nsivabalan commented on code in PR #13526:
URL: https://github.com/apache/hudi/pull/13526#discussion_r2220609107


##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java:
##########
@@ -41,6 +41,7 @@ public class TestDataSource extends AbstractBaseTestSource {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TestDataSource.class);
   public static boolean returnEmptyBatch = false;
+  public static Option<String> recordInstantTime = Option.empty();

Review Comment:
   we should be resetting this to `Option.empty()` in either BeforeEach or in 
the constructor of `TestDataSource` 



##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java:
##########
@@ -604,13 +651,30 @@ protected void 
validateOutputFromFileGroupReader(StorageConfiguration<?> storage
                                                  List<HoodieRecord> 
expectedHoodieUnmergedRecords) throws Exception {
     HoodieTableMetaClient metaClient = 
HoodieTestUtils.createMetaClient(storageConf, tablePath);
     Schema avroSchema = new 
TableSchemaResolver(metaClient).getTableAvroSchema();
+    expectedHoodieRecords = 
getExpectedHoodieRecordsWithOrderingValue(expectedHoodieRecords, metaClient, 
avroSchema);
+    expectedHoodieUnmergedRecords = 
getExpectedHoodieRecordsWithOrderingValue(expectedHoodieUnmergedRecords, 
metaClient, avroSchema);
     List<HoodieTestDataGenerator.RecordIdentifier> expectedRecords = 
convertHoodieRecords(expectedHoodieRecords, avroSchema);
     List<HoodieTestDataGenerator.RecordIdentifier> expectedUnmergedRecords = 
convertHoodieRecords(expectedHoodieUnmergedRecords, avroSchema);
     validateOutputFromFileGroupReaderWithExistingRecords(
         storageConf, tablePath, containsBaseFile, expectedLogFileNum, 
recordMergeMode,
         expectedRecords, expectedUnmergedRecords);
   }
 
+  private static List<HoodieRecord> 
getExpectedHoodieRecordsWithOrderingValue(List<HoodieRecord> 
expectedHoodieRecords, HoodieTableMetaClient metaClient, Schema avroSchema) {
+    return expectedHoodieRecords.stream().map(rec -> {
+      RawTripTestPayload oldPayload = (RawTripTestPayload) rec.getData();
+      try {
+        List<String> orderingFields = 
metaClient.getTableConfig().getPreCombineFieldList().get();
+        HoodieAvroRecord avroRecord = ((HoodieAvroRecord) rec);
+        Comparable orderingValue = Comparables.create(orderingFields, field -> 
(Comparable) avroRecord.getColumnValueAsJava(avroSchema, field, new 
TypedProperties()));
+        RawTripTestPayload newPayload = new 
RawTripTestPayload(Option.ofNullable(oldPayload.getJsonData()), 
oldPayload.getRowKey(), oldPayload.getPartitionPath(), null, false, 
orderingValue);

Review Comment:
   Have you thought about or attempted fixing HoodieTestDataGenerator only to 
instantiate `RawTripTestPayload` w/ right ordering value. 
   then, we fix it for all test data generation right, and not just the 
`TestHoodieFileGroupReaderBase.java` 



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -1955,6 +1957,62 @@ public void testFilterDupesWithPrecombine(
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
 
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void testDeltaStreamerWithMultipleOrderingFields(HoodieTableType 
tableType) throws Exception {
+    String tableBasePath = basePath + "/test_with_multiple_ordering_fields";
+    HoodieDeltaStreamer.Config cfg =
+        TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
+    cfg.tableType = tableType.name();
+    cfg.filterDupes = true;
+    cfg.sourceOrderingFields = "timestamp,rider";
+    cfg.recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
+    cfg.payloadClassName = DefaultHoodieRecordPayload.class.getName();
+    cfg.recordMergeStrategyId = 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
+    // Set record merge mode to event time ordering
+    cfg.configs.add(String.format("%s=%s", 
HoodieWriteConfig.RECORD_MERGE_MODE.key(), 
RecordMergeMode.EVENT_TIME_ORDERING.name()));
+    //addRecordMerger(HoodieRecordType.AVRO, cfg.configs);

Review Comment:
   why commented out?
   if not required, can we remove it



##########
hudi-common/src/main/java/org/apache/hudi/Comparables.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi;
+
+import org.apache.hudi.common.util.ValidationUtils;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
+
+public class Comparables implements Comparable, Serializable {
+  protected static final long serialVersionUID = 1L;
+  private static final Comparables DEFAULT_VALUE = new 
Comparables(DEFAULT_ORDERING_VALUE);
+
+  private final List<Comparable> comparables;
+
+  public Comparables(List<Comparable> comparables) {
+    this.comparables = comparables;
+  }
+
+  public Comparables(Comparable comparable) {
+    this.comparables = Collections.singletonList(comparable);
+  }
+
+  public static Comparables getDefault() {
+    return DEFAULT_VALUE;
+  }
+
+  public static boolean isDefault(Comparable orderingVal) {
+    if (orderingVal instanceof Comparables) {
+      return ((Comparables) orderingVal).comparables.size() == 1
+          && ((Comparables) 
orderingVal).comparables.get(0).equals(DEFAULT_ORDERING_VALUE);
+    } else {
+      return orderingVal.equals(DEFAULT_ORDERING_VALUE);
+    }
+  }
+
+  /**
+   * Returns whether the given two comparable values come from the same 
runtime class.
+   */
+  public static boolean isSameClass(Comparable<?> v, Comparable<?> o) {
+    if (v.getClass() != o.getClass()) {
+      // If class is not same return false
+      return false;
+    } else if (v instanceof Comparables) {
+      if (((Comparables) v).comparables.size() != ((Comparables) 
o).comparables.size()) {
+        // if comparables size is not same return false
+        return false;
+      } else {
+        // compare class of comparable list of both arguments
+        return IntStream.range(0, ((Comparables) v).comparables.size())
+            .mapToObj(i -> ((Comparables) v).comparables.get(i).getClass() == 
((Comparables) o).comparables.get(i).getClass())
+            .reduce(Boolean::logicalAnd)
+            .orElse(true);
+      }
+    }
+    // return true if class is same and input objects are not instance of 
Comparables class
+    return true;
+  }
+
+  public static Comparable getDefaultOrderingValue() {
+    return DEFAULT_ORDERING_VALUE;
+  }
+
+  @Override
+  public int compareTo(Object o) {
+    ValidationUtils.checkArgument(o instanceof Comparables, "Comparables can 
only be compared with another Comparables");
+    Comparables otherComparables = (Comparables) o;
+    ValidationUtils.checkArgument(comparables.size() == 
otherComparables.comparables.size(), "Comparables should be of same size");
+    for (int i = 0; i < comparables.size(); i++) {
+      int comparingValue = 
comparables.get(i).compareTo(otherComparables.comparables.get(i));

Review Comment:
   yes, lets ensure the ConfigProperty calls this out



##########
hudi-common/src/main/java/org/apache/hudi/Comparables.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi;
+
+import org.apache.hudi.common.util.ValidationUtils;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
+
+public class Comparables implements Comparable, Serializable {
+  protected static final long serialVersionUID = 1L;
+
+  private final List<Comparable> comparables;

Review Comment:
   hey @the-other-tim-brown : Whats your take on this? do you have any 
suggestion towards this. 
   
   @lokeshj1703 : can we do a mirco benchmarking. using master for a single 
ordering field vs this branch. 
   we can create a MOR file slice w/ N no of log files so that we do repeated 
compare calls. and then do snapshot read to see if we can spot any perf 
difference. 



##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java:
##########
@@ -185,6 +189,49 @@ public void 
testReadFileGroupInMergeOnReadTable(RecordMergeMode recordMergeMode,
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = {"avro", "parquet"})
+  public void testReadFileGroupWithMultipleOrderingFields(String 
logDataBlockFormat) throws Exception {
+    RecordMergeMode recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
+    Map<String, String> writeConfigs = new 
HashMap<>(getCommonConfigs(recordMergeMode, true));
+    writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), 
logDataBlockFormat);
+    writeConfigs.put("hoodie.datasource.write.table.type", 
HoodieTableType.MERGE_ON_READ.name());
+    // Use two precombine values - combination of timestamp and rider
+    String orderingValues = "timestamp,rider";
+    writeConfigs.put("hoodie.datasource.write.precombine.field", 
orderingValues);
+    writeConfigs.put("hoodie.payload.ordering.field", orderingValues);
+
+    try (HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(0xDEEF)) {
+      // Initial commit. rider column gets value of rider-002
+      List<HoodieRecord> initialRecords = dataGen.generateInserts("002", 100);
+      commitToTable(initialRecords, INSERT.value(), true, writeConfigs);
+      validateOutputFromFileGroupReader(
+          getStorageConf(), getBasePath(), true, 0, recordMergeMode,
+          initialRecords, initialRecords);
+
+      // The updates have rider values as rider-001 and the existing records 
have rider values as rider-001

Review Comment:
   guess there is a typo in this comment. 



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -1955,6 +1957,62 @@ public void testFilterDupesWithPrecombine(
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
 
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void testDeltaStreamerWithMultipleOrderingFields(HoodieTableType 
tableType) throws Exception {
+    String tableBasePath = basePath + "/test_with_multiple_ordering_fields";
+    HoodieDeltaStreamer.Config cfg =
+        TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
+    cfg.tableType = tableType.name();
+    cfg.filterDupes = true;
+    cfg.sourceOrderingFields = "timestamp,rider";
+    cfg.recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
+    cfg.payloadClassName = DefaultHoodieRecordPayload.class.getName();
+    cfg.recordMergeStrategyId = 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
+    // Set record merge mode to event time ordering
+    cfg.configs.add(String.format("%s=%s", 
HoodieWriteConfig.RECORD_MERGE_MODE.key(), 
RecordMergeMode.EVENT_TIME_ORDERING.name()));
+    //addRecordMerger(HoodieRecordType.AVRO, cfg.configs);
+    TestDataSource.recordInstantTime = Option.of("002");
+    new HoodieStreamer(cfg, jsc).sync();
+
+    assertRecordCount(1000, tableBasePath, sqlContext);
+    TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
+
+    // Generate new updates with lower recordInstantTime so that updates are 
rejected
+    TestDataSource.recordInstantTime = Option.of("001");
+    runStreamSync(cfg, false, 50, WriteOperationType.UPSERT);
+    int numInserts = 25;
+    // TestDataSource generates 500 inserts, 450 updates and 50 deletes
+    assertRecordCount(1025, tableBasePath, sqlContext); // if filter dupes is 
not enabled, we should be expecting 3000 records here.
+    TestHelpers.assertCommitMetadata("00001", tableBasePath, 2);
+    // Filter records with rider-001 value and deduct the number of inserts
+    long numUpdates = 
sparkSession.read().format("hudi").load(tableBasePath).filter("rider = 
'rider-001'").count()
+        - numInserts;
+    // There should be no updates since ordering value rider-001 is lower than 
existing record ordering value rider-002
+    assertEquals(0, numUpdates);
+
+    // Generate new updates with lower recordInstantTime so that updates are 
rejected

Review Comment:
   again, lets fix the comments



##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java:
##########
@@ -185,6 +189,49 @@ public void 
testReadFileGroupInMergeOnReadTable(RecordMergeMode recordMergeMode,
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = {"avro", "parquet"})
+  public void testReadFileGroupWithMultipleOrderingFields(String 
logDataBlockFormat) throws Exception {

Review Comment:
   we may not need diff log block formats. 
   but lets add a test at spark ds layer for COW table. (I assume it does not 
make sense to add COW here w/ FG reader)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to