nsivabalan commented on code in PR #13670:
URL: https://github.com/apache/hudi/pull/13670#discussion_r2250578252


##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java:
##########
@@ -146,11 +151,54 @@ void readWithEventTimeOrderingAndDeleteBlock() throws 
IOException {
     assertEquals(2, readStats.getNumUpdates());
   }
 
+  @Test
+  void readWithEventTimeOrderingWithRecords() throws IOException {

Review Comment:
   pushed an update. 



##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java:
##########
@@ -146,11 +151,54 @@ void readWithEventTimeOrderingAndDeleteBlock() throws 
IOException {
     assertEquals(2, readStats.getNumUpdates());
   }
 
+  @Test
+  void readWithEventTimeOrderingWithRecords() throws IOException {
+    HoodieReadStats readStats = new HoodieReadStats();
+    TypedProperties properties = new TypedProperties();
+    properties.setProperty(HoodieTableConfig.PRECOMBINE_FIELDS.key(), "ts");
+    properties.setProperty(DELETE_KEY, "counter");
+    properties.setProperty(DELETE_MARKER, "3");
+    HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] 
{"record_key"}));
+    StorageConfiguration<?> storageConfiguration = 
mock(StorageConfiguration.class);
+    HoodieReaderContext<IndexedRecord> readerContext = new 
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(), 
Option.empty());
+    readerContext.setHasLogFiles(false);
+    readerContext.setHasBootstrapBaseFile(false);
+    FileGroupReaderSchemaHandler schemaHandler = new 
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), 
tableConfig,
+        properties);
+    readerContext.setSchemaHandler(schemaHandler);
+    Map<Serializable, BufferedRecord> inputRecords = 
convertToBufferedRecordsMap(Arrays.asList(testRecord1UpdateWithSameTime, 
testRecord2Update, testRecord3Update,
+        testRecord4EarlierUpdate), readerContext, properties, new 
String[]{"ts"});
+    
inputRecords.putAll(convertToBufferedRecordsMapForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
 testRecord6DeleteByCustomMarker), false));
+    KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer = 
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, null,
+        RecordMergeMode.EVENT_TIME_ORDERING, Collections.singletonList("ts"), 
properties);
+
+    
fileGroupRecordBuffer.setBaseFileIterator(ClosableIterator.wrap(Arrays.asList(testRecord1,
 testRecord2, testRecord3, testRecord4,
+        testRecord5, testRecord6).iterator()));
+
+    inputRecords.entrySet().forEach(kv -> {
+      try {
+        fileGroupRecordBuffer.processNextDataRecord(kv.getValue(), 
kv.getKey());
+      } catch (IOException e) {
+        throw new HoodieIOException("Failed to process next data ", e);
+      }
+    });
+
+    List<IndexedRecord> actualRecords = 
getActualRecords(fileGroupRecordBuffer);
+    // update for 4 is ignored due to lower ordering value.
+    // record5 is deleted.
+    // delete for 6 is ignored due to lower ordering value.
+    assertEquals(Arrays.asList(testRecord1UpdateWithSameTime, 
testRecord2Update, testRecord3Update, testRecord4, testRecord6), actualRecords);
+    assertEquals(0, readStats.getNumInserts());

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to