This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 72ff9a7f0c9 [HUDI-7052] Fix partition key validation for custom key
generators. (#10014)
72ff9a7f0c9 is described below
commit 72ff9a7f0c9a7da12810669ca0111761ee7adcfe
Author: Rajesh Mahindra <[email protected]>
AuthorDate: Wed Nov 22 20:49:15 2023 -0800
[HUDI-7052] Fix partition key validation for custom key generators. (#10014)
---------
Co-authored-by: rmahindra123 <[email protected]>
---
.../AutoRecordGenWrapperAvroKeyGenerator.java | 27 +++++++++---
.../hudi/keygen/AutoRecordKeyGeneratorWrapper.java | 32 +++++++++++++++
.../keygen/AutoRecordGenWrapperKeyGenerator.java | 48 ++++++++++++++--------
.../org/apache/hudi/util/SparkKeyGenUtils.scala | 31 ++++++++------
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 4 +-
.../scala/org/apache/hudi/HoodieWriterUtils.scala | 5 ++-
.../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 2 +-
.../apache/hudi/functional/TestCOWDataSource.scala | 3 +-
.../deltastreamer/TestHoodieDeltaStreamer.java | 6 +--
9 files changed, 112 insertions(+), 46 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperAvroKeyGenerator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperAvroKeyGenerator.java
index a8ae48e1d67..8431180a2fe 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperAvroKeyGenerator.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperAvroKeyGenerator.java
@@ -43,24 +43,24 @@ import java.util.List;
* PartitionId refers to spark's partition Id.
* RowId refers to the row index within the spark partition.
*/
-public class AutoRecordGenWrapperAvroKeyGenerator extends BaseKeyGenerator {
+public class AutoRecordGenWrapperAvroKeyGenerator extends BaseKeyGenerator
implements AutoRecordKeyGeneratorWrapper {
private final BaseKeyGenerator keyGenerator;
- private final int partitionId;
- private final String instantTime;
+ private Integer partitionId;
+ private String instantTime;
private int rowId;
public AutoRecordGenWrapperAvroKeyGenerator(TypedProperties config,
BaseKeyGenerator keyGenerator) {
super(config);
this.keyGenerator = keyGenerator;
this.rowId = 0;
- this.partitionId =
config.getInteger(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG);
- this.instantTime =
config.getString(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG);
+ partitionId = null;
+ instantTime = null;
}
@Override
public String getRecordKey(GenericRecord record) {
- return HoodieRecord.generateSequenceId(instantTime, partitionId, rowId++);
+ return generateSequenceId(rowId++);
}
@Override
@@ -80,4 +80,19 @@ public class AutoRecordGenWrapperAvroKeyGenerator extends
BaseKeyGenerator {
public boolean isConsistentLogicalTimestampEnabled() {
return keyGenerator.isConsistentLogicalTimestampEnabled();
}
+
+ @Override
+ public BaseKeyGenerator getPartitionKeyGenerator() {
+ return keyGenerator;
+ }
+
+ private String generateSequenceId(long recordIndex) {
+ if (partitionId == null) {
+ this.partitionId =
config.getInteger(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG);
+ }
+ if (instantTime == null) {
+ this.instantTime =
config.getString(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG);
+ }
+ return HoodieRecord.generateSequenceId(instantTime, partitionId,
recordIndex);
+ }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGeneratorWrapper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGeneratorWrapper.java
new file mode 100644
index 00000000000..e136bc89cbb
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGeneratorWrapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+/**
+ * Interface for {@link KeyGenerator} implementations that
+ * generate a unique record key internally.
+ */
+public interface AutoRecordKeyGeneratorWrapper {
+
+ /**
+ * @returns the underlying key generator used for the partition path.
+ */
+ BaseKeyGenerator getPartitionKeyGenerator();
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperKeyGenerator.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperKeyGenerator.java
index ce767665a6f..5b8287c58d4 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperKeyGenerator.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperKeyGenerator.java
@@ -47,62 +47,76 @@ import java.util.List;
* PartitionId refers to spark's partition Id.
* RowId refers to the row index within the spark partition.
*/
-public class AutoRecordGenWrapperKeyGenerator extends BuiltinKeyGenerator {
+public class AutoRecordGenWrapperKeyGenerator extends BuiltinKeyGenerator
implements AutoRecordKeyGeneratorWrapper {
- private final BuiltinKeyGenerator builtinKeyGenerator;
- private final int partitionId;
- private final String instantTime;
+ private final BuiltinKeyGenerator keyGenerator;
+ private Integer partitionId;
+ private String instantTime;
private int rowId;
- public AutoRecordGenWrapperKeyGenerator(TypedProperties config,
BuiltinKeyGenerator builtinKeyGenerator) {
+ public AutoRecordGenWrapperKeyGenerator(TypedProperties config,
BuiltinKeyGenerator keyGenerator) {
super(config);
- this.builtinKeyGenerator = builtinKeyGenerator;
+ this.keyGenerator = keyGenerator;
this.rowId = 0;
- this.partitionId =
config.getInteger(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG);
- this.instantTime =
config.getString(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG);
+ partitionId = null;
+ instantTime = null;
}
@Override
public String getRecordKey(GenericRecord record) {
- return HoodieRecord.generateSequenceId(instantTime, partitionId, rowId++);
+ return generateSequenceId(rowId++);
}
@Override
public String getPartitionPath(GenericRecord record) {
- return builtinKeyGenerator.getPartitionPath(record);
+ return keyGenerator.getPartitionPath(record);
}
@Override
public String getRecordKey(Row row) {
- return HoodieRecord.generateSequenceId(instantTime, partitionId, rowId++);
+ return generateSequenceId(rowId++);
}
@Override
public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
- return UTF8String.fromString(HoodieRecord.generateSequenceId(instantTime,
partitionId, rowId++));
+ return UTF8String.fromString(generateSequenceId(rowId++));
}
@Override
public String getPartitionPath(Row row) {
- return builtinKeyGenerator.getPartitionPath(row);
+ return keyGenerator.getPartitionPath(row);
}
@Override
public UTF8String getPartitionPath(InternalRow internalRow, StructType
schema) {
- return builtinKeyGenerator.getPartitionPath(internalRow, schema);
+ return keyGenerator.getPartitionPath(internalRow, schema);
}
@Override
public List<String> getRecordKeyFieldNames() {
- return builtinKeyGenerator.getRecordKeyFieldNames();
+ return keyGenerator.getRecordKeyFieldNames();
}
public List<String> getPartitionPathFields() {
- return builtinKeyGenerator.getPartitionPathFields();
+ return keyGenerator.getPartitionPathFields();
}
public boolean isConsistentLogicalTimestampEnabled() {
- return builtinKeyGenerator.isConsistentLogicalTimestampEnabled();
+ return keyGenerator.isConsistentLogicalTimestampEnabled();
}
+ @Override
+ public BuiltinKeyGenerator getPartitionKeyGenerator() {
+ return keyGenerator;
+ }
+
+ private String generateSequenceId(long recordIndex) {
+ if (partitionId == null) {
+ this.partitionId =
config.getInteger(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG);
+ }
+ if (instantTime == null) {
+ this.instantTime =
config.getString(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG);
+ }
+ return HoodieRecord.generateSequenceId(instantTime, partitionId,
recordIndex);
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
index 932fa0096cf..7b91ae5a728 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
@@ -21,11 +21,8 @@ import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.common.util.ValidationUtils.checkArgument
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
-import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator,
CustomKeyGenerator, GlobalAvroDeleteKeyGenerator, GlobalDeleteKeyGenerator,
KeyGenerator, NonpartitionedAvroKeyGenerator, NonpartitionedKeyGenerator,
SimpleKeyGenerator}
+import org.apache.hudi.keygen.{AutoRecordKeyGeneratorWrapper,
AutoRecordGenWrapperKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator,
GlobalAvroDeleteKeyGenerator, GlobalDeleteKeyGenerator, KeyGenerator,
NonpartitionedAvroKeyGenerator, NonpartitionedKeyGenerator}
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
-import
org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName
-
-import scala.collection.JavaConverters._
object SparkKeyGenUtils {
@@ -34,26 +31,34 @@ object SparkKeyGenUtils {
* @return partition columns
*/
def getPartitionColumns(props: TypedProperties): String = {
- val keyGeneratorClass = getKeyGeneratorClassName(props)
- getPartitionColumns(keyGeneratorClass, props)
+ val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
+ getPartitionColumns(keyGenerator, props)
}
/**
* @param keyGen key generator class name
* @return partition columns
*/
- def getPartitionColumns(keyGenClass: String, typedProperties:
TypedProperties): String = {
+ def getPartitionColumns(keyGenClass: KeyGenerator, typedProperties:
TypedProperties): String = {
+ // For {@link AutoRecordGenWrapperKeyGenerator} or {@link
AutoRecordGenWrapperAvroKeyGenerator},
+ // get the base key generator for the partition paths
+ var baseKeyGen = keyGenClass match {
+ case autoRecordKeyGenerator: AutoRecordKeyGeneratorWrapper =>
+ autoRecordKeyGenerator.getPartitionKeyGenerator
+ case _ => keyGenClass
+ }
+
// For CustomKeyGenerator and CustomAvroKeyGenerator, the partition path
filed format
// is: "field_name: field_type", we extract the field_name from the
partition path field.
- if (keyGenClass.equals(classOf[CustomKeyGenerator].getCanonicalName) ||
keyGenClass.equals(classOf[CustomAvroKeyGenerator].getCanonicalName)) {
+ if (baseKeyGen.isInstanceOf[CustomKeyGenerator] ||
baseKeyGen.isInstanceOf[CustomAvroKeyGenerator]) {
typedProperties.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
.split(",").map(pathField => {
pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX)
- .headOption.getOrElse(s"Illegal partition path field format:
'$pathField' for ${keyGenClass}")}).mkString(",")
- } else if
(keyGenClass.equals(classOf[NonpartitionedKeyGenerator].getCanonicalName)
- ||
keyGenClass.equals(classOf[NonpartitionedAvroKeyGenerator].getCanonicalName)
- || keyGenClass.equals(classOf[GlobalDeleteKeyGenerator].getCanonicalName)
- ||
keyGenClass.equals(classOf[GlobalAvroDeleteKeyGenerator].getCanonicalName)) {
+ .headOption.getOrElse(s"Illegal partition path field format:
'$pathField' for ${baseKeyGen}")}).mkString(",")
+ } else if (baseKeyGen.isInstanceOf[NonpartitionedKeyGenerator]
+ || baseKeyGen.isInstanceOf[NonpartitionedAvroKeyGenerator]
+ || baseKeyGen.isInstanceOf[GlobalDeleteKeyGenerator]
+ || baseKeyGen.isInstanceOf[GlobalAvroDeleteKeyGenerator]) {
StringUtils.EMPTY_STRING
} else {
checkArgument(typedProperties.containsKey(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()),
"Partition path needs to be set")
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 01a73cd0816..fda3156740d 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -268,14 +268,14 @@ class HoodieSparkSqlWriterInternal {
}
}
+ val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(new
TypedProperties(hoodieConfig.getProps))
if (mode == SaveMode.Ignore && tableExists) {
log.warn(s"hoodie table at $basePath already exists. Ignoring & not
performing actual writes.")
(false, common.util.Option.empty(), common.util.Option.empty(),
common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
} else {
// Handle various save modes
handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig,
tblName, operation, fs)
- val partitionColumns =
SparkKeyGenUtils.getPartitionColumns(getKeyGeneratorClassName(new
TypedProperties(hoodieConfig.getProps)),
- toProperties(parameters))
+ val partitionColumns =
SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters))
val timelineTimeZone =
HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))
val tableMetaClient = if (tableExists) {
HoodieInstantTimeGenerator.setCommitTimeZone(timelineTimeZone)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index e2c5ad88d7f..133f641d280 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -203,10 +203,11 @@ object HoodieWriterUtils {
}
val datasourcePartitionFields =
params.getOrElse(PARTITIONPATH_FIELD.key(), null)
+ val currentPartitionFields = if (datasourcePartitionFields == null)
null else SparkKeyGenUtils.getPartitionColumns(TypedProperties.fromMap(params))
val tableConfigPartitionFields =
tableConfig.getString(HoodieTableConfig.PARTITION_FIELDS)
if (null != datasourcePartitionFields && null !=
tableConfigPartitionFields
- && datasourcePartitionFields != tableConfigPartitionFields) {
-
diffConfigs.append(s"PartitionPath:\t$datasourcePartitionFields\t$tableConfigPartitionFields\n")
+ && currentPartitionFields != tableConfigPartitionFields) {
+
diffConfigs.append(s"PartitionPath:\t$currentPartitionFields\t$tableConfigPartitionFields\n")
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 7f89817a7f8..865ca147eb0 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -470,7 +470,7 @@ class TestHoodieSparkSqlWriter {
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
// try write to Hudi
- assertThrows[IllegalArgumentException] {
+ assertThrows[IOException] {
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, tableOpts -
DataSourceWriteOptions.PARTITIONPATH_FIELD.key, df)
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 02c9b90e75a..e2c719e8782 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -1001,8 +1001,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
writer.save(basePath)
fail("should fail when invalid PartitionKeyType is provided!")
} catch {
- case e: Exception =>
- assertTrue(e.getCause.getMessage.contains("No enum constant
org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType.DUMMY"))
+ case e: Exception => assertTrue(e.getCause.getMessage.contains("Unable
to instantiate class org.apache.hudi.keygen.CustomKeyGenerator"))
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index f73ee936b60..67bf90a9853 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -379,7 +379,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
@Test
public void testPropsWithInvalidKeyGenerator() {
- Exception e = assertThrows(IllegalArgumentException.class, () -> {
+ Exception e = assertThrows(IOException.class, () -> {
String tableBasePath = basePath + "/test_table_invalid_key_gen";
HoodieDeltaStreamer deltaStreamer =
new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath,
WriteOperationType.BULK_INSERT,
@@ -387,8 +387,8 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
deltaStreamer.sync();
}, "Should error out when setting the key generator class property to an
invalid value");
// expected
- LOG.debug("Expected error during getting the key generator", e);
- assertTrue(e.getMessage().contains("No KeyGeneratorType found for class
name"));
+ LOG.warn("Expected error during getting the key generator", e);
+ assertTrue(e.getMessage().contains("Could not load key generator class
invalid"));
}
private static Stream<Arguments> provideInferKeyGenArgs() {