This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 72ff9a7f0c9 [HUDI-7052] Fix partition key validation for custom key generators. (#10014) 72ff9a7f0c9 is described below commit 72ff9a7f0c9a7da12810669ca0111761ee7adcfe Author: Rajesh Mahindra <76502047+rmahindra...@users.noreply.github.com> AuthorDate: Wed Nov 22 20:49:15 2023 -0800 [HUDI-7052] Fix partition key validation for custom key generators. (#10014) --------- Co-authored-by: rmahindra123 <rmahindra@Rajeshs-MacBook-Pro.local> --- .../AutoRecordGenWrapperAvroKeyGenerator.java | 27 +++++++++--- .../hudi/keygen/AutoRecordKeyGeneratorWrapper.java | 32 +++++++++++++++ .../keygen/AutoRecordGenWrapperKeyGenerator.java | 48 ++++++++++++++-------- .../org/apache/hudi/util/SparkKeyGenUtils.scala | 31 ++++++++------ .../org/apache/hudi/HoodieSparkSqlWriter.scala | 4 +- .../scala/org/apache/hudi/HoodieWriterUtils.scala | 5 ++- .../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 2 +- .../apache/hudi/functional/TestCOWDataSource.scala | 3 +- .../deltastreamer/TestHoodieDeltaStreamer.java | 6 +-- 9 files changed, 112 insertions(+), 46 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperAvroKeyGenerator.java index a8ae48e1d67..8431180a2fe 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperAvroKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperAvroKeyGenerator.java @@ -43,24 +43,24 @@ import java.util.List; * PartitionId refers to spark's partition Id. * RowId refers to the row index within the spark partition. */ -public class AutoRecordGenWrapperAvroKeyGenerator extends BaseKeyGenerator { +public class AutoRecordGenWrapperAvroKeyGenerator extends BaseKeyGenerator implements AutoRecordKeyGeneratorWrapper { private final BaseKeyGenerator keyGenerator; - private final int partitionId; - private final String instantTime; + private Integer partitionId; + private String instantTime; private int rowId; public AutoRecordGenWrapperAvroKeyGenerator(TypedProperties config, BaseKeyGenerator keyGenerator) { super(config); this.keyGenerator = keyGenerator; this.rowId = 0; - this.partitionId = config.getInteger(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG); - this.instantTime = config.getString(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG); + partitionId = null; + instantTime = null; } @Override public String getRecordKey(GenericRecord record) { - return HoodieRecord.generateSequenceId(instantTime, partitionId, rowId++); + return generateSequenceId(rowId++); } @Override @@ -80,4 +80,19 @@ public class AutoRecordGenWrapperAvroKeyGenerator extends BaseKeyGenerator { public boolean isConsistentLogicalTimestampEnabled() { return keyGenerator.isConsistentLogicalTimestampEnabled(); } + + @Override + public BaseKeyGenerator getPartitionKeyGenerator() { + return keyGenerator; + } + + private String generateSequenceId(long recordIndex) { + if (partitionId == null) { + this.partitionId = config.getInteger(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG); + } + if (instantTime == null) { + this.instantTime = config.getString(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG); + } + return HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex); + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGeneratorWrapper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGeneratorWrapper.java new file mode 100644 index 00000000000..e136bc89cbb --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGeneratorWrapper.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.keygen; + +/** + * Interface for {@link KeyGenerator} implementations that + * generate a unique record key internally. + */ +public interface AutoRecordKeyGeneratorWrapper { + + /** + * @returns the underlying key generator used for the partition path. + */ + BaseKeyGenerator getPartitionKeyGenerator(); +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperKeyGenerator.java index ce767665a6f..5b8287c58d4 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperKeyGenerator.java @@ -47,62 +47,76 @@ import java.util.List; * PartitionId refers to spark's partition Id. * RowId refers to the row index within the spark partition. */ -public class AutoRecordGenWrapperKeyGenerator extends BuiltinKeyGenerator { +public class AutoRecordGenWrapperKeyGenerator extends BuiltinKeyGenerator implements AutoRecordKeyGeneratorWrapper { - private final BuiltinKeyGenerator builtinKeyGenerator; - private final int partitionId; - private final String instantTime; + private final BuiltinKeyGenerator keyGenerator; + private Integer partitionId; + private String instantTime; private int rowId; - public AutoRecordGenWrapperKeyGenerator(TypedProperties config, BuiltinKeyGenerator builtinKeyGenerator) { + public AutoRecordGenWrapperKeyGenerator(TypedProperties config, BuiltinKeyGenerator keyGenerator) { super(config); - this.builtinKeyGenerator = builtinKeyGenerator; + this.keyGenerator = keyGenerator; this.rowId = 0; - this.partitionId = config.getInteger(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG); - this.instantTime = config.getString(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG); + partitionId = null; + instantTime = null; } @Override public String getRecordKey(GenericRecord record) { - return HoodieRecord.generateSequenceId(instantTime, partitionId, rowId++); + return generateSequenceId(rowId++); } @Override public String getPartitionPath(GenericRecord record) { - return builtinKeyGenerator.getPartitionPath(record); + return keyGenerator.getPartitionPath(record); } @Override public String getRecordKey(Row row) { - return HoodieRecord.generateSequenceId(instantTime, partitionId, rowId++); + return generateSequenceId(rowId++); } @Override public UTF8String getRecordKey(InternalRow internalRow, StructType schema) { - return UTF8String.fromString(HoodieRecord.generateSequenceId(instantTime, partitionId, rowId++)); + return UTF8String.fromString(generateSequenceId(rowId++)); } @Override public String getPartitionPath(Row row) { - return builtinKeyGenerator.getPartitionPath(row); + return keyGenerator.getPartitionPath(row); } @Override public UTF8String getPartitionPath(InternalRow internalRow, StructType schema) { - return builtinKeyGenerator.getPartitionPath(internalRow, schema); + return keyGenerator.getPartitionPath(internalRow, schema); } @Override public List<String> getRecordKeyFieldNames() { - return builtinKeyGenerator.getRecordKeyFieldNames(); + return keyGenerator.getRecordKeyFieldNames(); } public List<String> getPartitionPathFields() { - return builtinKeyGenerator.getPartitionPathFields(); + return keyGenerator.getPartitionPathFields(); } public boolean isConsistentLogicalTimestampEnabled() { - return builtinKeyGenerator.isConsistentLogicalTimestampEnabled(); + return keyGenerator.isConsistentLogicalTimestampEnabled(); } + @Override + public BuiltinKeyGenerator getPartitionKeyGenerator() { + return keyGenerator; + } + + private String generateSequenceId(long recordIndex) { + if (partitionId == null) { + this.partitionId = config.getInteger(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG); + } + if (instantTime == null) { + this.instantTime = config.getString(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG); + } + return HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex); + } } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala index 932fa0096cf..7b91ae5a728 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala @@ -21,11 +21,8 @@ import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.util.StringUtils import org.apache.hudi.common.util.ValidationUtils.checkArgument import org.apache.hudi.keygen.constant.KeyGeneratorOptions -import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, GlobalAvroDeleteKeyGenerator, GlobalDeleteKeyGenerator, KeyGenerator, NonpartitionedAvroKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator} +import org.apache.hudi.keygen.{AutoRecordKeyGeneratorWrapper, AutoRecordGenWrapperKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, GlobalAvroDeleteKeyGenerator, GlobalDeleteKeyGenerator, KeyGenerator, NonpartitionedAvroKeyGenerator, NonpartitionedKeyGenerator} import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory -import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName - -import scala.collection.JavaConverters._ object SparkKeyGenUtils { @@ -34,26 +31,34 @@ object SparkKeyGenUtils { * @return partition columns */ def getPartitionColumns(props: TypedProperties): String = { - val keyGeneratorClass = getKeyGeneratorClassName(props) - getPartitionColumns(keyGeneratorClass, props) + val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props) + getPartitionColumns(keyGenerator, props) } /** * @param keyGen key generator class name * @return partition columns */ - def getPartitionColumns(keyGenClass: String, typedProperties: TypedProperties): String = { + def getPartitionColumns(keyGenClass: KeyGenerator, typedProperties: TypedProperties): String = { + // For {@link AutoRecordGenWrapperKeyGenerator} or {@link AutoRecordGenWrapperAvroKeyGenerator}, + // get the base key generator for the partition paths + var baseKeyGen = keyGenClass match { + case autoRecordKeyGenerator: AutoRecordKeyGeneratorWrapper => + autoRecordKeyGenerator.getPartitionKeyGenerator + case _ => keyGenClass + } + // For CustomKeyGenerator and CustomAvroKeyGenerator, the partition path filed format // is: "field_name: field_type", we extract the field_name from the partition path field. - if (keyGenClass.equals(classOf[CustomKeyGenerator].getCanonicalName) || keyGenClass.equals(classOf[CustomAvroKeyGenerator].getCanonicalName)) { + if (baseKeyGen.isInstanceOf[CustomKeyGenerator] || baseKeyGen.isInstanceOf[CustomAvroKeyGenerator]) { typedProperties.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()) .split(",").map(pathField => { pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX) - .headOption.getOrElse(s"Illegal partition path field format: '$pathField' for ${keyGenClass}")}).mkString(",") - } else if (keyGenClass.equals(classOf[NonpartitionedKeyGenerator].getCanonicalName) - || keyGenClass.equals(classOf[NonpartitionedAvroKeyGenerator].getCanonicalName) - || keyGenClass.equals(classOf[GlobalDeleteKeyGenerator].getCanonicalName) - || keyGenClass.equals(classOf[GlobalAvroDeleteKeyGenerator].getCanonicalName)) { + .headOption.getOrElse(s"Illegal partition path field format: '$pathField' for ${baseKeyGen}")}).mkString(",") + } else if (baseKeyGen.isInstanceOf[NonpartitionedKeyGenerator] + || baseKeyGen.isInstanceOf[NonpartitionedAvroKeyGenerator] + || baseKeyGen.isInstanceOf[GlobalDeleteKeyGenerator] + || baseKeyGen.isInstanceOf[GlobalAvroDeleteKeyGenerator]) { StringUtils.EMPTY_STRING } else { checkArgument(typedProperties.containsKey(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()), "Partition path needs to be set") diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 01a73cd0816..fda3156740d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -268,14 +268,14 @@ class HoodieSparkSqlWriterInternal { } } + val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(hoodieConfig.getProps)) if (mode == SaveMode.Ignore && tableExists) { log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.") (false, common.util.Option.empty(), common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig) } else { // Handle various save modes handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tblName, operation, fs) - val partitionColumns = SparkKeyGenUtils.getPartitionColumns(getKeyGeneratorClassName(new TypedProperties(hoodieConfig.getProps)), - toProperties(parameters)) + val partitionColumns = SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters)) val timelineTimeZone = HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)) val tableMetaClient = if (tableExists) { HoodieInstantTimeGenerator.setCommitTimeZone(timelineTimeZone) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index e2c5ad88d7f..133f641d280 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -203,10 +203,11 @@ object HoodieWriterUtils { } val datasourcePartitionFields = params.getOrElse(PARTITIONPATH_FIELD.key(), null) + val currentPartitionFields = if (datasourcePartitionFields == null) null else SparkKeyGenUtils.getPartitionColumns(TypedProperties.fromMap(params)) val tableConfigPartitionFields = tableConfig.getString(HoodieTableConfig.PARTITION_FIELDS) if (null != datasourcePartitionFields && null != tableConfigPartitionFields - && datasourcePartitionFields != tableConfigPartitionFields) { - diffConfigs.append(s"PartitionPath:\t$datasourcePartitionFields\t$tableConfigPartitionFields\n") + && currentPartitionFields != tableConfigPartitionFields) { + diffConfigs.append(s"PartitionPath:\t$currentPartitionFields\t$tableConfigPartitionFields\n") } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index 7f89817a7f8..865ca147eb0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -470,7 +470,7 @@ class TestHoodieSparkSqlWriter { val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType) // try write to Hudi - assertThrows[IllegalArgumentException] { + assertThrows[IOException] { HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, tableOpts - DataSourceWriteOptions.PARTITIONPATH_FIELD.key, df) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 02c9b90e75a..e2c719e8782 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -1001,8 +1001,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup writer.save(basePath) fail("should fail when invalid PartitionKeyType is provided!") } catch { - case e: Exception => - assertTrue(e.getCause.getMessage.contains("No enum constant org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType.DUMMY")) + case e: Exception => assertTrue(e.getCause.getMessage.contains("Unable to instantiate class org.apache.hudi.keygen.CustomKeyGenerator")) } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index f73ee936b60..67bf90a9853 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -379,7 +379,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { @Test public void testPropsWithInvalidKeyGenerator() { - Exception e = assertThrows(IllegalArgumentException.class, () -> { + Exception e = assertThrows(IOException.class, () -> { String tableBasePath = basePath + "/test_table_invalid_key_gen"; HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT, @@ -387,8 +387,8 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { deltaStreamer.sync(); }, "Should error out when setting the key generator class property to an invalid value"); // expected - LOG.debug("Expected error during getting the key generator", e); - assertTrue(e.getMessage().contains("No KeyGeneratorType found for class name")); + LOG.warn("Expected error during getting the key generator", e); + assertTrue(e.getMessage().contains("Could not load key generator class invalid")); } private static Stream<Arguments> provideInferKeyGenArgs() {