the-other-tim-brown commented on code in PR #13556:
URL: https://github.com/apache/hudi/pull/13556#discussion_r2217390601


##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java:
##########
@@ -443,16 +449,51 @@ public void testLogFileCountsAfterCompaction() throws 
Exception {
         Dataset<Row> actual = HoodieClientTestUtils.read(
             jsc(), basePath(), sqlContext(), hoodieStorage(), 
fullPartitionPaths);
         List<Row> rows = actual.collectAsList();
-        assertEquals(updatedRecords.size(), rows.size());
+        assertEquals(90, rows.size());
+        int updatedCount = 0;
         for (Row row : rows) {
-          assertEquals(row.getAs(HoodieRecord.COMMIT_TIME_METADATA_FIELD), 
newCommitTime);
+          if 
(row.getAs(HoodieRecord.COMMIT_TIME_METADATA_FIELD).equals(newCommitTime)) {
+            updatedCount++;
+          } else {
+            // check that the commit time is 100 for all records that are not 
updated
+            assertEquals(firstCommitTime, 
row.getAs(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
+          }
           // check that file names metadata is updated
           
assertTrue(row.getString(HoodieRecord.FILENAME_META_FIELD_ORD).contains(compactionInstantTime));
         }
+        // check that 80 records are updated
+        assertEquals(80, updatedCount);
       }
     }
   }
 
+  private static void validateCompactionMetadata(HoodieCommitMetadata 
compactionMetadata, String previousCommit, long expectedTotalRecordsWritten, 
long expectedTotalUpdatedRecords,
+                                                 long 
expectedTotalInsertedRecords, long expectedTotalDeletedRecords) {
+    long totalRecordsWritten = 0;
+    long totalDeletedRecords = 0;
+    long totalUpdatedRecords = 0;
+    long totalInsertedRecords = 0;
+    for (HoodieWriteStat writeStat : compactionMetadata.getWriteStats()) {
+      totalRecordsWritten += writeStat.getNumWrites();
+      totalDeletedRecords += writeStat.getNumDeletes();
+      totalUpdatedRecords += writeStat.getNumUpdateWrites();
+      totalInsertedRecords += writeStat.getNumInserts();
+      assertEquals(previousCommit, writeStat.getPrevCommit());
+      assertNotNull(writeStat.getFileId());
+      assertNotNull(writeStat.getPath());
+      assertTrue(writeStat.getFileSizeInBytes() > 0);
+      assertTrue(writeStat.getTotalWriteBytes() > 0);
+      assertTrue(writeStat.getTotalLogBlocks() > 0);
+      assertTrue(writeStat.getTotalLogSizeCompacted() > 0);
+      assertTrue(writeStat.getTotalLogFilesCompacted() > 0);
+      assertTrue(writeStat.getTotalLogRecords() > 0);
+    }
+    assertEquals(expectedTotalRecordsWritten, totalRecordsWritten);
+    assertEquals(expectedTotalUpdatedRecords, totalUpdatedRecords);
+    assertEquals(expectedTotalInsertedRecords, totalInsertedRecords);

Review Comment:
   Test has been updated to include this



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java:
##########
@@ -363,27 +362,31 @@ public void testLogFileCountsAfterCompaction() throws 
Exception {
 
     setUp(config.getProps());
 
-    try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) {
-      String newCommitTime = "100";
-      WriteClientTestUtils.startCommitWithTime(writeClient, newCommitTime);
+    try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) {
+      String firstCommitTime = "100";
+      WriteClientTestUtils.startCommitWithTime(writeClient, firstCommitTime);
 
-      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
+      List<HoodieRecord> records = dataGen.generateInserts(firstCommitTime, 
100);
       JavaRDD<HoodieRecord> recordsRDD = jsc().parallelize(records, 1);
-      List<WriteStatus> statuses = writeClient.insert(recordsRDD, 
newCommitTime).collect();
-      writeClient.commit(newCommitTime, jsc().parallelize(statuses), 
Option.empty(), DELTA_COMMIT_ACTION, Collections.emptyMap());
+      List<WriteStatus> statuses = writeClient.insert(recordsRDD, 
firstCommitTime).collect();
+      writeClient.commit(firstCommitTime, jsc().parallelize(statuses), 
Option.empty(), DELTA_COMMIT_ACTION, Collections.emptyMap());
 
-      // Update all the 100 records
-      newCommitTime = "101";
-      List<HoodieRecord> updatedRecords = 
dataGen.generateUpdates(newCommitTime, records);
+      // Update 80 of 100 records
+      String newCommitTime = "101";
+      List<HoodieRecord> updatedRecords = 
dataGen.generateUpdates(newCommitTime, records.subList(0, 80));
       JavaRDD<HoodieRecord> updatedRecordsRDD = 
jsc().parallelize(updatedRecords, 1);
-
-      SparkRDDReadClient readClient = new SparkRDDReadClient(context(), 
config);
-      JavaRDD<HoodieRecord> updatedTaggedRecordsRDD = 
readClient.tagLocation(updatedRecordsRDD);
-
       WriteClientTestUtils.startCommitWithTime(writeClient, newCommitTime);
-      statuses = writeClient.upsertPreppedRecords(updatedTaggedRecordsRDD, 
newCommitTime).collect();
+      statuses = writeClient.upsert(updatedRecordsRDD, 
newCommitTime).collect();
       writeClient.commit(newCommitTime, jsc().parallelize(statuses), 
Option.empty(), DELTA_COMMIT_ACTION, Collections.emptyMap());
 
+      // Delete 10 of the remaining records
+      List<HoodieRecord> deleteRecords = 
dataGen.generateDeletesFromExistingRecords(records.subList(90, 100));
+      JavaRDD<HoodieRecord> deleteRecordsRDD = 
jsc().parallelize(deleteRecords, 1);
+      String deleteCommitTime = "102";
+      WriteClientTestUtils.startCommitWithTime(writeClient, deleteCommitTime);
+      statuses = writeClient.upsert(deleteRecordsRDD, 
deleteCommitTime).collect();
+      writeClient.commit(deleteCommitTime, jsc().parallelize(statuses), 
Option.empty(), DELTA_COMMIT_ACTION, Collections.emptyMap());

Review Comment:
   Updated to do these validations per write



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to