linliu-code commented on code in PR #13954:
URL: https://github.com/apache/hudi/pull/13954#discussion_r2370414313
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -3376,6 +3377,346 @@ void testErrorTableSourcePersist(WriteOperationType
writeOperationType, boolean
assertRecordCount(950, tableBasePath, sqlContext);
}
+ /**
+ * Test incremental source functionality when source table is upgraded from
v6 to v9
+ * while target table remains at v6. This validates backward compatibility
for cross-version
+ * incremental sync scenarios.
+ */
+ @Test
+ public void testIncrementalSourceWithSourceTableUpgradeFromV6ToV9() throws
Exception {
+ // Create unique paths for both tables
+ String sourceTablePath = basePath + "/source_table_v6_to_v9";
+ String targetTablePath = basePath + "/target_table_v6";
+
+ // Phase 1: Create source table at v6 with initial commits
+ HoodieDeltaStreamer.Config sourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ sourceConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ sourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+ sourceConfig.sourceLimit = 100;
+
+ // Initialize source table with first commit
+ HoodieDeltaStreamer sourceStreamer = new HoodieDeltaStreamer(sourceConfig,
jsc);
+ sourceStreamer.sync();
+
+ // Add 2 more commits to source table (total 3 commits)
+ sourceConfig.operation = WriteOperationType.BULK_INSERT;
+ for (int i = 0; i < 2; i++) {
+ // Create fresh config to avoid conflicts
+ HoodieDeltaStreamer.Config newSourceConfig = TestHelpers.makeConfig(
+ sourceTablePath, WriteOperationType.BULK_INSERT);
+ newSourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key()
+ "=false");
+ newSourceConfig.sourceLimit = 100;
+ sourceStreamer = new HoodieDeltaStreamer(newSourceConfig, jsc);
+ sourceStreamer.sync();
+ }
+
+ // Verify source has 3 commits and is at v6
+ HoodieTableMetaClient sourceMetaClient = HoodieTableMetaClient.builder()
+ .setConf(context.getStorageConf())
+ .setBasePath(sourceTablePath)
+ .build();
+ assertEquals(3,
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+ assertEquals(HoodieTableVersion.SIX,
sourceMetaClient.getTableConfig().getTableVersion());
+
+ // Phase 2: Setup target table at v6 and sync first 3 commits
+ HoodieDeltaStreamer.Config targetConfig =
TestHelpers.makeConfigForHudiIncrSrc(
+ sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT,
false, null);
+ targetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "="
+ HoodieTableVersion.SIX.versionCode());
+ targetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() +
"=false");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+ // Sync all 3 commits from source to target, one by one
+ HoodieDeltaStreamer targetStreamer = new HoodieDeltaStreamer(targetConfig,
jsc);
+ //try calling this once INSTEAD OF THREEE TIMES
+ targetStreamer.sync();
+
Review Comment:
remove one blank line.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]