the-other-tim-brown commented on code in PR #13526:
URL: https://github.com/apache/hudi/pull/13526#discussion_r2226869880
##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java:
##########
@@ -479,6 +483,15 @@ public final UnaryOperator<T> projectRecord(Schema from,
Schema to) {
return projectRecord(from, to, Collections.emptyMap());
}
+ /**
+ * Converts the ordering values to the specific engine type.
+ */
+ public final Comparable convertOrderingValueToEngineType(Comparable value) {
+ return value instanceof OrderingValue
Review Comment:
Can we make it so all ordering values are an instance of OrderingValue so
the casts can be avoided?
##########
hudi-common/src/main/java/org/apache/hudi/common/util/OrderingValues.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.model.EmptyOrderingValue;
+import org.apache.hudi.common.model.MultipleFieldsOrderingValue;
+import org.apache.hudi.common.model.OrderingValue;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * Factory class for {@link org.apache.hudi.common.model.OrderingValue}.
+ */
+public class OrderingValues {
+ private static final Comparable<?> DEFAULT_VALUE =
org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
+
+ /**
+ * Creates an {@code OrderingValue} with given values.
+ *
+ * <p>CAUTION: if the values are fetched through ordering fields,
+ * please use {@link #create(List, Function)} or {@link #create(String[],
Function)}
+ * as much as possible to avoid streaming on the ordering fields in row
level.</p>
+ */
+ public static Comparable create(Comparable[] orderingValues) {
+ if (orderingValues.length == 1) {
+ return orderingValues[0];
Review Comment:
@danny0405 I saw you originally had `SingleFieldOrderingValue` at one point.
That could make it easier to deal with the ordering values in all the new code
paths with FileGroupReader and BufferedRecord by using a return type of
`OrderingValue` instead of comparable so it is more clear what the value
represents. What was the reason for removing it?
##########
hudi-common/src/main/java/org/apache/hudi/common/model/MultipleFieldsOrderingValue.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.util.ValidationUtils;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+
+/**
+ * A wrapper class to manage multiple fields ordering values across Hudi.
+ */
+public class MultipleFieldsOrderingValue implements
OrderingValue<MultipleFieldsOrderingValue>, Serializable {
Review Comment:
Can you add some unit tests for this class?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -926,8 +927,12 @@ static RecordMergeMode
inferRecordMergeModeFromMergeStrategyId(String recordMerg
}
}
- public String getPreCombineField() {
- return getString(PRECOMBINE_FIELD);
+ public Option<List<String>> getPreCombineFields() {
Review Comment:
Should we use an empty list here as well to represent a table without an
ordering field?
##########
hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java:
##########
@@ -100,7 +100,7 @@ public static Stream<Arguments> orderingValueParams() {
{new String[] {"val1", "val2", "val3", null}},
{new Timestamp[] {new Timestamp(1690766971000L), new
Timestamp(1672536571000L)}},
// {new LocalDate[] {LocalDate.of(2023, 1, 1), LocalDate.of(1980,
7, 1)}} // HUDI-8854
- {new BigDecimal[] {new BigDecimal("12345678901234.2948"), new
BigDecimal("23456789012345.4856")}}
+ {new BigDecimal[] {new
BigDecimal("12345678901234.294800000000000"), new
BigDecimal("23456789012345.485600000000000")}}
Review Comment:
Why does this need to change?
##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -1659,6 +1666,11 @@ public static Comparable<?>
unwrapAvroValueWrapper(Object avroValueWrapper) {
return ((BytesWrapper) avroValueWrapper).getValue();
} else if (avroValueWrapper instanceof StringWrapper) {
return ((StringWrapper) avroValueWrapper).getValue();
+ } else if (avroValueWrapper instanceof ArrayWrapper) {
Review Comment:
This is not generic, it assumes all usages of ArrayWrapper is confined to
`OrderingValues`. If that is the case, should the model be named something to
indicate that instead of using a more generic name of ArrayWrapper?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -255,6 +257,16 @@ object HoodieDatasetBulkInsertHelper
}
}
+ private def getOrderingValue(nestedFieldPaths: List[NestedFieldPath], r:
InternalRow): Comparable[_] = {
+ if (nestedFieldPaths.isEmpty) {
+ EmptyOrderingValue.getInstance();
+ } else if (nestedFieldPaths.size == 1) {
+ getNestedInternalRowValue(r,
nestedFieldPaths.head).asInstanceOf[Comparable[_]]
+ } else {
+ new MultipleFieldsOrderingValue(nestedFieldPaths.map(_ =>
getNestedInternalRowValue(r,
nestedFieldPaths.head).asInstanceOf[Comparable[_]]).toArray)
Review Comment:
Instead of adding this logic here, can you just use `create` method which
has similar checks?
`OrderingValues.create(nestedFieldPaths.map(f =>
getNestedInternalRowValue(r, f).asInstanceOf[Comparable[_]]).toArray)`
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -1190,8 +1190,12 @@ public TableBuilder setBaseFileFormat(String
baseFileFormat) {
return this;
}
- public TableBuilder setPreCombineField(String preCombineField) {
- this.preCombineField = preCombineField;
+ /**
+ * Sets preCombine fields to be used by the table
Review Comment:
Add note that it is a comma separate list
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -142,6 +141,48 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
spark.read.format("org.apache.hudi").options(readOpts).load(basePath).count()
}
+ @Test
+ def testMultipleOrderingFields() {
+ val (writeOpts, readOpts) = getWriterReaderOpts(HoodieRecordType.AVRO)
+
+ // Insert Operation
+ var records = recordsToStrings(dataGen.generateInserts("002",
100)).asScala.toList
+ var inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+
+ val commonOptsWithMultipleOrderingFields = writeOpts ++ Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+ DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "timestamp,rider",
Review Comment:
Does the test assert on a case like timestamp is larger but rider is smaller?
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java:
##########
@@ -111,8 +112,11 @@ public static Option<JavaRDD<HoodieRecord>>
createHoodieRecords(HoodieStreamer.C
try {
HoodieKey hoodieKey = new
HoodieKey(builtinKeyGenerator.getRecordKey(genRec),
builtinKeyGenerator.getPartitionPath(genRec));
GenericRecord gr = isDropPartitionColumns(props) ?
HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec;
- HoodieRecordPayload payload = shouldUseOrderingField ?
DataSourceUtils.createPayload(payloadClassName, gr,
- (Comparable) HoodieAvroUtils.getNestedFieldVal(gr,
cfg.sourceOrderingField, false, useConsistentLogicalTimestamp))
+ Comparable orderingValue = shouldUseOrderingField
+ ?
OrderingValues.create(cfg.sourceOrderingFields.split(","),
+ field -> (Comparable)
HoodieAvroUtils.getNestedFieldVal(gr, field, false,
useConsistentLogicalTimestamp))
+ : null;
+ HoodieRecordPayload payload = shouldUseOrderingField ?
DataSourceUtils.createPayload(payloadClassName, gr, orderingValue)
Review Comment:
Can we swap to `HoodieRecordUtils.loadPayload` here while we're updating? It
caches the constructor lookup and has significantly higher throughput as a
result.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala:
##########
@@ -92,7 +92,7 @@ abstract class BaseMergeOnReadSnapshotRelation(sqlContext:
SQLContext,
* by the query), therefore saving on throughput
*/
protected lazy val mandatoryFieldsForMerging: Seq[String] =
- Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
+ Seq(recordKeyField) ++ preCombineFieldsOpt.orElse(List())
Review Comment:
Is there an equivalent of `Collections.emptyList` in scala? Seq.empty
perhaps?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -255,6 +257,16 @@ object HoodieDatasetBulkInsertHelper
}
}
+ private def getOrderingValue(nestedFieldPaths: List[NestedFieldPath], r:
InternalRow): Comparable[_] = {
+ if (nestedFieldPaths.isEmpty) {
+ EmptyOrderingValue.getInstance();
+ } else if (nestedFieldPaths.size == 1) {
+ getNestedInternalRowValue(r,
nestedFieldPaths.head).asInstanceOf[Comparable[_]]
+ } else {
+ new MultipleFieldsOrderingValue(nestedFieldPaths.map(_ =>
getNestedInternalRowValue(r,
nestedFieldPaths.head).asInstanceOf[Comparable[_]]).toArray)
Review Comment:
I think `nestedFieldPaths.head` will always return the first element instead
of iterating throught the list
##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java:
##########
@@ -380,21 +380,23 @@ private BiFunction<T, Schema, String>
virtualKeyExtractor(String[] recordKeyFiel
/**
* Gets the ordering value in particular type.
*
- * @param record An option of record.
- * @param schema The Avro schema of the record.
- * @param orderingFieldName name of the ordering field
+ * @param record An option of record.
+ * @param schema The Avro schema of the record.
+ * @param orderingFieldNames name of the ordering field
* @return The ordering value.
*/
public Comparable getOrderingValue(T record,
Schema schema,
- Option<String> orderingFieldName) {
- if (orderingFieldName.isEmpty()) {
- return DEFAULT_ORDERING_VALUE;
+ Option<List<String>> orderingFieldNames) {
Review Comment:
It was Option because there was not concept of a list of values for ordering
fields so we needed a way to indicate that there were no ordering fields set.
Now that it is a list, an empty list can represent the same.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala:
##########
@@ -127,9 +128,9 @@ class HoodieCatalogTable(val spark: SparkSession, var
table: CatalogTable) exten
lazy val primaryKeys: Array[String] =
tableConfig.getRecordKeyFields.orElse(Array.empty)
/**
- * PreCombine Field
+ * Comparables Field
*/
- lazy val preCombineKey: Option[String] =
Option(tableConfig.getPreCombineField)
+ lazy val preCombineKeys: Option[String] =
toScalaOption(tableConfig.getPreCombineFieldsAsString)
Review Comment:
Can we make this a list/seq of strings instead to ensure all users of this
are properly handling the possibility of multiple ordering fields?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]