waitingF commented on code in PR #8378:
URL: https://github.com/apache/hudi/pull/8378#discussion_r1174498569


##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -2604,6 +2605,59 @@ public void testForceEmptyMetaSync() throws Exception {
     assertTrue(hiveClient.tableExists(tableName), "Table " + tableName + " 
should exist");
   }
 
+  @Test
+  public void testResumeCheckpointAfterChangingCOW2MOR() throws Exception {
+    String tableBasePath = basePath + 
"/test_resume_checkpoint_after_changing_cow_to_mor";
+    // default table type is COW
+    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT);
+    new HoodieDeltaStreamer(cfg, jsc).sync();
+    TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
+    TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
+    TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
+
+    // change cow to mor
+    HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+            .setConf(new Configuration(fs.getConf()))
+            .setBasePath(cfg.targetBasePath)
+            .setLoadActiveTimelineOnLoad(false)
+            .build();
+    Properties hoodieProps = new Properties();
+    hoodieProps.load(fs.open(new Path(cfg.targetBasePath + 
"/.hoodie/hoodie.properties")));
+    LOG.info("old props: {}", hoodieProps);
+    hoodieProps.put("hoodie.table.type", HoodieTableType.MERGE_ON_READ.name());
+    LOG.info("new props: {}", hoodieProps);
+    Path metaPathDir = new Path(metaClient.getBasePathV2(), METAFOLDER_NAME);
+    HoodieTableConfig.create(metaClient.getFs(), metaPathDir, hoodieProps);
+
+    // continue deltastreamer
+    cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
+    cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
+    new HoodieDeltaStreamer(cfg, jsc).sync();
+    // out of 1000 new records, 500 are inserts, 450 are updates and 50 are 
deletes.

Review Comment:
   I just copied the testcase without any changes to current master.
   Then the testcase failed with the commit meta assert fail as expected.
   <img width="992" alt="image" 
src="https://user-images.githubusercontent.com/19326824/233815496-53643abf-e524-4657-8719-42e5d7b839d0.png";>
   <img width="1046" alt="image" 
src="https://user-images.githubusercontent.com/19326824/233815576-02aa45a7-49d0-4139-9ddb-ff6196dee174.png";>
   
   
   Here is the testcase without any changes.
   ```java
     @Test
     public void testResumeCheckpointAfterChangingCOW2MOR() throws Exception {
       String tableBasePath = basePath + 
"/test_resume_checkpoint_after_changing_cow_to_mor";
       // default table type is COW
       HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT);
       new HoodieDeltaStreamer(cfg, jsc).sync();
       TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
       TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
       TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
   
       // change cow to mor
       HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
           .setConf(new Configuration(fs.getConf()))
           .setBasePath(cfg.targetBasePath)
           .setLoadActiveTimelineOnLoad(false)
           .build();
       Properties hoodieProps = new Properties();
       hoodieProps.load(fs.open(new Path(cfg.targetBasePath + 
"/.hoodie/hoodie.properties")));
       LOG.info("old props: {}", hoodieProps);
       hoodieProps.put("hoodie.table.type", 
HoodieTableType.MERGE_ON_READ.name());
       LOG.info("new props: {}", hoodieProps);
       Path metaPathDir = new Path(metaClient.getBasePathV2(), METAFOLDER_NAME);
       HoodieTableConfig.create(metaClient.getFs(), metaPathDir, hoodieProps);
   
       // continue deltastreamer
       cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
       cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
       new HoodieDeltaStreamer(cfg, jsc).sync();
       // out of 1000 new records, 500 are inserts, 450 are updates and 50 are 
deletes.
       TestHelpers.assertRecordCount(1450, tableBasePath, sqlContext);
       TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
       List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, 
sqlContext);
       assertEquals(1450, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
       TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
       // currently there should be 1 deltacommits now
       TestHelpers.assertAtleastNDeltaCommits(1, tableBasePath, fs);
   
       // test the table type is already mor
       new HoodieDeltaStreamer(cfg, jsc).sync();
       // out of 1000 new records, 500 are inserts, 450 are updates and 50 are 
deletes.
       // total records should be 1900 now
       TestHelpers.assertRecordCount(1900, tableBasePath, sqlContext);
       TestHelpers.assertCommitMetadata("00002", tableBasePath, fs, 3);
       counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
       assertEquals(1900, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
       TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
       // currently there should be 2 deltacommits now
       TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath, fs);
   
       // clean up
       UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
     }
   ```
   
   
   
   
   > One option here is to read commit metadata of all commits in active 
timeline (with archiving disabled) and ensuring sum(all commit's 
fetchTotalRecordsWritten) == sum(input records).
   
   As for this, I think it won't make sense. Because the 
`fetchTotalRecordsWritten` will record all records that failed to write with a 
write error, and this value will merge the record data from existing files, so 
the `sum(all commit's fetchTotalRecordsWritten) == sum(input records)` should 
be false even not changing from COW to MOR
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to