waitingF commented on code in PR #8378:
URL: https://github.com/apache/hudi/pull/8378#discussion_r1169632568
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -2604,6 +2605,59 @@ public void testForceEmptyMetaSync() throws Exception {
assertTrue(hiveClient.tableExists(tableName), "Table " + tableName + "
should exist");
}
+ @Test
+ public void testResumeCheckpointAfterChangingCOW2MOR() throws Exception {
+ String tableBasePath = basePath +
"/test_resume_checkpoint_after_changing_cow_to_mor";
+ // default table type is COW
+ HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.BULK_INSERT);
+ new HoodieDeltaStreamer(cfg, jsc).sync();
+ TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
+ TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
+ TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
+
+ // change cow to mor
+ HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+ .setConf(new Configuration(fs.getConf()))
+ .setBasePath(cfg.targetBasePath)
+ .setLoadActiveTimelineOnLoad(false)
+ .build();
+ Properties hoodieProps = new Properties();
+ hoodieProps.load(fs.open(new Path(cfg.targetBasePath +
"/.hoodie/hoodie.properties")));
+ LOG.info("old props: {}", hoodieProps);
+ hoodieProps.put("hoodie.table.type", HoodieTableType.MERGE_ON_READ.name());
+ LOG.info("new props: {}", hoodieProps);
+ Path metaPathDir = new Path(metaClient.getBasePathV2(), METAFOLDER_NAME);
+ HoodieTableConfig.create(metaClient.getFs(), metaPathDir, hoodieProps);
+
+ // continue deltastreamer
+ cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
+ cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
+ new HoodieDeltaStreamer(cfg, jsc).sync();
+ // out of 1000 new records, 500 are inserts, 450 are updates and 50 are
deletes.
Review Comment:
Sorry, I dont know how to test current master in this branch. But I copied
the test case (made some modifications) to master and run test.
here is the copied case to master
```java
@Test
public void testResumeCheckpointAfterChangingCOW2MOR() throws Exception {
String tableBasePath = basePath +
"/test_resume_checkpoint_after_changing_cow_to_mor";
// default table type is COW
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.BULK_INSERT);
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
// change cow to mor
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(new Configuration(fs.getConf()))
.setBasePath(cfg.targetBasePath)
.setLoadActiveTimelineOnLoad(false)
.build();
Properties hoodieProps = new Properties();
hoodieProps.load(fs.open(new Path(cfg.targetBasePath +
"/.hoodie/hoodie.properties")));
LOG.info("old props: {}", hoodieProps);
hoodieProps.put("hoodie.table.type",
HoodieTableType.MERGE_ON_READ.name());
LOG.info("new props: {}", hoodieProps);
Path metaPathDir = new Path(metaClient.getBasePathV2(), METAFOLDER_NAME);
HoodieTableConfig.create(metaClient.getFs(), metaPathDir, hoodieProps);
// continue deltastreamer
cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
new HoodieDeltaStreamer(cfg, jsc).sync();
// out of 1000 new records, 500 are inserts, 450 are updates and 50 are
deletes.
TestHelpers.assertRecordCount(1450, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 2); //
here changed. the first time sync after changing from cow to mor, the
checkpoint lost, so the checkpoint here should be 00000
List<Row> counts = TestHelpers.countsPerCommit(tableBasePath,
sqlContext);
assertEquals(1450, counts.stream().mapToLong(entry ->
entry.getLong(1)).sum());
TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
// currently there should be 1 deltacommits now
TestHelpers.assertAtleastNDeltaCommits(1, tableBasePath, fs);
// test the table type is already mor
new HoodieDeltaStreamer(cfg, jsc).sync();
// out of 1000 new records, 500 are inserts, 450 are updates and 50 are
deletes.
// total records should be 1900 now
TestHelpers.assertRecordCount(1900, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 3); //
here changed. the second time sync after changing, the checkpoint should be
00001
counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
assertEquals(1900, counts.stream().mapToLong(entry ->
entry.getLong(1)).sum());
TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
// currently there should be 2 deltacommits now
TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath, fs);
// clean up
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}
```
And the test passed.
<img width="1286" alt="image"
src="https://user-images.githubusercontent.com/19326824/232707466-85af9c13-d0e7-4d5d-a813-35cb441a0158.png">
Is this ok to assert failure in master?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]