codope commented on code in PR #12746:
URL: https://github.com/apache/hudi/pull/12746#discussion_r1937352554
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java:
##########
@@ -222,6 +225,96 @@ public void testUpsertPartitioner(boolean
populateMetaFields) throws Exception {
}
}
+ @Test
+ public void testUpsertPartitionerWithTableVersion6() throws Exception {
+ HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true);
+ addConfigsForPopulateMetaFields(cfgBuilder, true);
+ cfgBuilder.withWriteTableVersion(6);
+ HoodieWriteConfig cfg = cfgBuilder.build();
+
+ // create meta client w/ the table version 6
+ Properties props = getPropertiesForKeyGen(true);
+ props.put(WRITE_TABLE_VERSION.key(), "6");
+ metaClient = getHoodieMetaClient(storageConf(), basePath(), props,
HoodieTableType.MERGE_ON_READ);
+ dataGen = new HoodieTestDataGenerator();
+
+ try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
+ // batch 1 insert
+ String newCommitTime = "001";
+ client.startCommitWithTime(newCommitTime);
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
+ JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
+ List<WriteStatus> statuses = client.upsert(writeRecords,
newCommitTime).collect();
+ assertNoWriteErrors(statuses);
+
+ HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(),
metaClient);
+
+ Option<HoodieInstant> deltaCommit =
metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
+ assertTrue(deltaCommit.isPresent());
+ assertEquals("001", deltaCommit.get().requestedTime(), "Delta commit
should be 001");
+
+ Option<HoodieInstant> commit =
metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant();
+ assertFalse(commit.isPresent());
+
+ List<StoragePathInfo> allFiles = listAllBaseFilesInPath(hoodieTable);
+ BaseFileOnlyView roView = getHoodieTableFileSystemView(metaClient,
+ metaClient.getCommitsTimeline().filterCompletedInstants(), allFiles);
+
+ Map<String, String> baseFileMapping = new HashMap<>();
+ Map<String, List<String>> baseFileToLogFileMapping = new HashMap<>();
+ BaseFileOnlyView finalRoView = roView;
+
Arrays.stream(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS).forEach(partitionPath
-> {
+ String baseFileName =
finalRoView.getLatestBaseFiles(partitionPath).collect(Collectors.toList()).get(0).getFileName();
+ baseFileMapping.put(partitionPath, baseFileName);
+ baseFileToLogFileMapping.put(baseFileName, new ArrayList<>());
+ });
+
+ writeAndValidateLogFileBaseInstantTimeMatches(client, "002", records,
cfg, baseFileMapping, baseFileToLogFileMapping);
Review Comment:
added data validation
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]