linliu-code commented on code in PR #13954:
URL: https://github.com/apache/hudi/pull/13954#discussion_r2370417300


##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -3376,6 +3377,346 @@ void testErrorTableSourcePersist(WriteOperationType 
writeOperationType, boolean
     assertRecordCount(950, tableBasePath, sqlContext);
   }
 
+  /**
+   * Test incremental source functionality when source table is upgraded from 
v6 to v9
+   * while target table remains at v6. This validates backward compatibility 
for cross-version
+   * incremental sync scenarios.
+   */
+  @Test
+  public void testIncrementalSourceWithSourceTableUpgradeFromV6ToV9() throws 
Exception {
+    // Create unique paths for both tables
+    String sourceTablePath = basePath + "/source_table_v6_to_v9";
+    String targetTablePath = basePath + "/target_table_v6";
+
+    // Phase 1: Create source table at v6 with initial commits
+    HoodieDeltaStreamer.Config sourceConfig = TestHelpers.makeConfig(
+        sourceTablePath, WriteOperationType.BULK_INSERT);
+    sourceConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "=" 
+ HoodieTableVersion.SIX.versionCode());
+    sourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() + 
"=false");
+    sourceConfig.sourceLimit = 100;
+
+    // Initialize source table with first commit
+    HoodieDeltaStreamer sourceStreamer = new HoodieDeltaStreamer(sourceConfig, 
jsc);
+    sourceStreamer.sync();
+
+    // Add 2 more commits to source table (total 3 commits)
+    sourceConfig.operation = WriteOperationType.BULK_INSERT;
+    for (int i = 0; i < 2; i++) {
+      // Create fresh config to avoid conflicts
+      HoodieDeltaStreamer.Config newSourceConfig = TestHelpers.makeConfig(
+          sourceTablePath, WriteOperationType.BULK_INSERT);
+      newSourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() 
+ "=false");
+      newSourceConfig.sourceLimit = 100;
+      sourceStreamer = new HoodieDeltaStreamer(newSourceConfig, jsc);
+      sourceStreamer.sync();
+    }
+
+    // Verify source has 3 commits and is at v6
+    HoodieTableMetaClient sourceMetaClient = HoodieTableMetaClient.builder()
+        .setConf(context.getStorageConf())
+        .setBasePath(sourceTablePath)
+        .build();
+    assertEquals(3, 
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+    assertEquals(HoodieTableVersion.SIX, 
sourceMetaClient.getTableConfig().getTableVersion());
+
+    // Phase 2: Setup target table at v6 and sync first 3 commits
+    HoodieDeltaStreamer.Config targetConfig = 
TestHelpers.makeConfigForHudiIncrSrc(
+        sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT, 
false, null);
+    targetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "=" 
+ HoodieTableVersion.SIX.versionCode());
+    targetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() + 
"=false");
+    
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+    
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+    // Sync all 3 commits from source to target, one by one
+    HoodieDeltaStreamer targetStreamer = new HoodieDeltaStreamer(targetConfig, 
jsc);
+    //try calling this once INSTEAD OF THREEE TIMES
+    targetStreamer.sync();
+
+
+    // Verify checkpoint is established in V1 format
+    HoodieTableMetaClient targetMetaClient = HoodieTableMetaClient.builder()
+        .setConf(context.getStorageConf())
+        .setBasePath(targetTablePath)
+        .build();
+    HoodieInstant lastTargetInstant = 
targetMetaClient.getActiveTimeline().lastInstant().get();
+    Option<HoodieCommitMetadata> commitMetadata = 
HoodieClientTestUtils.getCommitMetadataForInstant(
+        targetMetaClient, lastTargetInstant);
+    assertTrue(commitMetadata.isPresent());
+    // Checkpoint should be in V1 format
+    
assertTrue(commitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+    String checkpointBeforeUpgrade = 
commitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+    // Verify record counts match between source and target
+    long sourceRecordCountOriginal = sqlContext.read()
+            .format("org.apache.hudi")
+            .load(sourceTablePath)
+            .count();
+    long targetRecordCountOriginal = sqlContext.read()
+            .format("org.apache.hudi")
+            .load(targetTablePath)
+            .count();
+
+    System.out.println("Source record count: " + sourceRecordCountOriginal);
+    System.out.println("Target record count: " + targetRecordCountOriginal);
+
+    assertEquals(sourceRecordCountOriginal, targetRecordCountOriginal,
+            "Target should have all records from source");
+
+
+    // Phase 3: Upgrade source table from v6 to v9
+    HoodieDeltaStreamer.Config upgradeConfig = 
TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
+    upgradeConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + 
"=" + HoodieTableVersion.NINE.versionCode());
+    upgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() + 
"=true");
+    upgradeConfig.sourceLimit = 100;
+
+    // This sync will trigger the upgrade
+    sourceStreamer = new HoodieDeltaStreamer(upgradeConfig, jsc);
+    sourceStreamer.sync();
+
+    // Verify source table is now v9 - create fresh metaclient after upgrade
+    sourceMetaClient = HoodieTableMetaClient.builder()
+        .setConf(HoodieTestUtils.getDefaultStorageConf())
+        .setBasePath(sourceTablePath)
+        .build();
+    assertEquals(HoodieTableVersion.NINE, 
sourceMetaClient.getTableConfig().getTableVersion());
+
+    // Phase 4: Add 2 more commits to upgraded source table
+    // After upgrade, don't specify version - let it use the existing table 
version
+    for (int i = 0; i < 2; i++) {
+      HoodieDeltaStreamer.Config postUpgradeConfig = 
TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
+      
postUpgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() + 
"=false");
+      postUpgradeConfig.sourceLimit = 100;
+      sourceStreamer = new HoodieDeltaStreamer(postUpgradeConfig, jsc);
+      sourceStreamer.sync();
+    }
+
+    // Verify source now has 6 total commits
+    sourceMetaClient = HoodieTableMetaClient.reload(sourceMetaClient);
+    assertEquals(6, 
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+
+    // Phase 5: Resume incremental sync from target table (still at v6)
+    // Create base config following existing test patterns
+    HoodieDeltaStreamer.Config resumeTargetConfig = 
TestHelpers.makeConfigForHudiIncrSrc(
+        sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT, 
false, null);
+    resumeTargetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() 
+ "=" + HoodieTableVersion.SIX.versionCode());
+    
resumeTargetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() + 
"=false");
+    
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+    
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+    // This should successfully pull all remaining commits from upgraded source
+    targetStreamer = new HoodieDeltaStreamer(resumeTargetConfig, jsc);
+    targetStreamer.sync();
+
+    // Phase 6: Validate data integrity and checkpoint continuity
+    targetMetaClient.reloadActiveTimeline();
+    assertEquals(HoodieTableVersion.SIX, 
targetMetaClient.getTableConfig().getTableVersion());
+
+    // Verify record counts match between source and target
+    long sourceRecordCount = sqlContext.read()
+        .format("org.apache.hudi")
+        .load(sourceTablePath)
+        .count();
+    long targetRecordCount = sqlContext.read()
+        .format("org.apache.hudi")
+        .load(targetTablePath)
+        .count();
+
+    System.out.println("Source record count: " + sourceRecordCount);
+    System.out.println("Target record count: " + targetRecordCount);
+
+    assertEquals(sourceRecordCount, targetRecordCount,
+        "Target should have all records from source despite version 
difference");
+
+    // Verify checkpoint was properly updated in target
+    HoodieInstant finalTargetInstant = 
targetMetaClient.getActiveTimeline().lastInstant().get();
+    Option<HoodieCommitMetadata> finalCommitMetadata = 
HoodieClientTestUtils.getCommitMetadataForInstant(
+        targetMetaClient, finalTargetInstant);
+    assertTrue(finalCommitMetadata.isPresent());
+    // Target still uses V1 checkpoint format
+    
assertTrue(finalCommitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+    String finalCheckpoint = 
finalCommitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+    // Checkpoint should have advanced from the pre-upgrade checkpoint
+    assertNotEquals(checkpointBeforeUpgrade, finalCheckpoint,
+        "Checkpoint should have advanced after consuming new commits");
+
+    // Verify target has correct number of commits
+    assertEquals(2, 
targetMetaClient.getActiveTimeline().getCommitsTimeline().countInstants(),
+        "Target should have 2 commits (as its batches 3 source table commits 
into one target table commit)");
+
+    // Clean up
+    UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, sourceTablePath);
+    UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, targetTablePath);
+  }
+
+  @Test
+  public void testIncrementalSourceWithTargetTableUpgradeFromV6ToV9() throws 
Exception {
+    // Create unique paths for both tables
+    String sourceTablePath = basePath + "/source_table_v6_target_upgrade";
+    String targetTablePath = basePath + "/target_table_v6_to_v9";
+
+    // Phase 1: Create source table at v6 with initial commits
+    HoodieDeltaStreamer.Config sourceConfig = TestHelpers.makeConfig(
+        sourceTablePath, WriteOperationType.BULK_INSERT);
+    sourceConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "=" 
+ HoodieTableVersion.SIX.versionCode());
+    sourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() + 
"=false");
+    sourceConfig.sourceLimit = 100;
+
+    // Initialize source table with first commit
+    HoodieDeltaStreamer sourceStreamer = new HoodieDeltaStreamer(sourceConfig, 
jsc);
+    sourceStreamer.sync();
+
+    // Add 2 more commits to source table (total 3 commits)
+    sourceConfig.operation = WriteOperationType.BULK_INSERT;
+    for (int i = 0; i < 2; i++) {
+      // Create fresh config to avoid conflicts
+      HoodieDeltaStreamer.Config newSourceConfig = TestHelpers.makeConfig(
+          sourceTablePath, WriteOperationType.BULK_INSERT);
+      newSourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() 
+ "=false");
+      newSourceConfig.sourceLimit = 100;
+      sourceStreamer = new HoodieDeltaStreamer(newSourceConfig, jsc);
+      sourceStreamer.sync();
+    }
+
+    // Verify source has 3 commits and is at v6
+    HoodieTableMetaClient sourceMetaClient = HoodieTableMetaClient.builder()
+        .setConf(context.getStorageConf())
+        .setBasePath(sourceTablePath)
+        .build();
+    assertEquals(3, 
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+    assertEquals(HoodieTableVersion.SIX, 
sourceMetaClient.getTableConfig().getTableVersion());
+
+    // Phase 2: Setup target table at v6 and sync first 3 commits
+    HoodieDeltaStreamer.Config targetConfig = 
TestHelpers.makeConfigForHudiIncrSrc(
+        sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT, 
false, null);
+    targetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "=" 
+ HoodieTableVersion.SIX.versionCode());
+    targetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() + 
"=false");
+    
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+    
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+    // Sync all 3 commits from source to target
+    HoodieDeltaStreamer targetStreamer = new HoodieDeltaStreamer(targetConfig, 
jsc);
+    targetStreamer.sync();
+
+    // Verify checkpoint is established in V1 format
+    HoodieTableMetaClient targetMetaClient = HoodieTableMetaClient.builder()
+        .setConf(context.getStorageConf())
+        .setBasePath(targetTablePath)
+        .build();
+    HoodieInstant lastTargetInstant = 
targetMetaClient.getActiveTimeline().lastInstant().get();
+    Option<HoodieCommitMetadata> commitMetadata = 
HoodieClientTestUtils.getCommitMetadataForInstant(
+        targetMetaClient, lastTargetInstant);
+    assertTrue(commitMetadata.isPresent());
+    // Checkpoint should be in V1 format
+    
assertTrue(commitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+    String checkpointBeforeUpgrade = 
commitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+    // Verify record counts match between source and target
+    long sourceRecordCountOriginal = sqlContext.read()
+            .format("org.apache.hudi")
+            .load(sourceTablePath)
+            .count();
+    long targetRecordCountOriginal = sqlContext.read()
+            .format("org.apache.hudi")
+            .load(targetTablePath)
+            .count();
+
+    System.out.println("Source record count: " + sourceRecordCountOriginal);
+    System.out.println("Target record count: " + targetRecordCountOriginal);
+
+    assertEquals(sourceRecordCountOriginal, targetRecordCountOriginal,
+            "Target should have all records from source");
+
+    // Phase 3: Upgrade target table from v6 to v9
+    HoodieDeltaStreamer.Config upgradeTargetConfig = 
TestHelpers.makeConfigForHudiIncrSrc(

Review Comment:
   Do we need to consider upgrade to v8, having some commit, and then upgrade 
to v9?



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -3376,6 +3377,346 @@ void testErrorTableSourcePersist(WriteOperationType 
writeOperationType, boolean
     assertRecordCount(950, tableBasePath, sqlContext);
   }
 
+  /**
+   * Test incremental source functionality when source table is upgraded from 
v6 to v9
+   * while target table remains at v6. This validates backward compatibility 
for cross-version
+   * incremental sync scenarios.
+   */
+  @Test
+  public void testIncrementalSourceWithSourceTableUpgradeFromV6ToV9() throws 
Exception {
+    // Create unique paths for both tables
+    String sourceTablePath = basePath + "/source_table_v6_to_v9";
+    String targetTablePath = basePath + "/target_table_v6";
+
+    // Phase 1: Create source table at v6 with initial commits
+    HoodieDeltaStreamer.Config sourceConfig = TestHelpers.makeConfig(
+        sourceTablePath, WriteOperationType.BULK_INSERT);
+    sourceConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "=" 
+ HoodieTableVersion.SIX.versionCode());
+    sourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() + 
"=false");
+    sourceConfig.sourceLimit = 100;
+
+    // Initialize source table with first commit
+    HoodieDeltaStreamer sourceStreamer = new HoodieDeltaStreamer(sourceConfig, 
jsc);
+    sourceStreamer.sync();
+
+    // Add 2 more commits to source table (total 3 commits)
+    sourceConfig.operation = WriteOperationType.BULK_INSERT;
+    for (int i = 0; i < 2; i++) {
+      // Create fresh config to avoid conflicts
+      HoodieDeltaStreamer.Config newSourceConfig = TestHelpers.makeConfig(
+          sourceTablePath, WriteOperationType.BULK_INSERT);
+      newSourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() 
+ "=false");
+      newSourceConfig.sourceLimit = 100;
+      sourceStreamer = new HoodieDeltaStreamer(newSourceConfig, jsc);
+      sourceStreamer.sync();
+    }
+
+    // Verify source has 3 commits and is at v6
+    HoodieTableMetaClient sourceMetaClient = HoodieTableMetaClient.builder()
+        .setConf(context.getStorageConf())
+        .setBasePath(sourceTablePath)
+        .build();
+    assertEquals(3, 
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+    assertEquals(HoodieTableVersion.SIX, 
sourceMetaClient.getTableConfig().getTableVersion());
+
+    // Phase 2: Setup target table at v6 and sync first 3 commits
+    HoodieDeltaStreamer.Config targetConfig = 
TestHelpers.makeConfigForHudiIncrSrc(
+        sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT, 
false, null);
+    targetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "=" 
+ HoodieTableVersion.SIX.versionCode());
+    targetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() + 
"=false");
+    
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+    
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+    // Sync all 3 commits from source to target, one by one
+    HoodieDeltaStreamer targetStreamer = new HoodieDeltaStreamer(targetConfig, 
jsc);
+    //try calling this once INSTEAD OF THREEE TIMES
+    targetStreamer.sync();
+
+
+    // Verify checkpoint is established in V1 format
+    HoodieTableMetaClient targetMetaClient = HoodieTableMetaClient.builder()
+        .setConf(context.getStorageConf())
+        .setBasePath(targetTablePath)
+        .build();
+    HoodieInstant lastTargetInstant = 
targetMetaClient.getActiveTimeline().lastInstant().get();
+    Option<HoodieCommitMetadata> commitMetadata = 
HoodieClientTestUtils.getCommitMetadataForInstant(
+        targetMetaClient, lastTargetInstant);
+    assertTrue(commitMetadata.isPresent());
+    // Checkpoint should be in V1 format
+    
assertTrue(commitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+    String checkpointBeforeUpgrade = 
commitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+    // Verify record counts match between source and target
+    long sourceRecordCountOriginal = sqlContext.read()
+            .format("org.apache.hudi")
+            .load(sourceTablePath)
+            .count();
+    long targetRecordCountOriginal = sqlContext.read()
+            .format("org.apache.hudi")
+            .load(targetTablePath)
+            .count();
+
+    System.out.println("Source record count: " + sourceRecordCountOriginal);
+    System.out.println("Target record count: " + targetRecordCountOriginal);
+
+    assertEquals(sourceRecordCountOriginal, targetRecordCountOriginal,
+            "Target should have all records from source");
+
+
+    // Phase 3: Upgrade source table from v6 to v9
+    HoodieDeltaStreamer.Config upgradeConfig = 
TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
+    upgradeConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + 
"=" + HoodieTableVersion.NINE.versionCode());
+    upgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() + 
"=true");
+    upgradeConfig.sourceLimit = 100;
+
+    // This sync will trigger the upgrade
+    sourceStreamer = new HoodieDeltaStreamer(upgradeConfig, jsc);
+    sourceStreamer.sync();
+
+    // Verify source table is now v9 - create fresh metaclient after upgrade
+    sourceMetaClient = HoodieTableMetaClient.builder()
+        .setConf(HoodieTestUtils.getDefaultStorageConf())
+        .setBasePath(sourceTablePath)
+        .build();
+    assertEquals(HoodieTableVersion.NINE, 
sourceMetaClient.getTableConfig().getTableVersion());
+
+    // Phase 4: Add 2 more commits to upgraded source table
+    // After upgrade, don't specify version - let it use the existing table 
version
+    for (int i = 0; i < 2; i++) {
+      HoodieDeltaStreamer.Config postUpgradeConfig = 
TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
+      
postUpgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() + 
"=false");
+      postUpgradeConfig.sourceLimit = 100;
+      sourceStreamer = new HoodieDeltaStreamer(postUpgradeConfig, jsc);
+      sourceStreamer.sync();
+    }
+
+    // Verify source now has 6 total commits
+    sourceMetaClient = HoodieTableMetaClient.reload(sourceMetaClient);
+    assertEquals(6, 
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+
+    // Phase 5: Resume incremental sync from target table (still at v6)
+    // Create base config following existing test patterns
+    HoodieDeltaStreamer.Config resumeTargetConfig = 
TestHelpers.makeConfigForHudiIncrSrc(
+        sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT, 
false, null);
+    resumeTargetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() 
+ "=" + HoodieTableVersion.SIX.versionCode());
+    
resumeTargetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() + 
"=false");
+    
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+    
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+    // This should successfully pull all remaining commits from upgraded source
+    targetStreamer = new HoodieDeltaStreamer(resumeTargetConfig, jsc);
+    targetStreamer.sync();
+
+    // Phase 6: Validate data integrity and checkpoint continuity
+    targetMetaClient.reloadActiveTimeline();
+    assertEquals(HoodieTableVersion.SIX, 
targetMetaClient.getTableConfig().getTableVersion());
+
+    // Verify record counts match between source and target
+    long sourceRecordCount = sqlContext.read()
+        .format("org.apache.hudi")
+        .load(sourceTablePath)
+        .count();
+    long targetRecordCount = sqlContext.read()
+        .format("org.apache.hudi")
+        .load(targetTablePath)
+        .count();
+
+    System.out.println("Source record count: " + sourceRecordCount);
+    System.out.println("Target record count: " + targetRecordCount);
+
+    assertEquals(sourceRecordCount, targetRecordCount,
+        "Target should have all records from source despite version 
difference");
+
+    // Verify checkpoint was properly updated in target
+    HoodieInstant finalTargetInstant = 
targetMetaClient.getActiveTimeline().lastInstant().get();
+    Option<HoodieCommitMetadata> finalCommitMetadata = 
HoodieClientTestUtils.getCommitMetadataForInstant(
+        targetMetaClient, finalTargetInstant);
+    assertTrue(finalCommitMetadata.isPresent());
+    // Target still uses V1 checkpoint format
+    
assertTrue(finalCommitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+    String finalCheckpoint = 
finalCommitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+    // Checkpoint should have advanced from the pre-upgrade checkpoint
+    assertNotEquals(checkpointBeforeUpgrade, finalCheckpoint,
+        "Checkpoint should have advanced after consuming new commits");
+
+    // Verify target has correct number of commits
+    assertEquals(2, 
targetMetaClient.getActiveTimeline().getCommitsTimeline().countInstants(),
+        "Target should have 2 commits (as its batches 3 source table commits 
into one target table commit)");
+
+    // Clean up
+    UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, sourceTablePath);
+    UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, targetTablePath);
+  }
+
+  @Test
+  public void testIncrementalSourceWithTargetTableUpgradeFromV6ToV9() throws 
Exception {
+    // Create unique paths for both tables
+    String sourceTablePath = basePath + "/source_table_v6_target_upgrade";
+    String targetTablePath = basePath + "/target_table_v6_to_v9";
+
+    // Phase 1: Create source table at v6 with initial commits
+    HoodieDeltaStreamer.Config sourceConfig = TestHelpers.makeConfig(
+        sourceTablePath, WriteOperationType.BULK_INSERT);
+    sourceConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "=" 
+ HoodieTableVersion.SIX.versionCode());
+    sourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() + 
"=false");
+    sourceConfig.sourceLimit = 100;
+
+    // Initialize source table with first commit
+    HoodieDeltaStreamer sourceStreamer = new HoodieDeltaStreamer(sourceConfig, 
jsc);
+    sourceStreamer.sync();
+
+    // Add 2 more commits to source table (total 3 commits)
+    sourceConfig.operation = WriteOperationType.BULK_INSERT;
+    for (int i = 0; i < 2; i++) {
+      // Create fresh config to avoid conflicts
+      HoodieDeltaStreamer.Config newSourceConfig = TestHelpers.makeConfig(
+          sourceTablePath, WriteOperationType.BULK_INSERT);
+      newSourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() 
+ "=false");
+      newSourceConfig.sourceLimit = 100;
+      sourceStreamer = new HoodieDeltaStreamer(newSourceConfig, jsc);
+      sourceStreamer.sync();
+    }
+
+    // Verify source has 3 commits and is at v6
+    HoodieTableMetaClient sourceMetaClient = HoodieTableMetaClient.builder()
+        .setConf(context.getStorageConf())
+        .setBasePath(sourceTablePath)
+        .build();
+    assertEquals(3, 
sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+    assertEquals(HoodieTableVersion.SIX, 
sourceMetaClient.getTableConfig().getTableVersion());
+
+    // Phase 2: Setup target table at v6 and sync first 3 commits
+    HoodieDeltaStreamer.Config targetConfig = 
TestHelpers.makeConfigForHudiIncrSrc(
+        sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT, 
false, null);
+    targetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "=" 
+ HoodieTableVersion.SIX.versionCode());
+    targetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() + 
"=false");
+    
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
+    
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
+
+    // Sync all 3 commits from source to target
+    HoodieDeltaStreamer targetStreamer = new HoodieDeltaStreamer(targetConfig, 
jsc);
+    targetStreamer.sync();
+
+    // Verify checkpoint is established in V1 format
+    HoodieTableMetaClient targetMetaClient = HoodieTableMetaClient.builder()
+        .setConf(context.getStorageConf())
+        .setBasePath(targetTablePath)
+        .build();
+    HoodieInstant lastTargetInstant = 
targetMetaClient.getActiveTimeline().lastInstant().get();
+    Option<HoodieCommitMetadata> commitMetadata = 
HoodieClientTestUtils.getCommitMetadataForInstant(
+        targetMetaClient, lastTargetInstant);
+    assertTrue(commitMetadata.isPresent());
+    // Checkpoint should be in V1 format
+    
assertTrue(commitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
+    String checkpointBeforeUpgrade = 
commitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);
+
+    // Verify record counts match between source and target
+    long sourceRecordCountOriginal = sqlContext.read()
+            .format("org.apache.hudi")
+            .load(sourceTablePath)
+            .count();
+    long targetRecordCountOriginal = sqlContext.read()
+            .format("org.apache.hudi")
+            .load(targetTablePath)
+            .count();
+
+    System.out.println("Source record count: " + sourceRecordCountOriginal);
+    System.out.println("Target record count: " + targetRecordCountOriginal);

Review Comment:
   remove



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to