waitingF commented on code in PR #8378:
URL: https://github.com/apache/hudi/pull/8378#discussion_r1174498569
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -2604,6 +2605,59 @@ public void testForceEmptyMetaSync() throws Exception {
assertTrue(hiveClient.tableExists(tableName), "Table " + tableName + "
should exist");
}
+ @Test
+ public void testResumeCheckpointAfterChangingCOW2MOR() throws Exception {
+ String tableBasePath = basePath +
"/test_resume_checkpoint_after_changing_cow_to_mor";
+ // default table type is COW
+ HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.BULK_INSERT);
+ new HoodieDeltaStreamer(cfg, jsc).sync();
+ TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
+ TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
+ TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
+
+ // change cow to mor
+ HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+ .setConf(new Configuration(fs.getConf()))
+ .setBasePath(cfg.targetBasePath)
+ .setLoadActiveTimelineOnLoad(false)
+ .build();
+ Properties hoodieProps = new Properties();
+ hoodieProps.load(fs.open(new Path(cfg.targetBasePath +
"/.hoodie/hoodie.properties")));
+ LOG.info("old props: {}", hoodieProps);
+ hoodieProps.put("hoodie.table.type", HoodieTableType.MERGE_ON_READ.name());
+ LOG.info("new props: {}", hoodieProps);
+ Path metaPathDir = new Path(metaClient.getBasePathV2(), METAFOLDER_NAME);
+ HoodieTableConfig.create(metaClient.getFs(), metaPathDir, hoodieProps);
+
+ // continue deltastreamer
+ cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
+ cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
+ new HoodieDeltaStreamer(cfg, jsc).sync();
+ // out of 1000 new records, 500 are inserts, 450 are updates and 50 are
deletes.
Review Comment:
I just copied the testcase without any changes to current master.
Then the testcase failed with the commit meta assert fail as expected.
<img width="992" alt="image"
src="https://user-images.githubusercontent.com/19326824/233815496-53643abf-e524-4657-8719-42e5d7b839d0.png">
<img width="1046" alt="image"
src="https://user-images.githubusercontent.com/19326824/233815576-02aa45a7-49d0-4139-9ddb-ff6196dee174.png">
Here is the testcase without any changes.
```java
@Test
public void testResumeCheckpointAfterChangingCOW2MOR() throws Exception {
String tableBasePath = basePath +
"/test_resume_checkpoint_after_changing_cow_to_mor";
// default table type is COW
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.BULK_INSERT);
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
// change cow to mor
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(new Configuration(fs.getConf()))
.setBasePath(cfg.targetBasePath)
.setLoadActiveTimelineOnLoad(false)
.build();
Properties hoodieProps = new Properties();
hoodieProps.load(fs.open(new Path(cfg.targetBasePath +
"/.hoodie/hoodie.properties")));
LOG.info("old props: {}", hoodieProps);
hoodieProps.put("hoodie.table.type",
HoodieTableType.MERGE_ON_READ.name());
LOG.info("new props: {}", hoodieProps);
Path metaPathDir = new Path(metaClient.getBasePathV2(), METAFOLDER_NAME);
HoodieTableConfig.create(metaClient.getFs(), metaPathDir, hoodieProps);
// continue deltastreamer
cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
new HoodieDeltaStreamer(cfg, jsc).sync();
// out of 1000 new records, 500 are inserts, 450 are updates and 50 are
deletes.
TestHelpers.assertRecordCount(1450, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
List<Row> counts = TestHelpers.countsPerCommit(tableBasePath,
sqlContext);
assertEquals(1450, counts.stream().mapToLong(entry ->
entry.getLong(1)).sum());
TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
// currently there should be 1 deltacommits now
TestHelpers.assertAtleastNDeltaCommits(1, tableBasePath, fs);
// test the table type is already mor
new HoodieDeltaStreamer(cfg, jsc).sync();
// out of 1000 new records, 500 are inserts, 450 are updates and 50 are
deletes.
// total records should be 1900 now
TestHelpers.assertRecordCount(1900, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00002", tableBasePath, fs, 3);
counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
assertEquals(1900, counts.stream().mapToLong(entry ->
entry.getLong(1)).sum());
TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
// currently there should be 2 deltacommits now
TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath, fs);
// clean up
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}
```
> One option here is to read commit metadata of all commits in active
timeline (with archiving disabled) and ensuring sum(all commit's
fetchTotalRecordsWritten) == sum(input records).
As for this, I think it won't make sense. Because the
`fetchTotalRecordsWritten` will record all records that failed to write with a
write error, and this value will merge the record data from existing files, so
the `sum(all commit's fetchTotalRecordsWritten) == sum(input records)` should
be false even not changing from COW to MOR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]