This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e63722d24b5 [HUDI-7852] Constrain the comparison of different types of
ordering values to limited cases (#11424)
e63722d24b5 is described below
commit e63722d24b5f42d5411f02c0b0872310111887b3
Author: Y Ethan Guo <[email protected]>
AuthorDate: Mon Jun 10 04:50:44 2024 -0700
[HUDI-7852] Constrain the comparison of different types of ordering values
to limited cases (#11424)
---
.../hudi/BaseSparkInternalRowReaderContext.java | 10 +++
.../hudi/common/engine/HoodieReaderContext.java | 12 +++
.../read/HoodieBaseFileGroupRecordBuffer.java | 23 ++++--
.../table/read/TestHoodieFileGroupReaderBase.java | 90 +++++++++++++++-------
4 files changed, 99 insertions(+), 36 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index 72063dd472e..7fb4577f896 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -39,6 +39,7 @@ import org.apache.spark.sql.HoodieUnsafeRowUtils;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
import java.util.Map;
import java.util.function.UnaryOperator;
@@ -137,6 +138,15 @@ public abstract class BaseSparkInternalRowReaderContext
extends HoodieReaderCont
}
+ @Override
+ public int compareTo(Comparable o1, Comparable o2) {
+ if ((o1 instanceof String && o2 instanceof UTF8String)
+ || (o1 instanceof UTF8String && o2 instanceof String)) {
+ return o1.toString().compareTo(o2.toString());
+ }
+ return super.compareTo(o1, o2);
+ }
+
protected UnaryOperator<InternalRow> getIdentityProjection() {
return row -> row;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index b79562f8b43..94b9e1cd02d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -250,6 +250,18 @@ public abstract class HoodieReaderContext<T> {
*/
public abstract T seal(T record);
+ /**
+ * Compares values in different types which can contain engine-specific
types.
+ *
+ * @param o1 {@link Comparable} object.
+ * @param o2 other {@link Comparable} object to compare to.
+ * @return comparison result.
+ */
+ public int compareTo(Comparable o1, Comparable o2) {
+ throw new IllegalArgumentException("Cannot compare values in different
types: "
+ + o1 + "(" + o1.getClass() + "), " + o2 + "(" + o2.getClass() + ")");
+ }
+
/**
* Generates metadata map based on the information.
*
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
index 984d9740ceb..c25fba0e928 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -157,26 +157,33 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
* Compares two {@link Comparable}s. If both are numbers, converts them to
{@link Long} for comparison.
* If one of the {@link Comparable}s is a String, assumes that both are
String values for comparison.
*
+ * @param readerContext {@link HoodieReaderContext} instance.
* @param o1 {@link Comparable} object.
* @param o2 other {@link Comparable} object to compare to.
* @return comparison result.
*/
@VisibleForTesting
- static int compareTo(Comparable o1, Comparable o2) {
+ static int compareTo(HoodieReaderContext readerContext, Comparable o1,
Comparable o2) {
// TODO(HUDI-7848): fix the delete records to contain the correct ordering
value type
// so this util with the number comparison is not necessary.
try {
return o1.compareTo(o2);
} catch (ClassCastException e) {
- if (o1 instanceof Number && o2 instanceof Number) {
+ boolean isO1LongOrInteger = (o1 instanceof Long || o1 instanceof
Integer);
+ boolean isO2LongOrInteger = (o2 instanceof Long || o2 instanceof
Integer);
+ boolean isO1DoubleOrFloat = (o1 instanceof Double || o1 instanceof
Float);
+ boolean isO2DoubleOrFloat = (o2 instanceof Double || o2 instanceof
Float);
+ if (isO1LongOrInteger && isO2LongOrInteger) {
Long o1LongValue = ((Number) o1).longValue();
Long o2LongValue = ((Number) o2).longValue();
return o1LongValue.compareTo(o2LongValue);
- } else if (o1 instanceof String || o2 instanceof String) {
- return o1.toString().compareTo(o2.toString());
+ } else if ((isO1LongOrInteger && isO2DoubleOrFloat)
+ || (isO1DoubleOrFloat && isO2LongOrInteger)) {
+ Double o1DoubleValue = ((Number) o1).doubleValue();
+ Double o2DoubleValue = ((Number) o2).doubleValue();
+ return o1DoubleValue.compareTo(o2DoubleValue);
} else {
- throw new IllegalArgumentException("Cannot compare values in different
types: "
- + o1 + "(" + o1.getClass() + "), " + o2 + "(" + o2.getClass() +
")");
+ return readerContext.compareTo(o1, o2);
}
} catch (Throwable e) {
throw new HoodieException("Cannot compare values: "
@@ -236,7 +243,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
}
Comparable incomingOrderingValue = readerContext.getOrderingValue(
Option.of(record), metadata, readerSchema, payloadProps);
- if (compareTo(incomingOrderingValue, existingOrderingValue) > 0) {
+ if (compareTo(readerContext, incomingOrderingValue,
existingOrderingValue) > 0) {
return Option.of(Pair.of(record, metadata));
}
return Option.empty();
@@ -399,7 +406,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
if (isDeleteRecordWithNaturalOrder(newer, newOrderingValue)) {
return Option.empty();
}
- if (compareTo(oldOrderingValue, newOrderingValue) > 0) {
+ if (compareTo(readerContext, oldOrderingValue, newOrderingValue) >
0) {
return older;
}
return newer;
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index 9f3f8acf81c..a8717d8a8e3 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -55,6 +55,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
+import static org.apache.hudi.common.model.WriteOperationType.BULK_INSERT;
import static org.apache.hudi.common.model.WriteOperationType.INSERT;
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
import static org.apache.hudi.common.table.HoodieTableConfig.PARTITION_FIELDS;
@@ -64,6 +65,7 @@ import static
org.apache.hudi.common.table.read.HoodieBaseFileGroupRecordBuffer.
import static
org.apache.hudi.common.testutils.HoodieTestUtils.getLogFileListFromFileSlice;
import static
org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.params.provider.Arguments.arguments;
/**
@@ -90,36 +92,68 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
public abstract Comparable getComparableUTF8String(String value);
@Test
- public void testCompareToComparable() {
+ public void testCompareToComparable() throws Exception {
+ Map<String, String> writeConfigs = new HashMap<>(getCommonConfigs());
+ // Prepare a table for initializing reader context
+ try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(0xDEEF)) {
+ commitToTable(recordsToStrings(dataGen.generateInserts("001", 1)),
BULK_INSERT.value(), writeConfigs);
+ }
+ StorageConfiguration<?> storageConf = getStorageConf();
+ String tablePath = getBasePath();
+ HoodieTableMetaClient metaClient =
HoodieTestUtils.createMetaClient(storageConf, tablePath);
+ Schema avroSchema = new
TableSchemaResolver(metaClient).getTableAvroSchema();
+ HoodieReaderContext<T> readerContext = getHoodieReaderContext(tablePath,
avroSchema, storageConf);
+
// Test same type
- assertEquals(1, compareTo(Boolean.TRUE, Boolean.FALSE));
- assertEquals(0, compareTo(Boolean.TRUE, Boolean.TRUE));
- assertEquals(-1, compareTo(Boolean.FALSE, Boolean.TRUE));
- assertEquals(1, compareTo(20, 15));
- assertEquals(0, compareTo(15, 15));
- assertEquals(-1, compareTo(10, 15));
- assertEquals(1, compareTo(1.1f, 1.0f));
- assertEquals(0, compareTo(1.0f, 1.0f));
- assertEquals(-1, compareTo(0.9f, 1.0f));
- assertEquals(1, compareTo(1.1, 1.0));
- assertEquals(0, compareTo(1.0, 1.0));
- assertEquals(-1, compareTo(0.9, 1.0));
- assertEquals(1, compareTo("value2", "value1"));
- assertEquals(0, compareTo("value1", "value1"));
- assertEquals(-1, compareTo("value1", "value2"));
+ assertEquals(1, compareTo(readerContext, Boolean.TRUE, Boolean.FALSE));
+ assertEquals(0, compareTo(readerContext, Boolean.TRUE, Boolean.TRUE));
+ assertEquals(-1, compareTo(readerContext, Boolean.FALSE, Boolean.TRUE));
+ assertEquals(1, compareTo(readerContext, 20, 15));
+ assertEquals(0, compareTo(readerContext, 15, 15));
+ assertEquals(-1, compareTo(readerContext, 10, 15));
+ assertEquals(1, compareTo(readerContext, 1.1f, 1.0f));
+ assertEquals(0, compareTo(readerContext, 1.0f, 1.0f));
+ assertEquals(-1, compareTo(readerContext, 0.9f, 1.0f));
+ assertEquals(1, compareTo(readerContext, 1.1, 1.0));
+ assertEquals(0, compareTo(readerContext, 1.0, 1.0));
+ assertEquals(-1, compareTo(readerContext, 0.9, 1.0));
+ assertEquals(1, compareTo(readerContext, 1.1, 1));
+ assertEquals(-1, compareTo(readerContext, 0.9, 1));
+ assertEquals(1, compareTo(readerContext, "value2", "value1"));
+ assertEquals(0, compareTo(readerContext, "value1", "value1"));
+ assertEquals(-1, compareTo(readerContext, "value1", "value2"));
// Test different types which are comparable
- assertEquals(1, compareTo(Long.MAX_VALUE / 2L, 10));
- assertEquals(1, compareTo(20, 10L));
- assertEquals(0, compareTo(10L, 10));
- assertEquals(0, compareTo(10, 10L));
- assertEquals(-1, compareTo(10, Long.MAX_VALUE));
- assertEquals(-1, compareTo(10L, 20));
- assertEquals(1, compareTo(getComparableUTF8String("value2"), "value1"));
- assertEquals(1, compareTo("value2", getComparableUTF8String("value1")));
- assertEquals(0, compareTo(getComparableUTF8String("value1"), "value1"));
- assertEquals(0, compareTo("value1", getComparableUTF8String("value1")));
- assertEquals(-1, compareTo(getComparableUTF8String("value1"), "value2"));
- assertEquals(-1, compareTo("value1", getComparableUTF8String("value2")));
+ assertEquals(1, compareTo(readerContext, Long.MAX_VALUE / 2L, 10));
+ assertEquals(1, compareTo(readerContext, 20, 10L));
+ assertEquals(0, compareTo(readerContext, 10L, 10));
+ assertEquals(0, compareTo(readerContext, 10, 10L));
+ assertEquals(-1, compareTo(readerContext, 10, Long.MAX_VALUE));
+ assertEquals(-1, compareTo(readerContext, 10L, 20));
+ assertEquals(1, compareTo(readerContext, 10.01f, 10));
+ assertEquals(1, compareTo(readerContext, 10.01f, 10L));
+ assertEquals(1, compareTo(readerContext, 10.01, 10));
+ assertEquals(1, compareTo(readerContext, 10.01, 10L));
+ assertEquals(1, compareTo(readerContext, 11L, 10.99f));
+ assertEquals(1, compareTo(readerContext, 11, 10.99));
+ // Throw exception if comparing Double with Float which have different
precision
+ assertThrows(IllegalArgumentException.class, () ->
compareTo(readerContext, 10.01f, 10.0));
+ assertThrows(IllegalArgumentException.class, () ->
compareTo(readerContext, 10.01, 10.0f));
+ assertEquals(0, compareTo(readerContext, 10.0, 10L));
+ assertEquals(0, compareTo(readerContext, 10.0f, 10L));
+ assertEquals(0, compareTo(readerContext, 10.0, 10));
+ assertEquals(0, compareTo(readerContext, 10.0f, 10));
+ assertEquals(-1, compareTo(readerContext, 9.99f, 10));
+ assertEquals(-1, compareTo(readerContext, 9.99f, 10L));
+ assertEquals(-1, compareTo(readerContext, 9.99, 10));
+ assertEquals(-1, compareTo(readerContext, 9.99, 10L));
+ assertEquals(-1, compareTo(readerContext, 10L, 10.01f));
+ assertEquals(-1, compareTo(readerContext, 10, 10.01));
+ assertEquals(1, compareTo(readerContext,
getComparableUTF8String("value2"), "value1"));
+ assertEquals(1, compareTo(readerContext, "value2",
getComparableUTF8String("value1")));
+ assertEquals(0, compareTo(readerContext,
getComparableUTF8String("value1"), "value1"));
+ assertEquals(0, compareTo(readerContext, "value1",
getComparableUTF8String("value1")));
+ assertEquals(-1, compareTo(readerContext,
getComparableUTF8String("value1"), "value2"));
+ assertEquals(-1, compareTo(readerContext, "value1",
getComparableUTF8String("value2")));
}
private static Stream<Arguments> testArguments() {