This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch recordKeyGenRefactor in repository https://gitbox.apache.org/repos/asf/hudi.git
commit b26ffe6600ce80dbeed7c43e5de72b6ff3ddeb99 Author: sivabalan <[email protected]> AuthorDate: Mon Jan 16 15:05:41 2023 -0800 Fixing record key generation so that any key generator class can have any record key generation(simple, custom, auto generation --- .../org/apache/hudi/config/HoodieWriteConfig.java | 14 ++ .../apache/hudi/keygen/AutoRecordKeyGenerator.java | 235 +++++++++++++++++++++ .../hudi/keygen/ComplexAvroKeyGenerator.java | 11 +- .../hudi/keygen/ComplexAvroRecordKeyGenerator.java | 42 ++++ .../hudi/keygen/GlobalAvroDeleteKeyGenerator.java | 5 +- .../keygen/NonpartitionedAvroKeyGenerator.java | 8 +- .../apache/hudi/keygen/SimpleAvroKeyGenerator.java | 11 +- .../hudi/keygen/SimpleAvroRecordKeyGenerator.java | 40 ++++ .../keygen/factory/RecordKeyGeneratorFactory.java | 45 ++++ .../hudi/keygen/SparkKeyGeneratorInterface.java | 26 +-- ....java => SparkRecordKeyGeneratorInterface.java} | 31 +-- .../org/apache/hudi/keygen/BaseKeyGenerator.java | 4 +- .../org/apache/hudi/keygen/RecordKeyGenerator.java | 32 +++ 13 files changed, 435 insertions(+), 69 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index b70b13c0833..b907905f99c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -549,6 +549,11 @@ public class HoodieWriteConfig extends HoodieConfig { .withDocumentation("When table is upgraded from pre 0.12 to 0.12, we check for \"default\" partition and fail if found one. " + "Users are expected to rewrite the data in those partitions. Enabling this config will bypass this validation"); + public static final ConfigProperty<Boolean> AUTO_GENERATE_RECORD_KEYS = ConfigProperty.key("hoodie.auto.generate.record.keys") + .defaultValue(false) + .sinceVersion("0.13.0") + .withDocumentation("to be added"); + private ConsistencyGuardConfig consistencyGuardConfig; private FileSystemRetryConfig fileSystemRetryConfig; @@ -2201,6 +2206,10 @@ public class HoodieWriteConfig extends HoodieConfig { return getBoolean(SKIP_DEFAULT_PARTITION_VALIDATION); } + public Boolean doAutoGenerateRecordKeys() { + return getBooleanOrDefault(AUTO_GENERATE_RECORD_KEYS); + } + /** * Are any table services configured to run inline for both scheduling and execution? * @@ -2723,6 +2732,11 @@ public class HoodieWriteConfig extends HoodieConfig { return this; } + public Builder withAutoGenerateRecordKeys(boolean autoGenerateRecordKeys) { + writeConfig.setValue(AUTO_GENERATE_RECORD_KEYS, String.valueOf(autoGenerateRecordKeys)); + return this; + } + protected void setDefaults() { writeConfig.setDefaultValue(MARKERS_TYPE, getDefaultMarkersType(engineType)); // Check for mandatory properties diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGenerator.java new file mode 100644 index 00000000000..6beba686dfc --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGenerator.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.apache.hudi.keygen.CustomAvroKeyGenerator.SPLIT_REGEX; + +/** + * Auto record key generator. This generator will fetch values from the entire record based on some of the fields and determine the record key. + * Use-cases where users may not be able to configure record keys, can use this auto record key generator. + */ +public class AutoRecordKeyGenerator implements RecordKeyGenerator { + + private final TypedProperties config; + private static final String HOODIE_PREFIX = "_hoodie"; + private static final String DOT = "."; + private final int maxFieldsToConsider; + private final int numFieldsForKey; + private final Set<String> partitionFieldNames; + private int[][] fieldOrdering; + + public AutoRecordKeyGenerator(TypedProperties config, List<String> partitionPathFields) { + this.config = config; + this.numFieldsForKey = config.getInteger(KeyGeneratorOptions.NUM_FIELDS_IN_KEYLESS_GENERATOR.key(), KeyGeneratorOptions.NUM_FIELDS_IN_KEYLESS_GENERATOR.defaultValue()); + // cap the number of fields to order in case of large schemas + this.maxFieldsToConsider = numFieldsForKey * 3; + this.partitionFieldNames = partitionPathFields.stream().map(field -> field.split(SPLIT_REGEX)[0]).collect(Collectors.toSet()); + } + + @Override + public String getRecordKey(GenericRecord record) { + return buildKey(getFieldOrdering(record), record); + } + + int[][] getFieldOrdering(GenericRecord genericRecord) { + if (fieldOrdering == null) { + fieldOrdering = buildFieldOrdering(genericRecord.getSchema().getFields()); + } + return fieldOrdering; + } + + /** + * Deterministically builds a key for the input value based on the provided fieldOrdering. The first {@link #numFieldsForKey} non-null values will be used to generate a string that is passed to + * {@link UUID#nameUUIDFromBytes(byte[])}. + * @param fieldOrdering an array of integer arrays. The integer arrays represent paths to a single field within the input object. + * @param input the input object that needs a key + * @return a deterministically generated {@link UUID} + * @param <T> the input object type + */ + private <T> String buildKey(int[][] fieldOrdering, GenericRecord input) { + StringBuilder key = new StringBuilder(); + int nonNullFields = 0; + for (int[] index : fieldOrdering) { + Object value = getFieldForRecord(input, index); + if (value == null) { + continue; + } + nonNullFields++; + key.append(value.hashCode()); + if (nonNullFields >= numFieldsForKey) { + break; + } + } + return UUID.nameUUIDFromBytes(key.toString().getBytes(StandardCharsets.UTF_8)).toString(); + } + + /** + * Gets the value of the field at the specified path within the record. + * @param record the input record + * @param fieldPath the path to the field as an array of integers representing the field position within the object + * @return value at the path + */ + private static Object getFieldForRecord(GenericRecord record, int[] fieldPath) { + Object value = record; + for (Integer index : fieldPath) { + if (value == null) { + return null; + } + value = ((GenericRecord) value).get(index); + } + return value; + } + + private int[][] buildFieldOrdering(List<Schema.Field> initialFields) { + PriorityQueue<Pair<int[], Integer>> queue = new PriorityQueue<>(maxFieldsToConsider + 1, RankingComparator.getInstance()); + Queue<FieldToProcess> fieldsToProcess = new ArrayDeque<>(); + for (int j = 0; j < initialFields.size(); j++) { + fieldsToProcess.offer(new FieldToProcess(new int[]{j}, initialFields.get(j), initialFields.get(j).name())); + } + while (!fieldsToProcess.isEmpty()) { + FieldToProcess fieldToProcess = fieldsToProcess.poll(); + int[] existingPath = fieldToProcess.getIndexPath(); + Schema fieldSchema = fieldToProcess.getField().schema(); + if (fieldSchema.getType() == Schema.Type.UNION) { + fieldSchema = fieldSchema.getTypes().get(1); + } + if (fieldSchema.getType() == Schema.Type.RECORD) { + List<Schema.Field> nestedFields = fieldSchema.getFields(); + for (int i = 0; i < nestedFields.size(); i++) { + int[] path = Arrays.copyOf(existingPath, existingPath.length + 1); + path[existingPath.length] = i; + Schema.Field nestedField = nestedFields.get(i); + fieldsToProcess.add(new FieldToProcess(path, nestedField, fieldToProcess.getNamePath() + DOT + nestedField.name())); + } + } else { + // check that field is not used in partitioning + if (!partitionFieldNames.contains(fieldToProcess.getNamePath())) { + queue.offer(Pair.of(existingPath, getSchemaRanking(fieldToProcess.getField()))); + if (queue.size() > maxFieldsToConsider) { + queue.poll(); + } + } + } + } + Pair<int[], Integer>[] sortedPairs = queue.toArray(new Pair[queue.size()]); + Arrays.sort(sortedPairs, RankingComparator.getInstance().reversed()); + int[][] output = new int[sortedPairs.length][]; + for (int k = 0; k < sortedPairs.length; k++) { + output[k] = sortedPairs[k].getLeft(); + } + return output; + } + + private static class FieldToProcess { + final int[] indexPath; + final Schema.Field field; + final String namePath; + + public FieldToProcess(int[] indexPath, Schema.Field field, String namePath) { + this.indexPath = indexPath; + this.field = field; + this.namePath = namePath; + } + + public int[] getIndexPath() { + return indexPath; + } + + public Schema.Field getField() { + return field; + } + + public String getNamePath() { + return namePath; + } + } + + /** + * Ranks the fields by their type. + * @param field input field + * @return a score of 0 to 4 + */ + private int getSchemaRanking(Schema.Field field) { + if (field.name().startsWith(HOODIE_PREFIX)) { + return 0; + } + Schema schema = field.schema(); + if (schema.getType() == Schema.Type.UNION) { + schema = schema.getTypes().get(0).getType() == Schema.Type.NULL ? schema.getTypes().get(1) : schema.getTypes().get(0); + } + Schema.Type type = schema.getType(); + switch (type) { + case LONG: + // assumes long with logical type will be a timestamp + return schema.getLogicalType() != null ? 4 : 3; + case INT: + // assumes long with logical type will be a date which will have low variance in a batch + return schema.getLogicalType() != null ? 1 : 3; + case DOUBLE: + case FLOAT: + return 3; + case BOOLEAN: + case MAP: + case ARRAY: + return 1; + default: + return 2; + } + } + + private static class RankingComparator implements Comparator<Pair<int[], Integer>> { + private static final RankingComparator INSTANCE = new RankingComparator(); + + static RankingComparator getInstance() { + return INSTANCE; + } + + @Override + public int compare(Pair<int[], Integer> o1, Pair<int[], Integer> o2) { + int initialResult = o1.getRight().compareTo(o2.getRight()); + if (initialResult == 0) { + // favor the smaller list (less nested value) on ties + int sizeResult = Integer.compare(o2.getLeft().length, o1.getLeft().length); + if (sizeResult == 0) { + return Integer.compare(o2.getLeft()[0], o1.getLeft()[0]); + } + return sizeResult; + } + return initialResult; + } + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java index 9ff5c522e45..581ddaa90f5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java @@ -20,6 +20,7 @@ package org.apache.hudi.keygen; import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.hudi.keygen.factory.RecordKeyGeneratorFactory; import java.util.Arrays; import java.util.stream.Collectors; @@ -29,22 +30,22 @@ import java.util.stream.Collectors; */ public class ComplexAvroKeyGenerator extends BaseKeyGenerator { public static final String DEFAULT_RECORD_KEY_SEPARATOR = ":"; + private final RecordKeyGenerator recordKeyGenerator; public ComplexAvroKeyGenerator(TypedProperties props) { super(props); - this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(",")) - .map(String::trim) - .filter(s -> !s.isEmpty()) - .collect(Collectors.toList()); + this.recordKeyFields = Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(",")); this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(",")) .map(String::trim) .filter(s -> !s.isEmpty()) .collect(Collectors.toList()); + this.recordKeyGenerator = RecordKeyGeneratorFactory.getRecordKeyGenerator(config, recordKeyFields, isConsistentLogicalTimestampEnabled(), + partitionPathFields); } @Override public String getRecordKey(GenericRecord record) { - return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled()); + return recordKeyGenerator.getRecordKey(record); } @Override diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroRecordKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroRecordKeyGenerator.java new file mode 100644 index 00000000000..cd86b8f0834 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroRecordKeyGenerator.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import org.apache.avro.generic.GenericRecord; + +import java.util.List; + +/** + * Complex record key generator. + */ +public class ComplexAvroRecordKeyGenerator implements RecordKeyGenerator { + + private final List<String> recordKeyFields; + private final boolean consistentLogicalTimestampEnabled; + + public ComplexAvroRecordKeyGenerator(List<String> recordKeyFields, boolean consistentLogicalTimestampEnabled) { + this.recordKeyFields = recordKeyFields; + this.consistentLogicalTimestampEnabled = consistentLogicalTimestampEnabled; + } + + @Override + public String getRecordKey(GenericRecord record) { + return KeyGenUtils.getRecordKey(record, recordKeyFields, consistentLogicalTimestampEnabled); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java index dc0bc3cef2f..ba66b1a32f8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java @@ -20,6 +20,7 @@ package org.apache.hudi.keygen; import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.hudi.keygen.factory.RecordKeyGeneratorFactory; import java.util.ArrayList; import java.util.Arrays; @@ -32,15 +33,17 @@ import java.util.List; public class GlobalAvroDeleteKeyGenerator extends BaseKeyGenerator { private static final String EMPTY_PARTITION = ""; + private final RecordKeyGenerator recordKeyGenerator; public GlobalAvroDeleteKeyGenerator(TypedProperties config) { super(config); this.recordKeyFields = Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(",")); + this.recordKeyGenerator = RecordKeyGeneratorFactory.getRecordKeyGenerator(config, recordKeyFields, isConsistentLogicalTimestampEnabled(), new ArrayList<>()); } @Override public String getRecordKey(GenericRecord record) { - return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled()); + return recordKeyGenerator.getRecordKey(record); } @Override diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java index 5b5cedcbf88..4efbaf9b857 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java @@ -20,6 +20,7 @@ package org.apache.hudi.keygen; import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.hudi.keygen.factory.RecordKeyGeneratorFactory; import java.util.ArrayList; import java.util.Arrays; @@ -33,12 +34,14 @@ public class NonpartitionedAvroKeyGenerator extends BaseKeyGenerator { private static final String EMPTY_PARTITION = ""; private static final List<String> EMPTY_PARTITION_FIELD_LIST = new ArrayList<>(); + private final RecordKeyGenerator recordKeyGenerator; public NonpartitionedAvroKeyGenerator(TypedProperties props) { super(props); this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) .split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList()); this.partitionPathFields = EMPTY_PARTITION_FIELD_LIST; + this.recordKeyGenerator = RecordKeyGeneratorFactory.getRecordKeyGenerator(config, recordKeyFields, isConsistentLogicalTimestampEnabled(), partitionPathFields); } @Override @@ -56,10 +59,7 @@ public class NonpartitionedAvroKeyGenerator extends BaseKeyGenerator { // for backward compatibility, we need to use the right format according to the number of record key fields // 1. if there is only one record key field, the format of record key is just "<value>" // 2. if there are multiple record key fields, the format is "<field1>:<value1>,<field2>:<value2>,..." - if (getRecordKeyFieldNames().size() == 1) { - return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled()); - } - return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled()); + return recordKeyGenerator.getRecordKey(record); } public String getEmptyPartition() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java index c7398e94ece..85a3fb74f27 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java @@ -20,7 +20,9 @@ package org.apache.hudi.keygen; import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.hudi.keygen.factory.RecordKeyGeneratorFactory; +import java.util.Arrays; import java.util.Collections; /** @@ -28,6 +30,8 @@ import java.util.Collections; */ public class SimpleAvroKeyGenerator extends BaseKeyGenerator { + private final RecordKeyGenerator recordKeyGenerator; + public SimpleAvroKeyGenerator(TypedProperties props) { this(props, props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()), props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())); @@ -39,15 +43,14 @@ public class SimpleAvroKeyGenerator extends BaseKeyGenerator { SimpleAvroKeyGenerator(TypedProperties props, String recordKeyField, String partitionPathField) { super(props); - this.recordKeyFields = recordKeyField == null - ? Collections.emptyList() - : Collections.singletonList(recordKeyField); + this.recordKeyFields = Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(",")); this.partitionPathFields = Collections.singletonList(partitionPathField); + this.recordKeyGenerator = RecordKeyGeneratorFactory.getRecordKeyGenerator(config, recordKeyFields, isConsistentLogicalTimestampEnabled(), partitionPathFields); } @Override public String getRecordKey(GenericRecord record) { - return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled()); + return recordKeyGenerator.getRecordKey(record); } @Override diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroRecordKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroRecordKeyGenerator.java new file mode 100644 index 00000000000..52bf7ac872b --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroRecordKeyGenerator.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import org.apache.avro.generic.GenericRecord; + +/** + * Simple record key generator. + */ +public class SimpleAvroRecordKeyGenerator implements RecordKeyGenerator { + + private final String recordKeyField; + private final boolean consistentLogicalTimestampEnabled; + + public SimpleAvroRecordKeyGenerator(String recordKeyField, boolean consistentLogicalTimestampEnabled) { + this.recordKeyField = recordKeyField; + this.consistentLogicalTimestampEnabled = consistentLogicalTimestampEnabled; + } + + @Override + public String getRecordKey(GenericRecord record) { + return KeyGenUtils.getRecordKey(record, recordKeyField, consistentLogicalTimestampEnabled); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/RecordKeyGeneratorFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/RecordKeyGeneratorFactory.java new file mode 100644 index 00000000000..e884d4db5c4 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/RecordKeyGeneratorFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen.factory; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.keygen.ComplexAvroRecordKeyGenerator; +import org.apache.hudi.keygen.AutoRecordKeyGenerator; +import org.apache.hudi.keygen.RecordKeyGenerator; +import org.apache.hudi.keygen.SimpleAvroRecordKeyGenerator; + +import java.util.List; + +/** + * Factory to instantiate RecordKeyGenerator. + */ +public class RecordKeyGeneratorFactory { + + public static RecordKeyGenerator getRecordKeyGenerator(TypedProperties config, List<String> recordKeyFields, boolean consistentLogicalTimestampEnabled, + List<String> partitionPathFields) { + if (config.getBoolean(HoodieWriteConfig.AUTO_GENERATE_RECORD_KEYS.key(), HoodieWriteConfig.AUTO_GENERATE_RECORD_KEYS.defaultValue())) { + return new AutoRecordKeyGenerator(config, partitionPathFields); + } else if (recordKeyFields.size() == 1) { + return new SimpleAvroRecordKeyGenerator(recordKeyFields.get(0), consistentLogicalTimestampEnabled); + } else { + return new ComplexAvroRecordKeyGenerator(recordKeyFields, consistentLogicalTimestampEnabled); + } + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java index 977ff709bb1..38ea518508f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java @@ -30,31 +30,7 @@ import org.apache.spark.unsafe.types.UTF8String; * specifically implement record-key, partition-path generation w/o the need for (expensive) * conversion from Spark internal representation (for ex, to Avro) */ -public interface SparkKeyGeneratorInterface extends KeyGeneratorInterface { - - /** - * Extracts record key from Spark's {@link Row} - * - * @param row instance of {@link Row} from which record-key is extracted - * @return record's (primary) key - */ - @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) - String getRecordKey(Row row); - - /** - * Extracts record key from Spark's {@link InternalRow} - * - * NOTE: Difference b/w {@link Row} and {@link InternalRow} is that {@link InternalRow} could - * internally hold just a binary representation of the data, while {@link Row} has it - * deserialized into JVM-native representation (like {@code Integer}, {@code Long}, - * {@code String}, etc) - * - * @param row instance of {@link InternalRow} from which record-key is extracted - * @param schema schema {@link InternalRow} is adhering to - * @return record-key as instance of {@link UTF8String} - */ - @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) - UTF8String getRecordKey(InternalRow row, StructType schema); +public interface SparkKeyGeneratorInterface extends SparkRecordKeyGeneratorInterface { /** * Extracts partition-path from {@link Row} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkRecordKeyGeneratorInterface.java similarity index 61% copy from hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java copy to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkRecordKeyGeneratorInterface.java index 977ff709bb1..3d4ccc0de0e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkRecordKeyGeneratorInterface.java @@ -20,17 +20,16 @@ package org.apache.hudi.keygen; import org.apache.hudi.ApiMaturityLevel; import org.apache.hudi.PublicAPIMethod; + import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.types.StructType; import org.apache.spark.unsafe.types.UTF8String; /** - * Spark-specific {@link KeyGenerator} interface extension allowing implementation to - * specifically implement record-key, partition-path generation w/o the need for (expensive) - * conversion from Spark internal representation (for ex, to Avro) + * Spark's record key generator interface to assist in generating record key for a given spark row. */ -public interface SparkKeyGeneratorInterface extends KeyGeneratorInterface { +public interface SparkRecordKeyGeneratorInterface { /** * Extracts record key from Spark's {@link Row} @@ -55,28 +54,4 @@ public interface SparkKeyGeneratorInterface extends KeyGeneratorInterface { */ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) UTF8String getRecordKey(InternalRow row, StructType schema); - - /** - * Extracts partition-path from {@link Row} - * - * @param row instance of {@link Row} from which partition-path is extracted - * @return record's partition-path - */ - @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) - String getPartitionPath(Row row); - - /** - * Extracts partition-path from Spark's {@link InternalRow} - * - * NOTE: Difference b/w {@link Row} and {@link InternalRow} is that {@link InternalRow} could - * internally hold just a binary representation of the data, while {@link Row} has it - * deserialized into JVM-native representation (like {@code Integer}, {@code Long}, - * {@code String}, etc) - * - * @param row instance of {@link InternalRow} from which record-key is extracted - * @param schema schema {@link InternalRow} is adhering to - * @return partition-path as instance of {@link UTF8String} - */ - @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) - UTF8String getPartitionPath(InternalRow row, StructType schema); } diff --git a/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java b/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java index d0baa903919..e3d5a3b18bb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java +++ b/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java @@ -30,7 +30,7 @@ import java.util.List; * Base abstract class to extend for {@link KeyGenerator} with default logic of taking * partitioning and timestamp configs. */ -public abstract class BaseKeyGenerator extends KeyGenerator { +public abstract class BaseKeyGenerator extends KeyGenerator implements RecordKeyGenerator { protected List<String> recordKeyFields; protected List<String> partitionPathFields; @@ -51,7 +51,7 @@ public abstract class BaseKeyGenerator extends KeyGenerator { /** * Generate a record Key out of provided generic record. */ - public abstract String getRecordKey(GenericRecord record); + //public abstract String getRecordKey(GenericRecord record); /** * Generate a partition path out of provided generic record. diff --git a/hudi-common/src/main/java/org/apache/hudi/keygen/RecordKeyGenerator.java b/hudi-common/src/main/java/org/apache/hudi/keygen/RecordKeyGenerator.java new file mode 100644 index 00000000000..2854c5de2d8 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/keygen/RecordKeyGenerator.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import org.apache.avro.generic.GenericRecord; + +/** + * Interface to fetch record key given a GenericRecord. + */ +public interface RecordKeyGenerator { + + /** + * Generate a record Key out of provided generic record. + */ + String getRecordKey(GenericRecord record); +}
