yihua commented on code in PR #13830:
URL: https://github.com/apache/hudi/pull/13830#discussion_r2325285184
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java:
##########
@@ -159,8 +160,8 @@ public void testMultiTableExecutionWithKafkaSource() throws
IOException {
testUtils.createTopic(topicName2, 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
- testUtils.sendMessages(topicName1,
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5,
HoodieTestDataGenerator.TRIP_SCHEMA)));
- testUtils.sendMessages(topicName2,
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10,
HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
+ testUtils.sendMessages(topicName1,
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5,
HoodieTestDataGenerator.TRIP_SCHEMA, 0L)));
Review Comment:
Is current timestamp not working here?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala:
##########
@@ -291,15 +292,54 @@ class TestRecordLevelIndex extends
RecordLevelIndexTestBase {
insertDf.cache()
val instantTime = getNewInstantTime
- // Issue two deletes, one with the original partition and one with an
updated partition
- val recordsToDelete = dataGen.generateUniqueDeleteRecords(instantTime, 1)
-
recordsToDelete.addAll(dataGen.generateUniqueDeleteRecordsWithUpdatedPartition(instantTime,
1))
- val deleteBatch = recordsToStrings(recordsToDelete).asScala
+ // Issue four deletes, one with the original partition, one with an
updated partition,
+ // and two with an older ordering value that should be ignored
+ val deletedRecords = dataGen.generateUniqueDeleteRecords(instantTime, 1)
+
deletedRecords.addAll(dataGen.generateUniqueDeleteRecordsWithUpdatedPartition(instantTime,
1))
+ val inputRecords = new util.ArrayList[HoodieRecord[_]](deletedRecords)
+ val lowerOrderingValue = 1L
+ inputRecords.addAll(dataGen.generateUniqueDeleteRecords(instantTime, 1,
lowerOrderingValue))
+
inputRecords.addAll(dataGen.generateUniqueDeleteRecordsWithUpdatedPartition(instantTime,
1, lowerOrderingValue))
+ val deleteBatch = recordsToStrings(inputRecords).asScala
val deleteDf =
spark.read.json(spark.sparkContext.parallelize(deleteBatch.toSeq, 1))
deleteDf.cache()
val recordKeyToDelete1 =
deleteDf.collectAsList().get(0).getAs("_row_key").asInstanceOf[String]
val recordKeyToDelete2 =
deleteDf.collectAsList().get(1).getAs("_row_key").asInstanceOf[String]
- deleteDf.write.format("org.apache.hudi")
+ deleteDf.write.format("hudi")
+ .options(hudiOpts)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ val prevDf = mergedDfList.last
+ mergedDfList = mergedDfList :+ prevDf.filter(row =>
row.getAs("_row_key").asInstanceOf[String] != recordKeyToDelete1 &&
+ row.getAs("_row_key").asInstanceOf[String] != recordKeyToDelete2)
+ validateDataAndRecordIndices(hudiOpts,
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(deletedRecords).asScala.toSeq,
1)))
Review Comment:
nit: The pattern of updating `mergedDfList` in this class, which
`validateDataAndRecordIndices` depends on and is stateful of, should be
refactored separately. It's not obvious from first glance that
`validateDataAndRecordIndices` uses `mergedDfList`.
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java:
##########
@@ -358,11 +358,11 @@ protected static HoodieTestDataGenerator
prepareParquetDFSFiles(int numRecords,
HoodieTestDataGenerator dataGenerator = new
HoodieTestDataGenerator(makeDatesAmbiguous);
if (useCustomSchema) {
Helpers.saveParquetToDFS(Helpers.toGenericRecords(
- dataGenerator.generateInsertsAsPerSchema("000", numRecords,
schemaStr),
+ dataGenerator.generateInsertsAsPerSchema("000", numRecords,
schemaStr, 0L),
Review Comment:
Is current timestamp not working here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]