yihua commented on code in PR #13830:
URL: https://github.com/apache/hudi/pull/13830#discussion_r2325285184


##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java:
##########
@@ -159,8 +160,8 @@ public void testMultiTableExecutionWithKafkaSource() throws 
IOException {
     testUtils.createTopic(topicName2, 2);
 
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
-    testUtils.sendMessages(topicName1, 
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5, 
HoodieTestDataGenerator.TRIP_SCHEMA)));
-    testUtils.sendMessages(topicName2, 
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10, 
HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
+    testUtils.sendMessages(topicName1, 
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5, 
HoodieTestDataGenerator.TRIP_SCHEMA, 0L)));

Review Comment:
   Is current timestamp not working here?



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala:
##########
@@ -291,15 +292,54 @@ class TestRecordLevelIndex extends 
RecordLevelIndexTestBase {
     insertDf.cache()
 
     val instantTime = getNewInstantTime
-    // Issue two deletes, one with the original partition and one with an 
updated partition
-    val recordsToDelete = dataGen.generateUniqueDeleteRecords(instantTime, 1)
-    
recordsToDelete.addAll(dataGen.generateUniqueDeleteRecordsWithUpdatedPartition(instantTime,
 1))
-    val deleteBatch = recordsToStrings(recordsToDelete).asScala
+    // Issue four deletes, one with the original partition, one with an 
updated partition,
+    // and two with an older ordering value that should be ignored
+    val deletedRecords = dataGen.generateUniqueDeleteRecords(instantTime, 1)
+    
deletedRecords.addAll(dataGen.generateUniqueDeleteRecordsWithUpdatedPartition(instantTime,
 1))
+    val inputRecords = new util.ArrayList[HoodieRecord[_]](deletedRecords)
+    val lowerOrderingValue = 1L
+    inputRecords.addAll(dataGen.generateUniqueDeleteRecords(instantTime, 1, 
lowerOrderingValue))
+    
inputRecords.addAll(dataGen.generateUniqueDeleteRecordsWithUpdatedPartition(instantTime,
 1, lowerOrderingValue))
+    val deleteBatch = recordsToStrings(inputRecords).asScala
     val deleteDf = 
spark.read.json(spark.sparkContext.parallelize(deleteBatch.toSeq, 1))
     deleteDf.cache()
     val recordKeyToDelete1 = 
deleteDf.collectAsList().get(0).getAs("_row_key").asInstanceOf[String]
     val recordKeyToDelete2 = 
deleteDf.collectAsList().get(1).getAs("_row_key").asInstanceOf[String]
-    deleteDf.write.format("org.apache.hudi")
+    deleteDf.write.format("hudi")
+      .options(hudiOpts)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    val prevDf = mergedDfList.last
+    mergedDfList = mergedDfList :+ prevDf.filter(row => 
row.getAs("_row_key").asInstanceOf[String] != recordKeyToDelete1 &&
+      row.getAs("_row_key").asInstanceOf[String] != recordKeyToDelete2)
+    validateDataAndRecordIndices(hudiOpts, 
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(deletedRecords).asScala.toSeq,
 1)))

Review Comment:
   nit: The pattern of updating `mergedDfList` in this class, which 
`validateDataAndRecordIndices` depends on and is stateful of, should be 
refactored separately.  It's not obvious from first glance that 
`validateDataAndRecordIndices` uses `mergedDfList`.



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java:
##########
@@ -358,11 +358,11 @@ protected static HoodieTestDataGenerator 
prepareParquetDFSFiles(int numRecords,
     HoodieTestDataGenerator dataGenerator = new 
HoodieTestDataGenerator(makeDatesAmbiguous);
     if (useCustomSchema) {
       Helpers.saveParquetToDFS(Helpers.toGenericRecords(
-          dataGenerator.generateInsertsAsPerSchema("000", numRecords, 
schemaStr),
+          dataGenerator.generateInsertsAsPerSchema("000", numRecords, 
schemaStr, 0L),

Review Comment:
   Is current timestamp not working here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to