This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4653eaed685 [HUDI-6644] Flink append mode use auto key generator
(#9362)
4653eaed685 is described below
commit 4653eaed685a29ba81d5484c1aa623e57b44e9ac
Author: Bingeng Huang <[email protected]>
AuthorDate: Fri Aug 4 23:04:53 2023 +0800
[HUDI-6644] Flink append mode use auto key generator (#9362)
Auto generates the record key for pkless table.
---------
Co-authored-by: Danny Chan <[email protected]>
---
.../sink/bucket/BucketBulkInsertWriterHelper.java | 2 +-
.../apache/hudi/sink/bulk/AutoRowDataKeyGen.java | 58 ++++++++++++++++++++++
.../hudi/sink/bulk/BulkInsertWriterHelper.java | 16 +++---
.../org/apache/hudi/sink/bulk/RowDataKeyGen.java | 53 ++++++++------------
.../org/apache/hudi/sink/bulk/RowDataKeyGens.java | 52 +++++++++++++++++++
.../apache/hudi/sink/bulk/TestRowDataKeyGen.java | 10 ++--
6 files changed, 146 insertions(+), 45 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
index 7d1400cb5c1..fcf38e112ad 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
@@ -82,7 +82,7 @@ public class BucketBulkInsertWriterHelper extends
BulkInsertWriterHelper {
close();
}
HoodieRowDataCreateHandle rowCreateHandle = new
HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, fileId,
- instantTime, taskPartitionId, taskId, taskEpochId, rowType,
preserveHoodieMetadata);
+ instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType,
preserveHoodieMetadata);
handles.put(fileId, rowCreateHandle);
}
return handles.get(fileId);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/AutoRowDataKeyGen.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/AutoRowDataKeyGen.java
new file mode 100644
index 00000000000..bcae6fcd7b6
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/AutoRowDataKeyGen.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.configuration.FlinkOptions;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Key generator for {@link RowData} that use an auto key generator.
+ */
+public class AutoRowDataKeyGen extends RowDataKeyGen {
+ private final int taskId;
+ private final String instantTime;
+ private int rowId;
+
+ public AutoRowDataKeyGen(
+ int taskId,
+ String instantTime,
+ String partitionFields,
+ RowType rowType,
+ boolean hiveStylePartitioning,
+ boolean encodePartitionPath) {
+ super(Option.empty(), partitionFields, rowType, hiveStylePartitioning,
encodePartitionPath, false, Option.empty());
+ this.taskId = taskId;
+ this.instantTime = instantTime;
+ }
+
+ public static RowDataKeyGen instance(Configuration conf, RowType rowType,
int taskId, String instantTime) {
+ return new AutoRowDataKeyGen(taskId, instantTime,
conf.getString(FlinkOptions.PARTITION_PATH_FIELD),
+ rowType, conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING),
conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING));
+ }
+
+ @Override
+ public String getRecordKey(RowData rowData) {
+ return HoodieRecord.generateSequenceId(instantTime, taskId, rowId++);
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
index 3c0d4fb7662..5ac0a122bbc 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
@@ -53,7 +53,7 @@ public class BulkInsertWriterHelper {
protected final String instantTime;
protected final int taskPartitionId;
- protected final long taskId;
+ protected final long totalSubtaskNum;
protected final long taskEpochId;
protected final HoodieTable hoodieTable;
protected final HoodieWriteConfig writeConfig;
@@ -70,24 +70,24 @@ public class BulkInsertWriterHelper {
protected final RowDataKeyGen keyGen;
public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable,
HoodieWriteConfig writeConfig,
- String instantTime, int taskPartitionId, long
taskId, long taskEpochId, RowType rowType) {
- this(conf, hoodieTable, writeConfig, instantTime, taskPartitionId, taskId,
taskEpochId, rowType, false);
+ String instantTime, int taskPartitionId, long
totalSubtaskNum, long taskEpochId, RowType rowType) {
+ this(conf, hoodieTable, writeConfig, instantTime, taskPartitionId,
totalSubtaskNum, taskEpochId, rowType, false);
}
public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable,
HoodieWriteConfig writeConfig,
- String instantTime, int taskPartitionId, long
taskId, long taskEpochId, RowType rowType,
+ String instantTime, int taskPartitionId, long
totalSubtaskNum, long taskEpochId, RowType rowType,
boolean preserveHoodieMetadata) {
this.hoodieTable = hoodieTable;
this.writeConfig = writeConfig;
this.instantTime = instantTime;
this.taskPartitionId = taskPartitionId;
- this.taskId = taskId;
+ this.totalSubtaskNum = totalSubtaskNum;
this.taskEpochId = taskEpochId;
this.rowType = preserveHoodieMetadata ? rowType :
addMetadataFields(rowType, writeConfig.allowOperationMetadataField()); // patch
up with metadata fields
this.preserveHoodieMetadata = preserveHoodieMetadata;
this.isInputSorted = OptionsResolver.isBulkInsertOperation(conf) &&
conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT);
this.fileIdPrefix = UUID.randomUUID().toString();
- this.keyGen = preserveHoodieMetadata ? null : RowDataKeyGen.instance(conf,
rowType);
+ this.keyGen = preserveHoodieMetadata ? null :
RowDataKeyGens.instance(conf, rowType, taskPartitionId, instantTime);
}
/**
@@ -127,7 +127,7 @@ public class BulkInsertWriterHelper {
LOG.info("Creating new file for partition path " + partitionPath);
HoodieRowDataCreateHandle rowCreateHandle = new
HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath,
getNextFileId(),
- instantTime, taskPartitionId, taskId, taskEpochId, rowType,
preserveHoodieMetadata);
+ instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType,
preserveHoodieMetadata);
handles.put(partitionPath, rowCreateHandle);
} else if (!handles.get(partitionPath).canWrite()) {
// even if there is a handle to the partition path, it could have
reached its max size threshold. So, we close the handle here and
@@ -135,7 +135,7 @@ public class BulkInsertWriterHelper {
LOG.info("Rolling max-size file for partition path " + partitionPath);
writeStatusList.add(handles.remove(partitionPath).close());
HoodieRowDataCreateHandle rowCreateHandle = new
HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath,
getNextFileId(),
- instantTime, taskPartitionId, taskId, taskEpochId, rowType,
preserveHoodieMetadata);
+ instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType,
preserveHoodieMetadata);
handles.put(partitionPath, rowCreateHandle);
}
return handles.get(partitionPath);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
index 00b2c49e980..a9f34b36d27 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
@@ -57,8 +57,6 @@ public class RowDataKeyGen implements Serializable {
private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
- private final boolean hasRecordKey;
-
private final String[] recordKeyFields;
private final String[] partitionPathFields;
@@ -80,35 +78,38 @@ public class RowDataKeyGen implements Serializable {
private boolean nonPartitioned;
- private RowDataKeyGen(
- String recordKeys,
+ protected RowDataKeyGen(
+ Option<String> recordKeys,
String partitionFields,
RowType rowType,
boolean hiveStylePartitioning,
boolean encodePartitionPath,
boolean consistentLogicalTimestampEnabled,
Option<TimestampBasedAvroKeyGenerator> keyGenOpt) {
- this.recordKeyFields = recordKeys.split(",");
this.partitionPathFields = partitionFields.split(",");
- List<String> fieldNames = rowType.getFieldNames();
- List<LogicalType> fieldTypes = rowType.getChildren();
-
this.hiveStylePartitioning = hiveStylePartitioning;
this.encodePartitionPath = encodePartitionPath;
this.consistentLogicalTimestampEnabled = consistentLogicalTimestampEnabled;
- this.hasRecordKey = hasRecordKey(fieldNames);
- if (!hasRecordKey) {
- this.recordKeyProjection = null;
- } else if (this.recordKeyFields.length == 1) {
- // efficient code path
- this.simpleRecordKey = true;
- int recordKeyIdx = fieldNames.indexOf(this.recordKeyFields[0]);
- this.recordKeyFieldGetter =
RowData.createFieldGetter(fieldTypes.get(recordKeyIdx), recordKeyIdx);
+ List<String> fieldNames = rowType.getFieldNames();
+ List<LogicalType> fieldTypes = rowType.getChildren();
+
+ if (!recordKeys.isPresent()) {
+ this.recordKeyFields = null;
this.recordKeyProjection = null;
} else {
- this.recordKeyProjection = getProjection(this.recordKeyFields,
fieldNames, fieldTypes);
+ this.recordKeyFields = recordKeys.get().split(",");
+ if (this.recordKeyFields.length == 1) {
+ // efficient code path
+ this.simpleRecordKey = true;
+ int recordKeyIdx = fieldNames.indexOf(this.recordKeyFields[0]);
+ this.recordKeyFieldGetter =
RowData.createFieldGetter(fieldTypes.get(recordKeyIdx), recordKeyIdx);
+ this.recordKeyProjection = null;
+ } else {
+ this.recordKeyProjection = getProjection(this.recordKeyFields,
fieldNames, fieldTypes);
+ }
}
+
if (this.partitionPathFields.length == 1) {
// efficient code path
if (this.partitionPathFields[0].equals("")) {
@@ -125,14 +126,6 @@ public class RowDataKeyGen implements Serializable {
this.keyGenOpt = keyGenOpt;
}
- /**
- * Checks whether user provides any record key.
- */
- private boolean hasRecordKey(List<String> fieldNames) {
- return recordKeyFields.length != 1
- || fieldNames.contains(recordKeyFields[0]);
- }
-
public static RowDataKeyGen instance(Configuration conf, RowType rowType) {
Option<TimestampBasedAvroKeyGenerator> keyGeneratorOpt = Option.empty();
if
(TimestampBasedAvroKeyGenerator.class.getName().equals(conf.getString(FlinkOptions.KEYGEN_CLASS_NAME)))
{
@@ -143,9 +136,9 @@ public class RowDataKeyGen implements Serializable {
}
}
boolean consistentLogicalTimestampEnabled =
OptionsResolver.isConsistentLogicalTimestampEnabled(conf);
- return new RowDataKeyGen(conf.getString(FlinkOptions.RECORD_KEY_FIELD),
conf.getString(FlinkOptions.PARTITION_PATH_FIELD),
+ return new
RowDataKeyGen(Option.of(conf.getString(FlinkOptions.RECORD_KEY_FIELD)),
conf.getString(FlinkOptions.PARTITION_PATH_FIELD),
rowType, conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING),
conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING),
- consistentLogicalTimestampEnabled,keyGeneratorOpt);
+ consistentLogicalTimestampEnabled, keyGeneratorOpt);
}
public HoodieKey getHoodieKey(RowData rowData) {
@@ -153,11 +146,7 @@ public class RowDataKeyGen implements Serializable {
}
public String getRecordKey(RowData rowData) {
- if (!hasRecordKey) {
- // should be optimized to unique values that can be easily calculated
with low cost
- // for e.g, fileId + auto inc integer
- return EMPTY_RECORDKEY_PLACEHOLDER;
- } else if (this.simpleRecordKey) {
+ if (this.simpleRecordKey) {
return getRecordKey(recordKeyFieldGetter.getFieldOrNull(rowData),
this.recordKeyFields[0], consistentLogicalTimestampEnabled);
} else {
Object[] keyValues = this.recordKeyProjection.projectAsValues(rowData);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGens.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGens.java
new file mode 100644
index 00000000000..d5c96570dcd
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGens.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.configuration.FlinkOptions;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.List;
+
+/**
+ * Factory class for all kinds of {@link RowDataKeyGen}.
+ */
+public class RowDataKeyGens {
+
+ /**
+ * Creates a {@link RowDataKeyGen} with given configuration.
+ */
+ public static RowDataKeyGen instance(Configuration conf, RowType rowType,
int taskId, String instantTime) {
+ String recordKeys = conf.getString(FlinkOptions.RECORD_KEY_FIELD);
+ if (hasRecordKey(recordKeys, rowType.getFieldNames())) {
+ return RowDataKeyGen.instance(conf, rowType);
+ } else {
+ return AutoRowDataKeyGen.instance(conf, rowType, taskId, instantTime);
+ }
+ }
+
+ /**
+ * Checks whether user provides any record key.
+ */
+ private static boolean hasRecordKey(String recordKeys, List<String>
fieldNames) {
+ return recordKeys.split(",").length != 1
+ || fieldNames.contains(recordKeys);
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
index e0125f35c66..d222489bd80 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
@@ -175,18 +175,20 @@ public class TestRowDataKeyGen {
conf.setString(FlinkOptions.RECORD_KEY_FIELD, "");
final RowData rowData1 = insertRow(StringData.fromString("id1"),
StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(1), StringData.fromString("par1"));
- final RowDataKeyGen keyGen1 = RowDataKeyGen.instance(conf,
TestConfigurations.ROW_TYPE);
- assertThat(keyGen1.getRecordKey(rowData1), is("__empty__"));
+ final int taskId = 3;
+ final String instantTime = "000001";
+ final RowDataKeyGen keyGen1 = RowDataKeyGens.instance(conf,
TestConfigurations.ROW_TYPE, taskId, instantTime);
+ assertThat(keyGen1.getRecordKey(rowData1), is(instantTime + "_" + taskId +
"_0"));
// null record key and partition path
final RowData rowData2 = insertRow(TestConfigurations.ROW_TYPE, null,
StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(1), null);
- assertThat(keyGen1.getRecordKey(rowData2), is("__empty__"));
+ assertThat(keyGen1.getRecordKey(rowData2), is(instantTime + "_" + taskId +
"_1"));
// empty record key and partition path
final RowData rowData3 = insertRow(StringData.fromString(""),
StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(1), StringData.fromString(""));
- assertThat(keyGen1.getRecordKey(rowData3), is("__empty__"));
+ assertThat(keyGen1.getRecordKey(rowData3), is(instantTime + "_" + taskId +
"_2"));
}
@Test