swuferhong commented on a change in pull request #3401:
URL: https://github.com/apache/hudi/pull/3401#discussion_r683900152
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
##########
@@ -231,6 +229,83 @@ public void
testUpsertsForMultipleRecordsInSameFile(ExternalSpillableMap.DiskMap
}
}
+ @ParameterizedTest
+ @MethodSource("testArguments")
+ public void testHoodieMergeHandlePreCombine(ExternalSpillableMap.DiskMapType
diskMapType,
+ boolean isCompressionEnabled)
throws Exception {
+ // Create records in a single partition
+ String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+ dataGen = new HoodieTestDataGenerator(new String[] {partitionPath});
+
+ // Build a common config with diff configs
+ Properties properties = new Properties();
+ properties.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(),
diskMapType.name());
+
properties.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
String.valueOf(isCompressionEnabled));
+
metaClient.getTableConfig().setValue(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD_PROP,
"timestamp");
+
+ // Build a write config with insert parallelism set
+ HoodieWriteConfig cfg = getConfigBuilder()
+ .withProperties(properties)
+ .build();
+
+ // test1: combine before insert, small preCombineField value record will
be merged and replaced.
+ cfg.setValue(COMBINE_BEFORE_INSERT_PROP, "true");
+ try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+ String newCommitTime = "001";
+ client.startCommitWithTime(newCommitTime);
+
+ List<HoodieRecord> records =
dataGen.generateRecordsWithTimestampInsert(newCommitTime);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ List<WriteStatus> statuses = client.insert(writeRecords,
newCommitTime).collect();
+ assertNoWriteErrors(statuses);
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTimeline timeline = new
HoodieActiveTimeline(metaClient).getCommitTimeline();
+ Dataset<Row> dataSet = HoodieClientTestUtils.readCommit(basePath,
sqlContext, timeline, newCommitTime);
+ Row[] rows = (Row[]) dataSet.collect();
+ for (Row row : rows) {
+ if (row.getAs("_hoodie_record_key").equals("01")) {
+ assertEquals((Long) row.getAs("timestamp"), 4, "Record with greater
preCombine field is chosen");
+ } else if (row.getAs("_hoodie_record_key").equals("02")) {
+ assertEquals((Long) row.getAs("timestamp"), 5, "Record with equal
preCombine field, New arrival record is chosen");
+ } else if (row.getAs("_hoodie_record_key").equals("03")) {
+ assertEquals((Long) row.getAs("timestamp"), 6, "Record with greater
preCombine field is chosen");
+ }
+ }
+ assertEquals(5, dataSet.count(),
+ "Must contain 5 records, because three records be combined");
+ }
+
+ //test2 not combine before insert, and combine will happen on
HoodieMergeHandle.
+ cfg.setValue(COMBINE_BEFORE_INSERT_PROP, "false");
+ try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
+ //Combine before insert be set false, and combine will happen on
HoodieMergeHandle.
+ String newCommitTime = "002";
+ client.startCommitWithTime(newCommitTime);
+
+ List<HoodieRecord> records =
dataGen.generateRecordsWithTimestampInsert(newCommitTime);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ List<WriteStatus> statuses = client.insert(writeRecords,
newCommitTime).collect();
+ assertNoWriteErrors(statuses);
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTimeline timeline = new
HoodieActiveTimeline(metaClient).getCommitTimeline();
+ Dataset<Row> dataSet = HoodieClientTestUtils.readCommit(basePath,
sqlContext, timeline, newCommitTime);
+ Row[] rows = (Row[]) dataSet.collect();
+ for (Row row : rows) {
Review comment:
> can we reuse the asserts from here?
Ok
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
##########
@@ -231,6 +229,83 @@ public void
testUpsertsForMultipleRecordsInSameFile(ExternalSpillableMap.DiskMap
}
}
+ @ParameterizedTest
+ @MethodSource("testArguments")
+ public void testHoodieMergeHandlePreCombine(ExternalSpillableMap.DiskMapType
diskMapType,
+ boolean isCompressionEnabled)
throws Exception {
+ // Create records in a single partition
+ String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+ dataGen = new HoodieTestDataGenerator(new String[] {partitionPath});
+
+ // Build a common config with diff configs
+ Properties properties = new Properties();
+ properties.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(),
diskMapType.name());
+
properties.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
String.valueOf(isCompressionEnabled));
+
metaClient.getTableConfig().setValue(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD_PROP,
"timestamp");
+
+ // Build a write config with insert parallelism set
+ HoodieWriteConfig cfg = getConfigBuilder()
+ .withProperties(properties)
+ .build();
+
+ // test1: combine before insert, small preCombineField value record will
be merged and replaced.
+ cfg.setValue(COMBINE_BEFORE_INSERT_PROP, "true");
+ try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+ String newCommitTime = "001";
+ client.startCommitWithTime(newCommitTime);
+
+ List<HoodieRecord> records =
dataGen.generateRecordsWithTimestampInsert(newCommitTime);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ List<WriteStatus> statuses = client.insert(writeRecords,
newCommitTime).collect();
+ assertNoWriteErrors(statuses);
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTimeline timeline = new
HoodieActiveTimeline(metaClient).getCommitTimeline();
+ Dataset<Row> dataSet = HoodieClientTestUtils.readCommit(basePath,
sqlContext, timeline, newCommitTime);
+ Row[] rows = (Row[]) dataSet.collect();
+ for (Row row : rows) {
+ if (row.getAs("_hoodie_record_key").equals("01")) {
Review comment:
> use teh `HoodieRecord.` members?
Ok
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]