This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4653eaed685 [HUDI-6644] Flink append mode use auto key generator 
(#9362)
4653eaed685 is described below

commit 4653eaed685a29ba81d5484c1aa623e57b44e9ac
Author: Bingeng Huang <[email protected]>
AuthorDate: Fri Aug 4 23:04:53 2023 +0800

    [HUDI-6644] Flink append mode use auto key generator (#9362)
    
    Auto generates the record key for pkless table.
    
    ---------
    
    Co-authored-by: Danny Chan <[email protected]>
---
 .../sink/bucket/BucketBulkInsertWriterHelper.java  |  2 +-
 .../apache/hudi/sink/bulk/AutoRowDataKeyGen.java   | 58 ++++++++++++++++++++++
 .../hudi/sink/bulk/BulkInsertWriterHelper.java     | 16 +++---
 .../org/apache/hudi/sink/bulk/RowDataKeyGen.java   | 53 ++++++++------------
 .../org/apache/hudi/sink/bulk/RowDataKeyGens.java  | 52 +++++++++++++++++++
 .../apache/hudi/sink/bulk/TestRowDataKeyGen.java   | 10 ++--
 6 files changed, 146 insertions(+), 45 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
index 7d1400cb5c1..fcf38e112ad 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
@@ -82,7 +82,7 @@ public class BucketBulkInsertWriterHelper extends 
BulkInsertWriterHelper {
         close();
       }
       HoodieRowDataCreateHandle rowCreateHandle = new 
HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, fileId,
-          instantTime, taskPartitionId, taskId, taskEpochId, rowType, 
preserveHoodieMetadata);
+          instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, 
preserveHoodieMetadata);
       handles.put(fileId, rowCreateHandle);
     }
     return handles.get(fileId);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/AutoRowDataKeyGen.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/AutoRowDataKeyGen.java
new file mode 100644
index 00000000000..bcae6fcd7b6
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/AutoRowDataKeyGen.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.configuration.FlinkOptions;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Key generator for {@link RowData} that use an auto key generator.
+ */
+public class AutoRowDataKeyGen extends RowDataKeyGen {
+  private final int taskId;
+  private final String instantTime;
+  private int rowId;
+
+  public AutoRowDataKeyGen(
+      int taskId,
+      String instantTime,
+      String partitionFields,
+      RowType rowType,
+      boolean hiveStylePartitioning,
+      boolean encodePartitionPath) {
+    super(Option.empty(), partitionFields, rowType, hiveStylePartitioning, 
encodePartitionPath, false, Option.empty());
+    this.taskId = taskId;
+    this.instantTime = instantTime;
+  }
+
+  public static RowDataKeyGen instance(Configuration conf, RowType rowType, 
int taskId, String instantTime) {
+    return new AutoRowDataKeyGen(taskId, instantTime, 
conf.getString(FlinkOptions.PARTITION_PATH_FIELD),
+        rowType, conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING), 
conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING));
+  }
+
+  @Override
+  public String getRecordKey(RowData rowData) {
+    return HoodieRecord.generateSequenceId(instantTime, taskId, rowId++);
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
index 3c0d4fb7662..5ac0a122bbc 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
@@ -53,7 +53,7 @@ public class BulkInsertWriterHelper {
 
   protected final String instantTime;
   protected final int taskPartitionId;
-  protected final long taskId;
+  protected final long totalSubtaskNum;
   protected final long taskEpochId;
   protected final HoodieTable hoodieTable;
   protected final HoodieWriteConfig writeConfig;
@@ -70,24 +70,24 @@ public class BulkInsertWriterHelper {
   protected final RowDataKeyGen keyGen;
 
   public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable, 
HoodieWriteConfig writeConfig,
-                                String instantTime, int taskPartitionId, long 
taskId, long taskEpochId, RowType rowType) {
-    this(conf, hoodieTable, writeConfig, instantTime, taskPartitionId, taskId, 
taskEpochId, rowType, false);
+                                String instantTime, int taskPartitionId, long 
totalSubtaskNum, long taskEpochId, RowType rowType) {
+    this(conf, hoodieTable, writeConfig, instantTime, taskPartitionId, 
totalSubtaskNum, taskEpochId, rowType, false);
   }
 
   public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable, 
HoodieWriteConfig writeConfig,
-                                String instantTime, int taskPartitionId, long 
taskId, long taskEpochId, RowType rowType,
+                                String instantTime, int taskPartitionId, long 
totalSubtaskNum, long taskEpochId, RowType rowType,
                                 boolean preserveHoodieMetadata) {
     this.hoodieTable = hoodieTable;
     this.writeConfig = writeConfig;
     this.instantTime = instantTime;
     this.taskPartitionId = taskPartitionId;
-    this.taskId = taskId;
+    this.totalSubtaskNum = totalSubtaskNum;
     this.taskEpochId = taskEpochId;
     this.rowType = preserveHoodieMetadata ? rowType : 
addMetadataFields(rowType, writeConfig.allowOperationMetadataField()); // patch 
up with metadata fields
     this.preserveHoodieMetadata = preserveHoodieMetadata;
     this.isInputSorted = OptionsResolver.isBulkInsertOperation(conf) && 
conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT);
     this.fileIdPrefix = UUID.randomUUID().toString();
-    this.keyGen = preserveHoodieMetadata ? null : RowDataKeyGen.instance(conf, 
rowType);
+    this.keyGen = preserveHoodieMetadata ? null : 
RowDataKeyGens.instance(conf, rowType, taskPartitionId, instantTime);
   }
 
   /**
@@ -127,7 +127,7 @@ public class BulkInsertWriterHelper {
 
       LOG.info("Creating new file for partition path " + partitionPath);
       HoodieRowDataCreateHandle rowCreateHandle = new 
HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, 
getNextFileId(),
-          instantTime, taskPartitionId, taskId, taskEpochId, rowType, 
preserveHoodieMetadata);
+          instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, 
preserveHoodieMetadata);
       handles.put(partitionPath, rowCreateHandle);
     } else if (!handles.get(partitionPath).canWrite()) {
       // even if there is a handle to the partition path, it could have 
reached its max size threshold. So, we close the handle here and
@@ -135,7 +135,7 @@ public class BulkInsertWriterHelper {
       LOG.info("Rolling max-size file for partition path " + partitionPath);
       writeStatusList.add(handles.remove(partitionPath).close());
       HoodieRowDataCreateHandle rowCreateHandle = new 
HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, 
getNextFileId(),
-          instantTime, taskPartitionId, taskId, taskEpochId, rowType, 
preserveHoodieMetadata);
+          instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, 
preserveHoodieMetadata);
       handles.put(partitionPath, rowCreateHandle);
     }
     return handles.get(partitionPath);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
index 00b2c49e980..a9f34b36d27 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
@@ -57,8 +57,6 @@ public class RowDataKeyGen implements Serializable {
 
   private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
 
-  private final boolean hasRecordKey;
-
   private final String[] recordKeyFields;
   private final String[] partitionPathFields;
 
@@ -80,35 +78,38 @@ public class RowDataKeyGen implements Serializable {
 
   private boolean nonPartitioned;
 
-  private RowDataKeyGen(
-      String recordKeys,
+  protected RowDataKeyGen(
+      Option<String> recordKeys,
       String partitionFields,
       RowType rowType,
       boolean hiveStylePartitioning,
       boolean encodePartitionPath,
       boolean consistentLogicalTimestampEnabled,
       Option<TimestampBasedAvroKeyGenerator> keyGenOpt) {
-    this.recordKeyFields = recordKeys.split(",");
     this.partitionPathFields = partitionFields.split(",");
-    List<String> fieldNames = rowType.getFieldNames();
-    List<LogicalType> fieldTypes = rowType.getChildren();
-
     this.hiveStylePartitioning = hiveStylePartitioning;
     this.encodePartitionPath = encodePartitionPath;
     this.consistentLogicalTimestampEnabled = consistentLogicalTimestampEnabled;
 
-    this.hasRecordKey = hasRecordKey(fieldNames);
-    if (!hasRecordKey) {
-      this.recordKeyProjection = null;
-    } else if (this.recordKeyFields.length == 1) {
-      // efficient code path
-      this.simpleRecordKey = true;
-      int recordKeyIdx = fieldNames.indexOf(this.recordKeyFields[0]);
-      this.recordKeyFieldGetter = 
RowData.createFieldGetter(fieldTypes.get(recordKeyIdx), recordKeyIdx);
+    List<String> fieldNames = rowType.getFieldNames();
+    List<LogicalType> fieldTypes = rowType.getChildren();
+
+    if (!recordKeys.isPresent()) {
+      this.recordKeyFields = null;
       this.recordKeyProjection = null;
     } else {
-      this.recordKeyProjection = getProjection(this.recordKeyFields, 
fieldNames, fieldTypes);
+      this.recordKeyFields = recordKeys.get().split(",");
+      if (this.recordKeyFields.length == 1) {
+        // efficient code path
+        this.simpleRecordKey = true;
+        int recordKeyIdx = fieldNames.indexOf(this.recordKeyFields[0]);
+        this.recordKeyFieldGetter = 
RowData.createFieldGetter(fieldTypes.get(recordKeyIdx), recordKeyIdx);
+        this.recordKeyProjection = null;
+      } else {
+        this.recordKeyProjection = getProjection(this.recordKeyFields, 
fieldNames, fieldTypes);
+      }
     }
+
     if (this.partitionPathFields.length == 1) {
       // efficient code path
       if (this.partitionPathFields[0].equals("")) {
@@ -125,14 +126,6 @@ public class RowDataKeyGen implements Serializable {
     this.keyGenOpt = keyGenOpt;
   }
 
-  /**
-   * Checks whether user provides any record key.
-   */
-  private boolean hasRecordKey(List<String> fieldNames) {
-    return recordKeyFields.length != 1
-        || fieldNames.contains(recordKeyFields[0]);
-  }
-
   public static RowDataKeyGen instance(Configuration conf, RowType rowType) {
     Option<TimestampBasedAvroKeyGenerator> keyGeneratorOpt = Option.empty();
     if 
(TimestampBasedAvroKeyGenerator.class.getName().equals(conf.getString(FlinkOptions.KEYGEN_CLASS_NAME)))
 {
@@ -143,9 +136,9 @@ public class RowDataKeyGen implements Serializable {
       }
     }
     boolean consistentLogicalTimestampEnabled = 
OptionsResolver.isConsistentLogicalTimestampEnabled(conf);
-    return new RowDataKeyGen(conf.getString(FlinkOptions.RECORD_KEY_FIELD), 
conf.getString(FlinkOptions.PARTITION_PATH_FIELD),
+    return new 
RowDataKeyGen(Option.of(conf.getString(FlinkOptions.RECORD_KEY_FIELD)), 
conf.getString(FlinkOptions.PARTITION_PATH_FIELD),
         rowType, conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING), 
conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING),
-        consistentLogicalTimestampEnabled,keyGeneratorOpt);
+        consistentLogicalTimestampEnabled, keyGeneratorOpt);
   }
 
   public HoodieKey getHoodieKey(RowData rowData) {
@@ -153,11 +146,7 @@ public class RowDataKeyGen implements Serializable {
   }
 
   public String getRecordKey(RowData rowData) {
-    if (!hasRecordKey) {
-      // should be optimized to unique values that can be easily calculated 
with low cost
-      // for e.g, fileId + auto inc integer
-      return EMPTY_RECORDKEY_PLACEHOLDER;
-    } else if (this.simpleRecordKey) {
+    if (this.simpleRecordKey) {
       return getRecordKey(recordKeyFieldGetter.getFieldOrNull(rowData), 
this.recordKeyFields[0], consistentLogicalTimestampEnabled);
     } else {
       Object[] keyValues = this.recordKeyProjection.projectAsValues(rowData);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGens.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGens.java
new file mode 100644
index 00000000000..d5c96570dcd
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGens.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.configuration.FlinkOptions;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.List;
+
+/**
+ * Factory class for all kinds of {@link RowDataKeyGen}.
+ */
+public class RowDataKeyGens {
+
+  /**
+   * Creates a {@link RowDataKeyGen} with given configuration.
+   */
+  public static RowDataKeyGen instance(Configuration conf, RowType rowType, 
int taskId, String instantTime) {
+    String recordKeys = conf.getString(FlinkOptions.RECORD_KEY_FIELD);
+    if (hasRecordKey(recordKeys, rowType.getFieldNames())) {
+      return RowDataKeyGen.instance(conf, rowType);
+    } else {
+      return AutoRowDataKeyGen.instance(conf, rowType, taskId, instantTime);
+    }
+  }
+
+  /**
+   * Checks whether user provides any record key.
+   */
+  private static boolean hasRecordKey(String recordKeys, List<String> 
fieldNames) {
+    return recordKeys.split(",").length != 1
+        || fieldNames.contains(recordKeys);
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
index e0125f35c66..d222489bd80 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
@@ -175,18 +175,20 @@ public class TestRowDataKeyGen {
     conf.setString(FlinkOptions.RECORD_KEY_FIELD, "");
     final RowData rowData1 = insertRow(StringData.fromString("id1"), 
StringData.fromString("Danny"), 23,
         TimestampData.fromEpochMillis(1), StringData.fromString("par1"));
-    final RowDataKeyGen keyGen1 = RowDataKeyGen.instance(conf, 
TestConfigurations.ROW_TYPE);
-    assertThat(keyGen1.getRecordKey(rowData1), is("__empty__"));
+    final int taskId = 3;
+    final String instantTime = "000001";
+    final RowDataKeyGen keyGen1 = RowDataKeyGens.instance(conf, 
TestConfigurations.ROW_TYPE, taskId, instantTime);
+    assertThat(keyGen1.getRecordKey(rowData1), is(instantTime + "_" + taskId + 
"_0"));
 
     // null record key and partition path
     final RowData rowData2 = insertRow(TestConfigurations.ROW_TYPE, null, 
StringData.fromString("Danny"), 23,
         TimestampData.fromEpochMillis(1), null);
-    assertThat(keyGen1.getRecordKey(rowData2), is("__empty__"));
+    assertThat(keyGen1.getRecordKey(rowData2), is(instantTime + "_" + taskId + 
"_1"));
 
     // empty record key and partition path
     final RowData rowData3 = insertRow(StringData.fromString(""), 
StringData.fromString("Danny"), 23,
         TimestampData.fromEpochMillis(1), StringData.fromString(""));
-    assertThat(keyGen1.getRecordKey(rowData3), is("__empty__"));
+    assertThat(keyGen1.getRecordKey(rowData3), is(instantTime + "_" + taskId + 
"_2"));
   }
 
   @Test

Reply via email to