This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1f1c25c4d86 [HUDI-6916] Improve performance of Custom Key Generators
(#9821)
1f1c25c4d86 is described below
commit 1f1c25c4d863fe580f614a3e6a23715165225a9c
Author: Tim Brown <[email protected]>
AuthorDate: Mon Oct 9 03:48:56 2023 -0500
[HUDI-6916] Improve performance of Custom Key Generators (#9821)
Fixes an issue in the custom key generators where we are creating
objects per record/row instead of reusing them. This leads to excess
object creation which in turn creates more objects to garbage collect.
---
.../apache/hudi/keygen/CustomAvroKeyGenerator.java | 76 +++++++++--------
...estCreateAvroKeyGeneratorByTypeWithFactory.java | 5 +-
.../org/apache/hudi/keygen/CustomKeyGenerator.java | 97 +++++++++++-----------
.../apache/hudi/keygen/TestCustomKeyGenerator.java | 16 ++--
.../TestCreateKeyGeneratorByTypeWithFactory.java | 4 +
.../TestHoodieSparkKeyGeneratorFactory.java | 1 +
6 files changed, 112 insertions(+), 87 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
index 13ae1d50528..70565b5d81d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
@@ -18,16 +18,18 @@
package org.apache.hudi.keygen;
-import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.hudi.exception.HoodieKeyGeneratorException;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.avro.generic.GenericRecord;
+
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import java.util.stream.Collectors;
/**
@@ -47,6 +49,8 @@ public class CustomAvroKeyGenerator extends BaseKeyGenerator {
public static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
public static final String SPLIT_REGEX = ":";
+ private final List<BaseKeyGenerator> partitionKeyGenerators;
+ private final BaseKeyGenerator recordKeyGenerator;
/**
* Used as a part of config in CustomKeyGenerator.java.
@@ -63,6 +67,35 @@ public class CustomAvroKeyGenerator extends BaseKeyGenerator
{
.map(String::trim).collect(Collectors.toList())
).orElse(Collections.emptyList());
this.partitionPathFields =
Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(",")).map(String::trim).collect(Collectors.toList());
+ this.recordKeyGenerator = getRecordKeyFieldNames().size() == 1 ? new
SimpleAvroKeyGenerator(config) : new ComplexAvroKeyGenerator(config);
+ this.partitionKeyGenerators =
getPartitionKeyGenerators(this.partitionPathFields, config);
+ }
+
+ private static List<BaseKeyGenerator> getPartitionKeyGenerators(List<String>
partitionPathFields, TypedProperties config) {
+ if (partitionPathFields.size() == 1 &&
partitionPathFields.get(0).isEmpty()) {
+ return Collections.emptyList(); // Corresponds to no partition case
+ } else {
+ return partitionPathFields.stream().map(field -> {
+ String[] fieldWithType = field.split(SPLIT_REGEX);
+ if (fieldWithType.length != 2) {
+ throw new HoodieKeyException("Unable to find field names for
partition path in proper format");
+ }
+ String partitionPathField = fieldWithType[0];
+ PartitionKeyType keyType =
PartitionKeyType.valueOf(fieldWithType[1].toUpperCase());
+ switch (keyType) {
+ case SIMPLE:
+ return new SimpleAvroKeyGenerator(config, partitionPathField);
+ case TIMESTAMP:
+ try {
+ return new TimestampBasedAvroKeyGenerator(config,
partitionPathField);
+ } catch (IOException e) {
+ throw new HoodieKeyGeneratorException("Unable to initialise
TimestampBasedKeyGenerator class", e);
+ }
+ default:
+ throw new HoodieKeyGeneratorException("Please provide valid
PartitionKeyType with fields! You provided: " + keyType);
+ }
+ }).collect(Collectors.toList());
+ }
}
@Override
@@ -70,48 +103,25 @@ public class CustomAvroKeyGenerator extends
BaseKeyGenerator {
if (getPartitionPathFields() == null) {
throw new HoodieKeyException("Unable to find field names for partition
path in cfg");
}
-
- String partitionPathField;
- StringBuilder partitionPath = new StringBuilder();
-
- //Corresponds to no partition case
- if (getPartitionPathFields().size() == 1 &&
getPartitionPathFields().get(0).isEmpty()) {
+ // Corresponds to no partition case
+ if (partitionKeyGenerators.isEmpty()) {
return "";
}
- for (String field : getPartitionPathFields()) {
- String[] fieldWithType = field.split(SPLIT_REGEX);
- if (fieldWithType.length != 2) {
- throw new HoodieKeyException("Unable to find field names for partition
path in proper format");
- }
-
- partitionPathField = fieldWithType[0];
- PartitionKeyType keyType =
PartitionKeyType.valueOf(fieldWithType[1].toUpperCase());
- switch (keyType) {
- case SIMPLE:
- partitionPath.append(new SimpleAvroKeyGenerator(config,
partitionPathField).getPartitionPath(record));
- break;
- case TIMESTAMP:
- try {
- partitionPath.append(new TimestampBasedAvroKeyGenerator(config,
partitionPathField).getPartitionPath(record));
- } catch (IOException e) {
- throw new HoodieKeyGeneratorException("Unable to initialise
TimestampBasedKeyGenerator class", e);
- }
- break;
- default:
- throw new HoodieKeyGeneratorException("Please provide valid
PartitionKeyType with fields! You provided: " + keyType);
+ StringBuilder partitionPath = new StringBuilder();
+ for (int i = 0; i < partitionKeyGenerators.size(); i++) {
+ BaseKeyGenerator partitionKeyGenerator = partitionKeyGenerators.get(i);
+ partitionPath.append(partitionKeyGenerator.getPartitionPath(record));
+ if (i != partitionKeyGenerators.size() - 1) {
+ partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR);
}
- partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR);
}
- partitionPath.deleteCharAt(partitionPath.length() - 1);
return partitionPath.toString();
}
@Override
public String getRecordKey(GenericRecord record) {
validateRecordKeyFields();
- return getRecordKeyFieldNames().size() == 1
- ? new SimpleAvroKeyGenerator(config).getRecordKey(record)
- : new ComplexAvroKeyGenerator(config).getRecordKey(record);
+ return recordKeyGenerator.getRecordKey(record);
}
private void validateRecordKeyFields() {
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/factory/TestCreateAvroKeyGeneratorByTypeWithFactory.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/factory/TestCreateAvroKeyGeneratorByTypeWithFactory.java
index 96095da3716..0c12547fcbd 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/factory/TestCreateAvroKeyGeneratorByTypeWithFactory.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/factory/TestCreateAvroKeyGeneratorByTypeWithFactory.java
@@ -75,7 +75,10 @@ public class TestCreateAvroKeyGeneratorByTypeWithFactory {
public void testKeyGeneratorTypes(String keyGenType) throws IOException {
props.put(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), keyGenType);
KeyGeneratorType keyType = KeyGeneratorType.valueOf(keyGenType);
-
+ if (keyType == KeyGeneratorType.CUSTOM) {
+ // input needs to be properly formatted
+ props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),
"timestamp:timestamp");
+ }
KeyGenerator keyGenerator =
HoodieAvroKeyGeneratorFactory.createKeyGenerator(props);
switch (keyType) {
case SIMPLE:
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
index 1526164207f..48c1dfb04c7 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
@@ -34,6 +34,7 @@ import org.apache.spark.unsafe.types.UTF8String;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import java.util.stream.Collectors;
/**
@@ -49,12 +50,12 @@ import java.util.stream.Collectors;
*
* RecordKey is internally generated using either SimpleKeyGenerator or
ComplexKeyGenerator.
*
- * @deprecated
*/
-@Deprecated
public class CustomKeyGenerator extends BuiltinKeyGenerator {
private final CustomAvroKeyGenerator customAvroKeyGenerator;
+ private final List<BuiltinKeyGenerator> partitionKeyGenerators;
+ private final BuiltinKeyGenerator recordKeyGenerator;
public CustomKeyGenerator(TypedProperties props) {
// NOTE: We have to strip partition-path configuration, since it could
only be interpreted by
@@ -71,6 +72,37 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
? Collections.emptyList()
:
Arrays.stream(partitionPathFields.split(",")).map(String::trim).collect(Collectors.toList());
this.customAvroKeyGenerator = new CustomAvroKeyGenerator(props);
+ this.recordKeyGenerator = getRecordKeyFieldNames().size() == 1
+ ? new SimpleKeyGenerator(config,
Option.ofNullable(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())),
null)
+ : new ComplexKeyGenerator(config);
+ this.partitionKeyGenerators =
getPartitionKeyGenerators(this.partitionPathFields, config);
+ }
+
+ private static List<BuiltinKeyGenerator>
getPartitionKeyGenerators(List<String> partitionPathFields, TypedProperties
config) {
+ if (partitionPathFields.size() == 1 &&
partitionPathFields.get(0).isEmpty()) {
+ return Collections.emptyList();
+ } else {
+ return partitionPathFields.stream().map(field -> {
+ String[] fieldWithType =
field.split(CustomAvroKeyGenerator.SPLIT_REGEX);
+ if (fieldWithType.length != 2) {
+ throw new HoodieKeyGeneratorException("Unable to find field names
for partition path in proper format");
+ }
+ String partitionPathField = fieldWithType[0];
+ CustomAvroKeyGenerator.PartitionKeyType keyType =
CustomAvroKeyGenerator.PartitionKeyType.valueOf(fieldWithType[1].toUpperCase());
+ switch (keyType) {
+ case SIMPLE:
+ return new SimpleKeyGenerator(config, partitionPathField);
+ case TIMESTAMP:
+ try {
+ return new TimestampBasedKeyGenerator(config,
partitionPathField);
+ } catch (IOException ioe) {
+ throw new HoodieKeyGeneratorException("Unable to initialise
TimestampBasedKeyGenerator class", ioe);
+ }
+ default:
+ throw new HoodieKeyGeneratorException("Please provide valid
PartitionKeyType with fields! You provided: " + keyType);
+ }
+ }).collect(Collectors.toList());
+ }
}
@Override
@@ -85,9 +117,7 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
@Override
public String getRecordKey(Row row) {
- return getRecordKeyFieldNames().size() == 1
- ? new SimpleKeyGenerator(config,
Option.ofNullable(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())),
null).getRecordKey(row)
- : new ComplexKeyGenerator(config).getRecordKey(row);
+ return recordKeyGenerator.getRecordKey(row);
}
@Override
@@ -104,54 +134,25 @@ public class CustomKeyGenerator extends
BuiltinKeyGenerator {
if (getPartitionPathFields() == null) {
throw new HoodieKeyException("Unable to find field names for partition
path in cfg");
}
-
- String partitionPathField;
- StringBuilder partitionPath = new StringBuilder();
-
- //Corresponds to no partition case
- if (getPartitionPathFields().size() == 1 &&
getPartitionPathFields().get(0).isEmpty()) {
+ // Corresponds to no partition case
+ if (partitionKeyGenerators.isEmpty()) {
return "";
}
- for (String field : getPartitionPathFields()) {
- String[] fieldWithType = field.split(CustomAvroKeyGenerator.SPLIT_REGEX);
- if (fieldWithType.length != 2) {
- throw new HoodieKeyGeneratorException("Unable to find field names for
partition path in proper format");
+ StringBuilder partitionPath = new StringBuilder();
+ for (int i = 0; i < partitionKeyGenerators.size(); i++) {
+ BuiltinKeyGenerator keyGenerator = partitionKeyGenerators.get(i);
+ if (record.isPresent()) {
+ partitionPath.append(keyGenerator.getPartitionPath(record.get()));
+ } else if (row.isPresent()) {
+ partitionPath.append(keyGenerator.getPartitionPath(row.get()));
+ } else {
+
partitionPath.append(keyGenerator.getPartitionPath(internalRowStructTypePair.get().getKey(),
+ internalRowStructTypePair.get().getValue()));
}
-
- partitionPathField = fieldWithType[0];
- CustomAvroKeyGenerator.PartitionKeyType keyType =
CustomAvroKeyGenerator.PartitionKeyType.valueOf(fieldWithType[1].toUpperCase());
- switch (keyType) {
- case SIMPLE:
- if (record.isPresent()) {
- partitionPath.append(new SimpleKeyGenerator(config,
partitionPathField).getPartitionPath(record.get()));
- } else if (row.isPresent()) {
- partitionPath.append(new SimpleKeyGenerator(config,
partitionPathField).getPartitionPath(row.get()));
- } else {
- partitionPath.append(new SimpleKeyGenerator(config,
partitionPathField).getPartitionPath(internalRowStructTypePair.get().getKey(),
- internalRowStructTypePair.get().getValue()));
- }
- break;
- case TIMESTAMP:
- try {
- if (record.isPresent()) {
- partitionPath.append(new TimestampBasedKeyGenerator(config,
partitionPathField).getPartitionPath(record.get()));
- } else if (row.isPresent()) {
- partitionPath.append(new TimestampBasedKeyGenerator(config,
partitionPathField).getPartitionPath(row.get()));
- } else {
- partitionPath.append(new TimestampBasedKeyGenerator(config,
partitionPathField).getPartitionPath(internalRowStructTypePair.get().getKey(),
- internalRowStructTypePair.get().getValue()));
- }
- } catch (IOException ioe) {
- throw new HoodieKeyGeneratorException("Unable to initialise
TimestampBasedKeyGenerator class", ioe);
- }
- break;
- default:
- throw new HoodieKeyGeneratorException("Please provide valid
PartitionKeyType with fields! You provided: " + keyType);
+ if (i != partitionKeyGenerators.size() - 1) {
+
partitionPath.append(customAvroKeyGenerator.getDefaultPartitionPathSeparator());
}
-
-
partitionPath.append(customAvroKeyGenerator.getDefaultPartitionPathSeparator());
}
- partitionPath.deleteCharAt(partitionPath.length() - 1);
return partitionPath.toString();
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
index e001bfc13f5..0ba8d1425e7 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
@@ -18,7 +18,6 @@
package org.apache.hudi.keygen;
-import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -26,6 +25,8 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
+
+import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.unsafe.types.UTF8String;
@@ -224,7 +225,7 @@ public class TestCustomKeyGenerator extends
KeyGeneratorTestUtilities {
keyGenerator.getKey(getRecord());
Assertions.fail("should fail when invalid PartitionKeyType is
provided!");
} catch (Exception e) {
- Assertions.assertTrue(e.getMessage().contains("No enum constant
org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType.DUMMY"));
+
Assertions.assertTrue(getNestedConstructorErrorCause(e).getMessage().contains("No
enum constant
org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType.DUMMY"));
}
try {
@@ -236,7 +237,7 @@ public class TestCustomKeyGenerator extends
KeyGeneratorTestUtilities {
keyGenerator.getPartitionPath(row);
Assertions.fail("should fail when invalid PartitionKeyType is
provided!");
} catch (Exception e) {
- Assertions.assertTrue(e.getMessage().contains("No enum constant
org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType.DUMMY"));
+
Assertions.assertTrue(getNestedConstructorErrorCause(e).getMessage().contains("No
enum constant
org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType.DUMMY"));
}
}
@@ -304,7 +305,7 @@ public class TestCustomKeyGenerator extends
KeyGeneratorTestUtilities {
keyGenerator.getKey(getRecord());
Assertions.fail("should fail when partition key field is provided in
improper format!");
} catch (Exception e) {
- Assertions.assertTrue(e.getMessage().contains("Unable to find field
names for partition path in proper format"));
+
Assertions.assertTrue(getNestedConstructorErrorCause(e).getMessage().contains("Unable
to find field names for partition path in proper format"));
}
try {
@@ -316,7 +317,7 @@ public class TestCustomKeyGenerator extends
KeyGeneratorTestUtilities {
keyGenerator.getPartitionPath(row);
Assertions.fail("should fail when partition key field is provided in
improper format!");
} catch (Exception e) {
- Assertions.assertTrue(e.getMessage().contains("Unable to find field
names for partition path in proper format"));
+
Assertions.assertTrue(getNestedConstructorErrorCause(e).getMessage().contains("Unable
to find field names for partition path in proper format"));
}
}
@@ -373,4 +374,9 @@ public class TestCustomKeyGenerator extends
KeyGeneratorTestUtilities {
InternalRow internalRow = KeyGeneratorTestUtilities.getInternalRow(row);
Assertions.assertEquals(UTF8String.fromString("timestamp=4357686/ts_ms=20200321"),
keyGenerator.getPartitionPath(internalRow, row.schema()));
}
+
+ private static Throwable getNestedConstructorErrorCause(Exception e) {
+ // custom key generator will fail in the constructor, and we must unwrap
the cause for asserting error messages
+ return e.getCause().getCause().getCause();
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestCreateKeyGeneratorByTypeWithFactory.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestCreateKeyGeneratorByTypeWithFactory.java
index 45272ec1006..dc597df2cf5 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestCreateKeyGeneratorByTypeWithFactory.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestCreateKeyGeneratorByTypeWithFactory.java
@@ -77,6 +77,10 @@ public class TestCreateKeyGeneratorByTypeWithFactory {
props.put(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), keyGenType);
KeyGeneratorType keyType = KeyGeneratorType.valueOf(keyGenType);
+ if (keyType == KeyGeneratorType.CUSTOM) {
+ // input needs to be properly formatted
+ props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),
"timestamp:timestamp");
+ }
KeyGenerator keyGenerator =
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
switch (keyType) {
case SIMPLE:
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestHoodieSparkKeyGeneratorFactory.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestHoodieSparkKeyGeneratorFactory.java
index 6826af03e87..3cc30e86399 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestHoodieSparkKeyGeneratorFactory.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestHoodieSparkKeyGeneratorFactory.java
@@ -72,6 +72,7 @@ public class TestHoodieSparkKeyGeneratorFactory {
// set KeyGenerator type only
props.put(KEYGENERATOR_TYPE.key(), KeyGeneratorType.CUSTOM.name());
+ props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),
"field:simple");
KeyGenerator keyGenerator =
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
assertEquals(CustomKeyGenerator.class.getName(),
keyGenerator.getClass().getName());