This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 72ff9a7f0c9 [HUDI-7052] Fix partition key validation for custom key 
generators. (#10014)
72ff9a7f0c9 is described below

commit 72ff9a7f0c9a7da12810669ca0111761ee7adcfe
Author: Rajesh Mahindra <76502047+rmahindra...@users.noreply.github.com>
AuthorDate: Wed Nov 22 20:49:15 2023 -0800

    [HUDI-7052] Fix partition key validation for custom key generators. (#10014)
    
    
    ---------
    
    Co-authored-by: rmahindra123 <rmahindra@Rajeshs-MacBook-Pro.local>
---
 .../AutoRecordGenWrapperAvroKeyGenerator.java      | 27 +++++++++---
 .../hudi/keygen/AutoRecordKeyGeneratorWrapper.java | 32 +++++++++++++++
 .../keygen/AutoRecordGenWrapperKeyGenerator.java   | 48 ++++++++++++++--------
 .../org/apache/hudi/util/SparkKeyGenUtils.scala    | 31 ++++++++------
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  4 +-
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  |  5 ++-
 .../org/apache/hudi/TestHoodieSparkSqlWriter.scala |  2 +-
 .../apache/hudi/functional/TestCOWDataSource.scala |  3 +-
 .../deltastreamer/TestHoodieDeltaStreamer.java     |  6 +--
 9 files changed, 112 insertions(+), 46 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperAvroKeyGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperAvroKeyGenerator.java
index a8ae48e1d67..8431180a2fe 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperAvroKeyGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperAvroKeyGenerator.java
@@ -43,24 +43,24 @@ import java.util.List;
  * PartitionId refers to spark's partition Id.
  * RowId refers to the row index within the spark partition.
  */
-public class AutoRecordGenWrapperAvroKeyGenerator extends BaseKeyGenerator {
+public class AutoRecordGenWrapperAvroKeyGenerator extends BaseKeyGenerator 
implements AutoRecordKeyGeneratorWrapper {
 
   private final BaseKeyGenerator keyGenerator;
-  private final int partitionId;
-  private final String instantTime;
+  private Integer partitionId;
+  private String instantTime;
   private int rowId;
 
   public AutoRecordGenWrapperAvroKeyGenerator(TypedProperties config, 
BaseKeyGenerator keyGenerator) {
     super(config);
     this.keyGenerator = keyGenerator;
     this.rowId = 0;
-    this.partitionId = 
config.getInteger(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG);
-    this.instantTime = 
config.getString(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG);
+    partitionId = null;
+    instantTime = null;
   }
 
   @Override
   public String getRecordKey(GenericRecord record) {
-    return HoodieRecord.generateSequenceId(instantTime, partitionId, rowId++);
+    return generateSequenceId(rowId++);
   }
 
   @Override
@@ -80,4 +80,19 @@ public class AutoRecordGenWrapperAvroKeyGenerator extends 
BaseKeyGenerator {
   public boolean isConsistentLogicalTimestampEnabled() {
     return keyGenerator.isConsistentLogicalTimestampEnabled();
   }
+
+  @Override
+  public BaseKeyGenerator getPartitionKeyGenerator() {
+    return keyGenerator;
+  }
+
+  private String generateSequenceId(long recordIndex) {
+    if (partitionId == null) {
+      this.partitionId = 
config.getInteger(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG);
+    }
+    if (instantTime == null) {
+      this.instantTime = 
config.getString(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG);
+    }
+    return HoodieRecord.generateSequenceId(instantTime, partitionId, 
recordIndex);
+  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGeneratorWrapper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGeneratorWrapper.java
new file mode 100644
index 00000000000..e136bc89cbb
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGeneratorWrapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+/**
+ * Interface for {@link KeyGenerator} implementations that
+ * generate a unique record key internally.
+ */
+public interface AutoRecordKeyGeneratorWrapper {
+
+  /**
+   * @returns the underlying key generator used for the partition path.
+   */
+  BaseKeyGenerator getPartitionKeyGenerator();
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperKeyGenerator.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperKeyGenerator.java
index ce767665a6f..5b8287c58d4 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperKeyGenerator.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperKeyGenerator.java
@@ -47,62 +47,76 @@ import java.util.List;
  * PartitionId refers to spark's partition Id.
  * RowId refers to the row index within the spark partition.
  */
-public class AutoRecordGenWrapperKeyGenerator extends BuiltinKeyGenerator {
+public class AutoRecordGenWrapperKeyGenerator extends BuiltinKeyGenerator 
implements AutoRecordKeyGeneratorWrapper {
 
-  private final BuiltinKeyGenerator builtinKeyGenerator;
-  private final int partitionId;
-  private final String instantTime;
+  private final BuiltinKeyGenerator keyGenerator;
+  private Integer partitionId;
+  private String instantTime;
   private int rowId;
 
-  public AutoRecordGenWrapperKeyGenerator(TypedProperties config, 
BuiltinKeyGenerator builtinKeyGenerator) {
+  public AutoRecordGenWrapperKeyGenerator(TypedProperties config, 
BuiltinKeyGenerator keyGenerator) {
     super(config);
-    this.builtinKeyGenerator = builtinKeyGenerator;
+    this.keyGenerator = keyGenerator;
     this.rowId = 0;
-    this.partitionId = 
config.getInteger(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG);
-    this.instantTime = 
config.getString(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG);
+    partitionId = null;
+    instantTime = null;
   }
 
   @Override
   public String getRecordKey(GenericRecord record) {
-    return HoodieRecord.generateSequenceId(instantTime, partitionId, rowId++);
+    return generateSequenceId(rowId++);
   }
 
   @Override
   public String getPartitionPath(GenericRecord record) {
-    return builtinKeyGenerator.getPartitionPath(record);
+    return keyGenerator.getPartitionPath(record);
   }
 
   @Override
   public String getRecordKey(Row row) {
-    return HoodieRecord.generateSequenceId(instantTime, partitionId, rowId++);
+    return generateSequenceId(rowId++);
   }
 
   @Override
   public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
-    return UTF8String.fromString(HoodieRecord.generateSequenceId(instantTime, 
partitionId, rowId++));
+    return UTF8String.fromString(generateSequenceId(rowId++));
   }
 
   @Override
   public String getPartitionPath(Row row) {
-    return builtinKeyGenerator.getPartitionPath(row);
+    return keyGenerator.getPartitionPath(row);
   }
 
   @Override
   public UTF8String getPartitionPath(InternalRow internalRow, StructType 
schema) {
-    return builtinKeyGenerator.getPartitionPath(internalRow, schema);
+    return keyGenerator.getPartitionPath(internalRow, schema);
   }
 
   @Override
   public List<String> getRecordKeyFieldNames() {
-    return builtinKeyGenerator.getRecordKeyFieldNames();
+    return keyGenerator.getRecordKeyFieldNames();
   }
 
   public List<String> getPartitionPathFields() {
-    return builtinKeyGenerator.getPartitionPathFields();
+    return keyGenerator.getPartitionPathFields();
   }
 
   public boolean isConsistentLogicalTimestampEnabled() {
-    return builtinKeyGenerator.isConsistentLogicalTimestampEnabled();
+    return keyGenerator.isConsistentLogicalTimestampEnabled();
   }
 
+  @Override
+  public BuiltinKeyGenerator getPartitionKeyGenerator() {
+    return keyGenerator;
+  }
+
+  private String generateSequenceId(long recordIndex) {
+    if (partitionId == null) {
+      this.partitionId = 
config.getInteger(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG);
+    }
+    if (instantTime == null) {
+      this.instantTime = 
config.getString(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG);
+    }
+    return HoodieRecord.generateSequenceId(instantTime, partitionId, 
recordIndex);
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
index 932fa0096cf..7b91ae5a728 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
@@ -21,11 +21,8 @@ import org.apache.hudi.common.config.TypedProperties
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.common.util.ValidationUtils.checkArgument
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
-import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, 
CustomKeyGenerator, GlobalAvroDeleteKeyGenerator, GlobalDeleteKeyGenerator, 
KeyGenerator, NonpartitionedAvroKeyGenerator, NonpartitionedKeyGenerator, 
SimpleKeyGenerator}
+import org.apache.hudi.keygen.{AutoRecordKeyGeneratorWrapper, 
AutoRecordGenWrapperKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, 
GlobalAvroDeleteKeyGenerator, GlobalDeleteKeyGenerator, KeyGenerator, 
NonpartitionedAvroKeyGenerator, NonpartitionedKeyGenerator}
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
-import 
org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName
-
-import scala.collection.JavaConverters._
 
 object SparkKeyGenUtils {
 
@@ -34,26 +31,34 @@ object SparkKeyGenUtils {
    * @return partition columns
    */
   def getPartitionColumns(props: TypedProperties): String = {
-    val keyGeneratorClass = getKeyGeneratorClassName(props)
-    getPartitionColumns(keyGeneratorClass, props)
+    val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
+    getPartitionColumns(keyGenerator, props)
   }
 
   /**
    * @param keyGen key generator class name
    * @return partition columns
    */
-  def getPartitionColumns(keyGenClass: String, typedProperties: 
TypedProperties): String = {
+  def getPartitionColumns(keyGenClass: KeyGenerator, typedProperties: 
TypedProperties): String = {
+    // For {@link AutoRecordGenWrapperKeyGenerator} or {@link 
AutoRecordGenWrapperAvroKeyGenerator},
+    // get the base key generator for the partition paths
+    var baseKeyGen = keyGenClass match {
+      case autoRecordKeyGenerator: AutoRecordKeyGeneratorWrapper =>
+        autoRecordKeyGenerator.getPartitionKeyGenerator
+      case _ => keyGenClass
+    }
+
     // For CustomKeyGenerator and CustomAvroKeyGenerator, the partition path 
filed format
     // is: "field_name: field_type", we extract the field_name from the 
partition path field.
-    if (keyGenClass.equals(classOf[CustomKeyGenerator].getCanonicalName) || 
keyGenClass.equals(classOf[CustomAvroKeyGenerator].getCanonicalName)) {
+    if (baseKeyGen.isInstanceOf[CustomKeyGenerator] || 
baseKeyGen.isInstanceOf[CustomAvroKeyGenerator]) {
       
typedProperties.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
         .split(",").map(pathField => {
         pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX)
-          .headOption.getOrElse(s"Illegal partition path field format: 
'$pathField' for ${keyGenClass}")}).mkString(",")
-    } else if 
(keyGenClass.equals(classOf[NonpartitionedKeyGenerator].getCanonicalName)
-      || 
keyGenClass.equals(classOf[NonpartitionedAvroKeyGenerator].getCanonicalName)
-      || keyGenClass.equals(classOf[GlobalDeleteKeyGenerator].getCanonicalName)
-      || 
keyGenClass.equals(classOf[GlobalAvroDeleteKeyGenerator].getCanonicalName)) {
+          .headOption.getOrElse(s"Illegal partition path field format: 
'$pathField' for ${baseKeyGen}")}).mkString(",")
+    } else if (baseKeyGen.isInstanceOf[NonpartitionedKeyGenerator]
+      || baseKeyGen.isInstanceOf[NonpartitionedAvroKeyGenerator]
+      || baseKeyGen.isInstanceOf[GlobalDeleteKeyGenerator]
+      || baseKeyGen.isInstanceOf[GlobalAvroDeleteKeyGenerator]) {
       StringUtils.EMPTY_STRING
     } else {
       
checkArgument(typedProperties.containsKey(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()),
 "Partition path needs to be set")
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 01a73cd0816..fda3156740d 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -268,14 +268,14 @@ class HoodieSparkSqlWriterInternal {
       }
     }
 
+    val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(new 
TypedProperties(hoodieConfig.getProps))
     if (mode == SaveMode.Ignore && tableExists) {
       log.warn(s"hoodie table at $basePath already exists. Ignoring & not 
performing actual writes.")
       (false, common.util.Option.empty(), common.util.Option.empty(), 
common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
     } else {
       // Handle various save modes
       handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, 
tblName, operation, fs)
-      val partitionColumns = 
SparkKeyGenUtils.getPartitionColumns(getKeyGeneratorClassName(new 
TypedProperties(hoodieConfig.getProps)),
-        toProperties(parameters))
+      val partitionColumns = 
SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters))
       val timelineTimeZone = 
HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))
       val tableMetaClient = if (tableExists) {
         HoodieInstantTimeGenerator.setCommitTimeZone(timelineTimeZone)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index e2c5ad88d7f..133f641d280 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -203,10 +203,11 @@ object HoodieWriterUtils {
         }
 
         val datasourcePartitionFields = 
params.getOrElse(PARTITIONPATH_FIELD.key(), null)
+        val currentPartitionFields = if (datasourcePartitionFields == null) 
null else SparkKeyGenUtils.getPartitionColumns(TypedProperties.fromMap(params))
         val tableConfigPartitionFields = 
tableConfig.getString(HoodieTableConfig.PARTITION_FIELDS)
         if (null != datasourcePartitionFields && null != 
tableConfigPartitionFields
-          && datasourcePartitionFields != tableConfigPartitionFields) {
-          
diffConfigs.append(s"PartitionPath:\t$datasourcePartitionFields\t$tableConfigPartitionFields\n")
+          && currentPartitionFields != tableConfigPartitionFields) {
+          
diffConfigs.append(s"PartitionPath:\t$currentPartitionFields\t$tableConfigPartitionFields\n")
         }
       }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 7f89817a7f8..865ca147eb0 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -470,7 +470,7 @@ class TestHoodieSparkSqlWriter {
     val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
 
     // try write to Hudi
-    assertThrows[IllegalArgumentException] {
+    assertThrows[IOException] {
       HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, tableOpts - 
DataSourceWriteOptions.PARTITIONPATH_FIELD.key, df)
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 02c9b90e75a..e2c719e8782 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -1001,8 +1001,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
       writer.save(basePath)
       fail("should fail when invalid PartitionKeyType is provided!")
     } catch {
-      case e: Exception =>
-        assertTrue(e.getCause.getMessage.contains("No enum constant 
org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType.DUMMY"))
+      case e: Exception => assertTrue(e.getCause.getMessage.contains("Unable 
to instantiate class org.apache.hudi.keygen.CustomKeyGenerator"))
     }
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index f73ee936b60..67bf90a9853 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -379,7 +379,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
   @Test
   public void testPropsWithInvalidKeyGenerator() {
-    Exception e = assertThrows(IllegalArgumentException.class, () -> {
+    Exception e = assertThrows(IOException.class, () -> {
       String tableBasePath = basePath + "/test_table_invalid_key_gen";
       HoodieDeltaStreamer deltaStreamer =
           new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT,
@@ -387,8 +387,8 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       deltaStreamer.sync();
     }, "Should error out when setting the key generator class property to an 
invalid value");
     // expected
-    LOG.debug("Expected error during getting the key generator", e);
-    assertTrue(e.getMessage().contains("No KeyGeneratorType found for class 
name"));
+    LOG.warn("Expected error during getting the key generator", e);
+    assertTrue(e.getMessage().contains("Could not load key generator class 
invalid"));
   }
 
   private static Stream<Arguments> provideInferKeyGenArgs() {

Reply via email to