This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a34067826c2 [HUDI-5514][HUDI-5574][HUDI-5604][HUDI-5535] Adding auto
generation of record keys support to Hudi/Spark (#8107)
a34067826c2 is described below
commit a34067826c2f367be942052cf86d64a1cfdd01b8
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue May 9 05:33:56 2023 -0700
[HUDI-5514][HUDI-5574][HUDI-5604][HUDI-5535] Adding auto generation of
record keys support to Hudi/Spark (#8107)
* Adding auto generation of record keys support to Hudi
* Fixing non partitioned key generator for inference
---
.../AutoRecordGenWrapperAvroKeyGenerator.java | 83 +++++++
.../hudi/keygen/ComplexAvroKeyGenerator.java | 5 +-
.../apache/hudi/keygen/CustomAvroKeyGenerator.java | 10 +-
.../hudi/keygen/GlobalAvroDeleteKeyGenerator.java | 6 +-
.../java/org/apache/hudi/keygen/KeyGenUtils.java | 79 ++++--
.../keygen/NonpartitionedAvroKeyGenerator.java | 7 +-
.../apache/hudi/keygen/SimpleAvroKeyGenerator.java | 11 +-
.../keygen/TimestampBasedAvroKeyGenerator.java | 2 +-
.../factory/HoodieAvroKeyGeneratorFactory.java | 28 ++-
.../org/apache/hudi/keygen/TestKeyGenUtils.java | 13 +-
...estCreateAvroKeyGeneratorByTypeWithFactory.java | 13 +
.../keygen/AutoRecordGenWrapperKeyGenerator.java | 108 +++++++++
.../apache/hudi/keygen/ComplexKeyGenerator.java | 5 +-
.../org/apache/hudi/keygen/CustomKeyGenerator.java | 20 +-
.../hudi/keygen/GlobalDeleteKeyGenerator.java | 4 +-
.../hudi/keygen/NonpartitionedKeyGenerator.java | 10 +-
.../org/apache/hudi/keygen/SimpleKeyGenerator.java | 15 +-
.../hudi/keygen/TimestampBasedKeyGenerator.java | 5 +-
.../factory/HoodieSparkKeyGeneratorFactory.java | 15 +-
.../hudi/HoodieDatasetBulkInsertHelper.scala | 24 +-
.../org/apache/hudi/util/SparkKeyGenUtils.scala | 41 ++--
.../hudi/common/table/HoodieTableConfig.java | 18 +-
.../org/apache/hudi/keygen/BaseKeyGenerator.java | 1 +
.../testsuite/HoodieDeltaStreamerWrapper.java | 4 +-
.../apache/hudi/AutoRecordKeyGenerationUtils.scala | 50 ++++
.../scala/org/apache/hudi/DataSourceOptions.scala | 6 +-
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 57 +++--
.../scala/org/apache/hudi/HoodieWriterUtils.scala | 7 +-
.../sql/catalyst/catalog/HoodieCatalogTable.scala | 1 +
.../apache/spark/sql/hudi/HoodieOptionConfig.scala | 9 +-
.../spark/sql/hudi/ProvidesHoodieConfig.scala | 8 +-
.../spark/sql/hudi/command/SqlKeyGenerator.scala | 14 +-
.../TestHoodieDatasetBulkInsertHelper.java | 24 +-
.../hudi/keygen/TestComplexKeyGenerator.java | 6 +-
.../apache/hudi/keygen/TestCustomKeyGenerator.java | 26 +-
.../keygen/TestGlobalDeleteRecordGenerator.java | 6 +-
.../keygen/TestNonpartitionedKeyGenerator.java | 6 +-
.../apache/hudi/keygen/TestSimpleKeyGenerator.java | 6 +-
.../TestCreateKeyGeneratorByTypeWithFactory.java | 13 +
.../org/apache/hudi/TestDataSourceDefaults.scala | 10 +-
.../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 2 +-
.../apache/hudi/functional/CommonOptionUtils.scala | 56 +++++
.../TestAutoGenerationOfRecordKeys.scala | 264 +++++++++++++++++++++
.../apache/hudi/functional/TestCOWDataSource.scala | 16 +-
.../hudi/functional/TestStreamingSource.scala | 4 +
.../spark/sql/hudi/TestHoodieOptionConfig.scala | 10 -
.../apache/spark/sql/hudi/TestInsertTable.scala | 40 +++-
.../hudi/utilities/deltastreamer/DeltaSync.java | 103 ++++----
.../deltastreamer/TestHoodieDeltaStreamer.java | 40 +++-
49 files changed, 1042 insertions(+), 269 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperAvroKeyGenerator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperAvroKeyGenerator.java
new file mode 100644
index 00000000000..a8ae48e1d67
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperAvroKeyGenerator.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+
+import org.apache.avro.generic.GenericRecord;
+
+import java.util.List;
+
+/**
+ * A wrapper key generator to intercept getRecordKey calls for auto record key
generator.
+ * <ol>
+ * <li>Generated keys will be unique not only w/in provided
[[org.apache.spark.sql.DataFrame]], but
+ * globally unique w/in the target table</li>
+ * <li>Generated keys have minimal overhead (to compute, persist and
read)</li>
+ * </ol>
+ *
+ * Keys adhere to the following format:
+ *
+ * [instantTime]_[PartitionId]_[RowId]
+ *
+ * where
+ * instantTime refers to the commit time of the batch being ingested.
+ * PartitionId refers to spark's partition Id.
+ * RowId refers to the row index within the spark partition.
+ */
+public class AutoRecordGenWrapperAvroKeyGenerator extends BaseKeyGenerator {
+
+ private final BaseKeyGenerator keyGenerator;
+ private final int partitionId;
+ private final String instantTime;
+ private int rowId;
+
+ public AutoRecordGenWrapperAvroKeyGenerator(TypedProperties config,
BaseKeyGenerator keyGenerator) {
+ super(config);
+ this.keyGenerator = keyGenerator;
+ this.rowId = 0;
+ this.partitionId =
config.getInteger(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG);
+ this.instantTime =
config.getString(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG);
+ }
+
+ @Override
+ public String getRecordKey(GenericRecord record) {
+ return HoodieRecord.generateSequenceId(instantTime, partitionId, rowId++);
+ }
+
+ @Override
+ public String getPartitionPath(GenericRecord record) {
+ return keyGenerator.getPartitionPath(record);
+ }
+
+ @Override
+ public List<String> getRecordKeyFieldNames() {
+ return keyGenerator.getRecordKeyFieldNames();
+ }
+
+ public List<String> getPartitionPathFields() {
+ return keyGenerator.getPartitionPathFields();
+ }
+
+ public boolean isConsistentLogicalTimestampEnabled() {
+ return keyGenerator.isConsistentLogicalTimestampEnabled();
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
index 9ff5c522e45..1c4860779cb 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
@@ -32,10 +32,7 @@ public class ComplexAvroKeyGenerator extends
BaseKeyGenerator {
public ComplexAvroKeyGenerator(TypedProperties props) {
super(props);
- this.recordKeyFields =
Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","))
- .map(String::trim)
- .filter(s -> !s.isEmpty())
- .collect(Collectors.toList());
+ this.recordKeyFields = KeyGenUtils.getRecordKeyFields(props);
this.partitionPathFields =
Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
index 77377de7ab8..13ae1d50528 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
@@ -20,12 +20,14 @@ package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.hudi.exception.HoodieKeyGeneratorException;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.stream.Collectors;
/**
@@ -43,7 +45,7 @@ import java.util.stream.Collectors;
*/
public class CustomAvroKeyGenerator extends BaseKeyGenerator {
- private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
+ public static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
public static final String SPLIT_REGEX = ":";
/**
@@ -55,7 +57,11 @@ public class CustomAvroKeyGenerator extends BaseKeyGenerator
{
public CustomAvroKeyGenerator(TypedProperties props) {
super(props);
- this.recordKeyFields =
Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(",")).map(String::trim).collect(Collectors.toList());
+ this.recordKeyFields =
Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
null))
+ .map(recordKeyConfigValue ->
+ Arrays.stream(recordKeyConfigValue.split(","))
+ .map(String::trim).collect(Collectors.toList())
+ ).orElse(Collections.emptyList());
this.partitionPathFields =
Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(",")).map(String::trim).collect(Collectors.toList());
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java
index dc0bc3cef2f..517798e7e7c 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java
@@ -19,10 +19,8 @@ package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
/**
@@ -31,11 +29,9 @@ import java.util.List;
*/
public class GlobalAvroDeleteKeyGenerator extends BaseKeyGenerator {
- private static final String EMPTY_PARTITION = "";
-
public GlobalAvroDeleteKeyGenerator(TypedProperties config) {
super(config);
- this.recordKeyFields =
Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","));
+ this.recordKeyFields = KeyGenUtils.getRecordKeyFields(config);
}
@Override
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
index fdc17f1c799..5c24dd09291 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.keygen.parser.BaseHoodieDateTimeParser;
@@ -37,6 +38,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.stream.Collectors;
public class KeyGenUtils {
@@ -48,6 +50,9 @@ public class KeyGenUtils {
public static final String DEFAULT_RECORD_KEY_PARTS_SEPARATOR = ",";
public static final String DEFAULT_COMPOSITE_KEY_FILED_VALUE = ":";
+ public static final String RECORD_KEY_GEN_PARTITION_ID_CONFIG =
"_hoodie.record.key.gen.partition.id";
+ public static final String RECORD_KEY_GEN_INSTANT_TIME_CONFIG =
"_hoodie.record.key.gen.instant.time";
+
/**
* Infers the key generator type based on the record key and partition
fields.
* <p>
@@ -60,11 +65,28 @@ public class KeyGenUtils {
* @return Inferred key generator type.
*/
public static KeyGeneratorType inferKeyGeneratorType(
- String recordsKeyFields, String partitionFields) {
+ Option<String> recordsKeyFields, String partitionFields) {
+ boolean autoGenerateRecordKeys = !recordsKeyFields.isPresent();
+ if (autoGenerateRecordKeys) {
+ return inferKeyGeneratorTypeForAutoKeyGen(partitionFields);
+ } else {
+ if (!StringUtils.isNullOrEmpty(partitionFields)) {
+ int numPartFields = partitionFields.split(",").length;
+ int numRecordKeyFields = recordsKeyFields.get().split(",").length;
+ if (numPartFields == 1 && numRecordKeyFields == 1) {
+ return KeyGeneratorType.SIMPLE;
+ }
+ return KeyGeneratorType.COMPLEX;
+ }
+ return KeyGeneratorType.NON_PARTITION;
+ }
+ }
+
+ // When auto record key gen is enabled, our inference will be based on
partition path only.
+ private static KeyGeneratorType inferKeyGeneratorTypeForAutoKeyGen(String
partitionFields) {
if (!StringUtils.isNullOrEmpty(partitionFields)) {
int numPartFields = partitionFields.split(",").length;
- int numRecordKeyFields = recordsKeyFields.split(",").length;
- if (numPartFields == 1 && numRecordKeyFields == 1) {
+ if (numPartFields == 1) {
return KeyGeneratorType.SIMPLE;
}
return KeyGeneratorType.COMPLEX;
@@ -85,7 +107,8 @@ public class KeyGenUtils {
/**
* Fetches partition path from the GenericRecord.
- * @param genericRecord generic record of interest.
+ *
+ * @param genericRecord generic record of interest.
* @param keyGeneratorOpt Optional BaseKeyGenerator. If not, meta field will
be used.
* @return the partition path for the passed in generic record.
*/
@@ -107,18 +130,18 @@ public class KeyGenUtils {
public static String[] extractRecordKeysByFields(String recordKey,
List<String> fields) {
String[] fieldKV = recordKey.split(DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
return Arrays.stream(fieldKV).map(kv ->
kv.split(DEFAULT_COMPOSITE_KEY_FILED_VALUE, 2))
- .filter(kvArray -> kvArray.length == 1 || fields.isEmpty() ||
(fields.contains(kvArray[0])))
- .map(kvArray -> {
- if (kvArray.length == 1) {
- return kvArray[0];
- } else if (kvArray[1].equals(NULL_RECORDKEY_PLACEHOLDER)) {
- return null;
- } else if (kvArray[1].equals(EMPTY_RECORDKEY_PLACEHOLDER)) {
- return "";
- } else {
- return kvArray[1];
- }
- }).toArray(String[]::new);
+ .filter(kvArray -> kvArray.length == 1 || fields.isEmpty() ||
(fields.contains(kvArray[0])))
+ .map(kvArray -> {
+ if (kvArray.length == 1) {
+ return kvArray[0];
+ } else if (kvArray[1].equals(NULL_RECORDKEY_PLACEHOLDER)) {
+ return null;
+ } else if (kvArray[1].equals(EMPTY_RECORDKEY_PLACEHOLDER)) {
+ return "";
+ } else {
+ return kvArray[1];
+ }
+ }).toArray(String[]::new);
}
public static String getRecordKey(GenericRecord record, List<String>
recordKeyFields, boolean consistentLogicalTimestampEnabled) {
@@ -144,7 +167,7 @@ public class KeyGenUtils {
}
public static String getRecordPartitionPath(GenericRecord record,
List<String> partitionPathFields,
- boolean hiveStylePartitioning, boolean encodePartitionPath, boolean
consistentLogicalTimestampEnabled) {
+ boolean hiveStylePartitioning,
boolean encodePartitionPath, boolean consistentLogicalTimestampEnabled) {
if (partitionPathFields.isEmpty()) {
return "";
}
@@ -176,7 +199,7 @@ public class KeyGenUtils {
}
public static String getPartitionPath(GenericRecord record, String
partitionPathField,
- boolean hiveStylePartitioning, boolean encodePartitionPath, boolean
consistentLogicalTimestampEnabled) {
+ boolean hiveStylePartitioning, boolean
encodePartitionPath, boolean consistentLogicalTimestampEnabled) {
String partitionPath = HoodieAvroUtils.getNestedFieldValAsString(record,
partitionPathField, true, consistentLogicalTimestampEnabled);
if (partitionPath == null || partitionPath.isEmpty()) {
partitionPath = HUDI_DEFAULT_PARTITION_PATH;
@@ -193,7 +216,7 @@ public class KeyGenUtils {
/**
* Create a date time parser class for TimestampBasedKeyGenerator, passing
in any configs needed.
*/
- public static BaseHoodieDateTimeParser createDateTimeParser(TypedProperties
props, String parserClass) throws IOException {
+ public static BaseHoodieDateTimeParser createDateTimeParser(TypedProperties
props, String parserClass) throws IOException {
try {
return (BaseHoodieDateTimeParser) ReflectionUtils.loadClass(parserClass,
props);
} catch (Throwable e) {
@@ -228,4 +251,22 @@ public class KeyGenUtils {
}
return keyGenerator;
}
+
+ public static List<String> getRecordKeyFields(TypedProperties props) {
+ return
Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
null))
+ .map(recordKeyConfigValue ->
+ Arrays.stream(recordKeyConfigValue.split(","))
+ .map(String::trim)
+ .filter(s -> !s.isEmpty())
+ .collect(Collectors.toList())
+ ).orElse(Collections.emptyList());
+ }
+
+ /**
+ * @param props props of interest.
+ * @return true if record keys need to be auto generated. false otherwise.
+ */
+ public static boolean enableAutoGenerateRecordKeys(TypedProperties props) {
+ return !props.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key());
+ }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
index 59c883eaadc..6f127d0db15 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
@@ -18,27 +18,22 @@
package org.apache.hudi.keygen;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.avro.generic.GenericRecord;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
-import java.util.stream.Collectors;
/**
* Avro simple Key generator for non-partitioned Hive Tables.
*/
public class NonpartitionedAvroKeyGenerator extends BaseKeyGenerator {
- private static final String EMPTY_PARTITION = "";
private static final List<String> EMPTY_PARTITION_FIELD_LIST = new
ArrayList<>();
public NonpartitionedAvroKeyGenerator(TypedProperties props) {
super(props);
- this.recordKeyFields =
Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
- .split(",")).map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toList());
+ this.recordKeyFields = KeyGenUtils.getRecordKeyFields(config);
this.partitionPathFields = EMPTY_PARTITION_FIELD_LIST;
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java
index c7398e94ece..82a137f7cb1 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java
@@ -19,6 +19,7 @@ package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import java.util.Collections;
@@ -29,19 +30,17 @@ import java.util.Collections;
public class SimpleAvroKeyGenerator extends BaseKeyGenerator {
public SimpleAvroKeyGenerator(TypedProperties props) {
- this(props,
props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()),
+ this(props,
Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
null)),
props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()));
}
SimpleAvroKeyGenerator(TypedProperties props, String partitionPathField) {
- this(props, null, partitionPathField);
+ this(props, Option.empty(), partitionPathField);
}
- SimpleAvroKeyGenerator(TypedProperties props, String recordKeyField, String
partitionPathField) {
+ SimpleAvroKeyGenerator(TypedProperties props, Option<String> recordKeyField,
String partitionPathField) {
super(props);
- this.recordKeyFields = recordKeyField == null
- ? Collections.emptyList()
- : Collections.singletonList(recordKeyField);
+ this.recordKeyFields = recordKeyField.map(keyField ->
Collections.singletonList(keyField)).orElse(Collections.emptyList());
this.partitionPathFields = Collections.singletonList(partitionPathField);
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java
index 60ccc694f94..2c75867f8da 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java
@@ -76,7 +76,7 @@ public class TimestampBasedAvroKeyGenerator extends
SimpleAvroKeyGenerator {
}
TimestampBasedAvroKeyGenerator(TypedProperties config, String
recordKeyField, String partitionPathField) throws IOException {
- super(config, recordKeyField, partitionPathField);
+ super(config, Option.ofNullable(recordKeyField), partitionPathField);
String dateTimeParserClass =
config.getString(KeyGeneratorOptions.Config.DATE_TIME_PARSER_PROP,
HoodieDateTimeParser.class.getName());
this.parser = KeyGenUtils.createDateTimeParser(config,
dateTimeParserClass);
this.inputDateTimeZone = parser.getInputDateTimeZone();
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java
index b24b9a8e2d9..f375095122d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java
@@ -21,9 +21,11 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieKeyGeneratorException;
+import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.keygen.CustomAvroKeyGenerator;
import org.apache.hudi.keygen.GlobalAvroDeleteKeyGenerator;
+import org.apache.hudi.keygen.AutoRecordGenWrapperAvroKeyGenerator;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
@@ -71,22 +73,36 @@ public class HoodieAvroKeyGeneratorFactory {
throw new HoodieKeyGeneratorException("Unsupported keyGenerator Type " +
keyGeneratorType);
}
+ BaseKeyGenerator keyGenerator = null;
+
switch (keyGeneratorTypeEnum) {
case SIMPLE:
- return new SimpleAvroKeyGenerator(props);
+ keyGenerator = new SimpleAvroKeyGenerator(props);
+ break;
case COMPLEX:
- return new ComplexAvroKeyGenerator(props);
+ keyGenerator = new ComplexAvroKeyGenerator(props);
+ break;
case TIMESTAMP:
- return new TimestampBasedAvroKeyGenerator(props);
+ keyGenerator = new TimestampBasedAvroKeyGenerator(props);
+ break;
case CUSTOM:
- return new CustomAvroKeyGenerator(props);
+ keyGenerator = new CustomAvroKeyGenerator(props);
+ break;
case NON_PARTITION:
- return new NonpartitionedAvroKeyGenerator(props);
+ keyGenerator = new NonpartitionedAvroKeyGenerator(props);
+ break;
case GLOBAL_DELETE:
- return new GlobalAvroDeleteKeyGenerator(props);
+ keyGenerator = new GlobalAvroDeleteKeyGenerator(props);
+ break;
default:
throw new HoodieKeyGeneratorException("Unsupported keyGenerator Type "
+ keyGeneratorType);
}
+
+ if (KeyGenUtils.enableAutoGenerateRecordKeys(props)) {
+ return new AutoRecordGenWrapperAvroKeyGenerator(props, keyGenerator);
+ } else {
+ return keyGenerator;
+ }
}
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
index b79404ff23e..ae0b259dd73 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
@@ -18,6 +18,7 @@
package org.apache.hudi.keygen;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.junit.jupiter.api.Assertions;
@@ -34,22 +35,22 @@ public class TestKeyGenUtils {
public void testInferKeyGeneratorType() {
assertEquals(
KeyGeneratorType.SIMPLE,
- KeyGenUtils.inferKeyGeneratorType("col1", "partition1"));
+ KeyGenUtils.inferKeyGeneratorType(Option.of("col1"), "partition1"));
assertEquals(
KeyGeneratorType.COMPLEX,
- KeyGenUtils.inferKeyGeneratorType("col1", "partition1,partition2"));
+ KeyGenUtils.inferKeyGeneratorType(Option.of("col1"),
"partition1,partition2"));
assertEquals(
KeyGeneratorType.COMPLEX,
- KeyGenUtils.inferKeyGeneratorType("col1,col2", "partition1"));
+ KeyGenUtils.inferKeyGeneratorType(Option.of("col1,col2"),
"partition1"));
assertEquals(
KeyGeneratorType.COMPLEX,
- KeyGenUtils.inferKeyGeneratorType("col1,col2",
"partition1,partition2"));
+ KeyGenUtils.inferKeyGeneratorType(Option.of("col1,col2"),
"partition1,partition2"));
assertEquals(
KeyGeneratorType.NON_PARTITION,
- KeyGenUtils.inferKeyGeneratorType("col1,col2", ""));
+ KeyGenUtils.inferKeyGeneratorType(Option.of("col1,col2"), ""));
assertEquals(
KeyGeneratorType.NON_PARTITION,
- KeyGenUtils.inferKeyGeneratorType("col1,col2", null));
+ KeyGenUtils.inferKeyGeneratorType(Option.of("col1,col2"), null));
}
@Test
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/factory/TestCreateAvroKeyGeneratorByTypeWithFactory.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/factory/TestCreateAvroKeyGeneratorByTypeWithFactory.java
index b69d84442bc..96095da3716 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/factory/TestCreateAvroKeyGeneratorByTypeWithFactory.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/factory/TestCreateAvroKeyGeneratorByTypeWithFactory.java
@@ -20,9 +20,11 @@ package org.apache.hudi.keygen.factory;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieKeyGeneratorException;
+import org.apache.hudi.keygen.AutoRecordGenWrapperAvroKeyGenerator;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.keygen.CustomAvroKeyGenerator;
import org.apache.hudi.keygen.GlobalAvroDeleteKeyGenerator;
+import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
@@ -32,6 +34,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
@@ -97,4 +100,14 @@ public class TestCreateAvroKeyGeneratorByTypeWithFactory {
throw new HoodieKeyGeneratorException("Unsupported keyGenerator Type "
+ keyGenType);
}
}
+
+ @Test
+ public void testAutoRecordKeyGenerator() throws IOException {
+ props = new TypedProperties();
+ props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partition");
+ props.put(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, "100");
+ props.put(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, 1);
+ KeyGenerator keyGenerator =
HoodieAvroKeyGeneratorFactory.createKeyGenerator(props);
+
Assertions.assertEquals(AutoRecordGenWrapperAvroKeyGenerator.class.getName(),
keyGenerator.getClass().getName());
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperKeyGenerator.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperKeyGenerator.java
new file mode 100644
index 00000000000..ce767665a6f
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/AutoRecordGenWrapperKeyGenerator.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import java.util.List;
+
+/**
+ * A wrapper key generator to intercept getRecordKey calls for auto record key
generator.
+ * <ol>
+ * <li>Generated keys will be unique not only w/in provided
[[org.apache.spark.sql.DataFrame]], but
+ * globally unique w/in the target table</li>
+ * <li>Generated keys have minimal overhead (to compute, persist and
read)</li>
+ * </ol>
+ *
+ * Keys adhere to the following format:
+ *
+ * [instantTime]_[PartitionId]_[RowId]
+ *
+ * where
+ * instantTime refers to the commit time of the batch being ingested.
+ * PartitionId refers to spark's partition Id.
+ * RowId refers to the row index within the spark partition.
+ */
+public class AutoRecordGenWrapperKeyGenerator extends BuiltinKeyGenerator {
+
+ private final BuiltinKeyGenerator builtinKeyGenerator;
+ private final int partitionId;
+ private final String instantTime;
+ private int rowId;
+
+ public AutoRecordGenWrapperKeyGenerator(TypedProperties config,
BuiltinKeyGenerator builtinKeyGenerator) {
+ super(config);
+ this.builtinKeyGenerator = builtinKeyGenerator;
+ this.rowId = 0;
+ this.partitionId =
config.getInteger(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG);
+ this.instantTime =
config.getString(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG);
+ }
+
+ @Override
+ public String getRecordKey(GenericRecord record) {
+ return HoodieRecord.generateSequenceId(instantTime, partitionId, rowId++);
+ }
+
+ @Override
+ public String getPartitionPath(GenericRecord record) {
+ return builtinKeyGenerator.getPartitionPath(record);
+ }
+
+ @Override
+ public String getRecordKey(Row row) {
+ return HoodieRecord.generateSequenceId(instantTime, partitionId, rowId++);
+ }
+
+ @Override
+ public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
+ return UTF8String.fromString(HoodieRecord.generateSequenceId(instantTime,
partitionId, rowId++));
+ }
+
+ @Override
+ public String getPartitionPath(Row row) {
+ return builtinKeyGenerator.getPartitionPath(row);
+ }
+
+ @Override
+ public UTF8String getPartitionPath(InternalRow internalRow, StructType
schema) {
+ return builtinKeyGenerator.getPartitionPath(internalRow, schema);
+ }
+
+ @Override
+ public List<String> getRecordKeyFieldNames() {
+ return builtinKeyGenerator.getRecordKeyFieldNames();
+ }
+
+ public List<String> getPartitionPathFields() {
+ return builtinKeyGenerator.getPartitionPathFields();
+ }
+
+ public boolean isConsistentLogicalTimestampEnabled() {
+ return builtinKeyGenerator.isConsistentLogicalTimestampEnabled();
+ }
+
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
index c9cff284e80..d00ca066ced 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
@@ -41,10 +41,7 @@ public class ComplexKeyGenerator extends BuiltinKeyGenerator
{
public ComplexKeyGenerator(TypedProperties props) {
super(props);
- this.recordKeyFields =
Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(FIELDS_SEP))
- .map(String::trim)
- .filter(s -> !s.isEmpty())
- .collect(Collectors.toList());
+ this.recordKeyFields = KeyGenUtils.getRecordKeyFields(props);
this.partitionPathFields =
Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(FIELDS_SEP))
.map(String::trim)
.filter(s -> !s.isEmpty())
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
index fcd94bb4f15..1526164207f 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
@@ -60,17 +60,17 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator
{
// NOTE: We have to strip partition-path configuration, since it could
only be interpreted by
// this key-gen
super(stripPartitionPathConfig(props));
- this.recordKeyFields =
-
Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","))
- .map(String::trim)
- .collect(Collectors.toList());
+ this.recordKeyFields =
Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
null))
+ .map(recordKeyConfigValue ->
+ Arrays.stream(recordKeyConfigValue.split(","))
+ .map(String::trim)
+ .collect(Collectors.toList())
+ ).orElse(Collections.emptyList());
String partitionPathFields =
props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key());
this.partitionPathFields = partitionPathFields == null
? Collections.emptyList()
:
Arrays.stream(partitionPathFields.split(",")).map(String::trim).collect(Collectors.toList());
this.customAvroKeyGenerator = new CustomAvroKeyGenerator(props);
-
- validateRecordKeyFields();
}
@Override
@@ -86,7 +86,7 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
@Override
public String getRecordKey(Row row) {
return getRecordKeyFieldNames().size() == 1
- ? new SimpleKeyGenerator(config,
config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()),
null).getRecordKey(row)
+ ? new SimpleKeyGenerator(config,
Option.ofNullable(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())),
null).getRecordKey(row)
: new ComplexKeyGenerator(config).getRecordKey(row);
}
@@ -155,12 +155,6 @@ public class CustomKeyGenerator extends
BuiltinKeyGenerator {
return partitionPath.toString();
}
- private void validateRecordKeyFields() {
- if (getRecordKeyFieldNames() == null ||
getRecordKeyFieldNames().isEmpty()) {
- throw new HoodieKeyException("Unable to find field names for record key
in cfg");
- }
- }
-
private static TypedProperties stripPartitionPathConfig(TypedProperties
props) {
TypedProperties filtered = new TypedProperties(props);
// NOTE: We have to stub it out w/ empty string, since we properties are:
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
index 7fcc16094ea..8a229384307 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
@@ -19,7 +19,6 @@
package org.apache.hudi.keygen;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.Row;
@@ -28,7 +27,6 @@ import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
/**
@@ -40,7 +38,7 @@ public class GlobalDeleteKeyGenerator extends
BuiltinKeyGenerator {
private final GlobalAvroDeleteKeyGenerator globalAvroDeleteKeyGenerator;
public GlobalDeleteKeyGenerator(TypedProperties config) {
super(config);
- this.recordKeyFields =
Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","));
+ this.recordKeyFields = KeyGenUtils.getRecordKeyFields(config);
this.globalAvroDeleteKeyGenerator = new
GlobalAvroDeleteKeyGenerator(config);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
index 100bcc2cd7f..2854088c47c 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
@@ -18,9 +18,9 @@
package org.apache.hudi.keygen;
-import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
@@ -29,7 +29,6 @@ import org.apache.spark.unsafe.types.UTF8String;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.stream.Collectors;
/**
* Simple Key generator for non-partitioned Hive Tables.
@@ -40,10 +39,7 @@ public class NonpartitionedKeyGenerator extends
BuiltinKeyGenerator {
public NonpartitionedKeyGenerator(TypedProperties props) {
super(props);
- this.recordKeyFields =
Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
- .split(","))
- .map(String::trim)
- .collect(Collectors.toList());
+ this.recordKeyFields = KeyGenUtils.getRecordKeyFields(props);
this.partitionPathFields = Collections.emptyList();
this.nonpartitionedAvroKeyGenerator = new
NonpartitionedAvroKeyGenerator(props);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
index 4b1a5e5cb44..c897d6b657e 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
@@ -20,6 +20,7 @@ package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -38,21 +39,21 @@ public class SimpleKeyGenerator extends BuiltinKeyGenerator
{
private final SimpleAvroKeyGenerator simpleAvroKeyGenerator;
public SimpleKeyGenerator(TypedProperties props) {
- this(props,
props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()),
+ this(props,
Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
null)),
props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()));
}
SimpleKeyGenerator(TypedProperties props, String partitionPathField) {
- this(props, null, partitionPathField);
+ this(props, Option.empty(), partitionPathField);
}
- SimpleKeyGenerator(TypedProperties props, String recordKeyField, String
partitionPathField) {
+ SimpleKeyGenerator(TypedProperties props, Option<String> recordKeyField,
String partitionPathField) {
super(props);
// Make sure key-generator is configured properly
validateRecordKey(recordKeyField);
validatePartitionPath(partitionPathField);
- this.recordKeyFields = recordKeyField == null ? Collections.emptyList() :
Collections.singletonList(recordKeyField);
+ this.recordKeyFields = !recordKeyField.isPresent() ?
Collections.emptyList() : Collections.singletonList(recordKeyField.get());
this.partitionPathFields = partitionPathField == null ?
Collections.emptyList() : Collections.singletonList(partitionPathField);
this.simpleAvroKeyGenerator = new SimpleAvroKeyGenerator(props,
recordKeyField, partitionPathField);
}
@@ -116,10 +117,10 @@ public class SimpleKeyGenerator extends
BuiltinKeyGenerator {
String.format("Single partition-path field is expected; provided
(%s)", partitionPathField));
}
- private static void validateRecordKey(String recordKeyField) {
- checkArgument(recordKeyField == null || !recordKeyField.isEmpty(),
+ private void validateRecordKey(Option<String> recordKeyField) {
+ checkArgument(!recordKeyField.isPresent() ||
!recordKeyField.get().isEmpty(),
"Record key field has to be non-empty!");
- checkArgument(recordKeyField == null ||
!recordKeyField.contains(FIELDS_SEP),
+ checkArgument(!recordKeyField.isPresent() ||
!recordKeyField.get().contains(FIELDS_SEP),
String.format("Single record-key field is expected; provided (%s)",
recordKeyField));
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
index fa36f2152cb..470af045485 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
@@ -20,6 +20,7 @@ package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieKeyGeneratorException;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.spark.sql.Row;
@@ -41,7 +42,7 @@ public class TimestampBasedKeyGenerator extends
SimpleKeyGenerator {
private final TimestampBasedAvroKeyGenerator timestampBasedAvroKeyGenerator;
public TimestampBasedKeyGenerator(TypedProperties config) throws IOException
{
- this(config,
config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()),
+ this(config,
config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), null),
config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()));
}
@@ -50,7 +51,7 @@ public class TimestampBasedKeyGenerator extends
SimpleKeyGenerator {
}
TimestampBasedKeyGenerator(TypedProperties config, String recordKeyField,
String partitionPathField) throws IOException {
- super(config, recordKeyField, partitionPathField);
+ super(config, Option.ofNullable(recordKeyField), partitionPathField);
timestampBasedAvroKeyGenerator = new
TimestampBasedAvroKeyGenerator(config, recordKeyField, partitionPathField);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
index b9c4043ecda..ae961b4dcf5 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
@@ -19,13 +19,17 @@
package org.apache.hudi.keygen.factory;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieKeyGeneratorException;
+import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.hudi.keygen.ComplexKeyGenerator;
import org.apache.hudi.keygen.CustomKeyGenerator;
import org.apache.hudi.keygen.GlobalDeleteKeyGenerator;
+import org.apache.hudi.keygen.AutoRecordGenWrapperKeyGenerator;
+import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
@@ -72,8 +76,15 @@ public class HoodieSparkKeyGeneratorFactory {
public static KeyGenerator createKeyGenerator(TypedProperties props) throws
IOException {
String keyGeneratorClass = getKeyGeneratorClassName(props);
+ boolean autoRecordKeyGen = KeyGenUtils.enableAutoGenerateRecordKeys(props);
try {
- return (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass,
props);
+ KeyGenerator keyGenerator = (KeyGenerator)
ReflectionUtils.loadClass(keyGeneratorClass, props);
+ if (autoRecordKeyGen) {
+ return new AutoRecordGenWrapperKeyGenerator(props,
(BuiltinKeyGenerator) keyGenerator);
+ } else {
+ // if user comes with their own key generator.
+ return keyGenerator;
+ }
} catch (Throwable e) {
throw new IOException("Could not load key generator class " +
keyGeneratorClass, e);
}
@@ -112,7 +123,7 @@ public class HoodieSparkKeyGeneratorFactory {
public static KeyGeneratorType
inferKeyGeneratorTypeFromWriteConfig(TypedProperties props) {
String partitionFields =
props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), null);
String recordsKeyFields =
props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), null);
- return inferKeyGeneratorType(recordsKeyFields, partitionFields);
+ return inferKeyGeneratorType(Option.ofNullable(recordsKeyFields),
partitionFields);
}
public static String getKeyGeneratorClassName(TypedProperties props) {
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index e239db1b5a5..835a1251aaf 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -27,10 +27,11 @@ import org.apache.hudi.common.util.ReflectionUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.index.SparkHoodieIndexFactory
-import org.apache.hudi.keygen.{BuiltinKeyGenerator, SparkKeyGeneratorInterface}
+import org.apache.hudi.keygen.{AutoRecordGenWrapperKeyGenerator,
BuiltinKeyGenerator, KeyGenUtils, SparkKeyGeneratorInterface}
import
org.apache.hudi.table.action.commit.{BulkInsertDataInternalWriterHelper,
ParallelismHelper}
import org.apache.hudi.table.{BulkInsertPartitioner, HoodieTable}
import org.apache.hudi.util.JFunction.toJavaSerializableFunctionUnchecked
+import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath,
getNestedInternalRowValue}
@@ -60,9 +61,11 @@ object HoodieDatasetBulkInsertHelper
def prepareForBulkInsert(df: DataFrame,
config: HoodieWriteConfig,
partitioner: BulkInsertPartitioner[Dataset[Row]],
- shouldDropPartitionColumns: Boolean): Dataset[Row]
= {
+ shouldDropPartitionColumns: Boolean,
+ instantTime: String): Dataset[Row] = {
val populateMetaFields = config.populateMetaFields()
val schema = df.schema
+ val autoGenerateRecordKeys =
KeyGenUtils.enableAutoGenerateRecordKeys(config.getProps)
val metaFields = Seq(
StructField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, StringType),
@@ -79,11 +82,22 @@ object HoodieDatasetBulkInsertHelper
val prependedRdd: RDD[InternalRow] =
df.queryExecution.toRdd.mapPartitions { iter =>
- val keyGenerator =
- ReflectionUtils.loadClass(keyGeneratorClassName, new
TypedProperties(config.getProps))
- .asInstanceOf[SparkKeyGeneratorInterface]
+ val typedProps = new TypedProperties(config.getProps)
+ if (autoGenerateRecordKeys) {
+
typedProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG,
String.valueOf(TaskContext.getPartitionId()))
+
typedProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG,
instantTime)
+ }
+ val sparkKeyGenerator =
+ ReflectionUtils.loadClass(keyGeneratorClassName, typedProps)
+ .asInstanceOf[BuiltinKeyGenerator]
+ val keyGenerator: BuiltinKeyGenerator = if
(autoGenerateRecordKeys) {
+ new AutoRecordGenWrapperKeyGenerator(typedProps,
sparkKeyGenerator).asInstanceOf[BuiltinKeyGenerator]
+ } else {
+ sparkKeyGenerator
+ }
iter.map { row =>
+ // auto generate record keys if needed
val recordKey = keyGenerator.getRecordKey(row, schema)
val partitionPath = keyGenerator.getPartitionPath(row, schema)
val commitTimestamp = UTF8String.EMPTY_UTF8
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
index a767fd3c5bf..932fa0096cf 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
@@ -18,9 +18,12 @@
package org.apache.hudi.util
import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.util.StringUtils
+import org.apache.hudi.common.util.ValidationUtils.checkArgument
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
-import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator,
CustomKeyGenerator, KeyGenerator}
+import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator,
CustomKeyGenerator, GlobalAvroDeleteKeyGenerator, GlobalDeleteKeyGenerator,
KeyGenerator, NonpartitionedAvroKeyGenerator, NonpartitionedKeyGenerator,
SimpleKeyGenerator}
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
+import
org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName
import scala.collection.JavaConverters._
@@ -31,28 +34,30 @@ object SparkKeyGenUtils {
* @return partition columns
*/
def getPartitionColumns(props: TypedProperties): String = {
- val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
- getPartitionColumns(keyGenerator, props)
+ val keyGeneratorClass = getKeyGeneratorClassName(props)
+ getPartitionColumns(keyGeneratorClass, props)
}
/**
- * @param keyGen key generator
+ * @param keyGen key generator class name
* @return partition columns
*/
- def getPartitionColumns(keyGen: KeyGenerator, typedProperties:
TypedProperties): String = {
- keyGen match {
- // For CustomKeyGenerator and CustomAvroKeyGenerator, the partition path
filed format
- // is: "field_name: field_type", we extract the field_name from the
partition path field.
- case c: BaseKeyGenerator
- if c.isInstanceOf[CustomKeyGenerator] ||
c.isInstanceOf[CustomAvroKeyGenerator] =>
- c.getPartitionPathFields.asScala.map(pathField =>
- pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX)
- .headOption.getOrElse(s"Illegal partition path field format:
'$pathField' for ${c.getClass.getSimpleName}"))
- .mkString(",")
-
- case b: BaseKeyGenerator =>
b.getPartitionPathFields.asScala.mkString(",")
- case _ =>
typedProperties.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
+ def getPartitionColumns(keyGenClass: String, typedProperties:
TypedProperties): String = {
+ // For CustomKeyGenerator and CustomAvroKeyGenerator, the partition path
filed format
+ // is: "field_name: field_type", we extract the field_name from the
partition path field.
+ if (keyGenClass.equals(classOf[CustomKeyGenerator].getCanonicalName) ||
keyGenClass.equals(classOf[CustomAvroKeyGenerator].getCanonicalName)) {
+
typedProperties.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
+ .split(",").map(pathField => {
+ pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX)
+ .headOption.getOrElse(s"Illegal partition path field format:
'$pathField' for ${keyGenClass}")}).mkString(",")
+ } else if
(keyGenClass.equals(classOf[NonpartitionedKeyGenerator].getCanonicalName)
+ ||
keyGenClass.equals(classOf[NonpartitionedAvroKeyGenerator].getCanonicalName)
+ || keyGenClass.equals(classOf[GlobalDeleteKeyGenerator].getCanonicalName)
+ ||
keyGenClass.equals(classOf[GlobalAvroDeleteKeyGenerator].getCanonicalName)) {
+ StringUtils.EMPTY_STRING
+ } else {
+
checkArgument(typedProperties.containsKey(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()),
"Partition path needs to be set")
+
typedProperties.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
}
}
-
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index f4471e89a58..a5e4456381b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -452,7 +452,6 @@ public class HoodieTableConfig extends HoodieConfig {
if (hoodieConfig.contains(TIMELINE_TIMEZONE)) {
HoodieInstantTimeGenerator.setCommitTimeZone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getString(TIMELINE_TIMEZONE)));
}
-
hoodieConfig.setDefaultValue(DROP_PARTITION_COLUMNS);
storeProperties(hoodieConfig.getProps(), outputStream);
@@ -520,9 +519,13 @@ public class HoodieTableConfig extends HoodieConfig {
}
public Option<String[]> getRecordKeyFields() {
- String keyFieldsValue = getStringOrDefault(RECORDKEY_FIELDS,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
- return Option.of(Arrays.stream(keyFieldsValue.split(","))
- .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new
String[] {}));
+ String keyFieldsValue = getStringOrDefault(RECORDKEY_FIELDS, null);
+ if (keyFieldsValue == null) {
+ return Option.empty();
+ } else {
+ return Option.of(Arrays.stream(keyFieldsValue.split(","))
+ .filter(p -> p.length() >
0).collect(Collectors.toList()).toArray(new String[] {}));
+ }
}
public Option<String[]> getPartitionFields() {
@@ -639,6 +642,13 @@ public class HoodieTableConfig extends HoodieConfig {
return getStringOrDefault(RECORDKEY_FIELDS,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
}
+ /**
+ * @returns the record key field prop.
+ */
+ public String getRawRecordKeyFieldProp() {
+ return getStringOrDefault(RECORDKEY_FIELDS, null);
+ }
+
public boolean isCDCEnabled() {
return getBooleanOrDefault(CDC_ENABLED);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
b/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
index d0baa903919..c5a13dd49c1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
@@ -32,6 +32,7 @@ import java.util.List;
*/
public abstract class BaseKeyGenerator extends KeyGenerator {
+ public static final String EMPTY_PARTITION = "";
protected List<String> recordKeyFields;
protected List<String> partitionPathFields;
protected final boolean encodePartitionPath;
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
index 632bbecf10d..b6a155c133d 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
@@ -21,6 +21,7 @@ package org.apache.hudi.integ.testsuite;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.deltastreamer.DeltaSync;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
@@ -78,7 +79,8 @@ public class HoodieDeltaStreamerWrapper extends
HoodieDeltaStreamer {
public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>>
fetchSource() throws Exception {
DeltaSync service = getDeltaSync();
service.refreshTimeline();
- return service.readFromSource(service.getCommitsTimelineOpt());
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
+ return service.readFromSource(service.getCommitsTimelineOpt(),
instantTime);
}
public DeltaSync getDeltaSync() {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala
new file mode 100644
index 00000000000..9b3a10a3f62
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.DataSourceWriteOptions.INSERT_DROP_DUPS
+import org.apache.hudi.common.config.HoodieConfig
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.HoodieKeyGeneratorException
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+
+object AutoRecordKeyGenerationUtils {
+
+ def mayBeValidateParamsForAutoGenerationOfRecordKeys(parameters: Map[String,
String], hoodieConfig: HoodieConfig): Unit = {
+ val autoGenerateRecordKeys =
!parameters.contains(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) // if
record key is not configured,
+ // hudi will auto generate.
+ if (autoGenerateRecordKeys) {
+ // de-dup is not supported with auto generation of record keys
+ if (parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
+ HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean) {
+ throw new HoodieKeyGeneratorException("Enabling " +
HoodieWriteConfig.COMBINE_BEFORE_INSERT.key() + " is not supported with auto
generation of record keys ")
+ }
+ // drop dupes is not supported
+ if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) {
+ throw new HoodieKeyGeneratorException("Enabling " +
INSERT_DROP_DUPS.key() + " is not supported with auto generation of record keys
")
+ }
+ // virtual keys are not supported with auto generation of record keys.
+ if (!parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(),
HoodieTableConfig.POPULATE_META_FIELDS.defaultValue().toString).toBoolean) {
+ throw new HoodieKeyGeneratorException("Disabling " +
HoodieTableConfig.POPULATE_META_FIELDS.key() + " is not supported with auto
generation of record keys")
+ }
+ }
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 31e5512a3f9..512e84f3897 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -876,8 +876,8 @@ object DataSourceOptionsHelper {
*/
def fetchMissingWriteConfigsFromTableConfig(tableConfig: HoodieTableConfig,
params: Map[String, String]) : Map[String, String] = {
val missingWriteConfigs = scala.collection.mutable.Map[String, String]()
- if (!params.contains(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) &&
tableConfig.getRecordKeyFieldProp != null) {
- missingWriteConfigs ++=
Map(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key() ->
tableConfig.getRecordKeyFieldProp)
+ if (!params.contains(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) &&
tableConfig.getRawRecordKeyFieldProp != null) {
+ missingWriteConfigs ++=
Map(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key() ->
tableConfig.getRawRecordKeyFieldProp)
}
if (!params.contains(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
&& tableConfig.getPartitionFieldProp != null) {
missingWriteConfigs ++=
Map(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key() ->
tableConfig.getPartitionFieldProp)
@@ -912,7 +912,7 @@ object DataSourceOptionsHelper {
}
def inferKeyGenClazz(recordsKeyFields: String, partitionFields: String):
String = {
- getKeyGeneratorClassNameFromType(inferKeyGeneratorType(recordsKeyFields,
partitionFields))
+
getKeyGeneratorClassNameFromType(inferKeyGeneratorType(Option.ofNullable(recordsKeyFields),
partitionFields))
}
implicit def convert[T, U](prop: ConfigProperty[T])(implicit converter: T =>
U): ConfigProperty[U] = {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index a122e81aa73..9148bf15db1 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -21,6 +21,7 @@ import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
+import
org.apache.hudi.AutoRecordKeyGenerationUtils.mayBeValidateParamsForAutoGenerationOfRecordKeys
import org.apache.hudi.AvroConversionUtils.{convertStructTypeToAvroSchema,
getAvroRecordNameAndNamespace}
import
org.apache.hudi.DataSourceOptionsHelper.fetchMissingWriteConfigsFromTableConfig
import org.apache.hudi.DataSourceWriteOptions._
@@ -50,8 +51,10 @@ import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import
org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils.reconcileNullability
import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils,
SerDeHelper}
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
-import org.apache.hudi.keygen.{SparkKeyGeneratorInterface,
TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
+import
org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.{createKeyGenerator,
getKeyGeneratorClassName}
+import org.apache.hudi.keygen.{BaseKeyGenerator, KeyGenUtils,
SparkKeyGeneratorInterface, TimestampBasedAvroKeyGenerator,
TimestampBasedKeyGenerator}
import org.apache.hudi.metrics.Metrics
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.sync.common.util.SyncUtilHelpers
@@ -65,7 +68,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.types.StructType
-import org.apache.spark.{SPARK_VERSION, SparkContext}
+import org.apache.spark.{SPARK_VERSION, SparkContext, TaskContext}
import org.slf4j.LoggerFactory
import java.util.function.BiConsumer
@@ -176,7 +179,6 @@ object HoodieSparkSqlWriter {
jsc.setLocalProperty("spark.scheduler.pool",
SparkConfigs.SPARK_DATASOURCE_WRITER_POOL_NAME)
}
}
- val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(new
TypedProperties(hoodieConfig.getProps))
if (mode == SaveMode.Ignore && tableExists) {
log.warn(s"hoodie table at $basePath already exists. Ignoring & not
performing actual writes.")
@@ -184,7 +186,8 @@ object HoodieSparkSqlWriter {
} else {
// Handle various save modes
handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig,
tblName, operation, fs)
- val partitionColumns =
SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters))
+ val partitionColumns =
SparkKeyGenUtils.getPartitionColumns(getKeyGeneratorClassName(new
TypedProperties(hoodieConfig.getProps)),
+ toProperties(parameters))
val timelineTimeZone =
HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))
val tableMetaClient = if (tableExists) {
HoodieInstantTimeGenerator.setCommitTimeZone(timelineTimeZone)
@@ -256,6 +259,7 @@ object HoodieSparkSqlWriter {
case WriteOperationType.DELETE =>
val genericRecords = HoodieSparkUtils.createRdd(df,
avroRecordName, avroRecordNamespace)
// Convert to RDD[HoodieKey]
+ val keyGenerator =
HoodieSparkKeyGeneratorFactory.createKeyGenerator(new
TypedProperties(hoodieConfig.getProps))
val hoodieKeysToDelete = genericRecords.map(gr =>
keyGenerator.getKey(gr)).toJavaRDD()
if (!tableExists) {
@@ -286,6 +290,7 @@ object HoodieSparkSqlWriter {
throw new HoodieException(s"hoodie table at $basePath does not
exist")
}
+ val keyGenerator =
HoodieSparkKeyGeneratorFactory.createKeyGenerator(new
TypedProperties(hoodieConfig.getProps))
// Get list of partitions to delete
val partitionsToDelete = if
(parameters.containsKey(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key())) {
val partitionColsToDelete =
parameters(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key()).split(",")
@@ -317,6 +322,7 @@ object HoodieSparkSqlWriter {
val writerSchema = deduceWriterSchema(sourceSchema,
latestTableSchemaOpt, internalSchemaOpt, parameters)
validateSchemaForHoodieIsDeleted(writerSchema)
+ mayBeValidateParamsForAutoGenerationOfRecordKeys(parameters,
hoodieConfig)
// Short-circuit if bulk_insert via row is enabled.
// scalastyle:off
@@ -355,7 +361,7 @@ object HoodieSparkSqlWriter {
// Convert to RDD[HoodieRecord]
val hoodieRecords =
createHoodieRecordRdd(df, writeConfig, parameters,
avroRecordName, avroRecordNamespace, writerSchema,
- dataFileSchema, operation)
+ dataFileSchema, operation, instantTime)
if (isAsyncCompactionEnabled(client, tableConfig, parameters,
jsc.hadoopConfiguration())) {
asyncCompactionTriggerFn.get.apply(client)
@@ -812,7 +818,8 @@ object HoodieSparkSqlWriter {
}
val shouldDropPartitionColumns =
hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
- val hoodieDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(df,
writeConfig, bulkInsertPartitionerRows, shouldDropPartitionColumns)
+ val hoodieDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(df,
writeConfig, bulkInsertPartitionerRows, shouldDropPartitionColumns,
+ instantTime)
val optsOverrides = Map(
HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED ->
@@ -1098,10 +1105,11 @@ object HoodieSparkSqlWriter {
recordNameSpace: String,
writerSchema: Schema,
dataFileSchema: Schema,
- operation: WriteOperationType) = {
+ operation: WriteOperationType,
+ instantTime: String) = {
val shouldDropPartitionColumns =
config.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
- val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(new
TypedProperties(config.getProps))
val recordType = config.getRecordMerger.getRecordType
+ val autoGenerateRecordKeys : Boolean =
!parameters.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key());
val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
operation.equals(WriteOperationType.UPSERT) ||
@@ -1121,24 +1129,36 @@ object HoodieSparkSqlWriter {
Some(writerSchema))
avroRecords.mapPartitions(it => {
+ val sparkPartitionId = TaskContext.getPartitionId()
+ val keyGenProps = new TypedProperties(config.getProps)
+ if (autoGenerateRecordKeys) {
+
keyGenProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG,
String.valueOf(sparkPartitionId))
+
keyGenProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG,
instantTime)
+ }
+ val keyGenerator =
HoodieSparkKeyGeneratorFactory.createKeyGenerator(keyGenProps)
+ .asInstanceOf[BaseKeyGenerator]
+
val dataFileSchema = new Schema.Parser().parse(dataFileSchemaStr)
val consistentLogicalTimestampEnabled = parameters.getOrElse(
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean
- it.map { avroRecord =>
+ // handle dropping partition columns
+ it.map { avroRec =>
val processedRecord = if (shouldDropPartitionColumns) {
- HoodieAvroUtils.rewriteRecord(avroRecord, dataFileSchema)
+ HoodieAvroUtils.rewriteRecord(avroRec, dataFileSchema)
} else {
- avroRecord
+ avroRec
}
+
+ val hoodieKey = new HoodieKey(keyGenerator.getRecordKey(avroRec),
keyGenerator.getPartitionPath(avroRec))
val hoodieRecord = if (shouldCombine) {
- val orderingVal = HoodieAvroUtils.getNestedFieldVal(avroRecord,
config.getString(PRECOMBINE_FIELD),
+ val orderingVal = HoodieAvroUtils.getNestedFieldVal(avroRec,
config.getString(PRECOMBINE_FIELD),
false,
consistentLogicalTimestampEnabled).asInstanceOf[Comparable[_]]
- DataSourceUtils.createHoodieRecord(processedRecord, orderingVal,
keyGenerator.getKey(avroRecord),
+ DataSourceUtils.createHoodieRecord(processedRecord, orderingVal,
hoodieKey,
config.getString(PAYLOAD_CLASS_NAME))
} else {
- DataSourceUtils.createHoodieRecord(processedRecord,
keyGenerator.getKey(avroRecord),
+ DataSourceUtils.createHoodieRecord(processedRecord, hoodieKey,
config.getString(PAYLOAD_CLASS_NAME))
}
hoodieRecord
@@ -1146,13 +1166,19 @@ object HoodieSparkSqlWriter {
}).toJavaRDD()
case HoodieRecord.HoodieRecordType.SPARK =>
- val sparkKeyGenerator =
keyGenerator.asInstanceOf[SparkKeyGeneratorInterface]
val dataFileSchema = new Schema.Parser().parse(dataFileSchemaStr)
val dataFileStructType =
HoodieInternalRowUtils.getCachedSchema(dataFileSchema)
val writerStructType =
HoodieInternalRowUtils.getCachedSchema(writerSchema)
val sourceStructType = df.schema
df.queryExecution.toRdd.mapPartitions { it =>
+ val sparkPartitionId = TaskContext.getPartitionId()
+ val keyGenProps = new TypedProperties(config.getProps)
+ if (autoGenerateRecordKeys) {
+
keyGenProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG,
String.valueOf(sparkPartitionId))
+
keyGenProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG,
instantTime)
+ }
+ val sparkKeyGenerator =
HoodieSparkKeyGeneratorFactory.createKeyGenerator(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface]
val targetStructType = if (shouldDropPartitionColumns)
dataFileStructType else writerStructType
// NOTE: To make sure we properly transform records
val targetStructTypeRowWriter =
getCachedUnsafeRowWriter(sourceStructType, targetStructType)
@@ -1161,7 +1187,6 @@ object HoodieSparkSqlWriter {
val recordKey = sparkKeyGenerator.getRecordKey(sourceRow,
sourceStructType)
val partitionPath = sparkKeyGenerator.getPartitionPath(sourceRow,
sourceStructType)
val key = new HoodieKey(recordKey.toString, partitionPath.toString)
-
val targetRow = targetStructTypeRowWriter(sourceRow)
new HoodieSparkRecord(key, targetRow, dataFileStructType, false)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index a2acca66114..09a6d873e8b 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -146,8 +146,11 @@ object HoodieWriterUtils {
if (null != tableConfig) {
val datasourceRecordKey = params.getOrElse(RECORDKEY_FIELD.key(), null)
val tableConfigRecordKey =
tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS)
- if (null != datasourceRecordKey && null != tableConfigRecordKey
- && datasourceRecordKey != tableConfigRecordKey) {
+ if ((null != datasourceRecordKey && null != tableConfigRecordKey
+ && datasourceRecordKey != tableConfigRecordKey) || (null !=
datasourceRecordKey && datasourceRecordKey.nonEmpty
+ && tableConfigRecordKey == null)) {
+ // if both are non null, they should match.
+ // if incoming record key is non empty, table config should also be
non empty.
diffConfigs.append(s"RecordKey:\t$datasourceRecordKey\t$tableConfigRecordKey\n")
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
index 5ee0089f104..b8d93fa51e1 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
@@ -25,6 +25,7 @@ import
org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.common.util.ValidationUtils.checkArgument
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper}
import org.apache.spark.internal.Logging
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
index bb682cf9b5f..239ec31eb65 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
@@ -196,10 +196,11 @@ object HoodieOptionConfig {
// validate primary key
val primaryKeys = sqlOptions.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName)
.map(_.split(",").filter(_.length > 0))
- ValidationUtils.checkArgument(primaryKeys.nonEmpty, "No `primaryKey` is
specified.")
- primaryKeys.get.foreach { primaryKey =>
- ValidationUtils.checkArgument(schema.exists(f => resolver(f.name,
getRootLevelFieldName(primaryKey))),
- s"Can't find primaryKey `$primaryKey` in ${schema.treeString}.")
+ if (primaryKeys.isDefined) {
+ primaryKeys.get.foreach { primaryKey =>
+ ValidationUtils.checkArgument(schema.exists(f => resolver(f.name,
getRootLevelFieldName(primaryKey))),
+ s"Can't find primaryKey `$primaryKey` in ${schema.treeString}.")
+ }
}
// validate preCombine key
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index ffc4079824f..a2194feb31b 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -196,6 +196,12 @@ trait ProvidesHoodieConfig extends Logging {
HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key ->
hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key ->
hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS)
)
+ // for auto record key gen
+ val recordKeyConfigValue = if (hoodieCatalogTable.primaryKeys.length > 1) {
+ hoodieCatalogTable.primaryKeys.mkString(",")
+ } else {
+ null
+ }
val overridingOpts = extraOptions ++ Map(
"path" -> path,
@@ -204,7 +210,7 @@ trait ProvidesHoodieConfig extends Logging {
OPERATION.key -> operation,
HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning,
- RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+ RECORDKEY_FIELD.key -> recordKeyConfigValue,
PRECOMBINE_FIELD.key -> preCombineField,
PARTITIONPATH_FIELD.key -> partitionFieldsStr
)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
index b94705a9a2a..04f1fbd5ba0 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
@@ -49,7 +49,12 @@ class SqlKeyGenerator(props: TypedProperties) extends
BuiltinKeyGenerator(props)
}
}
- private lazy val complexKeyGen = new ComplexKeyGenerator(props)
+ private lazy val autoRecordKeyGen =
KeyGenUtils.enableAutoGenerateRecordKeys(props)
+ private lazy val complexKeyGen = if (autoRecordKeyGen) {
+ new AutoRecordGenWrapperKeyGenerator(props, new ComplexKeyGenerator(props))
+ } else {
+ new ComplexKeyGenerator(props)
+ }
private lazy val originalKeyGen =
Option(props.getString(SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME, null))
.map { originalKeyGenClassName =>
@@ -61,7 +66,12 @@ class SqlKeyGenerator(props: TypedProperties) extends
BuiltinKeyGenerator(props)
keyGenProps.remove(SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME)
keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key,
convertedKeyGenClassName)
-
KeyGenUtils.createKeyGeneratorByClassName(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface]
+ val keyGenerator =
KeyGenUtils.createKeyGeneratorByClassName(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface]
+ if (autoRecordKeyGen) {
+ new AutoRecordGenWrapperKeyGenerator(keyGenProps,
keyGenerator.asInstanceOf[BuiltinKeyGenerator])
+ } else {
+ keyGenerator
+ }
}
override def getRecordKey(record: GenericRecord): String =
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
index 17c6f23089e..1038e0c9226 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
@@ -129,7 +129,7 @@ public class TestHoodieDatasetBulkInsertHelper extends
HoodieSparkClientTestBase
List<Row> rows = DataSourceTestUtils.generateRandomRows(10);
Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
Dataset<Row> result =
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
- new NonSortPartitionerWithRows(), false);
+ new NonSortPartitionerWithRows(), false, "0000000001");
StructType resultSchema = result.schema();
assertEquals(result.count(), 10);
@@ -168,7 +168,7 @@ public class TestHoodieDatasetBulkInsertHelper extends
HoodieSparkClientTestBase
.build();
Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
Dataset<Row> result =
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
- new NonSortPartitionerWithRows(), false);
+ new NonSortPartitionerWithRows(), false, "000001111");
StructType resultSchema = result.schema();
assertEquals(result.count(), 10);
@@ -205,7 +205,7 @@ public class TestHoodieDatasetBulkInsertHelper extends
HoodieSparkClientTestBase
rows.addAll(updates);
Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
Dataset<Row> result =
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
- new NonSortPartitionerWithRows(), false);
+ new NonSortPartitionerWithRows(), false, "000001111");
StructType resultSchema = result.schema();
assertEquals(result.count(), enablePreCombine ? 10 : 15);
@@ -309,7 +309,7 @@ public class TestHoodieDatasetBulkInsertHelper extends
HoodieSparkClientTestBase
Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
try {
Dataset<Row> preparedDF =
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
- new NonSortPartitionerWithRows(), false);
+ new NonSortPartitionerWithRows(), false, "000001111");
preparedDF.count();
fail("Should have thrown exception");
} catch (Exception e) {
@@ -321,19 +321,7 @@ public class TestHoodieDatasetBulkInsertHelper extends
HoodieSparkClientTestBase
dataset = sqlContext.createDataFrame(rows, structType);
try {
Dataset<Row> preparedDF =
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
- new NonSortPartitionerWithRows(), false);
- preparedDF.count();
- fail("Should have thrown exception");
- } catch (Exception e) {
- // ignore
- }
-
- config = getConfigBuilder(schemaStr).withProps(getProps(false, true,
false, true)).build();
- rows = DataSourceTestUtils.generateRandomRows(10);
- dataset = sqlContext.createDataFrame(rows, structType);
- try {
- Dataset<Row> preparedDF =
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
- new NonSortPartitionerWithRows(), false);
+ new NonSortPartitionerWithRows(), false, "000001111");
preparedDF.count();
fail("Should have thrown exception");
} catch (Exception e) {
@@ -345,7 +333,7 @@ public class TestHoodieDatasetBulkInsertHelper extends
HoodieSparkClientTestBase
dataset = sqlContext.createDataFrame(rows, structType);
try {
Dataset<Row> preparedDF =
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
- new NonSortPartitionerWithRows(), false);
+ new NonSortPartitionerWithRows(), false, "000001111");
preparedDF.count();
fail("Should have thrown exception");
} catch (Exception e) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java
index caed61249a1..d9d1e51059b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java
@@ -77,7 +77,11 @@ public class TestComplexKeyGenerator extends
KeyGeneratorTestUtilities {
@Test
public void testNullRecordKeyFields() {
- Assertions.assertThrows(IllegalArgumentException.class, () -> new
ComplexKeyGenerator(getPropertiesWithoutRecordKeyProp()));
+ GenericRecord record = getRecord();
+ Assertions.assertThrows(StringIndexOutOfBoundsException.class, () -> {
+ ComplexKeyGenerator keyGenerator = new
ComplexKeyGenerator(getPropertiesWithoutRecordKeyProp());
+ keyGenerator.getRecordKey(record);
+ });
}
@Test
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
index 311356a0f71..e001bfc13f5 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
@@ -253,29 +253,23 @@ public class TestCustomKeyGenerator extends
KeyGeneratorTestUtilities {
public void testNoRecordKeyFieldProp(boolean useKeyGeneratorClassName) {
TypedProperties propsWithoutRecordKeyFieldProps =
getPropsWithoutRecordKeyFieldProps(useKeyGeneratorClassName);
try {
- BuiltinKeyGenerator keyGenerator =
- (BuiltinKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(propsWithoutRecordKeyFieldProps);
+ BuiltinKeyGenerator keyGenerator = new
CustomKeyGenerator(propsWithoutRecordKeyFieldProps);
keyGenerator.getKey(getRecord());
Assertions.fail("should fail when record key field is not provided!");
} catch (Exception e) {
if (useKeyGeneratorClassName) {
// "Property hoodie.datasource.write.recordkey.field not found"
exception cause CustomKeyGenerator init fail
- Assertions.assertTrue(e
- .getCause()
- .getCause()
- .getCause()
- .getMessage()
- .contains("Property hoodie.datasource.write.recordkey.field not
found"));
+ Assertions.assertTrue(e.getMessage()
+ .contains("Unable to find field names for record key in cfg"));
} else {
- Assertions.assertTrue(stackTraceToString(e).contains("Property
hoodie.datasource.write.recordkey.field not found"));
+ Assertions.assertTrue(stackTraceToString(e).contains("Unable to find
field names for record key in cfg"));
}
}
try {
- BuiltinKeyGenerator keyGenerator =
- (BuiltinKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(propsWithoutRecordKeyFieldProps);
+ BuiltinKeyGenerator keyGenerator = new
CustomKeyGenerator(propsWithoutRecordKeyFieldProps);
GenericRecord record = getRecord();
Row row = KeyGeneratorTestUtilities.getRow(record);
@@ -284,14 +278,10 @@ public class TestCustomKeyGenerator extends
KeyGeneratorTestUtilities {
} catch (Exception e) {
if (useKeyGeneratorClassName) {
// "Property hoodie.datasource.write.recordkey.field not found"
exception cause CustomKeyGenerator init fail
- Assertions.assertTrue(e
- .getCause()
- .getCause()
- .getCause()
- .getMessage()
- .contains("Property hoodie.datasource.write.recordkey.field not
found"));
+ Assertions.assertTrue(e.getMessage()
+ .contains("All of the values for ([]) were either null or empty"));
} else {
- Assertions.assertTrue(stackTraceToString(e).contains("Property
hoodie.datasource.write.recordkey.field not found"));
+ Assertions.assertTrue(stackTraceToString(e).contains("All of the
values for ([]) were either null or empty"));
}
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteRecordGenerator.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteRecordGenerator.java
index 1b25ce65054..df69279cc89 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteRecordGenerator.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteRecordGenerator.java
@@ -61,7 +61,11 @@ public class TestGlobalDeleteRecordGenerator extends
KeyGeneratorTestUtilities {
@Test
public void testNullRecordKeyFields() {
- Assertions.assertThrows(IllegalArgumentException.class, () -> new
GlobalDeleteKeyGenerator(getPropertiesWithoutRecordKeyProp()));
+ GenericRecord record = getRecord();
+ Assertions.assertThrows(StringIndexOutOfBoundsException.class, () -> {
+ BaseKeyGenerator keyGenerator = new
GlobalDeleteKeyGenerator(getPropertiesWithoutRecordKeyProp());
+ keyGenerator.getRecordKey(record);
+ });
}
@Test
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestNonpartitionedKeyGenerator.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestNonpartitionedKeyGenerator.java
index fd299f179c3..fb740d00e2a 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestNonpartitionedKeyGenerator.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestNonpartitionedKeyGenerator.java
@@ -68,7 +68,11 @@ public class TestNonpartitionedKeyGenerator extends
KeyGeneratorTestUtilities {
@Test
public void testNullRecordKeyFields() {
- Assertions.assertThrows(IllegalArgumentException.class, () -> new
NonpartitionedKeyGenerator(getPropertiesWithoutRecordKeyProp()));
+ GenericRecord record = getRecord();
+ Assertions.assertThrows(StringIndexOutOfBoundsException.class, () -> {
+ BaseKeyGenerator keyGenerator = new
NonpartitionedKeyGenerator(getPropertiesWithoutRecordKeyProp());
+ keyGenerator.getRecordKey(record);
+ });
}
@Test
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
index 77d1b34f136..adf522f8354 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
@@ -96,7 +96,11 @@ public class TestSimpleKeyGenerator extends
KeyGeneratorTestUtilities {
@Test
public void testNullRecordKeyFields() {
- assertThrows(IllegalArgumentException.class, () -> new
SimpleKeyGenerator(getPropertiesWithoutRecordKeyProp()));
+ GenericRecord record = getRecord();
+ Assertions.assertThrows(IndexOutOfBoundsException.class, () -> {
+ BaseKeyGenerator keyGenerator = new
SimpleKeyGenerator(getPropertiesWithoutRecordKeyProp());
+ keyGenerator.getRecordKey(record);
+ });
}
@Test
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestCreateKeyGeneratorByTypeWithFactory.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestCreateKeyGeneratorByTypeWithFactory.java
index ad3edd4495a..45272ec1006 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestCreateKeyGeneratorByTypeWithFactory.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/factory/TestCreateKeyGeneratorByTypeWithFactory.java
@@ -21,9 +21,11 @@ package org.apache.hudi.keygen.factory;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieKeyGeneratorException;
+import org.apache.hudi.keygen.AutoRecordGenWrapperKeyGenerator;
import org.apache.hudi.keygen.ComplexKeyGenerator;
import org.apache.hudi.keygen.CustomKeyGenerator;
import org.apache.hudi.keygen.GlobalDeleteKeyGenerator;
+import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
@@ -33,6 +35,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
@@ -98,4 +101,14 @@ public class TestCreateKeyGeneratorByTypeWithFactory {
throw new HoodieKeyGeneratorException("Unsupported keyGenerator Type "
+ keyGenType);
}
}
+
+ @Test
+ public void testAutoRecordKeyGenerator() throws IOException {
+ props = new TypedProperties();
+ props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partition");
+ props.put(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, "100");
+ props.put(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, 1);
+ KeyGenerator keyGenerator =
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
+ Assertions.assertEquals(AutoRecordGenWrapperKeyGenerator.class.getName(),
keyGenerator.getClass().getName());
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
index c9b58fdff01..61a7a04823a 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
@@ -96,8 +96,8 @@ class TestDataSourceDefaults extends ScalaAssertionSupport {
val props = new TypedProperties()
props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(),
"partitionField")
- assertThrows(classOf[IllegalArgumentException]) {
- new SimpleKeyGenerator(props)
+ assertThrows(classOf[IndexOutOfBoundsException]) {
+ new SimpleKeyGenerator(props).getRecordKey(baseRecord)
}
}
@@ -262,7 +262,7 @@ class TestDataSourceDefaults extends ScalaAssertionSupport {
}
// Record's key field not specified
- assertThrows(classOf[IllegalArgumentException]) {
+ assertThrows(classOf[StringIndexOutOfBoundsException]) {
val props = new TypedProperties()
props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD.key,
"partitionField")
val keyGen = new ComplexKeyGenerator(props)
@@ -494,8 +494,8 @@ class TestDataSourceDefaults extends ScalaAssertionSupport {
val props = new TypedProperties()
props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD.key,
"partitionField")
- assertThrows(classOf[IllegalArgumentException]) {
- new GlobalDeleteKeyGenerator(props)
+ assertThrows(classOf[StringIndexOutOfBoundsException]) {
+ new GlobalDeleteKeyGenerator(props).getRecordKey(baseRecord)
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 9c0c182cd20..8785366fd51 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -467,7 +467,7 @@ class TestHoodieSparkSqlWriter {
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
// try write to Hudi
- assertThrows[IOException] {
+ assertThrows[IllegalArgumentException] {
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, tableOpts -
DataSourceWriteOptions.PARTITIONPATH_FIELD.key, df)
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/CommonOptionUtils.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/CommonOptionUtils.scala
new file mode 100644
index 00000000000..6748c82d130
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/CommonOptionUtils.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
HoodieSparkRecordMerger}
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.config.HoodieWriteConfig
+
+object CommonOptionUtils {
+
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ "hoodie.bulkinsert.shuffle.parallelism" -> "2",
+ "hoodie.delete.shuffle.parallelism" -> "1",
+ HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key() -> "true",
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+ DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1"
+ )
+ val sparkOpts = Map(HoodieWriteConfig.RECORD_MERGER_IMPLS.key ->
classOf[HoodieSparkRecordMerger].getName)
+
+ def getWriterReaderOpts(recordType: HoodieRecordType,
+ opt: Map[String, String] = commonOpts,
+ enableFileIndex: Boolean =
DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.defaultValue()):
+ (Map[String, String], Map[String, String]) = {
+ val fileIndexOpt: Map[String, String] =
+ Map(DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.key ->
enableFileIndex.toString)
+
+ recordType match {
+ case HoodieRecordType.SPARK => (opt ++ sparkOpts, sparkOpts ++
fileIndexOpt)
+ case _ => (opt, fileIndexOpt)
+ }
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoGenerationOfRecordKeys.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoGenerationOfRecordKeys.scala
new file mode 100644
index 00000000000..3f737c9dac3
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoGenerationOfRecordKeys.scala
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hudi.HoodieConversionUtils.toJavaOption
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType,
WriteOperationType}
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.common.util
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.ExceptionUtil.getRootCause
+import org.apache.hudi.exception.{HoodieException, HoodieKeyGeneratorException}
+import org.apache.hudi.functional.CommonOptionUtils._
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+import org.apache.hudi.keygen.{ComplexKeyGenerator, KeyGenUtils,
NonpartitionedKeyGenerator, SimpleKeyGenerator, TimestampBasedKeyGenerator}
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
+import org.apache.hudi.testutils.HoodieSparkClientTestBase
+import org.apache.hudi.util.JFunction
+import org.apache.hudi.{DataSourceWriteOptions, HoodieDataSourceHelpers,
ScalaAssertionSupport}
+import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
+import org.apache.spark.sql.{SaveMode, SparkSession, SparkSessionExtensions}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{CsvSource, EnumSource}
+
+import java.util.function.Consumer
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+class TestAutoGenerationOfRecordKeys extends HoodieSparkClientTestBase with
ScalaAssertionSupport {
+ var spark: SparkSession = null
+ val verificationCol: String = "driver"
+ val updatedVerificationVal: String = "driver_update"
+
+ override def getSparkSessionExtensionsInjector:
util.Option[Consumer[SparkSessionExtensions]] =
+ toJavaOption(
+ Some(
+ JFunction.toJavaConsumer((receiver: SparkSessionExtensions) => new
HoodieSparkSessionExtension().apply(receiver)))
+ )
+
+ @BeforeEach override def setUp() {
+ initPath()
+ initSparkContexts()
+ spark = sqlContext.sparkSession
+ initTestDataGenerator()
+ initFileSystem()
+ }
+
+ @AfterEach override def tearDown() = {
+ cleanupSparkContexts()
+ cleanupTestDataGenerator()
+ cleanupFileSystem()
+ FileSystem.closeAll()
+ System.gc()
+ }
+
+ @ParameterizedTest
+ @CsvSource(value = Array(
+ "AVRO,insert,COPY_ON_WRITE", "AVRO,bulk_insert,COPY_ON_WRITE",
"AVRO,insert,MERGE_ON_READ", "AVRO,bulk_insert,MERGE_ON_READ"
+ ))
+ def testRecordKeysAutoGen(recordType: HoodieRecordType, op: String,
tableType: HoodieTableType): Unit = {
+ testRecordKeysAutoGenInternal(recordType, op, tableType)
+ }
+
+ @Test
+ def testRecordKeyAutoGenWithTimestampBasedKeyGen(): Unit = {
+ testRecordKeysAutoGenInternal(HoodieRecordType.AVRO, "insert",
HoodieTableType.COPY_ON_WRITE,
+ classOf[TimestampBasedKeyGenerator].getName)
+ }
+
+ @Test
+ def testRecordKeyAutoGenWithComplexKeyGen(): Unit = {
+ testRecordKeysAutoGenInternal(HoodieRecordType.AVRO, "insert",
HoodieTableType.COPY_ON_WRITE,
+ classOf[ComplexKeyGenerator].getName,
+ complexPartitionPath = true)
+ }
+
+ @Test
+ def testRecordKeyAutoGenWithNonPartitionedKeyGen(): Unit = {
+ testRecordKeysAutoGenInternal(HoodieRecordType.AVRO, "insert",
HoodieTableType.COPY_ON_WRITE,
+ classOf[NonpartitionedKeyGenerator].getName, complexPartitionPath =
false, nonPartitionedDataset = true)
+ }
+
+ def testRecordKeysAutoGenInternal(recordType: HoodieRecordType, op: String =
"insert", tableType: HoodieTableType = HoodieTableType.COPY_ON_WRITE,
+ keyGenClass: String =
classOf[SimpleKeyGenerator].getCanonicalName,
+ complexPartitionPath: Boolean = false,
nonPartitionedDataset: Boolean = false): Unit = {
+ val (vanillaWriteOpts, readOpts) = getWriterReaderOpts(recordType)
+
+ var options: Map[String, String] = vanillaWriteOpts ++ Map(
+ DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> keyGenClass)
+
+ val isTimestampBasedKeyGen: Boolean =
classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)
+ if (isTimestampBasedKeyGen) {
+ options += Config.TIMESTAMP_TYPE_FIELD_PROP -> "DATE_STRING"
+ options += Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP -> "yyyy/MM/dd"
+ options += Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP -> "yyyyMMdd"
+ }
+
+ if (complexPartitionPath) {
+ options += KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key() ->
"rider,_hoodie_is_deleted"
+ }
+ if (nonPartitionedDataset) {
+ options = options --
Seq(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
+ }
+
+ // add partition Id and instant time
+ options += KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG -> "1"
+ options += KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG -> "100"
+
+ // NOTE: In this test we deliberately removing record-key configuration
+ // to validate Hudi is handling this case appropriately
+ val writeOpts = options -- Seq(DataSourceWriteOptions.RECORDKEY_FIELD.key)
+
+ // Insert Operation
+ val records = recordsToStrings(dataGen.generateInserts("000", 5)).toList
+ val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+ inputDF.cache
+
+ //
+ // Step #1: Persist first batch with auto-gen'd record-keys
+ //
+
+ inputDF.write.format("hudi")
+ .options(writeOpts)
+ .option(DataSourceWriteOptions.OPERATION.key, op)
+ .option(DataSourceWriteOptions.TABLE_TYPE.key(), tableType.name())
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+
+ //
+ // Step #2: Persist *same* batch with auto-gen'd record-keys (new record
keys should
+ // be generated this time)
+ //
+ val inputDF2 = inputDF
+ inputDF2.write.format("hudi")
+ .options(writeOpts)
+ .option(DataSourceWriteOptions.OPERATION.key, op)
+ .option(DataSourceWriteOptions.TABLE_TYPE.key(), tableType.name())
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ val readDF = spark.read.format("hudi")
+ .options(readOpts)
+ .load(basePath)
+ readDF.cache
+
+ val recordKeys = readDF.select(HoodieRecord.RECORD_KEY_METADATA_FIELD)
+ .distinct()
+ .collectAsList()
+ .map(_.getString(0))
+
+ // Validate auto-gen'd keys are globally unique
+ assertEquals(10, recordKeys.size)
+
+ // validate entire batch is present in snapshot read
+ val expectedInputDf =
inputDF.union(inputDF2).drop("partition","rider","_hoodie_is_deleted")
+ val actualDf = readDF.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala:
_*).drop("partition","rider","_hoodie_is_deleted")
+ assertEquals(expectedInputDf.except(actualDf).count, 0)
+ }
+
+ @ParameterizedTest
+ @CsvSource(value = Array(
+
"hoodie.populate.meta.fields,false","hoodie.combine.before.insert,true","hoodie.datasource.write.insert.drop.duplicates,true"
+ ))
+ def testRecordKeysAutoGenInvalidParams(configKey: String, configValue:
String): Unit = {
+ val (writeOpts, _) = getWriterReaderOpts(HoodieRecordType.AVRO)
+
+ // NOTE: In this test we deliberately removing record-key configuration
+ // to validate Hudi is handling this case appropriately
+ var opts = writeOpts -- Seq(DataSourceWriteOptions.RECORDKEY_FIELD.key)
+
+ // add partition Id and instant time
+ opts += KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG -> "1"
+ opts += KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG -> "100"
+
+ // Insert Operation
+ val records = recordsToStrings(dataGen.generateInserts("000", 1)).toList
+ val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+ val e = assertThrows(classOf[HoodieKeyGeneratorException]) {
+ inputDF.write.format("hudi")
+ .options(opts)
+ .option(DataSourceWriteOptions.OPERATION.key, "insert")
+ .option(configKey, configValue)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ }
+
+ assertTrue(getRootCause(e).getMessage.contains(configKey + " is not
supported with auto generation of record keys"))
+ }
+
+
+ @Test
+ def testRecordKeysAutoGenEnableToDisable(): Unit = {
+ val (vanillaWriteOpts, readOpts) =
getWriterReaderOpts(HoodieRecordType.AVRO)
+
+ var options: Map[String, String] = vanillaWriteOpts ++ Map(
+ DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key ->
classOf[SimpleKeyGenerator].getCanonicalName)
+
+ // NOTE: In this test we deliberately removing record-key configuration
+ // to validate Hudi is handling this case appropriately
+ var writeOpts = options -- Seq(DataSourceWriteOptions.RECORDKEY_FIELD.key)
+
+ // Insert Operation
+ val records = recordsToStrings(dataGen.generateInserts("000", 5)).toList
+ val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+ inputDF.cache
+
+
+ // add partition Id and instant time
+ writeOpts += KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG -> "1"
+ writeOpts += KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG -> "100"
+
+ //
+ // Step #1: Persist first batch with auto-gen'd record-keys
+ //
+ inputDF.write.format("hudi")
+ .options(writeOpts)
+ .option(DataSourceWriteOptions.OPERATION.key, "insert")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+
+ //
+ // Step #2: Insert w/ explicit record key config. Should fail since we
can't modify this property.
+ //
+ val e = assertThrows(classOf[HoodieException]) {
+ val inputDF2 = inputDF
+ inputDF2.write.format("hudi")
+ .options(writeOpts ++ Map(
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key"
+ ))
+ .option(DataSourceWriteOptions.OPERATION.key, "insert")
+ .mode(SaveMode.Append)
+ .save(basePath)
+ }
+
+ val expectedMsg = s"RecordKey:\t_row_key\tnull"
+ assertTrue(getRootCause(e).getMessage.contains(expectedMsg))
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 6b1773807fe..b229d4f7c2d 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -36,6 +36,7 @@ import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.metrics.HoodieMetricsConfig
import org.apache.hudi.exception.ExceptionUtil.getRootCause
import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.functional.CommonOptionUtils._
import org.apache.hudi.functional.TestCOWDataSource.convertColumnsToNullable
import org.apache.hudi.keygen._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
@@ -69,19 +70,6 @@ import org.junit.jupiter.api.function.Executable
*/
class TestCOWDataSource extends HoodieSparkClientTestBase with
ScalaAssertionSupport {
var spark: SparkSession = null
- val commonOpts = Map(
- "hoodie.insert.shuffle.parallelism" -> "4",
- "hoodie.upsert.shuffle.parallelism" -> "4",
- "hoodie.bulkinsert.shuffle.parallelism" -> "2",
- "hoodie.delete.shuffle.parallelism" -> "1",
- HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key() -> "true",
- DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
- DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
- DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
- HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
- HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1"
- )
- val sparkOpts = Map(HoodieWriteConfig.RECORD_MERGER_IMPLS.key ->
classOf[HoodieSparkRecordMerger].getName)
val verificationCol: String = "driver"
val updatedVerificationVal: String = "driver_update"
@@ -899,7 +887,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
}
@ParameterizedTest
- @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO",
"SPARK"))
+ @EnumSource(value = classOf[HoodieRecordType], names = Array("SPARK"))
def testSparkPartitionByWithCustomKeyGenerator(recordType:
HoodieRecordType): Unit = {
val (writeOpts, readOpts) =
getWriterReaderOptsLessPartitionPath(recordType)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
index 96108b0d550..f2d1cd58e65 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
@@ -55,6 +55,7 @@ class TestStreamingSource extends StreamTest {
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(COPY_ON_WRITE)
.setTableName(getTableName(tablePath))
+ .setRecordKeyFields("id")
.setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
.setPreCombineField("ts")
.initTable(spark.sessionState.newHadoopConf(), tablePath)
@@ -107,6 +108,7 @@ class TestStreamingSource extends StreamTest {
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(MERGE_ON_READ)
.setTableName(getTableName(tablePath))
+ .setRecordKeyFields("id")
.setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
.setPreCombineField("ts")
.initTable(spark.sessionState.newHadoopConf(), tablePath)
@@ -153,6 +155,7 @@ class TestStreamingSource extends StreamTest {
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(COPY_ON_WRITE)
.setTableName(getTableName(tablePath))
+ .setRecordKeyFields("id")
.setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
.setPreCombineField("ts")
.initTable(spark.sessionState.newHadoopConf(), tablePath)
@@ -185,6 +188,7 @@ class TestStreamingSource extends StreamTest {
val metaClient = HoodieTableMetaClient.withPropertyBuilder()
.setTableType(COPY_ON_WRITE)
.setTableName(getTableName(tablePath))
+ .setRecordKeyFields("id")
.setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
.setPreCombineField("ts")
.initTable(spark.sessionState.newHadoopConf(), tablePath)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
index 4e726317844..35ebe872e40 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
@@ -109,16 +109,6 @@ class TestHoodieOptionConfig extends
SparkClientFunctionalTestHarness {
StructField("dt", StringType, true))
)
- // miss primaryKey parameter
- val sqlOptions1 = baseSqlOptions ++ Map(
- "type" -> "mor"
- )
-
- val e1 = intercept[IllegalArgumentException] {
- HoodieOptionConfig.validateTable(spark, schema, sqlOptions1)
- }
- assertTrue(e1.getMessage.contains("No `primaryKey` is specified."))
-
// primary field not found
val sqlOptions2 = baseSqlOptions ++ Map(
"primaryKey" -> "xxx",
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index dfe81391105..d40bae13209 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hudi
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
-import org.apache.hudi.common.model.WriteOperationType
+import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieDuplicateKeyException
@@ -28,6 +28,7 @@ import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.getLastCommitMetadata
import
org.apache.spark.sql.hudi.command.HoodieSparkValidateDuplicateKeyRecordMerger
+import org.junit.jupiter.api.Assertions.assertEquals
import java.io.File
@@ -1267,4 +1268,41 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
})
}
}
+
+ test("Test Insert Into with auto generate record keys") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // Create a partitioned table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | dt string,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | partitioned by (dt)
+ | location '${tmp.getCanonicalPath}'
+ """.stripMargin)
+
+ // Note: Do not write the field alias, the partition field must be
placed last.
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ | (1, 'a1', 10, 1000, "2021-01-05"),
+ | (2, 'a2', 20, 2000, "2021-01-06"),
+ | (3, 'a3', 30, 3000, "2021-01-07")
+ """.stripMargin)
+
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05"),
+ Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+ Seq(3, "a3", 30.0, 3000, "2021-01-07")
+ )
+
+ val df = spark.read.format("hudi").load(tmp.getCanonicalPath)
+ assertEquals(3,
df.select(HoodieRecord.RECORD_KEY_METADATA_FIELD).count())
+ }
+ }
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 64686e3f38f..7d1d0758955 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -70,8 +70,8 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.internal.schema.InternalSchema;
-import org.apache.hudi.keygen.KeyGenerator;
-import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
+import org.apache.hudi.keygen.BuiltinKeyGenerator;
+import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.metrics.HoodieMetrics;
@@ -103,8 +103,10 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.HoodieInternalRowUtils;
@@ -124,6 +126,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -147,6 +150,7 @@ import static
org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT;
import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC;
import static
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC;
+import static
org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName;
import static org.apache.hudi.utilities.UtilHelpers.createRecordMerger;
import static
org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig.MUTLI_WRITER_SOURCE_CHECKPOINT_ID;
import static
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_FORCE_SKIP_PROP;
@@ -190,10 +194,7 @@ public class DeltaSync implements Serializable, Closeable {
*/
private transient Option<Transformer> transformer;
- /**
- * Extract the key for the target table.
- */
- private KeyGenerator keyGenerator;
+ private String keyGenClassName;
/**
* Filesystem used.
@@ -257,7 +258,6 @@ public class DeltaSync implements Serializable, Closeable {
private transient HoodieIngestionMetrics metrics;
private transient HoodieMetrics hoodieMetrics;
-
/**
* Unique identifier of the deltastreamer
* */
@@ -269,6 +269,8 @@ public class DeltaSync implements Serializable, Closeable {
*/
private transient String latestCheckpointWritten;
+ private final boolean autoGenerateRecordKeys;
+
public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession,
SchemaProvider schemaProvider,
TypedProperties props, JavaSparkContext jssc, FileSystem
fs, Configuration conf,
Function<SparkRDDWriteClient, Boolean>
onInitializingHoodieWriteClient) throws IOException {
@@ -281,7 +283,8 @@ public class DeltaSync implements Serializable, Closeable {
this.props = props;
this.userProvidedSchemaProvider = schemaProvider;
this.processedSchema = new SchemaSet();
- this.keyGenerator =
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
+ this.autoGenerateRecordKeys =
KeyGenUtils.enableAutoGenerateRecordKeys(props);
+ this.keyGenClassName = getKeyGeneratorClassName(new
TypedProperties(props));
refreshTimeline();
// Register User Provided schema first
registerAvroSchemas(schemaProvider);
@@ -362,7 +365,7 @@ public class DeltaSync implements Serializable, Closeable {
private void initializeEmptyTable() throws IOException {
this.commitsTimelineOpt = Option.empty();
this.allCommitsTimelineOpt = Option.empty();
- String partitionColumns =
SparkKeyGenUtils.getPartitionColumns(keyGenerator, props);
+ String partitionColumns = SparkKeyGenUtils.getPartitionColumns(props);
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(cfg.tableType)
.setTableName(cfg.targetTableName)
@@ -373,8 +376,7 @@ public class DeltaSync implements Serializable, Closeable {
.setRecordKeyFields(props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key()))
.setPopulateMetaFields(props.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(),
HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))
-
.setKeyGeneratorClassProp(props.getProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(),
- keyGenerator.getClass().getName()))
+ .setKeyGeneratorClassProp(keyGenClassName)
.setPreCombineField(cfg.sourceOrderingField)
.setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),
HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()))
@@ -396,8 +398,9 @@ public class DeltaSync implements Serializable, Closeable {
// Refresh Timeline
refreshTimeline();
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
- Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>>
srcRecordsWithCkpt = readFromSource(commitsTimelineOpt);
+ Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>>
srcRecordsWithCkpt = readFromSource(commitsTimelineOpt, instantTime);
if (srcRecordsWithCkpt != null) {
final JavaRDD<HoodieRecord> recordsFromSource =
srcRecordsWithCkpt.getRight().getRight();
@@ -438,7 +441,7 @@ public class DeltaSync implements Serializable, Closeable {
}
}
- result = writeToSink(recordsFromSource,
+ result = writeToSink(instantTime, recordsFromSource,
srcRecordsWithCkpt.getRight().getLeft(), metrics,
overallTimerContext);
}
@@ -470,14 +473,14 @@ public class DeltaSync implements Serializable, Closeable
{
* of schemaProvider, checkpointStr and hoodieRecord
* @throws Exception in case of any Exception
*/
- public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>>
readFromSource(Option<HoodieTimeline> commitsTimelineOpt) throws IOException {
+ public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>>
readFromSource(Option<HoodieTimeline> commitsTimelineOpt, String instantTime)
throws IOException {
// Retrieve the previous round checkpoints, if any
Option<String> resumeCheckpointStr = Option.empty();
if (commitsTimelineOpt.isPresent()) {
resumeCheckpointStr = getCheckpointToResume(commitsTimelineOpt);
} else {
// initialize the table for the first time.
- String partitionColumns =
SparkKeyGenUtils.getPartitionColumns(keyGenerator, props);
+ String partitionColumns = SparkKeyGenUtils.getPartitionColumns(props);
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(cfg.tableType)
.setTableName(cfg.targetTableName)
@@ -488,8 +491,7 @@ public class DeltaSync implements Serializable, Closeable {
.setRecordKeyFields(props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key()))
.setPopulateMetaFields(props.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(),
HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))
-
.setKeyGeneratorClassProp(props.getProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(),
- keyGenerator.getClass().getName()))
+ .setKeyGeneratorClassProp(keyGenClassName)
.setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),
HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()))
.setShouldDropPartitionColumns(isDropPartitionColumns())
@@ -507,7 +509,7 @@ public class DeltaSync implements Serializable, Closeable {
Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> sourceDataToSync
= null;
while (curRetryCount++ < maxRetryCount && sourceDataToSync == null) {
try {
- sourceDataToSync = fetchFromSource(resumeCheckpointStr);
+ sourceDataToSync = fetchFromSource(resumeCheckpointStr, instantTime);
} catch (HoodieSourceTimeoutException e) {
if (curRetryCount >= maxRetryCount) {
throw e;
@@ -524,7 +526,7 @@ public class DeltaSync implements Serializable, Closeable {
return sourceDataToSync;
}
- private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>>
fetchFromSource(Option<String> resumeCheckpointStr) {
+ private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>>
fetchFromSource(Option<String> resumeCheckpointStr, String instantTime) {
HoodieRecordType recordType = createRecordMerger(props).getRecordType();
if (recordType == HoodieRecordType.SPARK &&
HoodieTableType.valueOf(cfg.tableType) == HoodieTableType.MERGE_ON_READ
&&
HoodieLogBlockType.fromId(props.getProperty(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),
"avro"))
@@ -620,25 +622,42 @@ public class DeltaSync implements Serializable, Closeable
{
}
boolean shouldCombine = cfg.filterDupes ||
cfg.operation.equals(WriteOperationType.UPSERT);
- Set<String> partitionColumns = getPartitionColumns(keyGenerator, props);
+ Set<String> partitionColumns = getPartitionColumns(props);
JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
JavaRDD<HoodieRecord> records;
SerializableSchema avroSchema = new
SerializableSchema(schemaProvider.getTargetSchema());
SerializableSchema processedAvroSchema = new
SerializableSchema(isDropPartitionColumns() ?
HoodieAvroUtils.removeMetadataFields(avroSchema.get()) : avroSchema.get());
if (recordType == HoodieRecordType.AVRO) {
- records = avroRDD.map(record -> {
- GenericRecord gr = isDropPartitionColumns() ?
HoodieAvroUtils.removeFields(record, partitionColumns) : record;
- HoodieRecordPayload payload = shouldCombine ?
DataSourceUtils.createPayload(cfg.payloadClassName, gr,
- (Comparable) HoodieAvroUtils.getNestedFieldVal(gr,
cfg.sourceOrderingField, false, props.getBoolean(
-
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
-
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))))
- : DataSourceUtils.createPayload(cfg.payloadClassName, gr);
- return new HoodieAvroRecord<>(keyGenerator.getKey(record), payload);
- });
+ records = avroRDD.mapPartitions(
+ (FlatMapFunction<Iterator<GenericRecord>, HoodieRecord>)
genericRecordIterator -> {
+ if (autoGenerateRecordKeys) {
+
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG,
String.valueOf(TaskContext.getPartitionId()));
+
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime);
+ }
+ BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
+ List<HoodieRecord> avroRecords = new ArrayList<>();
+ while (genericRecordIterator.hasNext()) {
+ GenericRecord genRec = genericRecordIterator.next();
+ HoodieKey hoodieKey = new
HoodieKey(builtinKeyGenerator.getRecordKey(genRec),
builtinKeyGenerator.getPartitionPath(genRec));
+ GenericRecord gr = isDropPartitionColumns() ?
HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec;
+ HoodieRecordPayload payload = shouldCombine ?
DataSourceUtils.createPayload(cfg.payloadClassName, gr,
+ (Comparable) HoodieAvroUtils.getNestedFieldVal(gr,
cfg.sourceOrderingField, false, props.getBoolean(
+
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
+
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))))
+ : DataSourceUtils.createPayload(cfg.payloadClassName, gr);
+ avroRecords.add(new HoodieAvroRecord<>(hoodieKey, payload));
+ }
+ return avroRecords.iterator();
+ });
} else if (recordType == HoodieRecordType.SPARK) {
// TODO we should remove it if we can read InternalRow from source.
records = avroRDD.mapPartitions(itr -> {
+ if (autoGenerateRecordKeys) {
+ props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG,
String.valueOf(TaskContext.getPartitionId()));
+ props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG,
instantTime);
+ }
+ BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
StructType baseStructType =
AvroConversionUtils.convertAvroSchemaToStructType(processedAvroSchema.get());
StructType targetStructType = isDropPartitionColumns() ?
AvroConversionUtils
.convertAvroSchemaToStructType(HoodieAvroUtils.removeFields(processedAvroSchema.get(),
partitionColumns)) : baseStructType;
@@ -646,9 +665,8 @@ public class DeltaSync implements Serializable, Closeable {
return new CloseableMappingIterator<>(ClosableIterator.wrap(itr), rec
-> {
InternalRow row = (InternalRow) deserializer.deserialize(rec).get();
- SparkKeyGeneratorInterface keyGenerator =
(SparkKeyGeneratorInterface) this.keyGenerator;
- String recordKey = keyGenerator.getRecordKey(row,
baseStructType).toString();
- String partitionPath = keyGenerator.getPartitionPath(row,
baseStructType).toString();
+ String recordKey = builtinKeyGenerator.getRecordKey(row,
baseStructType).toString();
+ String partitionPath = builtinKeyGenerator.getPartitionPath(row,
baseStructType).toString();
return new HoodieSparkRecord(new HoodieKey(recordKey, partitionPath),
HoodieInternalRowUtils.getCachedUnsafeProjection(baseStructType,
targetStructType).apply(row), targetStructType, false);
});
@@ -760,13 +778,14 @@ public class DeltaSync implements Serializable, Closeable
{
/**
* Perform Hoodie Write. Run Cleaner, schedule compaction and syncs to hive
if needed.
*
+ * @param instantTime instant time to use for ingest.
* @param records Input Records
* @param checkpointStr Checkpoint String
* @param metrics Metrics
* @param overallTimerContext Timer Context
* @return Option Compaction instant if one is scheduled
*/
- private Pair<Option<String>, JavaRDD<WriteStatus>>
writeToSink(JavaRDD<HoodieRecord> records, String checkpointStr,
+ private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(String
instantTime, JavaRDD<HoodieRecord> records, String checkpointStr,
HoodieIngestionMetrics metrics,
Timer.Context
overallTimerContext) {
Option<String> scheduledCompactionInstant = Option.empty();
@@ -776,9 +795,7 @@ public class DeltaSync implements Serializable, Closeable {
}
boolean isEmpty = records.isEmpty();
-
- // try to start a new commit
- String instantTime = startCommit();
+ instantTime = startCommit(instantTime, !autoGenerateRecordKeys);
LOG.info("Starting commit : " + instantTime);
JavaRDD<WriteStatus> writeStatusRDD;
@@ -892,18 +909,20 @@ public class DeltaSync implements Serializable, Closeable
{
*
* @return Instant time of the commit
*/
- private String startCommit() {
+ private String startCommit(String instantTime, boolean retryEnabled) {
final int maxRetries = 2;
int retryNum = 1;
RuntimeException lastException = null;
while (retryNum <= maxRetries) {
try {
- String instantTime = HoodieActiveTimeline.createNewInstantTime();
String commitActionType =
CommitUtils.getCommitActionType(cfg.operation,
HoodieTableType.valueOf(cfg.tableType));
writeClient.startCommitWithTime(instantTime, commitActionType);
return instantTime;
} catch (IllegalArgumentException ie) {
lastException = ie;
+ if (!retryEnabled) {
+ throw ie;
+ }
LOG.error("Got error trying to start a new commit. Retrying after
sleeping for a sec", ie);
retryNum++;
try {
@@ -912,6 +931,7 @@ public class DeltaSync implements Serializable, Closeable {
// No-Op
}
}
+ instantTime = HoodieActiveTimeline.createNewInstantTime();
}
throw lastException;
}
@@ -974,7 +994,7 @@ public class DeltaSync implements Serializable, Closeable {
private void reInitWriteClient(Schema sourceSchema, Schema targetSchema,
JavaRDD<HoodieRecord> records) throws IOException {
LOG.info("Setting up new Hoodie Write Client");
if (isDropPartitionColumns()) {
- targetSchema = HoodieAvroUtils.removeFields(targetSchema,
getPartitionColumns(keyGenerator, props));
+ targetSchema = HoodieAvroUtils.removeFields(targetSchema,
getPartitionColumns(props));
}
registerAvroSchemas(sourceSchema, targetSchema);
final HoodieWriteConfig initialWriteConfig =
getHoodieClientConfig(targetSchema);
@@ -1205,12 +1225,11 @@ public class DeltaSync implements Serializable,
Closeable {
/**
* Get the partition columns as a set of strings.
*
- * @param keyGenerator KeyGenerator
* @param props TypedProperties
* @return Set of partition columns.
*/
- private Set<String> getPartitionColumns(KeyGenerator keyGenerator,
TypedProperties props) {
- String partitionColumns =
SparkKeyGenUtils.getPartitionColumns(keyGenerator, props);
+ private Set<String> getPartitionColumns(TypedProperties props) {
+ String partitionColumns = SparkKeyGenUtils.getPartitionColumns(props);
return
Arrays.stream(partitionColumns.split(",")).collect(Collectors.toSet());
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index d749b15d35d..3f09982ab55 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -100,6 +100,7 @@ import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.AnalysisException;
@@ -601,7 +602,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
@Test
public void testPropsWithInvalidKeyGenerator() throws Exception {
- Exception e = assertThrows(IOException.class, () -> {
+ Exception e = assertThrows(SparkException.class, () -> {
String tableBasePath = basePath + "/test_table_invalid_key_gen";
HoodieDeltaStreamer deltaStreamer =
new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath,
WriteOperationType.BULK_INSERT,
@@ -610,7 +611,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
}, "Should error out when setting the key generator class property to an
invalid value");
// expected
LOG.debug("Expected error during getting the key generator", e);
- assertTrue(e.getMessage().contains("Could not load key generator class"));
+ assertTrue(e.getMessage().contains("Could not load key generator class
invalid"));
}
private static Stream<Arguments> provideInferKeyGenArgs() {
@@ -1518,7 +1519,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
* 1 ===============> HUDI TABLE 2 (incr-pull with transform) (incr-pull)
Hudi Table 1 is synced with Hive.
*/
@ParameterizedTest
- @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"})
+ @EnumSource(value = HoodieRecordType.class, names = {"AVRO","SPARK"})
public void
testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline(HoodieRecordType
recordType) throws Exception {
String tableBasePath = basePath + "/test_table2";
String downstreamTableBasePath = basePath + "/test_downstream_table2";
@@ -2629,10 +2630,10 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
// change cow to mor
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
- .setConf(new Configuration(fs.getConf()))
- .setBasePath(cfg.targetBasePath)
- .setLoadActiveTimelineOnLoad(false)
- .build();
+ .setConf(new Configuration(fs.getConf()))
+ .setBasePath(cfg.targetBasePath)
+ .setLoadActiveTimelineOnLoad(false)
+ .build();
Properties hoodieProps = new Properties();
hoodieProps.load(fs.open(new Path(cfg.targetBasePath +
"/.hoodie/hoodie.properties")));
LOG.info("old props: {}", hoodieProps);
@@ -2670,6 +2671,31 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}
+ @Test
+ public void testAutoGenerateRecordKeys() throws Exception {
+ boolean useSchemaProvider = false;
+ List<String> transformerClassNames = null;
+ PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
+ int parquetRecordsCount = 100;
+ boolean hasTransformer = transformerClassNames != null &&
!transformerClassNames.isEmpty();
+ prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT,
FIRST_PARQUET_FILE_NAME, false, null, null);
+ prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc",
"target.avsc", PROPS_FILENAME_TEST_PARQUET,
+ PARQUET_SOURCE_ROOT, false, "partition_path", "");
+
+ String tableBasePath = basePath + "/test_parquet_table" + testNum;
+ HoodieDeltaStreamer.Config config = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
+ transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
+ useSchemaProvider, 100000, false, null, null, "timestamp", null);
+ HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(config, jsc);
+ deltaStreamer.sync();
+ TestHelpers.assertRecordCount(parquetRecordsCount, tableBasePath,
sqlContext);
+
+ prepareParquetDFSFiles(200, PARQUET_SOURCE_ROOT, "2.parquet", false, null,
null);
+ deltaStreamer.sync();
+ TestHelpers.assertRecordCount(parquetRecordsCount + 200, tableBasePath,
sqlContext);
+ testNum++;
+ }
+
class TestDeltaSync extends DeltaSync {
public TestDeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession
sparkSession, SchemaProvider schemaProvider, TypedProperties props,