This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d1831b03f8a [HUDI-6344] Flink MDT bulk_insert for initial commit 
(#8914)
d1831b03f8a is described below

commit d1831b03f8a6388e99edd8b67c53143ee95ab8cd
Author: Danny Chan <[email protected]>
AuthorDate: Mon Jun 12 14:55:51 2023 +0800

    [HUDI-6344] Flink MDT bulk_insert for initial commit (#8914)
---
 .../apache/hudi/client/HoodieFlinkWriteClient.java | 16 ++++++-
 .../FlinkHoodieBackedTableMetadataWriter.java      | 18 ++++++--
 .../hudi/table/ExplicitWriteHandleTable.java       | 33 +++++++++++---
 .../hudi/table/HoodieFlinkCopyOnWriteTable.java    | 36 ++++++++++++---
 ...FlinkBulkInsertPreppedCommitActionExecutor.java | 51 ++++++++++++++++++++++
 5 files changed, 136 insertions(+), 18 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 0abe0815187..49d873719b6 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -246,7 +246,21 @@ public class HoodieFlinkWriteClient<T> extends
 
   @Override
   public List<WriteStatus> bulkInsertPreppedRecords(List<HoodieRecord<T>> 
preppedRecords, String instantTime, Option<BulkInsertPartitioner> 
bulkInsertPartitioner) {
-    throw new HoodieNotSupportedException("BulkInsertPrepped operation is not 
supported yet");
+    // only used for metadata table, the bulk_insert happens in single JVM 
process
+    HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> 
table =
+        initTable(WriteOperationType.BULK_INSERT_PREPPED, 
Option.ofNullable(instantTime));
+    table.validateInsertSchema();
+    preWrite(instantTime, WriteOperationType.BULK_INSERT_PREPPED, 
table.getMetaClient());
+    Map<String, List<HoodieRecord<T>>> preppedRecordsByFileId = 
preppedRecords.stream().parallel()
+        .collect(Collectors.groupingBy(r -> 
r.getCurrentLocation().getFileId()));
+    return preppedRecordsByFileId.values().stream().parallel().map(records -> {
+      HoodieWriteMetadata<List<WriteStatus>> result;
+      records.get(0).getCurrentLocation().setInstantTime("I");
+      try (AutoCloseableWriteHandle closeableHandle = new 
AutoCloseableWriteHandle(records, instantTime, table, true)) {
+        result = ((HoodieFlinkTable<T>) table).bulkInsertPrepped(context, 
closeableHandle.getWriteHandle(), instantTime, records);
+      }
+      return postWrite(result, instantTime, table);
+    }).flatMap(Collection::stream).collect(Collectors.toList());
   }
 
   @Override
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index d779eea4cc6..8f13315245b 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -40,6 +40,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -107,6 +108,17 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
 
   @Override
   protected void commit(String instantTime, Map<MetadataPartitionType, 
HoodieData<HoodieRecord>> partitionRecordsMap) {
+    doCommit(instantTime, partitionRecordsMap, false);
+  }
+
+  @Override
+  protected void bulkCommit(String instantTime, MetadataPartitionType 
partitionType, HoodieData<HoodieRecord> records, int fileGroupCount) {
+    Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap = 
new HashMap<>();
+    partitionRecordsMap.put(partitionType, records);
+    doCommit(instantTime, partitionRecordsMap, true);
+  }
+
+  private void doCommit(String instantTime, Map<MetadataPartitionType, 
HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing) {
     ValidationUtils.checkState(metadataMetaClient != null, "Metadata table is 
not fully initialized yet.");
     HoodieData<HoodieRecord> preppedRecords = prepRecords(partitionRecordsMap);
     List<HoodieRecord> preppedRecordList = preppedRecords.collectAsList();
@@ -149,9 +161,9 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
         writeClient.getHeartbeatClient().start(instantTime);
       }
 
-      List<WriteStatus> statuses = preppedRecordList.size() > 0
-          ? writeClient.upsertPreppedRecords(preppedRecordList, instantTime)
-          : Collections.emptyList();
+      List<WriteStatus> statuses = isInitializing
+          ? writeClient.bulkInsertPreppedRecords(preppedRecordList, 
instantTime, Option.empty())
+          : writeClient.upsertPreppedRecords(preppedRecordList, instantTime);
       // flink does not support auto-commit yet, also the auto commit logic is 
not complete as BaseHoodieWriteClient now.
       writeClient.commit(instantTime, statuses, Option.empty(), 
HoodieActiveTimeline.DELTA_COMMIT_ACTION, Collections.emptyMap());
 
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java
index 4145c9630cd..0e71b852ca0 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java
@@ -35,7 +35,7 @@ public interface ExplicitWriteHandleTable<T> {
   /**
    * Upsert a batch of new records into Hoodie table at the supplied 
instantTime.
    *
-   * <p>Specifies the write handle explicitly in order to have fine grained 
control with
+   * <p>Specifies the write handle explicitly in order to have fine-grained 
control with
    * the underneath file.
    *
    * @param context     HoodieEngineContext
@@ -53,7 +53,7 @@ public interface ExplicitWriteHandleTable<T> {
   /**
    * Insert a batch of new records into Hoodie table at the supplied 
instantTime.
    *
-   * <p>Specifies the write handle explicitly in order to have fine grained 
control with
+   * <p>Specifies the write handle explicitly in order to have fine-grained 
control with
    * the underneath file.
    *
    * @param context     HoodieEngineContext
@@ -72,7 +72,7 @@ public interface ExplicitWriteHandleTable<T> {
    * Deletes a list of {@link HoodieKey}s from the Hoodie table, at the 
supplied instantTime {@link HoodieKey}s will be
    * de-duped and non existent keys will be removed before deleting.
    *
-   * <p>Specifies the write handle explicitly in order to have fine grained 
control with
+   * <p>Specifies the write handle explicitly in order to have fine-grained 
control with
    * the underneath file.
    *
    * @param context     HoodieEngineContext
@@ -92,12 +92,12 @@ public interface ExplicitWriteHandleTable<T> {
    *
    * <p>This implementation requires that the input records are already 
tagged, and de-duped if needed.
    *
-   * <p>Specifies the write handle explicitly in order to have fine grained 
control with
+   * <p>Specifies the write handle explicitly in order to have fine-grained 
control with
    * the underneath file.
    *
    * @param context        HoodieEngineContext
    * @param instantTime    Instant Time for the action
-   * @param preppedRecords hoodieRecords to upsert
+   * @param preppedRecords HoodieRecords to upsert
    * @return HoodieWriteMetadata
    */
   HoodieWriteMetadata<List<WriteStatus>> upsertPrepped(
@@ -111,12 +111,12 @@ public interface ExplicitWriteHandleTable<T> {
    *
    * <p>This implementation requires that the input records are already 
tagged, and de-duped if needed.
    *
-   * <p>Specifies the write handle explicitly in order to have fine grained 
control with
+   * <p>Specifies the write handle explicitly in order to have fine-grained 
control with
    * the underneath file.
    *
    * @param context        HoodieEngineContext
    * @param instantTime    Instant Time for the action
-   * @param preppedRecords hoodieRecords to upsert
+   * @param preppedRecords Hoodie records to insert
    * @return HoodieWriteMetadata
    */
   HoodieWriteMetadata<List<WriteStatus>> insertPrepped(
@@ -125,6 +125,25 @@ public interface ExplicitWriteHandleTable<T> {
       String instantTime,
       List<HoodieRecord<T>> preppedRecords);
 
+  /**
+   * Bulk inserts the given prepared records into the Hoodie table, at the 
supplied instantTime.
+   *
+   * <p>This implementation requires that the input records are already 
tagged, and de-duped if needed.
+   *
+   * <p>Specifies the write handle explicitly in order to have fine-grained 
control with
+   * the underneath file.
+   *
+   * @param context        HoodieEngineContext
+   * @param instantTime    Instant Time for the action
+   * @param preppedRecords Hoodie records to bulk_insert
+   * @return HoodieWriteMetadata
+   */
+  HoodieWriteMetadata<List<WriteStatus>> bulkInsertPrepped(
+      HoodieEngineContext context,
+      HoodieWriteHandle<?, ?, ?, ?> writeHandle,
+      String instantTime,
+      List<HoodieRecord<T>> preppedRecords);
+
   /**
    * Replaces all the existing records and inserts the specified new records 
into Hoodie table at the supplied instantTime,
    * for the partition paths contained in input records.
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index cb046e2e91f..e07eff4cf3d 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -54,6 +54,7 @@ import 
org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
 import org.apache.hudi.table.action.clean.CleanActionExecutor;
 import org.apache.hudi.table.action.clean.CleanPlanActionExecutor;
 import org.apache.hudi.table.action.cluster.ClusteringPlanActionExecutor;
+import 
org.apache.hudi.table.action.commit.FlinkBulkInsertPreppedCommitActionExecutor;
 import org.apache.hudi.table.action.commit.FlinkDeleteCommitActionExecutor;
 import 
org.apache.hudi.table.action.commit.FlinkDeletePartitionCommitActionExecutor;
 import org.apache.hudi.table.action.commit.FlinkInsertCommitActionExecutor;
@@ -94,7 +95,7 @@ public class HoodieFlinkCopyOnWriteTable<T>
   /**
    * Upsert a batch of new records into Hoodie table at the supplied 
instantTime.
    *
-   * <p>Specifies the write handle explicitly in order to have fine grained 
control with
+   * <p>Specifies the write handle explicitly in order to have fine-grained 
control with
    * the underneath file.
    *
    * @param context     HoodieEngineContext
@@ -114,7 +115,7 @@ public class HoodieFlinkCopyOnWriteTable<T>
   /**
    * Insert a batch of new records into Hoodie table at the supplied 
instantTime.
    *
-   * <p>Specifies the write handle explicitly in order to have fine grained 
control with
+   * <p>Specifies the write handle explicitly in order to have fine-grained 
control with
    * the underneath file.
    *
    * @param context     HoodieEngineContext
@@ -135,7 +136,7 @@ public class HoodieFlinkCopyOnWriteTable<T>
    * Deletes a list of {@link HoodieKey}s from the Hoodie table, at the 
supplied instantTime {@link HoodieKey}s will be
    * de-duped and non existent keys will be removed before deleting.
    *
-   * <p>Specifies the write handle explicitly in order to have fine grained 
control with
+   * <p>Specifies the write handle explicitly in order to have fine-grained 
control with
    * the underneath file.
    *
    * @param context     HoodieEngineContext
@@ -157,12 +158,12 @@ public class HoodieFlinkCopyOnWriteTable<T>
    *
    * <p>This implementation requires that the input records are already 
tagged, and de-duped if needed.
    *
-   * <p>Specifies the write handle explicitly in order to have fine grained 
control with
+   * <p>Specifies the write handle explicitly in order to have fine-grained 
control with
    * the underneath file.
    *
    * @param context        HoodieEngineContext
    * @param instantTime    Instant Time for the action
-   * @param preppedRecords hoodieRecords to upsert
+   * @param preppedRecords Hoodie records to upsert
    * @return HoodieWriteMetadata
    */
   public HoodieWriteMetadata<List<WriteStatus>> upsertPrepped(
@@ -178,12 +179,12 @@ public class HoodieFlinkCopyOnWriteTable<T>
    *
    * <p>This implementation requires that the input records are already 
tagged, and de-duped if needed.
    *
-   * <p>Specifies the write handle explicitly in order to have fine grained 
control with
+   * <p>Specifies the write handle explicitly in order to have fine-grained 
control with
    * the underneath file.
    *
    * @param context        HoodieEngineContext
    * @param instantTime    Instant Time for the action
-   * @param preppedRecords hoodieRecords to upsert
+   * @param preppedRecords Hoodie records to insert
    * @return HoodieWriteMetadata
    */
   public HoodieWriteMetadata<List<WriteStatus>> insertPrepped(
@@ -194,6 +195,27 @@ public class HoodieFlinkCopyOnWriteTable<T>
     return new FlinkInsertPreppedCommitActionExecutor<>(context, writeHandle, 
config, this, instantTime, preppedRecords).execute();
   }
 
+  /**
+   * Bulk inserts the given prepared records into the Hoodie table, at the 
supplied instantTime.
+   *
+   * <p>This implementation requires that the input records are already 
tagged, and de-duped if needed.
+   *
+   * <p>Specifies the write handle explicitly in order to have fine-grained 
control with
+   * the underneath file.
+   *
+   * @param context        HoodieEngineContext
+   * @param instantTime    Instant Time for the action
+   * @param preppedRecords Hoodie records to bulk_insert
+   * @return HoodieWriteMetadata
+   */
+  public HoodieWriteMetadata<List<WriteStatus>> bulkInsertPrepped(
+      HoodieEngineContext context,
+      HoodieWriteHandle<?, ?, ?, ?> writeHandle,
+      String instantTime,
+      List<HoodieRecord<T>> preppedRecords) {
+    return new FlinkBulkInsertPreppedCommitActionExecutor<>(context, 
writeHandle, config, this, instantTime, preppedRecords).execute();
+  }
+
   @Override
   public HoodieWriteMetadata<List<WriteStatus>> insertOverwrite(
       HoodieEngineContext context,
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkBulkInsertPreppedCommitActionExecutor.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkBulkInsertPreppedCommitActionExecutor.java
new file mode 100644
index 00000000000..70c35419970
--- /dev/null
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkBulkInsertPreppedCommitActionExecutor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.HoodieWriteHandle;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import java.util.List;
+
+/**
+ * Flink insert prepped commit action executor.
+ */
+public class FlinkBulkInsertPreppedCommitActionExecutor<T> extends 
BaseFlinkCommitActionExecutor<T> {
+
+  private final List<HoodieRecord<T>> preppedRecords;
+
+  public FlinkBulkInsertPreppedCommitActionExecutor(HoodieEngineContext 
context,
+                                                    HoodieWriteHandle<?, ?, ?, 
?> writeHandle,
+                                                    HoodieWriteConfig config, 
HoodieTable table,
+                                                    String instantTime, 
List<HoodieRecord<T>> preppedRecords) {
+    super(context, writeHandle, config, table, instantTime, 
WriteOperationType.BULK_INSERT_PREPPED);
+    this.preppedRecords = preppedRecords;
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> execute() {
+    return super.execute(preppedRecords);
+  }
+}

Reply via email to