nsivabalan commented on code in PR #13670:
URL: https://github.com/apache/hudi/pull/13670#discussion_r2250578252
##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java:
##########
@@ -146,11 +151,54 @@ void readWithEventTimeOrderingAndDeleteBlock() throws
IOException {
assertEquals(2, readStats.getNumUpdates());
}
+ @Test
+ void readWithEventTimeOrderingWithRecords() throws IOException {
Review Comment:
pushed an update.
##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java:
##########
@@ -146,11 +151,54 @@ void readWithEventTimeOrderingAndDeleteBlock() throws
IOException {
assertEquals(2, readStats.getNumUpdates());
}
+ @Test
+ void readWithEventTimeOrderingWithRecords() throws IOException {
+ HoodieReadStats readStats = new HoodieReadStats();
+ TypedProperties properties = new TypedProperties();
+ properties.setProperty(HoodieTableConfig.PRECOMBINE_FIELDS.key(), "ts");
+ properties.setProperty(DELETE_KEY, "counter");
+ properties.setProperty(DELETE_MARKER, "3");
+ HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+ when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"record_key"}));
+ StorageConfiguration<?> storageConfiguration =
mock(StorageConfiguration.class);
+ HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(),
Option.empty());
+ readerContext.setHasLogFiles(false);
+ readerContext.setHasBootstrapBaseFile(false);
+ FileGroupReaderSchemaHandler schemaHandler = new
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
tableConfig,
+ properties);
+ readerContext.setSchemaHandler(schemaHandler);
+ Map<Serializable, BufferedRecord> inputRecords =
convertToBufferedRecordsMap(Arrays.asList(testRecord1UpdateWithSameTime,
testRecord2Update, testRecord3Update,
+ testRecord4EarlierUpdate), readerContext, properties, new
String[]{"ts"});
+
inputRecords.putAll(convertToBufferedRecordsMapForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
testRecord6DeleteByCustomMarker), false));
+ KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer =
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, null,
+ RecordMergeMode.EVENT_TIME_ORDERING, Collections.singletonList("ts"),
properties);
+
+
fileGroupRecordBuffer.setBaseFileIterator(ClosableIterator.wrap(Arrays.asList(testRecord1,
testRecord2, testRecord3, testRecord4,
+ testRecord5, testRecord6).iterator()));
+
+ inputRecords.entrySet().forEach(kv -> {
+ try {
+ fileGroupRecordBuffer.processNextDataRecord(kv.getValue(),
kv.getKey());
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to process next data ", e);
+ }
+ });
+
+ List<IndexedRecord> actualRecords =
getActualRecords(fileGroupRecordBuffer);
+ // update for 4 is ignored due to lower ordering value.
+ // record5 is deleted.
+ // delete for 6 is ignored due to lower ordering value.
+ assertEquals(Arrays.asList(testRecord1UpdateWithSameTime,
testRecord2Update, testRecord3Update, testRecord4, testRecord6), actualRecords);
+ assertEquals(0, readStats.getNumInserts());
Review Comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]