This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f1c25c4d86 [HUDI-6916] Improve performance of Custom Key Generators 
(#9821)
1f1c25c4d86 is described below

commit 1f1c25c4d863fe580f614a3e6a23715165225a9c
Author: Tim Brown <[email protected]>
AuthorDate: Mon Oct 9 03:48:56 2023 -0500

    [HUDI-6916] Improve performance of Custom Key Generators (#9821)
    
    Fixes an issue in the custom key generators where we are creating
    objects per record/row instead of reusing them. This leads to excess
    object creation which in turn creates more objects to garbage collect.
---
 .../apache/hudi/keygen/CustomAvroKeyGenerator.java | 76 +++++++++--------
 ...estCreateAvroKeyGeneratorByTypeWithFactory.java |  5 +-
 .../org/apache/hudi/keygen/CustomKeyGenerator.java | 97 +++++++++++-----------
 .../apache/hudi/keygen/TestCustomKeyGenerator.java | 16 ++--
 .../TestCreateKeyGeneratorByTypeWithFactory.java   |  4 +
 .../TestHoodieSparkKeyGeneratorFactory.java        |  1 +
 6 files changed, 112 insertions(+), 87 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
index 13ae1d50528..70565b5d81d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
@@ -18,16 +18,18 @@
 
 package org.apache.hudi.keygen;
 
-import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieKeyException;
 import org.apache.hudi.exception.HoodieKeyGeneratorException;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 
+import org.apache.avro.generic.GenericRecord;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.stream.Collectors;
 
 /**
@@ -47,6 +49,8 @@ public class CustomAvroKeyGenerator extends BaseKeyGenerator {
 
   public static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
   public static final String SPLIT_REGEX = ":";
+  private final List<BaseKeyGenerator> partitionKeyGenerators;
+  private final BaseKeyGenerator recordKeyGenerator;
 
   /**
    * Used as a part of config in CustomKeyGenerator.java.
@@ -63,6 +67,35 @@ public class CustomAvroKeyGenerator extends BaseKeyGenerator 
{
                 .map(String::trim).collect(Collectors.toList())
         ).orElse(Collections.emptyList());
     this.partitionPathFields = 
Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(",")).map(String::trim).collect(Collectors.toList());
+    this.recordKeyGenerator = getRecordKeyFieldNames().size() == 1 ? new 
SimpleAvroKeyGenerator(config) : new ComplexAvroKeyGenerator(config);
+    this.partitionKeyGenerators = 
getPartitionKeyGenerators(this.partitionPathFields, config);
+  }
+
+  private static List<BaseKeyGenerator> getPartitionKeyGenerators(List<String> 
partitionPathFields, TypedProperties config) {
+    if (partitionPathFields.size() == 1 && 
partitionPathFields.get(0).isEmpty()) {
+      return Collections.emptyList(); // Corresponds to no partition case
+    } else {
+      return partitionPathFields.stream().map(field -> {
+        String[] fieldWithType = field.split(SPLIT_REGEX);
+        if (fieldWithType.length != 2) {
+          throw new HoodieKeyException("Unable to find field names for 
partition path in proper format");
+        }
+        String partitionPathField = fieldWithType[0];
+        PartitionKeyType keyType = 
PartitionKeyType.valueOf(fieldWithType[1].toUpperCase());
+        switch (keyType) {
+          case SIMPLE:
+            return new SimpleAvroKeyGenerator(config, partitionPathField);
+          case TIMESTAMP:
+            try {
+              return new TimestampBasedAvroKeyGenerator(config, 
partitionPathField);
+            } catch (IOException e) {
+              throw new HoodieKeyGeneratorException("Unable to initialise 
TimestampBasedKeyGenerator class", e);
+            }
+          default:
+            throw new HoodieKeyGeneratorException("Please provide valid 
PartitionKeyType with fields! You provided: " + keyType);
+        }
+      }).collect(Collectors.toList());
+    }
   }
 
   @Override
@@ -70,48 +103,25 @@ public class CustomAvroKeyGenerator extends 
BaseKeyGenerator {
     if (getPartitionPathFields() == null) {
       throw new HoodieKeyException("Unable to find field names for partition 
path in cfg");
     }
-
-    String partitionPathField;
-    StringBuilder partitionPath = new StringBuilder();
-
-    //Corresponds to no partition case
-    if (getPartitionPathFields().size() == 1 && 
getPartitionPathFields().get(0).isEmpty()) {
+    // Corresponds to no partition case
+    if (partitionKeyGenerators.isEmpty()) {
       return "";
     }
-    for (String field : getPartitionPathFields()) {
-      String[] fieldWithType = field.split(SPLIT_REGEX);
-      if (fieldWithType.length != 2) {
-        throw new HoodieKeyException("Unable to find field names for partition 
path in proper format");
-      }
-
-      partitionPathField = fieldWithType[0];
-      PartitionKeyType keyType = 
PartitionKeyType.valueOf(fieldWithType[1].toUpperCase());
-      switch (keyType) {
-        case SIMPLE:
-          partitionPath.append(new SimpleAvroKeyGenerator(config, 
partitionPathField).getPartitionPath(record));
-          break;
-        case TIMESTAMP:
-          try {
-            partitionPath.append(new TimestampBasedAvroKeyGenerator(config, 
partitionPathField).getPartitionPath(record));
-          } catch (IOException e) {
-            throw new HoodieKeyGeneratorException("Unable to initialise 
TimestampBasedKeyGenerator class", e);
-          }
-          break;
-        default:
-          throw new HoodieKeyGeneratorException("Please provide valid 
PartitionKeyType with fields! You provided: " + keyType);
+    StringBuilder partitionPath = new StringBuilder();
+    for (int i = 0; i < partitionKeyGenerators.size(); i++) {
+      BaseKeyGenerator partitionKeyGenerator = partitionKeyGenerators.get(i);
+      partitionPath.append(partitionKeyGenerator.getPartitionPath(record));
+      if (i != partitionKeyGenerators.size() - 1) {
+        partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR);
       }
-      partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR);
     }
-    partitionPath.deleteCharAt(partitionPath.length() - 1);
     return partitionPath.toString();
   }
 
   @Override
   public String getRecordKey(GenericRecord record) {
     validateRecordKeyFields();
-    return getRecordKeyFieldNames().size() == 1
-        ? new SimpleAvroKeyGenerator(config).getRecordKey(record)
-        : new ComplexAvroKeyGenerator(config).getRecordKey(record);
+    return recordKeyGenerator.getRecordKey(record);
   }
 
   private void validateRecordKeyFields() {
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/factory/TestCreateAvroKeyGeneratorByTypeWithFactory.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/factory/TestCreateAvroKeyGeneratorByTypeWithFactory.java
index 96095da3716..0c12547fcbd 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/factory/TestCreateAvroKeyGeneratorByTypeWithFactory.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/factory/TestCreateAvroKeyGeneratorByTypeWithFactory.java
@@ -75,7 +75,10 @@ public class TestCreateAvroKeyGeneratorByTypeWithFactory {
   public void testKeyGeneratorTypes(String keyGenType) throws IOException {
     props.put(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), keyGenType);
     KeyGeneratorType keyType = KeyGeneratorType.valueOf(keyGenType);
-
+    if (keyType == KeyGeneratorType.CUSTOM) {
+      // input needs to be properly formatted
+      props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), 
"timestamp:timestamp");
+    }
     KeyGenerator keyGenerator = 
HoodieAvroKeyGeneratorFactory.createKeyGenerator(props);
     switch (keyType) {
       case SIMPLE:
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
index 1526164207f..48c1dfb04c7 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
@@ -34,6 +34,7 @@ import org.apache.spark.unsafe.types.UTF8String;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.stream.Collectors;
 
 /**
@@ -49,12 +50,12 @@ import java.util.stream.Collectors;
  *
  * RecordKey is internally generated using either SimpleKeyGenerator or 
ComplexKeyGenerator.
  *
- * @deprecated
  */
-@Deprecated
 public class CustomKeyGenerator extends BuiltinKeyGenerator {
 
   private final CustomAvroKeyGenerator customAvroKeyGenerator;
+  private final List<BuiltinKeyGenerator> partitionKeyGenerators;
+  private final BuiltinKeyGenerator recordKeyGenerator;
 
   public CustomKeyGenerator(TypedProperties props) {
     // NOTE: We have to strip partition-path configuration, since it could 
only be interpreted by
@@ -71,6 +72,37 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
         ? Collections.emptyList()
         : 
Arrays.stream(partitionPathFields.split(",")).map(String::trim).collect(Collectors.toList());
     this.customAvroKeyGenerator = new CustomAvroKeyGenerator(props);
+    this.recordKeyGenerator = getRecordKeyFieldNames().size() == 1
+        ? new SimpleKeyGenerator(config, 
Option.ofNullable(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())),
 null)
+        : new ComplexKeyGenerator(config);
+    this.partitionKeyGenerators = 
getPartitionKeyGenerators(this.partitionPathFields, config);
+  }
+
+  private static List<BuiltinKeyGenerator> 
getPartitionKeyGenerators(List<String> partitionPathFields, TypedProperties 
config) {
+    if (partitionPathFields.size() == 1 && 
partitionPathFields.get(0).isEmpty()) {
+      return Collections.emptyList();
+    } else {
+      return partitionPathFields.stream().map(field -> {
+        String[] fieldWithType = 
field.split(CustomAvroKeyGenerator.SPLIT_REGEX);
+        if (fieldWithType.length != 2) {
+          throw new HoodieKeyGeneratorException("Unable to find field names 
for partition path in proper format");
+        }
+        String partitionPathField = fieldWithType[0];
+        CustomAvroKeyGenerator.PartitionKeyType keyType = 
CustomAvroKeyGenerator.PartitionKeyType.valueOf(fieldWithType[1].toUpperCase());
+        switch (keyType) {
+          case SIMPLE:
+            return new SimpleKeyGenerator(config, partitionPathField);
+          case TIMESTAMP:
+            try {
+              return new TimestampBasedKeyGenerator(config, 
partitionPathField);
+            } catch (IOException ioe) {
+              throw new HoodieKeyGeneratorException("Unable to initialise 
TimestampBasedKeyGenerator class", ioe);
+            }
+          default:
+            throw new HoodieKeyGeneratorException("Please provide valid 
PartitionKeyType with fields! You provided: " + keyType);
+        }
+      }).collect(Collectors.toList());
+    }
   }
 
   @Override
@@ -85,9 +117,7 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
 
   @Override
   public String getRecordKey(Row row) {
-    return getRecordKeyFieldNames().size() == 1
-        ? new SimpleKeyGenerator(config, 
Option.ofNullable(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())),
 null).getRecordKey(row)
-        : new ComplexKeyGenerator(config).getRecordKey(row);
+    return recordKeyGenerator.getRecordKey(row);
   }
 
   @Override
@@ -104,54 +134,25 @@ public class CustomKeyGenerator extends 
BuiltinKeyGenerator {
     if (getPartitionPathFields() == null) {
       throw new HoodieKeyException("Unable to find field names for partition 
path in cfg");
     }
-
-    String partitionPathField;
-    StringBuilder partitionPath = new StringBuilder();
-
-    //Corresponds to no partition case
-    if (getPartitionPathFields().size() == 1 && 
getPartitionPathFields().get(0).isEmpty()) {
+    // Corresponds to no partition case
+    if (partitionKeyGenerators.isEmpty()) {
       return "";
     }
-    for (String field : getPartitionPathFields()) {
-      String[] fieldWithType = field.split(CustomAvroKeyGenerator.SPLIT_REGEX);
-      if (fieldWithType.length != 2) {
-        throw new HoodieKeyGeneratorException("Unable to find field names for 
partition path in proper format");
+    StringBuilder partitionPath = new StringBuilder();
+    for (int i = 0; i < partitionKeyGenerators.size(); i++) {
+      BuiltinKeyGenerator keyGenerator = partitionKeyGenerators.get(i);
+      if (record.isPresent()) {
+        partitionPath.append(keyGenerator.getPartitionPath(record.get()));
+      } else if (row.isPresent()) {
+        partitionPath.append(keyGenerator.getPartitionPath(row.get()));
+      } else {
+        
partitionPath.append(keyGenerator.getPartitionPath(internalRowStructTypePair.get().getKey(),
+            internalRowStructTypePair.get().getValue()));
       }
-
-      partitionPathField = fieldWithType[0];
-      CustomAvroKeyGenerator.PartitionKeyType keyType = 
CustomAvroKeyGenerator.PartitionKeyType.valueOf(fieldWithType[1].toUpperCase());
-      switch (keyType) {
-        case SIMPLE:
-          if (record.isPresent()) {
-            partitionPath.append(new SimpleKeyGenerator(config, 
partitionPathField).getPartitionPath(record.get()));
-          } else if (row.isPresent()) {
-            partitionPath.append(new SimpleKeyGenerator(config, 
partitionPathField).getPartitionPath(row.get()));
-          } else {
-            partitionPath.append(new SimpleKeyGenerator(config, 
partitionPathField).getPartitionPath(internalRowStructTypePair.get().getKey(),
-                internalRowStructTypePair.get().getValue()));
-          }
-          break;
-        case TIMESTAMP:
-          try {
-            if (record.isPresent()) {
-              partitionPath.append(new TimestampBasedKeyGenerator(config, 
partitionPathField).getPartitionPath(record.get()));
-            } else if (row.isPresent()) {
-              partitionPath.append(new TimestampBasedKeyGenerator(config, 
partitionPathField).getPartitionPath(row.get()));
-            } else {
-              partitionPath.append(new TimestampBasedKeyGenerator(config, 
partitionPathField).getPartitionPath(internalRowStructTypePair.get().getKey(),
-                  internalRowStructTypePair.get().getValue()));
-            }
-          } catch (IOException ioe) {
-            throw new HoodieKeyGeneratorException("Unable to initialise 
TimestampBasedKeyGenerator class", ioe);
-          }
-          break;
-        default:
-          throw new HoodieKeyGeneratorException("Please provide valid 
PartitionKeyType with fields! You provided: " + keyType);
+      if (i != partitionKeyGenerators.size() - 1) {
+        
partitionPath.append(customAvroKeyGenerator.getDefaultPartitionPathSeparator());
       }
-
-      
partitionPath.append(customAvroKeyGenerator.getDefaultPartitionPathSeparator());
     }
-    partitionPath.deleteCharAt(partitionPath.length() - 1);
     return partitionPath.toString();
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
index e001bfc13f5..0ba8d1425e7 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.keygen;
 
-import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -26,6 +25,8 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.keygen.constant.KeyGeneratorType;
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
 import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
+
+import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.unsafe.types.UTF8String;
@@ -224,7 +225,7 @@ public class TestCustomKeyGenerator extends 
KeyGeneratorTestUtilities {
       keyGenerator.getKey(getRecord());
       Assertions.fail("should fail when invalid PartitionKeyType is 
provided!");
     } catch (Exception e) {
-      Assertions.assertTrue(e.getMessage().contains("No enum constant 
org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType.DUMMY"));
+      
Assertions.assertTrue(getNestedConstructorErrorCause(e).getMessage().contains("No
 enum constant 
org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType.DUMMY"));
     }
 
     try {
@@ -236,7 +237,7 @@ public class TestCustomKeyGenerator extends 
KeyGeneratorTestUtilities {
       keyGenerator.getPartitionPath(row);
       Assertions.fail("should fail when invalid PartitionKeyType is 
provided!");
     } catch (Exception e) {
-      Assertions.assertTrue(e.getMessage().contains("No enum constant 
org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType.DUMMY"));
+      
Assertions.assertTrue(getNestedConstructorErrorCause(e).getMessage().contains("No
 enum constant 
org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType.DUMMY"));
     }
   }
 
@@ -304,7 +305,7 @@ public class TestCustomKeyGenerator extends 
KeyGeneratorTestUtilities {
       keyGenerator.getKey(getRecord());
       Assertions.fail("should fail when partition key field is provided in 
improper format!");
     } catch (Exception e) {
-      Assertions.assertTrue(e.getMessage().contains("Unable to find field 
names for partition path in proper format"));
+      
Assertions.assertTrue(getNestedConstructorErrorCause(e).getMessage().contains("Unable
 to find field names for partition path in proper format"));
     }
 
     try {
@@ -316,7 +317,7 @@ public class TestCustomKeyGenerator extends 
KeyGeneratorTestUtilities {
       keyGenerator.getPartitionPath(row);
       Assertions.fail("should fail when partition key field is provided in 
improper format!");
     } catch (Exception e) {
-      Assertions.assertTrue(e.getMessage().contains("Unable to find field 
names for partition path in proper format"));
+      
Assertions.assertTrue(getNestedConstructorErrorCause(e).getMessage().contains("Unable
 to find field names for partition path in proper format"));
     }
   }
 
@@ -373,4 +374,9 @@ public class TestCustomKeyGenerator extends 
KeyGeneratorTestUtilities {
     InternalRow internalRow = KeyGeneratorTestUtilities.getInternalRow(row);
     
Assertions.assertEquals(UTF8String.fromString("timestamp=4357686/ts_ms=20200321"),
 keyGenerator.getPartitionPath(internalRow, row.schema()));
   }
+
+  private static Throwable getNestedConstructorErrorCause(Exception e) {
+    // custom key generator will fail in the constructor, and we must unwrap 
the cause for asserting error messages
+    return e.getCause().getCause().getCause();
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestCreateKeyGeneratorByTypeWithFactory.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestCreateKeyGeneratorByTypeWithFactory.java
index 45272ec1006..dc597df2cf5 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestCreateKeyGeneratorByTypeWithFactory.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestCreateKeyGeneratorByTypeWithFactory.java
@@ -77,6 +77,10 @@ public class TestCreateKeyGeneratorByTypeWithFactory {
     props.put(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), keyGenType);
     KeyGeneratorType keyType = KeyGeneratorType.valueOf(keyGenType);
 
+    if (keyType == KeyGeneratorType.CUSTOM) {
+      // input needs to be properly formatted
+      props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), 
"timestamp:timestamp");
+    }
     KeyGenerator keyGenerator = 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
     switch (keyType) {
       case SIMPLE:
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestHoodieSparkKeyGeneratorFactory.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestHoodieSparkKeyGeneratorFactory.java
index 6826af03e87..3cc30e86399 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestHoodieSparkKeyGeneratorFactory.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestHoodieSparkKeyGeneratorFactory.java
@@ -72,6 +72,7 @@ public class TestHoodieSparkKeyGeneratorFactory {
 
     // set KeyGenerator type only
     props.put(KEYGENERATOR_TYPE.key(), KeyGeneratorType.CUSTOM.name());
+    props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), 
"field:simple");
     KeyGenerator keyGenerator = 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
     assertEquals(CustomKeyGenerator.class.getName(), 
keyGenerator.getClass().getName());
 

Reply via email to