nsivabalan commented on code in PR #13954:
URL: https://github.com/apache/hudi/pull/13954#discussion_r2373174206
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -3376,6 +3377,333 @@ void testErrorTableSourcePersist(WriteOperationType
writeOperationType, boolean
assertRecordCount(950, tableBasePath, sqlContext);
}
+ /**
+ * Test incremental source functionality when source table is upgraded from
v6 to v8/v9
+ * while target table remains at v6. This validates backward compatibility
for cross-version
+ * incremental sync scenarios.
+ */
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableVersion.class, names = {"EIGHT", "NINE"})
+ public void testIncrementalSourceWithSourceTableUpgrade(HoodieTableVersion
targetUpgradeVersion) throws Exception {
+ // Create unique paths for both tables
+ String sourceTablePath = basePath + "/source_table_v6_to_v" +
targetUpgradeVersion.versionCode();
+ String targetTablePath = basePath + "/target_table_v6_" +
targetUpgradeVersion.versionCode();
+
+ // Phase 1: Create source table at v6 with initial commits
+ HoodieDeltaStreamer.Config sourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ sourceConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ sourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+ sourceConfig.sourceLimit = 100;
+
+ // Initialize source table with first commit
+ HoodieDeltaStreamer sourceStreamer = new HoodieDeltaStreamer(sourceConfig,
jsc);
+ sourceStreamer.sync();
+
+ // Add 2 more commits to source table (total 3 commits)
+ sourceConfig.operation = WriteOperationType.BULK_INSERT;
+ for (int i = 0; i < 2; i++) {
+ // Create fresh config to avoid conflicts
+ HoodieDeltaStreamer.Config newSourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ newSourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key()
+ "=false");
+ newSourceConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(newSourceConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source has 3 commits and is at v6
+ HoodieTableMetaClient sourceMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(sourceTablePath)
+ .build();
+ assertEquals(3,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+ assertEquals(HoodieTableVersion.SIX,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 2: Setup target table at v6 and sync first 3 commits
+ HoodieDeltaStreamer.Config targetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+ targetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ targetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+ // Sync all 3 commits from source to target, one by one
+ HoodieDeltaStreamer targetStreamer = new HoodieDeltaStreamer(targetConfig,
jsc);
+ targetStreamer.sync();
+
+ // Verify checkpoint is established in V1 format
+ HoodieTableMetaClient targetMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(targetTablePath)
+ .build();
+ HoodieInstant lastTargetInstant =
targetMetaClient.getActiveTimeline().lastInstant().get();
+ Option<HoodieCommitMetadata> commitMetadata =
HoodieClientTestUtils.getCommitMetadataForInstant(
+ targetMetaClient, lastTargetInstant);
+ assertTrue(commitMetadata.isPresent());
+ // Checkpoint should be in V1 format
+
assertTrue(commitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+ String checkpointBeforeUpgrade =
commitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+ // Verify record counts match between source and target
+ long sourceRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(sourceTablePath)
+ .count();
+ long targetRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(targetTablePath)
+ .count();
+
+ assertEquals(sourceRecordCountOriginal, targetRecordCountOriginal,
+ "Target should have all records from source");
+
+ // Phase 3: Upgrade source table from v6 to target version
+ HoodieDeltaStreamer.Config upgradeConfig =
TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
+ upgradeConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() +
"=" + targetUpgradeVersion.versionCode());
+ upgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=true");
+ upgradeConfig.sourceLimit = 100;
+
+ // This sync will trigger the upgrade
+ sourceStreamer = new HoodieDeltaStreamer(upgradeConfig, jsc);
+ sourceStreamer.sync();
+
+ // Verify source table is now at target version - create fresh metaclient
after upgrade
+ sourceMetaClient = HoodieTableMetaClient.builder()
+ .setConf(HoodieTestUtils.getDefaultStorageConf())
+ .setBasePath(sourceTablePath)
+ .build();
+ assertEquals(targetUpgradeVersion,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 4: Add 2 more commits to upgraded source table
+ // After upgrade, don't specify version - let it use the existing table
version
+ for (int i = 0; i < 2; i++) {
+ HoodieDeltaStreamer.Config postUpgradeConfig =
TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
+
postUpgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+ postUpgradeConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(postUpgradeConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source now has 6 total commits
+ sourceMetaClient = HoodieTableMetaClient.reload(sourceMetaClient);
+ assertEquals(6,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+
+ // Phase 5: Resume incremental sync from target table (still at v6)
+ // Create base config following existing test patterns
+ HoodieDeltaStreamer.Config resumeTargetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+ resumeTargetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key()
+ "=" + HoodieTableVersion.SIX.versionCode());
+
resumeTargetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+ // This should successfully pull all remaining commits from upgraded source
+ targetStreamer = new HoodieDeltaStreamer(resumeTargetConfig, jsc);
+ targetStreamer.sync();
+
+ // Phase 6: Validate data integrity and checkpoint continuity
+ targetMetaClient.reloadActiveTimeline();
+ assertEquals(HoodieTableVersion.SIX,
targetMetaClient.getTableConfig().getTableVersion());
+
+ // Verify record counts match between source and target
+ long sourceRecordCount = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(sourceTablePath)
+ .count();
+ long targetRecordCount = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(targetTablePath)
+ .count();
+
+ assertEquals(sourceRecordCount, targetRecordCount,
Review Comment:
data equality checks please
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -3376,6 +3377,333 @@ void testErrorTableSourcePersist(WriteOperationType
writeOperationType, boolean
assertRecordCount(950, tableBasePath, sqlContext);
}
+ /**
+ * Test incremental source functionality when source table is upgraded from
v6 to v8/v9
+ * while target table remains at v6. This validates backward compatibility
for cross-version
+ * incremental sync scenarios.
+ */
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableVersion.class, names = {"EIGHT", "NINE"})
+ public void testIncrementalSourceWithSourceTableUpgrade(HoodieTableVersion
targetUpgradeVersion) throws Exception {
+ // Create unique paths for both tables
+ String sourceTablePath = basePath + "/source_table_v6_to_v" +
targetUpgradeVersion.versionCode();
+ String targetTablePath = basePath + "/target_table_v6_" +
targetUpgradeVersion.versionCode();
+
+ // Phase 1: Create source table at v6 with initial commits
+ HoodieDeltaStreamer.Config sourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ sourceConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ sourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+ sourceConfig.sourceLimit = 100;
+
+ // Initialize source table with first commit
+ HoodieDeltaStreamer sourceStreamer = new HoodieDeltaStreamer(sourceConfig,
jsc);
+ sourceStreamer.sync();
+
+ // Add 2 more commits to source table (total 3 commits)
+ sourceConfig.operation = WriteOperationType.BULK_INSERT;
+ for (int i = 0; i < 2; i++) {
+ // Create fresh config to avoid conflicts
+ HoodieDeltaStreamer.Config newSourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ newSourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key()
+ "=false");
+ newSourceConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(newSourceConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source has 3 commits and is at v6
+ HoodieTableMetaClient sourceMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(sourceTablePath)
+ .build();
+ assertEquals(3,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+ assertEquals(HoodieTableVersion.SIX,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 2: Setup target table at v6 and sync first 3 commits
+ HoodieDeltaStreamer.Config targetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+ targetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ targetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+ // Sync all 3 commits from source to target, one by one
+ HoodieDeltaStreamer targetStreamer = new HoodieDeltaStreamer(targetConfig,
jsc);
+ targetStreamer.sync();
+
+ // Verify checkpoint is established in V1 format
+ HoodieTableMetaClient targetMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(targetTablePath)
+ .build();
+ HoodieInstant lastTargetInstant =
targetMetaClient.getActiveTimeline().lastInstant().get();
+ Option<HoodieCommitMetadata> commitMetadata =
HoodieClientTestUtils.getCommitMetadataForInstant(
+ targetMetaClient, lastTargetInstant);
+ assertTrue(commitMetadata.isPresent());
+ // Checkpoint should be in V1 format
+
assertTrue(commitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+ String checkpointBeforeUpgrade =
commitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+ // Verify record counts match between source and target
+ long sourceRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(sourceTablePath)
+ .count();
+ long targetRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(targetTablePath)
+ .count();
+
+ assertEquals(sourceRecordCountOriginal, targetRecordCountOriginal,
Review Comment:
since we are doing bulk_insert, should be easy to do entire data equality
right?
just drop meta fields from both table and compare the dataframes.
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -3376,6 +3377,346 @@ void testErrorTableSourcePersist(WriteOperationType
writeOperationType, boolean
assertRecordCount(950, tableBasePath, sqlContext);
}
+ /**
+ * Test incremental source functionality when source table is upgraded from
v6 to v9
+ * while target table remains at v6. This validates backward compatibility
for cross-version
+ * incremental sync scenarios.
+ */
+ @Test
+ public void testIncrementalSourceWithSourceTableUpgradeFromV6ToV9() throws
Exception {
+ // Create unique paths for both tables
+ String sourceTablePath = basePath + "/source_table_v6_to_v9";
+ String targetTablePath = basePath + "/target_table_v6";
+
+ // Phase 1: Create source table at v6 with initial commits
+ HoodieDeltaStreamer.Config sourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ sourceConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ sourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+ sourceConfig.sourceLimit = 100;
+
+ // Initialize source table with first commit
+ HoodieDeltaStreamer sourceStreamer = new HoodieDeltaStreamer(sourceConfig,
jsc);
+ sourceStreamer.sync();
+
+ // Add 2 more commits to source table (total 3 commits)
+ sourceConfig.operation = WriteOperationType.BULK_INSERT;
+ for (int i = 0; i < 2; i++) {
+ // Create fresh config to avoid conflicts
+ HoodieDeltaStreamer.Config newSourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ newSourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key()
+ "=false");
+ newSourceConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(newSourceConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source has 3 commits and is at v6
+ HoodieTableMetaClient sourceMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(sourceTablePath)
+ .build();
+ assertEquals(3,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+ assertEquals(HoodieTableVersion.SIX,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 2: Setup target table at v6 and sync first 3 commits
+ HoodieDeltaStreamer.Config targetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+ targetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ targetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+ // Sync all 3 commits from source to target, one by one
+ HoodieDeltaStreamer targetStreamer = new HoodieDeltaStreamer(targetConfig,
jsc);
+ //try calling this once INSTEAD OF THREEE TIMES
+ targetStreamer.sync();
+
+
+ // Verify checkpoint is established in V1 format
+ HoodieTableMetaClient targetMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(targetTablePath)
+ .build();
+ HoodieInstant lastTargetInstant =
targetMetaClient.getActiveTimeline().lastInstant().get();
+ Option<HoodieCommitMetadata> commitMetadata =
HoodieClientTestUtils.getCommitMetadataForInstant(
+ targetMetaClient, lastTargetInstant);
+ assertTrue(commitMetadata.isPresent());
+ // Checkpoint should be in V1 format
+
assertTrue(commitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+ String checkpointBeforeUpgrade =
commitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+ // Verify record counts match between source and target
+ long sourceRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(sourceTablePath)
+ .count();
+ long targetRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(targetTablePath)
+ .count();
+
+ System.out.println("Source record count: " + sourceRecordCountOriginal);
+ System.out.println("Target record count: " + targetRecordCountOriginal);
+
+ assertEquals(sourceRecordCountOriginal, targetRecordCountOriginal,
+ "Target should have all records from source");
+
+
+ // Phase 3: Upgrade source table from v6 to v9
+ HoodieDeltaStreamer.Config upgradeConfig =
TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
+ upgradeConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() +
"=" + HoodieTableVersion.NINE.versionCode());
+ upgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=true");
+ upgradeConfig.sourceLimit = 100;
+
+ // This sync will trigger the upgrade
+ sourceStreamer = new HoodieDeltaStreamer(upgradeConfig, jsc);
+ sourceStreamer.sync();
+
+ // Verify source table is now v9 - create fresh metaclient after upgrade
+ sourceMetaClient = HoodieTableMetaClient.builder()
+ .setConf(HoodieTestUtils.getDefaultStorageConf())
+ .setBasePath(sourceTablePath)
+ .build();
+ assertEquals(HoodieTableVersion.NINE,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 4: Add 2 more commits to upgraded source table
+ // After upgrade, don't specify version - let it use the existing table
version
+ for (int i = 0; i < 2; i++) {
+ HoodieDeltaStreamer.Config postUpgradeConfig =
TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
+
postUpgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+ postUpgradeConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(postUpgradeConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source now has 6 total commits
+ sourceMetaClient = HoodieTableMetaClient.reload(sourceMetaClient);
+ assertEquals(6,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+
+ // Phase 5: Resume incremental sync from target table (still at v6)
+ // Create base config following existing test patterns
+ HoodieDeltaStreamer.Config resumeTargetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+ resumeTargetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key()
+ "=" + HoodieTableVersion.SIX.versionCode());
+
resumeTargetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+ // This should successfully pull all remaining commits from upgraded source
+ targetStreamer = new HoodieDeltaStreamer(resumeTargetConfig, jsc);
+ targetStreamer.sync();
+
+ // Phase 6: Validate data integrity and checkpoint continuity
+ targetMetaClient.reloadActiveTimeline();
+ assertEquals(HoodieTableVersion.SIX,
targetMetaClient.getTableConfig().getTableVersion());
+
+ // Verify record counts match between source and target
+ long sourceRecordCount = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(sourceTablePath)
+ .count();
+ long targetRecordCount = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(targetTablePath)
+ .count();
+
+ System.out.println("Source record count: " + sourceRecordCount);
+ System.out.println("Target record count: " + targetRecordCount);
+
+ assertEquals(sourceRecordCount, targetRecordCount,
+ "Target should have all records from source despite version
difference");
+
+ // Verify checkpoint was properly updated in target
+ HoodieInstant finalTargetInstant =
targetMetaClient.getActiveTimeline().lastInstant().get();
+ Option<HoodieCommitMetadata> finalCommitMetadata =
HoodieClientTestUtils.getCommitMetadataForInstant(
+ targetMetaClient, finalTargetInstant);
+ assertTrue(finalCommitMetadata.isPresent());
+ // Target still uses V1 checkpoint format
+
assertTrue(finalCommitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+ String finalCheckpoint =
finalCommitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+ // Checkpoint should have advanced from the pre-upgrade checkpoint
+ assertNotEquals(checkpointBeforeUpgrade, finalCheckpoint,
+ "Checkpoint should have advanced after consuming new commits");
+
+ // Verify target has correct number of commits
+ assertEquals(2,
targetMetaClient.getActiveTimeline().getCommitsTimeline().countInstants(),
+ "Target should have 2 commits (as its batches 3 source table commits
into one target table commit)");
+
+ // Clean up
+ UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, sourceTablePath);
+ UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, targetTablePath);
+ }
+
+ @Test
+ public void testIncrementalSourceWithTargetTableUpgradeFromV6ToV9() throws
Exception {
+ // Create unique paths for both tables
+ String sourceTablePath = basePath + "/source_table_v6_target_upgrade";
+ String targetTablePath = basePath + "/target_table_v6_to_v9";
+
+ // Phase 1: Create source table at v6 with initial commits
+ HoodieDeltaStreamer.Config sourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ sourceConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ sourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+ sourceConfig.sourceLimit = 100;
+
+ // Initialize source table with first commit
+ HoodieDeltaStreamer sourceStreamer = new HoodieDeltaStreamer(sourceConfig,
jsc);
+ sourceStreamer.sync();
+
+ // Add 2 more commits to source table (total 3 commits)
+ sourceConfig.operation = WriteOperationType.BULK_INSERT;
+ for (int i = 0; i < 2; i++) {
+ // Create fresh config to avoid conflicts
+ HoodieDeltaStreamer.Config newSourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ newSourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key()
+ "=false");
+ newSourceConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(newSourceConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source has 3 commits and is at v6
+ HoodieTableMetaClient sourceMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(sourceTablePath)
+ .build();
+ assertEquals(3,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+ assertEquals(HoodieTableVersion.SIX,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 2: Setup target table at v6 and sync first 3 commits
+ HoodieDeltaStreamer.Config targetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+ targetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ targetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+ // Sync all 3 commits from source to target
+ HoodieDeltaStreamer targetStreamer = new HoodieDeltaStreamer(targetConfig,
jsc);
+ targetStreamer.sync();
+
+ // Verify checkpoint is established in V1 format
+ HoodieTableMetaClient targetMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(targetTablePath)
+ .build();
+ HoodieInstant lastTargetInstant =
targetMetaClient.getActiveTimeline().lastInstant().get();
+ Option<HoodieCommitMetadata> commitMetadata =
HoodieClientTestUtils.getCommitMetadataForInstant(
+ targetMetaClient, lastTargetInstant);
+ assertTrue(commitMetadata.isPresent());
+ // Checkpoint should be in V1 format
+
assertTrue(commitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+ String checkpointBeforeUpgrade =
commitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+ // Verify record counts match between source and target
+ long sourceRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(sourceTablePath)
+ .count();
+ long targetRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(targetTablePath)
+ .count();
+
+ System.out.println("Source record count: " + sourceRecordCountOriginal);
+ System.out.println("Target record count: " + targetRecordCountOriginal);
+
+ assertEquals(sourceRecordCountOriginal, targetRecordCountOriginal,
+ "Target should have all records from source");
+
+ // Phase 3: Upgrade target table from v6 to v9
+ HoodieDeltaStreamer.Config upgradeTargetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
Review Comment:
this is end to end functional test which will add up to test run time.
So, I was trying to be cautious.
most folks will likely either be in V6 or V9. so, we can leave it as is for
now.
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -3376,6 +3377,333 @@ void testErrorTableSourcePersist(WriteOperationType
writeOperationType, boolean
assertRecordCount(950, tableBasePath, sqlContext);
}
+ /**
+ * Test incremental source functionality when source table is upgraded from
v6 to v8/v9
+ * while target table remains at v6. This validates backward compatibility
for cross-version
+ * incremental sync scenarios.
+ */
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableVersion.class, names = {"EIGHT", "NINE"})
+ public void testIncrementalSourceWithSourceTableUpgrade(HoodieTableVersion
targetUpgradeVersion) throws Exception {
+ // Create unique paths for both tables
+ String sourceTablePath = basePath + "/source_table_v6_to_v" +
targetUpgradeVersion.versionCode();
+ String targetTablePath = basePath + "/target_table_v6_" +
targetUpgradeVersion.versionCode();
+
+ // Phase 1: Create source table at v6 with initial commits
+ HoodieDeltaStreamer.Config sourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ sourceConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ sourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+ sourceConfig.sourceLimit = 100;
+
+ // Initialize source table with first commit
+ HoodieDeltaStreamer sourceStreamer = new HoodieDeltaStreamer(sourceConfig,
jsc);
+ sourceStreamer.sync();
+
+ // Add 2 more commits to source table (total 3 commits)
+ sourceConfig.operation = WriteOperationType.BULK_INSERT;
+ for (int i = 0; i < 2; i++) {
+ // Create fresh config to avoid conflicts
+ HoodieDeltaStreamer.Config newSourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ newSourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key()
+ "=false");
+ newSourceConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(newSourceConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source has 3 commits and is at v6
+ HoodieTableMetaClient sourceMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(sourceTablePath)
+ .build();
+ assertEquals(3,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+ assertEquals(HoodieTableVersion.SIX,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 2: Setup target table at v6 and sync first 3 commits
+ HoodieDeltaStreamer.Config targetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+ targetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ targetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+ // Sync all 3 commits from source to target, one by one
+ HoodieDeltaStreamer targetStreamer = new HoodieDeltaStreamer(targetConfig,
jsc);
+ targetStreamer.sync();
+
+ // Verify checkpoint is established in V1 format
+ HoodieTableMetaClient targetMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(targetTablePath)
+ .build();
+ HoodieInstant lastTargetInstant =
targetMetaClient.getActiveTimeline().lastInstant().get();
+ Option<HoodieCommitMetadata> commitMetadata =
HoodieClientTestUtils.getCommitMetadataForInstant(
+ targetMetaClient, lastTargetInstant);
+ assertTrue(commitMetadata.isPresent());
+ // Checkpoint should be in V1 format
+
assertTrue(commitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+ String checkpointBeforeUpgrade =
commitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+ // Verify record counts match between source and target
+ long sourceRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(sourceTablePath)
+ .count();
+ long targetRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(targetTablePath)
+ .count();
+
+ assertEquals(sourceRecordCountOriginal, targetRecordCountOriginal,
+ "Target should have all records from source");
+
+ // Phase 3: Upgrade source table from v6 to target version
+ HoodieDeltaStreamer.Config upgradeConfig =
TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
+ upgradeConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() +
"=" + targetUpgradeVersion.versionCode());
+ upgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=true");
+ upgradeConfig.sourceLimit = 100;
+
+ // This sync will trigger the upgrade
+ sourceStreamer = new HoodieDeltaStreamer(upgradeConfig, jsc);
+ sourceStreamer.sync();
+
+ // Verify source table is now at target version - create fresh metaclient
after upgrade
+ sourceMetaClient = HoodieTableMetaClient.builder()
+ .setConf(HoodieTestUtils.getDefaultStorageConf())
+ .setBasePath(sourceTablePath)
+ .build();
+ assertEquals(targetUpgradeVersion,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 4: Add 2 more commits to upgraded source table
+ // After upgrade, don't specify version - let it use the existing table
version
+ for (int i = 0; i < 2; i++) {
+ HoodieDeltaStreamer.Config postUpgradeConfig =
TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
+
postUpgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+ postUpgradeConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(postUpgradeConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source now has 6 total commits
+ sourceMetaClient = HoodieTableMetaClient.reload(sourceMetaClient);
+ assertEquals(6,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+
+ // Phase 5: Resume incremental sync from target table (still at v6)
+ // Create base config following existing test patterns
+ HoodieDeltaStreamer.Config resumeTargetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+ resumeTargetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key()
+ "=" + HoodieTableVersion.SIX.versionCode());
+
resumeTargetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+ // This should successfully pull all remaining commits from upgraded source
+ targetStreamer = new HoodieDeltaStreamer(resumeTargetConfig, jsc);
+ targetStreamer.sync();
+
+ // Phase 6: Validate data integrity and checkpoint continuity
+ targetMetaClient.reloadActiveTimeline();
+ assertEquals(HoodieTableVersion.SIX,
targetMetaClient.getTableConfig().getTableVersion());
+
+ // Verify record counts match between source and target
+ long sourceRecordCount = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(sourceTablePath)
+ .count();
+ long targetRecordCount = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(targetTablePath)
+ .count();
+
+ assertEquals(sourceRecordCount, targetRecordCount,
+ "Target should have all records from source despite version
difference");
+
+ // Verify checkpoint was properly updated in target
+ HoodieInstant finalTargetInstant =
targetMetaClient.getActiveTimeline().lastInstant().get();
+ Option<HoodieCommitMetadata> finalCommitMetadata =
HoodieClientTestUtils.getCommitMetadataForInstant(
+ targetMetaClient, finalTargetInstant);
+ assertTrue(finalCommitMetadata.isPresent());
+ // Target still uses V1 checkpoint format
+
assertTrue(finalCommitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+ String finalCheckpoint =
finalCommitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+ // Checkpoint should have advanced from the pre-upgrade checkpoint
+ assertTrue(Long.parseLong(finalCheckpoint) >
Long.parseLong(checkpointBeforeUpgrade),
+ "Final checkpoint should be greater than checkpoint before
upgrade");
+
+ // Verify target has correct number of commits
+ assertEquals(2,
targetMetaClient.getActiveTimeline().getCommitsTimeline().countInstants(),
+ "Target should have 2 commits (as its batches 3 source table commits
into one target table commit)");
+
+ // Clean up
+ UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, sourceTablePath);
+ UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, targetTablePath);
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableVersion.class, names = {"EIGHT", "NINE"})
+ public void testIncrementalSourceWithTargetTableUpgrade(HoodieTableVersion
targetUpgradeVersion) throws Exception {
+ // Create unique paths for both tables
+ String sourceTablePath = basePath + "/source_table_v6_target_upgrade_" +
targetUpgradeVersion.versionCode();
+ String targetTablePath = basePath + "/target_table_v6_to_v" +
targetUpgradeVersion.versionCode();
+
+ // Phase 1: Create source table at v6 with initial commits
+ HoodieDeltaStreamer.Config sourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ sourceConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ sourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+ sourceConfig.sourceLimit = 100;
+
+ // Initialize source table with first commit
+ HoodieDeltaStreamer sourceStreamer = new HoodieDeltaStreamer(sourceConfig,
jsc);
+ sourceStreamer.sync();
+
+ // Add 2 more commits to source table (total 3 commits)
+ sourceConfig.operation = WriteOperationType.BULK_INSERT;
+ for (int i = 0; i < 2; i++) {
+ // Create fresh config to avoid conflicts
+ HoodieDeltaStreamer.Config newSourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ newSourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key()
+ "=false");
+ newSourceConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(newSourceConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source has 3 commits and is at v6
+ HoodieTableMetaClient sourceMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(sourceTablePath)
+ .build();
+ assertEquals(3,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+ assertEquals(HoodieTableVersion.SIX,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 2: Setup target table at v6 and sync first 3 commits
+ HoodieDeltaStreamer.Config targetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+ targetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ targetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+ // Sync all 3 commits from source to target
+ HoodieDeltaStreamer targetStreamer = new HoodieDeltaStreamer(targetConfig,
jsc);
+ targetStreamer.sync();
+
+ // Verify checkpoint is established in V1 format
+ HoodieTableMetaClient targetMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(targetTablePath)
+ .build();
+ HoodieInstant lastTargetInstant =
targetMetaClient.getActiveTimeline().lastInstant().get();
+ Option<HoodieCommitMetadata> commitMetadata =
HoodieClientTestUtils.getCommitMetadataForInstant(
+ targetMetaClient, lastTargetInstant);
+ assertTrue(commitMetadata.isPresent());
+ // Checkpoint should be in V1 format
+
assertTrue(commitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+ String checkpointBeforeUpgrade =
commitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+ // Verify record counts match between source and target
+ long sourceRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(sourceTablePath)
+ .count();
+ long targetRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(targetTablePath)
+ .count();
+
+ assertEquals(sourceRecordCountOriginal, targetRecordCountOriginal,
+ "Target should have all records from source");
+
+ // Phase 3: Upgrade target table from v6 to target version
+ HoodieDeltaStreamer.Config upgradeTargetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+
upgradeTargetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() +
"=" + targetUpgradeVersion.versionCode());
+
upgradeTargetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=true");
+
upgradeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
upgradeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+ upgradeTargetConfig.allowCommitOnNoCheckpointChange = true; // Allow
commit even with no new data to trigger upgrade
+
+ // This sync will trigger the upgrade of target table and create a new
commit
+ targetStreamer = new HoodieDeltaStreamer(upgradeTargetConfig, jsc);
+ targetStreamer.sync();
+
+ // Verify target table is now at target version - create fresh metaclient
after upgrade
+ targetMetaClient = HoodieTableMetaClient.builder()
+ .setConf(HoodieTestUtils.getDefaultStorageConf())
+ .setBasePath(targetTablePath)
+ .build();
+ assertEquals(targetUpgradeVersion,
targetMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 4: Add 2 more commits to source table (keeping it at v6)
+ for (int i = 0; i < 2; i++) {
+ HoodieDeltaStreamer.Config postTargetUpgradeConfig =
TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
+
postTargetUpgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key()
+ "=false");
+ postTargetUpgradeConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(postTargetUpgradeConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source now has 5 total commits and is still at v6
+ sourceMetaClient = HoodieTableMetaClient.reload(sourceMetaClient);
+ assertEquals(5,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+ assertEquals(HoodieTableVersion.SIX,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 5: Resume incremental sync from upgraded target table (now at
target version)
+ // Create base config for resuming sync
+ HoodieDeltaStreamer.Config resumeTargetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+
resumeTargetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+ resumeTargetConfig.allowCommitOnNoCheckpointChange = true; // Allow commit
even with no new data to trigger upgrade
+
+ // This should successfully pull all remaining commits from v6 source to
upgraded target
+ targetStreamer = new HoodieDeltaStreamer(resumeTargetConfig, jsc);
+ targetStreamer.sync();
+
+ // Phase 6: Validate data integrity and checkpoint continuity
+ targetMetaClient.reloadActiveTimeline();
+ assertEquals(targetUpgradeVersion,
targetMetaClient.getTableConfig().getTableVersion());
+
+ // Verify record counts match between source and target
+ long sourceRecordCount = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(sourceTablePath)
+ .count();
+ long targetRecordCount = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(targetTablePath)
+ .count();
+
+ assertEquals(sourceRecordCount, targetRecordCount,
Review Comment:
lets validate full data
you can cache the source data if need be to avoid triggering reads to hudi
table everytime.
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -3376,6 +3377,333 @@ void testErrorTableSourcePersist(WriteOperationType
writeOperationType, boolean
assertRecordCount(950, tableBasePath, sqlContext);
}
+ /**
+ * Test incremental source functionality when source table is upgraded from
v6 to v8/v9
+ * while target table remains at v6. This validates backward compatibility
for cross-version
+ * incremental sync scenarios.
+ */
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableVersion.class, names = {"EIGHT", "NINE"})
+ public void testIncrementalSourceWithSourceTableUpgrade(HoodieTableVersion
targetUpgradeVersion) throws Exception {
+ // Create unique paths for both tables
+ String sourceTablePath = basePath + "/source_table_v6_to_v" +
targetUpgradeVersion.versionCode();
+ String targetTablePath = basePath + "/target_table_v6_" +
targetUpgradeVersion.versionCode();
+
+ // Phase 1: Create source table at v6 with initial commits
+ HoodieDeltaStreamer.Config sourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ sourceConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ sourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+ sourceConfig.sourceLimit = 100;
+
+ // Initialize source table with first commit
+ HoodieDeltaStreamer sourceStreamer = new HoodieDeltaStreamer(sourceConfig,
jsc);
+ sourceStreamer.sync();
+
+ // Add 2 more commits to source table (total 3 commits)
+ sourceConfig.operation = WriteOperationType.BULK_INSERT;
+ for (int i = 0; i < 2; i++) {
+ // Create fresh config to avoid conflicts
+ HoodieDeltaStreamer.Config newSourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ newSourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key()
+ "=false");
+ newSourceConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(newSourceConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source has 3 commits and is at v6
+ HoodieTableMetaClient sourceMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(sourceTablePath)
+ .build();
+ assertEquals(3,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+ assertEquals(HoodieTableVersion.SIX,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 2: Setup target table at v6 and sync first 3 commits
+ HoodieDeltaStreamer.Config targetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+ targetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ targetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+ // Sync all 3 commits from source to target, one by one
+ HoodieDeltaStreamer targetStreamer = new HoodieDeltaStreamer(targetConfig,
jsc);
+ targetStreamer.sync();
+
+ // Verify checkpoint is established in V1 format
+ HoodieTableMetaClient targetMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(targetTablePath)
+ .build();
+ HoodieInstant lastTargetInstant =
targetMetaClient.getActiveTimeline().lastInstant().get();
+ Option<HoodieCommitMetadata> commitMetadata =
HoodieClientTestUtils.getCommitMetadataForInstant(
+ targetMetaClient, lastTargetInstant);
+ assertTrue(commitMetadata.isPresent());
+ // Checkpoint should be in V1 format
+
assertTrue(commitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+ String checkpointBeforeUpgrade =
commitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+ // Verify record counts match between source and target
+ long sourceRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(sourceTablePath)
+ .count();
+ long targetRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(targetTablePath)
+ .count();
+
+ assertEquals(sourceRecordCountOriginal, targetRecordCountOriginal,
+ "Target should have all records from source");
+
+ // Phase 3: Upgrade source table from v6 to target version
+ HoodieDeltaStreamer.Config upgradeConfig =
TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
+ upgradeConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() +
"=" + targetUpgradeVersion.versionCode());
+ upgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=true");
+ upgradeConfig.sourceLimit = 100;
+
+ // This sync will trigger the upgrade
+ sourceStreamer = new HoodieDeltaStreamer(upgradeConfig, jsc);
+ sourceStreamer.sync();
+
+ // Verify source table is now at target version - create fresh metaclient
after upgrade
+ sourceMetaClient = HoodieTableMetaClient.builder()
+ .setConf(HoodieTestUtils.getDefaultStorageConf())
+ .setBasePath(sourceTablePath)
+ .build();
+ assertEquals(targetUpgradeVersion,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 4: Add 2 more commits to upgraded source table
+ // After upgrade, don't specify version - let it use the existing table
version
+ for (int i = 0; i < 2; i++) {
+ HoodieDeltaStreamer.Config postUpgradeConfig =
TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
+
postUpgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+ postUpgradeConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(postUpgradeConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source now has 6 total commits
+ sourceMetaClient = HoodieTableMetaClient.reload(sourceMetaClient);
+ assertEquals(6,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+
+ // Phase 5: Resume incremental sync from target table (still at v6)
+ // Create base config following existing test patterns
+ HoodieDeltaStreamer.Config resumeTargetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+ resumeTargetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key()
+ "=" + HoodieTableVersion.SIX.versionCode());
+
resumeTargetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+ // This should successfully pull all remaining commits from upgraded source
+ targetStreamer = new HoodieDeltaStreamer(resumeTargetConfig, jsc);
+ targetStreamer.sync();
+
+ // Phase 6: Validate data integrity and checkpoint continuity
+ targetMetaClient.reloadActiveTimeline();
+ assertEquals(HoodieTableVersion.SIX,
targetMetaClient.getTableConfig().getTableVersion());
+
+ // Verify record counts match between source and target
+ long sourceRecordCount = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(sourceTablePath)
+ .count();
+ long targetRecordCount = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(targetTablePath)
+ .count();
+
+ assertEquals(sourceRecordCount, targetRecordCount,
+ "Target should have all records from source despite version
difference");
+
+ // Verify checkpoint was properly updated in target
+ HoodieInstant finalTargetInstant =
targetMetaClient.getActiveTimeline().lastInstant().get();
+ Option<HoodieCommitMetadata> finalCommitMetadata =
HoodieClientTestUtils.getCommitMetadataForInstant(
+ targetMetaClient, finalTargetInstant);
+ assertTrue(finalCommitMetadata.isPresent());
+ // Target still uses V1 checkpoint format
+
assertTrue(finalCommitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+ String finalCheckpoint =
finalCommitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+ // Checkpoint should have advanced from the pre-upgrade checkpoint
+ assertTrue(Long.parseLong(finalCheckpoint) >
Long.parseLong(checkpointBeforeUpgrade),
+ "Final checkpoint should be greater than checkpoint before
upgrade");
+
+ // Verify target has correct number of commits
+ assertEquals(2,
targetMetaClient.getActiveTimeline().getCommitsTimeline().countInstants(),
+ "Target should have 2 commits (as its batches 3 source table commits
into one target table commit)");
+
+ // Clean up
+ UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, sourceTablePath);
+ UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, targetTablePath);
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableVersion.class, names = {"EIGHT", "NINE"})
+ public void testIncrementalSourceWithTargetTableUpgrade(HoodieTableVersion
targetUpgradeVersion) throws Exception {
Review Comment:
lets fix the arg name similar to other test
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -3376,6 +3377,333 @@ void testErrorTableSourcePersist(WriteOperationType
writeOperationType, boolean
assertRecordCount(950, tableBasePath, sqlContext);
}
+ /**
+ * Test incremental source functionality when source table is upgraded from
v6 to v8/v9
+ * while target table remains at v6. This validates backward compatibility
for cross-version
+ * incremental sync scenarios.
+ */
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableVersion.class, names = {"EIGHT", "NINE"})
+ public void testIncrementalSourceWithSourceTableUpgrade(HoodieTableVersion
targetUpgradeVersion) throws Exception {
Review Comment:
lets rename the arg to `sourceTableFinalTableVersion`
its bit confusing as of now
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -3376,6 +3377,333 @@ void testErrorTableSourcePersist(WriteOperationType
writeOperationType, boolean
assertRecordCount(950, tableBasePath, sqlContext);
}
+ /**
+ * Test incremental source functionality when source table is upgraded from
v6 to v8/v9
+ * while target table remains at v6. This validates backward compatibility
for cross-version
+ * incremental sync scenarios.
+ */
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableVersion.class, names = {"EIGHT", "NINE"})
+ public void testIncrementalSourceWithSourceTableUpgrade(HoodieTableVersion
targetUpgradeVersion) throws Exception {
+ // Create unique paths for both tables
+ String sourceTablePath = basePath + "/source_table_v6_to_v" +
targetUpgradeVersion.versionCode();
+ String targetTablePath = basePath + "/target_table_v6_" +
targetUpgradeVersion.versionCode();
+
+ // Phase 1: Create source table at v6 with initial commits
+ HoodieDeltaStreamer.Config sourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ sourceConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ sourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+ sourceConfig.sourceLimit = 100;
+
+ // Initialize source table with first commit
+ HoodieDeltaStreamer sourceStreamer = new HoodieDeltaStreamer(sourceConfig,
jsc);
+ sourceStreamer.sync();
+
+ // Add 2 more commits to source table (total 3 commits)
+ sourceConfig.operation = WriteOperationType.BULK_INSERT;
+ for (int i = 0; i < 2; i++) {
+ // Create fresh config to avoid conflicts
+ HoodieDeltaStreamer.Config newSourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ newSourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key()
+ "=false");
+ newSourceConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(newSourceConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source has 3 commits and is at v6
+ HoodieTableMetaClient sourceMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(sourceTablePath)
+ .build();
+ assertEquals(3,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+ assertEquals(HoodieTableVersion.SIX,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 2: Setup target table at v6 and sync first 3 commits
+ HoodieDeltaStreamer.Config targetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+ targetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ targetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+ // Sync all 3 commits from source to target, one by one
+ HoodieDeltaStreamer targetStreamer = new HoodieDeltaStreamer(targetConfig,
jsc);
+ targetStreamer.sync();
+
+ // Verify checkpoint is established in V1 format
+ HoodieTableMetaClient targetMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(targetTablePath)
+ .build();
+ HoodieInstant lastTargetInstant =
targetMetaClient.getActiveTimeline().lastInstant().get();
+ Option<HoodieCommitMetadata> commitMetadata =
HoodieClientTestUtils.getCommitMetadataForInstant(
+ targetMetaClient, lastTargetInstant);
+ assertTrue(commitMetadata.isPresent());
+ // Checkpoint should be in V1 format
+
assertTrue(commitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+ String checkpointBeforeUpgrade =
commitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+ // Verify record counts match between source and target
+ long sourceRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(sourceTablePath)
+ .count();
+ long targetRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(targetTablePath)
+ .count();
+
+ assertEquals(sourceRecordCountOriginal, targetRecordCountOriginal,
+ "Target should have all records from source");
+
+ // Phase 3: Upgrade source table from v6 to target version
+ HoodieDeltaStreamer.Config upgradeConfig =
TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
+ upgradeConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() +
"=" + targetUpgradeVersion.versionCode());
+ upgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=true");
+ upgradeConfig.sourceLimit = 100;
+
+ // This sync will trigger the upgrade
+ sourceStreamer = new HoodieDeltaStreamer(upgradeConfig, jsc);
+ sourceStreamer.sync();
+
+ // Verify source table is now at target version - create fresh metaclient
after upgrade
+ sourceMetaClient = HoodieTableMetaClient.builder()
+ .setConf(HoodieTestUtils.getDefaultStorageConf())
+ .setBasePath(sourceTablePath)
+ .build();
+ assertEquals(targetUpgradeVersion,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 4: Add 2 more commits to upgraded source table
+ // After upgrade, don't specify version - let it use the existing table
version
+ for (int i = 0; i < 2; i++) {
+ HoodieDeltaStreamer.Config postUpgradeConfig =
TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
+
postUpgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+ postUpgradeConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(postUpgradeConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source now has 6 total commits
+ sourceMetaClient = HoodieTableMetaClient.reload(sourceMetaClient);
+ assertEquals(6,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+
+ // Phase 5: Resume incremental sync from target table (still at v6)
+ // Create base config following existing test patterns
+ HoodieDeltaStreamer.Config resumeTargetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+ resumeTargetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key()
+ "=" + HoodieTableVersion.SIX.versionCode());
+
resumeTargetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+ // This should successfully pull all remaining commits from upgraded source
+ targetStreamer = new HoodieDeltaStreamer(resumeTargetConfig, jsc);
+ targetStreamer.sync();
+
+ // Phase 6: Validate data integrity and checkpoint continuity
+ targetMetaClient.reloadActiveTimeline();
+ assertEquals(HoodieTableVersion.SIX,
targetMetaClient.getTableConfig().getTableVersion());
+
+ // Verify record counts match between source and target
+ long sourceRecordCount = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(sourceTablePath)
+ .count();
+ long targetRecordCount = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(targetTablePath)
+ .count();
+
+ assertEquals(sourceRecordCount, targetRecordCount,
+ "Target should have all records from source despite version
difference");
+
+ // Verify checkpoint was properly updated in target
+ HoodieInstant finalTargetInstant =
targetMetaClient.getActiveTimeline().lastInstant().get();
+ Option<HoodieCommitMetadata> finalCommitMetadata =
HoodieClientTestUtils.getCommitMetadataForInstant(
+ targetMetaClient, finalTargetInstant);
+ assertTrue(finalCommitMetadata.isPresent());
+ // Target still uses V1 checkpoint format
+
assertTrue(finalCommitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+ String finalCheckpoint =
finalCommitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+ // Checkpoint should have advanced from the pre-upgrade checkpoint
+ assertTrue(Long.parseLong(finalCheckpoint) >
Long.parseLong(checkpointBeforeUpgrade),
+ "Final checkpoint should be greater than checkpoint before
upgrade");
+
+ // Verify target has correct number of commits
+ assertEquals(2,
targetMetaClient.getActiveTimeline().getCommitsTimeline().countInstants(),
+ "Target should have 2 commits (as its batches 3 source table commits
into one target table commit)");
+
+ // Clean up
+ UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, sourceTablePath);
+ UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, targetTablePath);
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableVersion.class, names = {"EIGHT", "NINE"})
+ public void testIncrementalSourceWithTargetTableUpgrade(HoodieTableVersion
targetUpgradeVersion) throws Exception {
Review Comment:
java docs similar to the other test would be nice
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -3376,6 +3377,333 @@ void testErrorTableSourcePersist(WriteOperationType
writeOperationType, boolean
assertRecordCount(950, tableBasePath, sqlContext);
}
+ /**
+ * Test incremental source functionality when source table is upgraded from
v6 to v8/v9
+ * while target table remains at v6. This validates backward compatibility
for cross-version
+ * incremental sync scenarios.
Review Comment:
can we extend the test to also upgrade target table to V9 as last step. and
add few more commits to source table and do one round of validation.
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -3376,6 +3377,333 @@ void testErrorTableSourcePersist(WriteOperationType
writeOperationType, boolean
assertRecordCount(950, tableBasePath, sqlContext);
}
+ /**
+ * Test incremental source functionality when source table is upgraded from
v6 to v8/v9
+ * while target table remains at v6. This validates backward compatibility
for cross-version
+ * incremental sync scenarios.
+ */
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableVersion.class, names = {"EIGHT", "NINE"})
+ public void testIncrementalSourceWithSourceTableUpgrade(HoodieTableVersion
targetUpgradeVersion) throws Exception {
+ // Create unique paths for both tables
+ String sourceTablePath = basePath + "/source_table_v6_to_v" +
targetUpgradeVersion.versionCode();
+ String targetTablePath = basePath + "/target_table_v6_" +
targetUpgradeVersion.versionCode();
+
+ // Phase 1: Create source table at v6 with initial commits
+ HoodieDeltaStreamer.Config sourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ sourceConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ sourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+ sourceConfig.sourceLimit = 100;
+
+ // Initialize source table with first commit
+ HoodieDeltaStreamer sourceStreamer = new HoodieDeltaStreamer(sourceConfig,
jsc);
+ sourceStreamer.sync();
+
+ // Add 2 more commits to source table (total 3 commits)
+ sourceConfig.operation = WriteOperationType.BULK_INSERT;
+ for (int i = 0; i < 2; i++) {
+ // Create fresh config to avoid conflicts
+ HoodieDeltaStreamer.Config newSourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ newSourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key()
+ "=false");
+ newSourceConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(newSourceConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source has 3 commits and is at v6
+ HoodieTableMetaClient sourceMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(sourceTablePath)
+ .build();
+ assertEquals(3,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+ assertEquals(HoodieTableVersion.SIX,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 2: Setup target table at v6 and sync first 3 commits
+ HoodieDeltaStreamer.Config targetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+ targetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ targetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+ // Sync all 3 commits from source to target, one by one
+ HoodieDeltaStreamer targetStreamer = new HoodieDeltaStreamer(targetConfig,
jsc);
+ targetStreamer.sync();
+
+ // Verify checkpoint is established in V1 format
+ HoodieTableMetaClient targetMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(targetTablePath)
+ .build();
+ HoodieInstant lastTargetInstant =
targetMetaClient.getActiveTimeline().lastInstant().get();
+ Option<HoodieCommitMetadata> commitMetadata =
HoodieClientTestUtils.getCommitMetadataForInstant(
+ targetMetaClient, lastTargetInstant);
+ assertTrue(commitMetadata.isPresent());
+ // Checkpoint should be in V1 format
+
assertTrue(commitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+ String checkpointBeforeUpgrade =
commitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+ // Verify record counts match between source and target
+ long sourceRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(sourceTablePath)
+ .count();
+ long targetRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(targetTablePath)
+ .count();
+
+ assertEquals(sourceRecordCountOriginal, targetRecordCountOriginal,
+ "Target should have all records from source");
+
+ // Phase 3: Upgrade source table from v6 to target version
+ HoodieDeltaStreamer.Config upgradeConfig =
TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
+ upgradeConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() +
"=" + targetUpgradeVersion.versionCode());
+ upgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=true");
+ upgradeConfig.sourceLimit = 100;
+
+ // This sync will trigger the upgrade
+ sourceStreamer = new HoodieDeltaStreamer(upgradeConfig, jsc);
+ sourceStreamer.sync();
+
+ // Verify source table is now at target version - create fresh metaclient
after upgrade
+ sourceMetaClient = HoodieTableMetaClient.builder()
+ .setConf(HoodieTestUtils.getDefaultStorageConf())
+ .setBasePath(sourceTablePath)
+ .build();
+ assertEquals(targetUpgradeVersion,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 4: Add 2 more commits to upgraded source table
+ // After upgrade, don't specify version - let it use the existing table
version
+ for (int i = 0; i < 2; i++) {
+ HoodieDeltaStreamer.Config postUpgradeConfig =
TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
+
postUpgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+ postUpgradeConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(postUpgradeConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source now has 6 total commits
+ sourceMetaClient = HoodieTableMetaClient.reload(sourceMetaClient);
+ assertEquals(6,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+
+ // Phase 5: Resume incremental sync from target table (still at v6)
+ // Create base config following existing test patterns
+ HoodieDeltaStreamer.Config resumeTargetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+ resumeTargetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key()
+ "=" + HoodieTableVersion.SIX.versionCode());
+
resumeTargetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+ // This should successfully pull all remaining commits from upgraded source
+ targetStreamer = new HoodieDeltaStreamer(resumeTargetConfig, jsc);
+ targetStreamer.sync();
+
+ // Phase 6: Validate data integrity and checkpoint continuity
+ targetMetaClient.reloadActiveTimeline();
+ assertEquals(HoodieTableVersion.SIX,
targetMetaClient.getTableConfig().getTableVersion());
+
+ // Verify record counts match between source and target
+ long sourceRecordCount = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(sourceTablePath)
+ .count();
+ long targetRecordCount = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(targetTablePath)
+ .count();
+
+ assertEquals(sourceRecordCount, targetRecordCount,
+ "Target should have all records from source despite version
difference");
+
+ // Verify checkpoint was properly updated in target
+ HoodieInstant finalTargetInstant =
targetMetaClient.getActiveTimeline().lastInstant().get();
+ Option<HoodieCommitMetadata> finalCommitMetadata =
HoodieClientTestUtils.getCommitMetadataForInstant(
+ targetMetaClient, finalTargetInstant);
+ assertTrue(finalCommitMetadata.isPresent());
+ // Target still uses V1 checkpoint format
+
assertTrue(finalCommitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+ String finalCheckpoint =
finalCommitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+ // Checkpoint should have advanced from the pre-upgrade checkpoint
+ assertTrue(Long.parseLong(finalCheckpoint) >
Long.parseLong(checkpointBeforeUpgrade),
+ "Final checkpoint should be greater than checkpoint before
upgrade");
+
+ // Verify target has correct number of commits
+ assertEquals(2,
targetMetaClient.getActiveTimeline().getCommitsTimeline().countInstants(),
+ "Target should have 2 commits (as its batches 3 source table commits
into one target table commit)");
+
+ // Clean up
+ UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, sourceTablePath);
+ UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, targetTablePath);
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableVersion.class, names = {"EIGHT", "NINE"})
+ public void testIncrementalSourceWithTargetTableUpgrade(HoodieTableVersion
targetUpgradeVersion) throws Exception {
+ // Create unique paths for both tables
+ String sourceTablePath = basePath + "/source_table_v6_target_upgrade_" +
targetUpgradeVersion.versionCode();
+ String targetTablePath = basePath + "/target_table_v6_to_v" +
targetUpgradeVersion.versionCode();
+
+ // Phase 1: Create source table at v6 with initial commits
+ HoodieDeltaStreamer.Config sourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ sourceConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ sourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+ sourceConfig.sourceLimit = 100;
+
+ // Initialize source table with first commit
+ HoodieDeltaStreamer sourceStreamer = new HoodieDeltaStreamer(sourceConfig,
jsc);
+ sourceStreamer.sync();
+
+ // Add 2 more commits to source table (total 3 commits)
+ sourceConfig.operation = WriteOperationType.BULK_INSERT;
+ for (int i = 0; i < 2; i++) {
+ // Create fresh config to avoid conflicts
+ HoodieDeltaStreamer.Config newSourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ newSourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key()
+ "=false");
+ newSourceConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(newSourceConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source has 3 commits and is at v6
+ HoodieTableMetaClient sourceMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(sourceTablePath)
+ .build();
+ assertEquals(3,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+ assertEquals(HoodieTableVersion.SIX,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 2: Setup target table at v6 and sync first 3 commits
+ HoodieDeltaStreamer.Config targetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+ targetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ targetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+ // Sync all 3 commits from source to target
+ HoodieDeltaStreamer targetStreamer = new HoodieDeltaStreamer(targetConfig,
jsc);
+ targetStreamer.sync();
+
+ // Verify checkpoint is established in V1 format
+ HoodieTableMetaClient targetMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(targetTablePath)
+ .build();
+ HoodieInstant lastTargetInstant =
targetMetaClient.getActiveTimeline().lastInstant().get();
+ Option<HoodieCommitMetadata> commitMetadata =
HoodieClientTestUtils.getCommitMetadataForInstant(
+ targetMetaClient, lastTargetInstant);
+ assertTrue(commitMetadata.isPresent());
+ // Checkpoint should be in V1 format
+
assertTrue(commitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+ String checkpointBeforeUpgrade =
commitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+ // Verify record counts match between source and target
+ long sourceRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(sourceTablePath)
+ .count();
+ long targetRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(targetTablePath)
+ .count();
+
+ assertEquals(sourceRecordCountOriginal, targetRecordCountOriginal,
+ "Target should have all records from source");
+
+ // Phase 3: Upgrade target table from v6 to target version
+ HoodieDeltaStreamer.Config upgradeTargetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+
upgradeTargetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() +
"=" + targetUpgradeVersion.versionCode());
+
upgradeTargetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=true");
+
upgradeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
upgradeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+ upgradeTargetConfig.allowCommitOnNoCheckpointChange = true; // Allow
commit even with no new data to trigger upgrade
+
+ // This sync will trigger the upgrade of target table and create a new
commit
+ targetStreamer = new HoodieDeltaStreamer(upgradeTargetConfig, jsc);
+ targetStreamer.sync();
+
+ // Verify target table is now at target version - create fresh metaclient
after upgrade
+ targetMetaClient = HoodieTableMetaClient.builder()
+ .setConf(HoodieTestUtils.getDefaultStorageConf())
+ .setBasePath(targetTablePath)
+ .build();
+ assertEquals(targetUpgradeVersion,
targetMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 4: Add 2 more commits to source table (keeping it at v6)
+ for (int i = 0; i < 2; i++) {
+ HoodieDeltaStreamer.Config postTargetUpgradeConfig =
TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
+
postTargetUpgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key()
+ "=false");
+ postTargetUpgradeConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(postTargetUpgradeConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source now has 5 total commits and is still at v6
+ sourceMetaClient = HoodieTableMetaClient.reload(sourceMetaClient);
+ assertEquals(5,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+ assertEquals(HoodieTableVersion.SIX,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 5: Resume incremental sync from upgraded target table (now at
target version)
+ // Create base config for resuming sync
+ HoodieDeltaStreamer.Config resumeTargetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+
resumeTargetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+ resumeTargetConfig.allowCommitOnNoCheckpointChange = true; // Allow commit
even with no new data to trigger upgrade
+
+ // This should successfully pull all remaining commits from v6 source to
upgraded target
+ targetStreamer = new HoodieDeltaStreamer(resumeTargetConfig, jsc);
+ targetStreamer.sync();
+
+ // Phase 6: Validate data integrity and checkpoint continuity
+ targetMetaClient.reloadActiveTimeline();
+ assertEquals(targetUpgradeVersion,
targetMetaClient.getTableConfig().getTableVersion());
+
+ // Verify record counts match between source and target
+ long sourceRecordCount = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(sourceTablePath)
+ .count();
+ long targetRecordCount = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(targetTablePath)
+ .count();
+
+ assertEquals(sourceRecordCount, targetRecordCount,
+ "Target should have all records from source despite version
difference");
+
+ // Verify checkpoint was properly updated and migrated to V2 format after
upgrade
+ HoodieInstant finalTargetInstant =
targetMetaClient.getActiveTimeline().lastInstant().get();
+ Option<HoodieCommitMetadata> finalCommitMetadata =
HoodieClientTestUtils.getCommitMetadataForInstant(
+ targetMetaClient, finalTargetInstant);
+ assertTrue(finalCommitMetadata.isPresent());
+
+ // The first time after upgrading, target checkpoint read is still in v1
+
assertTrue(finalCommitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY));
+ String finalCheckpoint =
finalCommitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY);
+
+ // Checkpoint should have advanced from the pre-upgrade checkpoint
+ assertTrue(Long.parseLong(finalCheckpoint) >
Long.parseLong(checkpointBeforeUpgrade),
+ "Final checkpoint should be greater than checkpoint before
upgrade");
+
+ // Verify target has correct number of commits
+ assertEquals(3,
targetMetaClient.getActiveTimeline().getCommitsTimeline().countInstants(),
+ "Target should have 3 commits (as it batches source table commits into
target table commits) + upgrade commit");
+
+ // Clean up
Review Comment:
can we move clean up to finally block
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -3376,6 +3377,333 @@ void testErrorTableSourcePersist(WriteOperationType
writeOperationType, boolean
assertRecordCount(950, tableBasePath, sqlContext);
}
+ /**
+ * Test incremental source functionality when source table is upgraded from
v6 to v8/v9
+ * while target table remains at v6. This validates backward compatibility
for cross-version
+ * incremental sync scenarios.
+ */
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableVersion.class, names = {"EIGHT", "NINE"})
+ public void testIncrementalSourceWithSourceTableUpgrade(HoodieTableVersion
targetUpgradeVersion) throws Exception {
+ // Create unique paths for both tables
+ String sourceTablePath = basePath + "/source_table_v6_to_v" +
targetUpgradeVersion.versionCode();
+ String targetTablePath = basePath + "/target_table_v6_" +
targetUpgradeVersion.versionCode();
+
+ // Phase 1: Create source table at v6 with initial commits
+ HoodieDeltaStreamer.Config sourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ sourceConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ sourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+ sourceConfig.sourceLimit = 100;
+
+ // Initialize source table with first commit
+ HoodieDeltaStreamer sourceStreamer = new HoodieDeltaStreamer(sourceConfig,
jsc);
+ sourceStreamer.sync();
+
+ // Add 2 more commits to source table (total 3 commits)
+ sourceConfig.operation = WriteOperationType.BULK_INSERT;
+ for (int i = 0; i < 2; i++) {
+ // Create fresh config to avoid conflicts
+ HoodieDeltaStreamer.Config newSourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ newSourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key()
+ "=false");
+ newSourceConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(newSourceConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source has 3 commits and is at v6
+ HoodieTableMetaClient sourceMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(sourceTablePath)
+ .build();
+ assertEquals(3,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+ assertEquals(HoodieTableVersion.SIX,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 2: Setup target table at v6 and sync first 3 commits
+ HoodieDeltaStreamer.Config targetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+ targetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ targetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+ // Sync all 3 commits from source to target, one by one
+ HoodieDeltaStreamer targetStreamer = new HoodieDeltaStreamer(targetConfig,
jsc);
+ targetStreamer.sync();
+
+ // Verify checkpoint is established in V1 format
+ HoodieTableMetaClient targetMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(targetTablePath)
+ .build();
+ HoodieInstant lastTargetInstant =
targetMetaClient.getActiveTimeline().lastInstant().get();
+ Option<HoodieCommitMetadata> commitMetadata =
HoodieClientTestUtils.getCommitMetadataForInstant(
+ targetMetaClient, lastTargetInstant);
+ assertTrue(commitMetadata.isPresent());
+ // Checkpoint should be in V1 format
+
assertTrue(commitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+ String checkpointBeforeUpgrade =
commitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+ // Verify record counts match between source and target
+ long sourceRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(sourceTablePath)
+ .count();
+ long targetRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(targetTablePath)
+ .count();
+
+ assertEquals(sourceRecordCountOriginal, targetRecordCountOriginal,
+ "Target should have all records from source");
+
+ // Phase 3: Upgrade source table from v6 to target version
+ HoodieDeltaStreamer.Config upgradeConfig =
TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
+ upgradeConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() +
"=" + targetUpgradeVersion.versionCode());
+ upgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=true");
+ upgradeConfig.sourceLimit = 100;
+
+ // This sync will trigger the upgrade
+ sourceStreamer = new HoodieDeltaStreamer(upgradeConfig, jsc);
+ sourceStreamer.sync();
+
+ // Verify source table is now at target version - create fresh metaclient
after upgrade
+ sourceMetaClient = HoodieTableMetaClient.builder()
+ .setConf(HoodieTestUtils.getDefaultStorageConf())
+ .setBasePath(sourceTablePath)
+ .build();
+ assertEquals(targetUpgradeVersion,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 4: Add 2 more commits to upgraded source table
+ // After upgrade, don't specify version - let it use the existing table
version
+ for (int i = 0; i < 2; i++) {
+ HoodieDeltaStreamer.Config postUpgradeConfig =
TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
+
postUpgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+ postUpgradeConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(postUpgradeConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source now has 6 total commits
+ sourceMetaClient = HoodieTableMetaClient.reload(sourceMetaClient);
+ assertEquals(6,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+
+ // Phase 5: Resume incremental sync from target table (still at v6)
+ // Create base config following existing test patterns
+ HoodieDeltaStreamer.Config resumeTargetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+ resumeTargetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key()
+ "=" + HoodieTableVersion.SIX.versionCode());
+
resumeTargetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+ // This should successfully pull all remaining commits from upgraded source
+ targetStreamer = new HoodieDeltaStreamer(resumeTargetConfig, jsc);
+ targetStreamer.sync();
+
+ // Phase 6: Validate data integrity and checkpoint continuity
+ targetMetaClient.reloadActiveTimeline();
+ assertEquals(HoodieTableVersion.SIX,
targetMetaClient.getTableConfig().getTableVersion());
+
+ // Verify record counts match between source and target
Review Comment:
I see the test is very monolith or rather fatter.
can we de-compose this into multiple smaller methods and re-use.
for eg, ingestion 100 records across 3 commits should be a private method.
- comparing num commits in timeline
- Comparing versions for a given table
- comparing data equality across source and target tables.
all these should be moved to private method and should be re-used
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -3376,6 +3377,333 @@ void testErrorTableSourcePersist(WriteOperationType
writeOperationType, boolean
assertRecordCount(950, tableBasePath, sqlContext);
}
+ /**
+ * Test incremental source functionality when source table is upgraded from
v6 to v8/v9
+ * while target table remains at v6. This validates backward compatibility
for cross-version
+ * incremental sync scenarios.
+ */
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableVersion.class, names = {"EIGHT", "NINE"})
+ public void testIncrementalSourceWithSourceTableUpgrade(HoodieTableVersion
targetUpgradeVersion) throws Exception {
+ // Create unique paths for both tables
+ String sourceTablePath = basePath + "/source_table_v6_to_v" +
targetUpgradeVersion.versionCode();
+ String targetTablePath = basePath + "/target_table_v6_" +
targetUpgradeVersion.versionCode();
+
+ // Phase 1: Create source table at v6 with initial commits
+ HoodieDeltaStreamer.Config sourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ sourceConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ sourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+ sourceConfig.sourceLimit = 100;
+
+ // Initialize source table with first commit
+ HoodieDeltaStreamer sourceStreamer = new HoodieDeltaStreamer(sourceConfig,
jsc);
+ sourceStreamer.sync();
+
+ // Add 2 more commits to source table (total 3 commits)
+ sourceConfig.operation = WriteOperationType.BULK_INSERT;
+ for (int i = 0; i < 2; i++) {
+ // Create fresh config to avoid conflicts
+ HoodieDeltaStreamer.Config newSourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ newSourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key()
+ "=false");
+ newSourceConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(newSourceConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source has 3 commits and is at v6
+ HoodieTableMetaClient sourceMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(sourceTablePath)
+ .build();
+ assertEquals(3,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+ assertEquals(HoodieTableVersion.SIX,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 2: Setup target table at v6 and sync first 3 commits
+ HoodieDeltaStreamer.Config targetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+ targetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ targetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+ // Sync all 3 commits from source to target, one by one
+ HoodieDeltaStreamer targetStreamer = new HoodieDeltaStreamer(targetConfig,
jsc);
+ targetStreamer.sync();
+
+ // Verify checkpoint is established in V1 format
+ HoodieTableMetaClient targetMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(targetTablePath)
+ .build();
+ HoodieInstant lastTargetInstant =
targetMetaClient.getActiveTimeline().lastInstant().get();
+ Option<HoodieCommitMetadata> commitMetadata =
HoodieClientTestUtils.getCommitMetadataForInstant(
+ targetMetaClient, lastTargetInstant);
+ assertTrue(commitMetadata.isPresent());
+ // Checkpoint should be in V1 format
+
assertTrue(commitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+ String checkpointBeforeUpgrade =
commitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+ // Verify record counts match between source and target
+ long sourceRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(sourceTablePath)
+ .count();
+ long targetRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(targetTablePath)
+ .count();
+
+ assertEquals(sourceRecordCountOriginal, targetRecordCountOriginal,
+ "Target should have all records from source");
+
+ // Phase 3: Upgrade source table from v6 to target version
+ HoodieDeltaStreamer.Config upgradeConfig =
TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
+ upgradeConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() +
"=" + targetUpgradeVersion.versionCode());
+ upgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=true");
+ upgradeConfig.sourceLimit = 100;
+
+ // This sync will trigger the upgrade
+ sourceStreamer = new HoodieDeltaStreamer(upgradeConfig, jsc);
+ sourceStreamer.sync();
+
+ // Verify source table is now at target version - create fresh metaclient
after upgrade
+ sourceMetaClient = HoodieTableMetaClient.builder()
+ .setConf(HoodieTestUtils.getDefaultStorageConf())
+ .setBasePath(sourceTablePath)
+ .build();
+ assertEquals(targetUpgradeVersion,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 4: Add 2 more commits to upgraded source table
+ // After upgrade, don't specify version - let it use the existing table
version
+ for (int i = 0; i < 2; i++) {
+ HoodieDeltaStreamer.Config postUpgradeConfig =
TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
+
postUpgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+ postUpgradeConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(postUpgradeConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source now has 6 total commits
+ sourceMetaClient = HoodieTableMetaClient.reload(sourceMetaClient);
+ assertEquals(6,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+
+ // Phase 5: Resume incremental sync from target table (still at v6)
+ // Create base config following existing test patterns
+ HoodieDeltaStreamer.Config resumeTargetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+ resumeTargetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key()
+ "=" + HoodieTableVersion.SIX.versionCode());
+
resumeTargetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+ // This should successfully pull all remaining commits from upgraded source
+ targetStreamer = new HoodieDeltaStreamer(resumeTargetConfig, jsc);
+ targetStreamer.sync();
+
+ // Phase 6: Validate data integrity and checkpoint continuity
+ targetMetaClient.reloadActiveTimeline();
+ assertEquals(HoodieTableVersion.SIX,
targetMetaClient.getTableConfig().getTableVersion());
+
+ // Verify record counts match between source and target
+ long sourceRecordCount = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(sourceTablePath)
+ .count();
+ long targetRecordCount = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(targetTablePath)
+ .count();
+
+ assertEquals(sourceRecordCount, targetRecordCount,
+ "Target should have all records from source despite version
difference");
+
+ // Verify checkpoint was properly updated in target
+ HoodieInstant finalTargetInstant =
targetMetaClient.getActiveTimeline().lastInstant().get();
+ Option<HoodieCommitMetadata> finalCommitMetadata =
HoodieClientTestUtils.getCommitMetadataForInstant(
+ targetMetaClient, finalTargetInstant);
+ assertTrue(finalCommitMetadata.isPresent());
+ // Target still uses V1 checkpoint format
+
assertTrue(finalCommitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+ String finalCheckpoint =
finalCommitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+ // Checkpoint should have advanced from the pre-upgrade checkpoint
+ assertTrue(Long.parseLong(finalCheckpoint) >
Long.parseLong(checkpointBeforeUpgrade),
+ "Final checkpoint should be greater than checkpoint before
upgrade");
+
+ // Verify target has correct number of commits
+ assertEquals(2,
targetMetaClient.getActiveTimeline().getCommitsTimeline().countInstants(),
+ "Target should have 2 commits (as its batches 3 source table commits
into one target table commit)");
+
+ // Clean up
+ UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, sourceTablePath);
+ UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, targetTablePath);
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableVersion.class, names = {"EIGHT", "NINE"})
+ public void testIncrementalSourceWithTargetTableUpgrade(HoodieTableVersion
targetUpgradeVersion) throws Exception {
+ // Create unique paths for both tables
+ String sourceTablePath = basePath + "/source_table_v6_target_upgrade_" +
targetUpgradeVersion.versionCode();
+ String targetTablePath = basePath + "/target_table_v6_to_v" +
targetUpgradeVersion.versionCode();
+
+ // Phase 1: Create source table at v6 with initial commits
+ HoodieDeltaStreamer.Config sourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ sourceConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ sourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+ sourceConfig.sourceLimit = 100;
+
+ // Initialize source table with first commit
+ HoodieDeltaStreamer sourceStreamer = new HoodieDeltaStreamer(sourceConfig,
jsc);
+ sourceStreamer.sync();
+
+ // Add 2 more commits to source table (total 3 commits)
+ sourceConfig.operation = WriteOperationType.BULK_INSERT;
+ for (int i = 0; i < 2; i++) {
+ // Create fresh config to avoid conflicts
+ HoodieDeltaStreamer.Config newSourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ newSourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key()
+ "=false");
+ newSourceConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(newSourceConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source has 3 commits and is at v6
+ HoodieTableMetaClient sourceMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(sourceTablePath)
+ .build();
+ assertEquals(3,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+ assertEquals(HoodieTableVersion.SIX,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 2: Setup target table at v6 and sync first 3 commits
+ HoodieDeltaStreamer.Config targetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+ targetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ targetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+ // Sync all 3 commits from source to target
+ HoodieDeltaStreamer targetStreamer = new HoodieDeltaStreamer(targetConfig,
jsc);
+ targetStreamer.sync();
+
+ // Verify checkpoint is established in V1 format
+ HoodieTableMetaClient targetMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(targetTablePath)
+ .build();
+ HoodieInstant lastTargetInstant =
targetMetaClient.getActiveTimeline().lastInstant().get();
+ Option<HoodieCommitMetadata> commitMetadata =
HoodieClientTestUtils.getCommitMetadataForInstant(
+ targetMetaClient, lastTargetInstant);
+ assertTrue(commitMetadata.isPresent());
+ // Checkpoint should be in V1 format
+
assertTrue(commitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+ String checkpointBeforeUpgrade =
commitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+ // Verify record counts match between source and target
Review Comment:
same comment as above test wrt data validation
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -3376,6 +3377,333 @@ void testErrorTableSourcePersist(WriteOperationType
writeOperationType, boolean
assertRecordCount(950, tableBasePath, sqlContext);
}
+ /**
+ * Test incremental source functionality when source table is upgraded from
v6 to v8/v9
+ * while target table remains at v6. This validates backward compatibility
for cross-version
+ * incremental sync scenarios.
+ */
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableVersion.class, names = {"EIGHT", "NINE"})
+ public void testIncrementalSourceWithSourceTableUpgrade(HoodieTableVersion
targetUpgradeVersion) throws Exception {
+ // Create unique paths for both tables
+ String sourceTablePath = basePath + "/source_table_v6_to_v" +
targetUpgradeVersion.versionCode();
+ String targetTablePath = basePath + "/target_table_v6_" +
targetUpgradeVersion.versionCode();
+
+ // Phase 1: Create source table at v6 with initial commits
+ HoodieDeltaStreamer.Config sourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ sourceConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ sourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+ sourceConfig.sourceLimit = 100;
+
+ // Initialize source table with first commit
+ HoodieDeltaStreamer sourceStreamer = new HoodieDeltaStreamer(sourceConfig,
jsc);
+ sourceStreamer.sync();
+
+ // Add 2 more commits to source table (total 3 commits)
+ sourceConfig.operation = WriteOperationType.BULK_INSERT;
+ for (int i = 0; i < 2; i++) {
+ // Create fresh config to avoid conflicts
+ HoodieDeltaStreamer.Config newSourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ newSourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key()
+ "=false");
+ newSourceConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(newSourceConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source has 3 commits and is at v6
+ HoodieTableMetaClient sourceMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(sourceTablePath)
+ .build();
+ assertEquals(3,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+ assertEquals(HoodieTableVersion.SIX,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 2: Setup target table at v6 and sync first 3 commits
+ HoodieDeltaStreamer.Config targetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+ targetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ targetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+ // Sync all 3 commits from source to target, one by one
+ HoodieDeltaStreamer targetStreamer = new HoodieDeltaStreamer(targetConfig,
jsc);
+ targetStreamer.sync();
+
+ // Verify checkpoint is established in V1 format
+ HoodieTableMetaClient targetMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(targetTablePath)
+ .build();
+ HoodieInstant lastTargetInstant =
targetMetaClient.getActiveTimeline().lastInstant().get();
+ Option<HoodieCommitMetadata> commitMetadata =
HoodieClientTestUtils.getCommitMetadataForInstant(
+ targetMetaClient, lastTargetInstant);
+ assertTrue(commitMetadata.isPresent());
+ // Checkpoint should be in V1 format
+
assertTrue(commitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+ String checkpointBeforeUpgrade =
commitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+ // Verify record counts match between source and target
+ long sourceRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(sourceTablePath)
+ .count();
+ long targetRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(targetTablePath)
+ .count();
+
+ assertEquals(sourceRecordCountOriginal, targetRecordCountOriginal,
+ "Target should have all records from source");
+
+ // Phase 3: Upgrade source table from v6 to target version
+ HoodieDeltaStreamer.Config upgradeConfig =
TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
+ upgradeConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() +
"=" + targetUpgradeVersion.versionCode());
+ upgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=true");
+ upgradeConfig.sourceLimit = 100;
+
+ // This sync will trigger the upgrade
+ sourceStreamer = new HoodieDeltaStreamer(upgradeConfig, jsc);
+ sourceStreamer.sync();
+
+ // Verify source table is now at target version - create fresh metaclient
after upgrade
+ sourceMetaClient = HoodieTableMetaClient.builder()
+ .setConf(HoodieTestUtils.getDefaultStorageConf())
+ .setBasePath(sourceTablePath)
+ .build();
+ assertEquals(targetUpgradeVersion,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 4: Add 2 more commits to upgraded source table
+ // After upgrade, don't specify version - let it use the existing table
version
+ for (int i = 0; i < 2; i++) {
+ HoodieDeltaStreamer.Config postUpgradeConfig =
TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
+
postUpgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+ postUpgradeConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(postUpgradeConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source now has 6 total commits
+ sourceMetaClient = HoodieTableMetaClient.reload(sourceMetaClient);
+ assertEquals(6,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+
+ // Phase 5: Resume incremental sync from target table (still at v6)
+ // Create base config following existing test patterns
+ HoodieDeltaStreamer.Config resumeTargetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+ resumeTargetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key()
+ "=" + HoodieTableVersion.SIX.versionCode());
+
resumeTargetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+ // This should successfully pull all remaining commits from upgraded source
+ targetStreamer = new HoodieDeltaStreamer(resumeTargetConfig, jsc);
+ targetStreamer.sync();
+
+ // Phase 6: Validate data integrity and checkpoint continuity
+ targetMetaClient.reloadActiveTimeline();
+ assertEquals(HoodieTableVersion.SIX,
targetMetaClient.getTableConfig().getTableVersion());
+
+ // Verify record counts match between source and target
+ long sourceRecordCount = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(sourceTablePath)
+ .count();
+ long targetRecordCount = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(targetTablePath)
+ .count();
+
+ assertEquals(sourceRecordCount, targetRecordCount,
+ "Target should have all records from source despite version
difference");
+
+ // Verify checkpoint was properly updated in target
+ HoodieInstant finalTargetInstant =
targetMetaClient.getActiveTimeline().lastInstant().get();
+ Option<HoodieCommitMetadata> finalCommitMetadata =
HoodieClientTestUtils.getCommitMetadataForInstant(
+ targetMetaClient, finalTargetInstant);
+ assertTrue(finalCommitMetadata.isPresent());
+ // Target still uses V1 checkpoint format
+
assertTrue(finalCommitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+ String finalCheckpoint =
finalCommitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+ // Checkpoint should have advanced from the pre-upgrade checkpoint
+ assertTrue(Long.parseLong(finalCheckpoint) >
Long.parseLong(checkpointBeforeUpgrade),
+ "Final checkpoint should be greater than checkpoint before
upgrade");
+
+ // Verify target has correct number of commits
+ assertEquals(2,
targetMetaClient.getActiveTimeline().getCommitsTimeline().countInstants(),
+ "Target should have 2 commits (as its batches 3 source table commits
into one target table commit)");
+
+ // Clean up
+ UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, sourceTablePath);
+ UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, targetTablePath);
+ }
+
+ @ParameterizedTest
Review Comment:
can we extend the test to also upgrade source table to V9 as last step. and
add few more commits to source table and do one round of validation.
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -3376,6 +3377,346 @@ void testErrorTableSourcePersist(WriteOperationType
writeOperationType, boolean
assertRecordCount(950, tableBasePath, sqlContext);
}
+ /**
+ * Test incremental source functionality when source table is upgraded from
v6 to v9
+ * while target table remains at v6. This validates backward compatibility
for cross-version
+ * incremental sync scenarios.
+ */
+ @Test
+ public void testIncrementalSourceWithSourceTableUpgradeFromV6ToV9() throws
Exception {
+ // Create unique paths for both tables
+ String sourceTablePath = basePath + "/source_table_v6_to_v9";
+ String targetTablePath = basePath + "/target_table_v6";
+
+ // Phase 1: Create source table at v6 with initial commits
+ HoodieDeltaStreamer.Config sourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ sourceConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ sourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+ sourceConfig.sourceLimit = 100;
+
+ // Initialize source table with first commit
+ HoodieDeltaStreamer sourceStreamer = new HoodieDeltaStreamer(sourceConfig,
jsc);
+ sourceStreamer.sync();
+
+ // Add 2 more commits to source table (total 3 commits)
+ sourceConfig.operation = WriteOperationType.BULK_INSERT;
+ for (int i = 0; i < 2; i++) {
+ // Create fresh config to avoid conflicts
+ HoodieDeltaStreamer.Config newSourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ newSourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key()
+ "=false");
+ newSourceConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(newSourceConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source has 3 commits and is at v6
+ HoodieTableMetaClient sourceMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(sourceTablePath)
+ .build();
+ assertEquals(3,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+ assertEquals(HoodieTableVersion.SIX,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 2: Setup target table at v6 and sync first 3 commits
+ HoodieDeltaStreamer.Config targetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+ targetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ targetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+ // Sync all 3 commits from source to target, one by one
+ HoodieDeltaStreamer targetStreamer = new HoodieDeltaStreamer(targetConfig,
jsc);
+ //try calling this once INSTEAD OF THREEE TIMES
+ targetStreamer.sync();
+
+
+ // Verify checkpoint is established in V1 format
+ HoodieTableMetaClient targetMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(targetTablePath)
+ .build();
+ HoodieInstant lastTargetInstant =
targetMetaClient.getActiveTimeline().lastInstant().get();
+ Option<HoodieCommitMetadata> commitMetadata =
HoodieClientTestUtils.getCommitMetadataForInstant(
+ targetMetaClient, lastTargetInstant);
+ assertTrue(commitMetadata.isPresent());
+ // Checkpoint should be in V1 format
+
assertTrue(commitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+ String checkpointBeforeUpgrade =
commitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+ // Verify record counts match between source and target
+ long sourceRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(sourceTablePath)
+ .count();
+ long targetRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(targetTablePath)
+ .count();
+
+ System.out.println("Source record count: " + sourceRecordCountOriginal);
+ System.out.println("Target record count: " + targetRecordCountOriginal);
+
+ assertEquals(sourceRecordCountOriginal, targetRecordCountOriginal,
+ "Target should have all records from source");
+
+
+ // Phase 3: Upgrade source table from v6 to v9
+ HoodieDeltaStreamer.Config upgradeConfig =
TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
+ upgradeConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() +
"=" + HoodieTableVersion.NINE.versionCode());
+ upgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=true");
+ upgradeConfig.sourceLimit = 100;
+
+ // This sync will trigger the upgrade
+ sourceStreamer = new HoodieDeltaStreamer(upgradeConfig, jsc);
+ sourceStreamer.sync();
+
+ // Verify source table is now v9 - create fresh metaclient after upgrade
+ sourceMetaClient = HoodieTableMetaClient.builder()
+ .setConf(HoodieTestUtils.getDefaultStorageConf())
+ .setBasePath(sourceTablePath)
+ .build();
Review Comment:
sg
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -3376,6 +3377,333 @@ void testErrorTableSourcePersist(WriteOperationType
writeOperationType, boolean
assertRecordCount(950, tableBasePath, sqlContext);
}
+ /**
+ * Test incremental source functionality when source table is upgraded from
v6 to v8/v9
+ * while target table remains at v6. This validates backward compatibility
for cross-version
+ * incremental sync scenarios.
+ */
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableVersion.class, names = {"EIGHT", "NINE"})
+ public void testIncrementalSourceWithSourceTableUpgrade(HoodieTableVersion
targetUpgradeVersion) throws Exception {
+ // Create unique paths for both tables
+ String sourceTablePath = basePath + "/source_table_v6_to_v" +
targetUpgradeVersion.versionCode();
+ String targetTablePath = basePath + "/target_table_v6_" +
targetUpgradeVersion.versionCode();
+
+ // Phase 1: Create source table at v6 with initial commits
+ HoodieDeltaStreamer.Config sourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ sourceConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ sourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+ sourceConfig.sourceLimit = 100;
+
+ // Initialize source table with first commit
+ HoodieDeltaStreamer sourceStreamer = new HoodieDeltaStreamer(sourceConfig,
jsc);
+ sourceStreamer.sync();
+
+ // Add 2 more commits to source table (total 3 commits)
+ sourceConfig.operation = WriteOperationType.BULK_INSERT;
+ for (int i = 0; i < 2; i++) {
+ // Create fresh config to avoid conflicts
+ HoodieDeltaStreamer.Config newSourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ newSourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key()
+ "=false");
+ newSourceConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(newSourceConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source has 3 commits and is at v6
+ HoodieTableMetaClient sourceMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(sourceTablePath)
+ .build();
+ assertEquals(3,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+ assertEquals(HoodieTableVersion.SIX,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 2: Setup target table at v6 and sync first 3 commits
+ HoodieDeltaStreamer.Config targetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+ targetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ targetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+ // Sync all 3 commits from source to target, one by one
+ HoodieDeltaStreamer targetStreamer = new HoodieDeltaStreamer(targetConfig,
jsc);
+ targetStreamer.sync();
+
+ // Verify checkpoint is established in V1 format
+ HoodieTableMetaClient targetMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(targetTablePath)
+ .build();
+ HoodieInstant lastTargetInstant =
targetMetaClient.getActiveTimeline().lastInstant().get();
+ Option<HoodieCommitMetadata> commitMetadata =
HoodieClientTestUtils.getCommitMetadataForInstant(
+ targetMetaClient, lastTargetInstant);
+ assertTrue(commitMetadata.isPresent());
+ // Checkpoint should be in V1 format
+
assertTrue(commitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+ String checkpointBeforeUpgrade =
commitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+ // Verify record counts match between source and target
+ long sourceRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(sourceTablePath)
+ .count();
+ long targetRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(targetTablePath)
+ .count();
+
+ assertEquals(sourceRecordCountOriginal, targetRecordCountOriginal,
+ "Target should have all records from source");
+
+ // Phase 3: Upgrade source table from v6 to target version
+ HoodieDeltaStreamer.Config upgradeConfig =
TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
+ upgradeConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() +
"=" + targetUpgradeVersion.versionCode());
+ upgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=true");
+ upgradeConfig.sourceLimit = 100;
+
+ // This sync will trigger the upgrade
+ sourceStreamer = new HoodieDeltaStreamer(upgradeConfig, jsc);
+ sourceStreamer.sync();
+
+ // Verify source table is now at target version - create fresh metaclient
after upgrade
+ sourceMetaClient = HoodieTableMetaClient.builder()
+ .setConf(HoodieTestUtils.getDefaultStorageConf())
+ .setBasePath(sourceTablePath)
+ .build();
+ assertEquals(targetUpgradeVersion,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 4: Add 2 more commits to upgraded source table
+ // After upgrade, don't specify version - let it use the existing table
version
+ for (int i = 0; i < 2; i++) {
+ HoodieDeltaStreamer.Config postUpgradeConfig =
TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
+
postUpgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+ postUpgradeConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(postUpgradeConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source now has 6 total commits
+ sourceMetaClient = HoodieTableMetaClient.reload(sourceMetaClient);
+ assertEquals(6,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+
+ // Phase 5: Resume incremental sync from target table (still at v6)
+ // Create base config following existing test patterns
+ HoodieDeltaStreamer.Config resumeTargetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+ resumeTargetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key()
+ "=" + HoodieTableVersion.SIX.versionCode());
+
resumeTargetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+ // This should successfully pull all remaining commits from upgraded source
+ targetStreamer = new HoodieDeltaStreamer(resumeTargetConfig, jsc);
+ targetStreamer.sync();
+
+ // Phase 6: Validate data integrity and checkpoint continuity
+ targetMetaClient.reloadActiveTimeline();
+ assertEquals(HoodieTableVersion.SIX,
targetMetaClient.getTableConfig().getTableVersion());
+
+ // Verify record counts match between source and target
+ long sourceRecordCount = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(sourceTablePath)
+ .count();
+ long targetRecordCount = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(targetTablePath)
+ .count();
+
+ assertEquals(sourceRecordCount, targetRecordCount,
+ "Target should have all records from source despite version
difference");
+
+ // Verify checkpoint was properly updated in target
+ HoodieInstant finalTargetInstant =
targetMetaClient.getActiveTimeline().lastInstant().get();
+ Option<HoodieCommitMetadata> finalCommitMetadata =
HoodieClientTestUtils.getCommitMetadataForInstant(
+ targetMetaClient, finalTargetInstant);
+ assertTrue(finalCommitMetadata.isPresent());
+ // Target still uses V1 checkpoint format
+
assertTrue(finalCommitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+ String finalCheckpoint =
finalCommitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+ // Checkpoint should have advanced from the pre-upgrade checkpoint
+ assertTrue(Long.parseLong(finalCheckpoint) >
Long.parseLong(checkpointBeforeUpgrade),
+ "Final checkpoint should be greater than checkpoint before
upgrade");
+
+ // Verify target has correct number of commits
+ assertEquals(2,
targetMetaClient.getActiveTimeline().getCommitsTimeline().countInstants(),
+ "Target should have 2 commits (as its batches 3 source table commits
into one target table commit)");
+
+ // Clean up
+ UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, sourceTablePath);
+ UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, targetTablePath);
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableVersion.class, names = {"EIGHT", "NINE"})
+ public void testIncrementalSourceWithTargetTableUpgrade(HoodieTableVersion
targetUpgradeVersion) throws Exception {
+ // Create unique paths for both tables
+ String sourceTablePath = basePath + "/source_table_v6_target_upgrade_" +
targetUpgradeVersion.versionCode();
+ String targetTablePath = basePath + "/target_table_v6_to_v" +
targetUpgradeVersion.versionCode();
+
+ // Phase 1: Create source table at v6 with initial commits
+ HoodieDeltaStreamer.Config sourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ sourceConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ sourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+ sourceConfig.sourceLimit = 100;
+
+ // Initialize source table with first commit
+ HoodieDeltaStreamer sourceStreamer = new HoodieDeltaStreamer(sourceConfig,
jsc);
+ sourceStreamer.sync();
+
+ // Add 2 more commits to source table (total 3 commits)
+ sourceConfig.operation = WriteOperationType.BULK_INSERT;
+ for (int i = 0; i < 2; i++) {
+ // Create fresh config to avoid conflicts
+ HoodieDeltaStreamer.Config newSourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ newSourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key()
+ "=false");
+ newSourceConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(newSourceConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source has 3 commits and is at v6
+ HoodieTableMetaClient sourceMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(sourceTablePath)
+ .build();
+ assertEquals(3,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+ assertEquals(HoodieTableVersion.SIX,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 2: Setup target table at v6 and sync first 3 commits
+ HoodieDeltaStreamer.Config targetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+ targetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ targetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+ // Sync all 3 commits from source to target
+ HoodieDeltaStreamer targetStreamer = new HoodieDeltaStreamer(targetConfig,
jsc);
+ targetStreamer.sync();
+
+ // Verify checkpoint is established in V1 format
+ HoodieTableMetaClient targetMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(targetTablePath)
+ .build();
+ HoodieInstant lastTargetInstant =
targetMetaClient.getActiveTimeline().lastInstant().get();
+ Option<HoodieCommitMetadata> commitMetadata =
HoodieClientTestUtils.getCommitMetadataForInstant(
+ targetMetaClient, lastTargetInstant);
+ assertTrue(commitMetadata.isPresent());
+ // Checkpoint should be in V1 format
+
assertTrue(commitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+ String checkpointBeforeUpgrade =
commitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+ // Verify record counts match between source and target
+ long sourceRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(sourceTablePath)
+ .count();
+ long targetRecordCountOriginal = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(targetTablePath)
+ .count();
+
+ assertEquals(sourceRecordCountOriginal, targetRecordCountOriginal,
+ "Target should have all records from source");
+
+ // Phase 3: Upgrade target table from v6 to target version
+ HoodieDeltaStreamer.Config upgradeTargetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+
upgradeTargetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() +
"=" + targetUpgradeVersion.versionCode());
+
upgradeTargetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=true");
+
upgradeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
upgradeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+ upgradeTargetConfig.allowCommitOnNoCheckpointChange = true; // Allow
commit even with no new data to trigger upgrade
+
+ // This sync will trigger the upgrade of target table and create a new
commit
+ targetStreamer = new HoodieDeltaStreamer(upgradeTargetConfig, jsc);
+ targetStreamer.sync();
+
+ // Verify target table is now at target version - create fresh metaclient
after upgrade
+ targetMetaClient = HoodieTableMetaClient.builder()
+ .setConf(HoodieTestUtils.getDefaultStorageConf())
+ .setBasePath(targetTablePath)
+ .build();
+ assertEquals(targetUpgradeVersion,
targetMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 4: Add 2 more commits to source table (keeping it at v6)
+ for (int i = 0; i < 2; i++) {
+ HoodieDeltaStreamer.Config postTargetUpgradeConfig =
TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
+
postTargetUpgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key()
+ "=false");
+ postTargetUpgradeConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(postTargetUpgradeConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source now has 5 total commits and is still at v6
+ sourceMetaClient = HoodieTableMetaClient.reload(sourceMetaClient);
+ assertEquals(5,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+ assertEquals(HoodieTableVersion.SIX,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 5: Resume incremental sync from upgraded target table (now at
target version)
+ // Create base config for resuming sync
+ HoodieDeltaStreamer.Config resumeTargetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+
resumeTargetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+ resumeTargetConfig.allowCommitOnNoCheckpointChange = true; // Allow commit
even with no new data to trigger upgrade
+
+ // This should successfully pull all remaining commits from v6 source to
upgraded target
+ targetStreamer = new HoodieDeltaStreamer(resumeTargetConfig, jsc);
+ targetStreamer.sync();
+
+ // Phase 6: Validate data integrity and checkpoint continuity
+ targetMetaClient.reloadActiveTimeline();
+ assertEquals(targetUpgradeVersion,
targetMetaClient.getTableConfig().getTableVersion());
+
+ // Verify record counts match between source and target
+ long sourceRecordCount = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(sourceTablePath)
+ .count();
+ long targetRecordCount = sqlContext.read()
+ .format("org.apache.hudi")
+ .load(targetTablePath)
+ .count();
+
+ assertEquals(sourceRecordCount, targetRecordCount,
+ "Target should have all records from source despite version
difference");
+
+ // Verify checkpoint was properly updated and migrated to V2 format after
upgrade
+ HoodieInstant finalTargetInstant =
targetMetaClient.getActiveTimeline().lastInstant().get();
+ Option<HoodieCommitMetadata> finalCommitMetadata =
HoodieClientTestUtils.getCommitMetadataForInstant(
+ targetMetaClient, finalTargetInstant);
+ assertTrue(finalCommitMetadata.isPresent());
+
+ // The first time after upgrading, target checkpoint read is still in v1
Review Comment:
sorry, is this expected to be in V1 or V2?
and how are we validating the checkpoint format?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]