This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d1831b03f8a [HUDI-6344] Flink MDT bulk_insert for initial commit
(#8914)
d1831b03f8a is described below
commit d1831b03f8a6388e99edd8b67c53143ee95ab8cd
Author: Danny Chan <[email protected]>
AuthorDate: Mon Jun 12 14:55:51 2023 +0800
[HUDI-6344] Flink MDT bulk_insert for initial commit (#8914)
---
.../apache/hudi/client/HoodieFlinkWriteClient.java | 16 ++++++-
.../FlinkHoodieBackedTableMetadataWriter.java | 18 ++++++--
.../hudi/table/ExplicitWriteHandleTable.java | 33 +++++++++++---
.../hudi/table/HoodieFlinkCopyOnWriteTable.java | 36 ++++++++++++---
...FlinkBulkInsertPreppedCommitActionExecutor.java | 51 ++++++++++++++++++++++
5 files changed, 136 insertions(+), 18 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 0abe0815187..49d873719b6 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -246,7 +246,21 @@ public class HoodieFlinkWriteClient<T> extends
@Override
public List<WriteStatus> bulkInsertPreppedRecords(List<HoodieRecord<T>>
preppedRecords, String instantTime, Option<BulkInsertPartitioner>
bulkInsertPartitioner) {
- throw new HoodieNotSupportedException("BulkInsertPrepped operation is not
supported yet");
+ // only used for metadata table, the bulk_insert happens in single JVM
process
+ HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>
table =
+ initTable(WriteOperationType.BULK_INSERT_PREPPED,
Option.ofNullable(instantTime));
+ table.validateInsertSchema();
+ preWrite(instantTime, WriteOperationType.BULK_INSERT_PREPPED,
table.getMetaClient());
+ Map<String, List<HoodieRecord<T>>> preppedRecordsByFileId =
preppedRecords.stream().parallel()
+ .collect(Collectors.groupingBy(r ->
r.getCurrentLocation().getFileId()));
+ return preppedRecordsByFileId.values().stream().parallel().map(records -> {
+ HoodieWriteMetadata<List<WriteStatus>> result;
+ records.get(0).getCurrentLocation().setInstantTime("I");
+ try (AutoCloseableWriteHandle closeableHandle = new
AutoCloseableWriteHandle(records, instantTime, table, true)) {
+ result = ((HoodieFlinkTable<T>) table).bulkInsertPrepped(context,
closeableHandle.getWriteHandle(), instantTime, records);
+ }
+ return postWrite(result, instantTime, table);
+ }).flatMap(Collection::stream).collect(Collectors.toList());
}
@Override
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index d779eea4cc6..8f13315245b 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -40,6 +40,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -107,6 +108,17 @@ public class FlinkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
@Override
protected void commit(String instantTime, Map<MetadataPartitionType,
HoodieData<HoodieRecord>> partitionRecordsMap) {
+ doCommit(instantTime, partitionRecordsMap, false);
+ }
+
+ @Override
+ protected void bulkCommit(String instantTime, MetadataPartitionType
partitionType, HoodieData<HoodieRecord> records, int fileGroupCount) {
+ Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap =
new HashMap<>();
+ partitionRecordsMap.put(partitionType, records);
+ doCommit(instantTime, partitionRecordsMap, true);
+ }
+
+ private void doCommit(String instantTime, Map<MetadataPartitionType,
HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing) {
ValidationUtils.checkState(metadataMetaClient != null, "Metadata table is
not fully initialized yet.");
HoodieData<HoodieRecord> preppedRecords = prepRecords(partitionRecordsMap);
List<HoodieRecord> preppedRecordList = preppedRecords.collectAsList();
@@ -149,9 +161,9 @@ public class FlinkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
writeClient.getHeartbeatClient().start(instantTime);
}
- List<WriteStatus> statuses = preppedRecordList.size() > 0
- ? writeClient.upsertPreppedRecords(preppedRecordList, instantTime)
- : Collections.emptyList();
+ List<WriteStatus> statuses = isInitializing
+ ? writeClient.bulkInsertPreppedRecords(preppedRecordList,
instantTime, Option.empty())
+ : writeClient.upsertPreppedRecords(preppedRecordList, instantTime);
// flink does not support auto-commit yet, also the auto commit logic is
not complete as BaseHoodieWriteClient now.
writeClient.commit(instantTime, statuses, Option.empty(),
HoodieActiveTimeline.DELTA_COMMIT_ACTION, Collections.emptyMap());
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java
index 4145c9630cd..0e71b852ca0 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java
@@ -35,7 +35,7 @@ public interface ExplicitWriteHandleTable<T> {
/**
* Upsert a batch of new records into Hoodie table at the supplied
instantTime.
*
- * <p>Specifies the write handle explicitly in order to have fine grained
control with
+ * <p>Specifies the write handle explicitly in order to have fine-grained
control with
* the underneath file.
*
* @param context HoodieEngineContext
@@ -53,7 +53,7 @@ public interface ExplicitWriteHandleTable<T> {
/**
* Insert a batch of new records into Hoodie table at the supplied
instantTime.
*
- * <p>Specifies the write handle explicitly in order to have fine grained
control with
+ * <p>Specifies the write handle explicitly in order to have fine-grained
control with
* the underneath file.
*
* @param context HoodieEngineContext
@@ -72,7 +72,7 @@ public interface ExplicitWriteHandleTable<T> {
* Deletes a list of {@link HoodieKey}s from the Hoodie table, at the
supplied instantTime {@link HoodieKey}s will be
* de-duped and non existent keys will be removed before deleting.
*
- * <p>Specifies the write handle explicitly in order to have fine grained
control with
+ * <p>Specifies the write handle explicitly in order to have fine-grained
control with
* the underneath file.
*
* @param context HoodieEngineContext
@@ -92,12 +92,12 @@ public interface ExplicitWriteHandleTable<T> {
*
* <p>This implementation requires that the input records are already
tagged, and de-duped if needed.
*
- * <p>Specifies the write handle explicitly in order to have fine grained
control with
+ * <p>Specifies the write handle explicitly in order to have fine-grained
control with
* the underneath file.
*
* @param context HoodieEngineContext
* @param instantTime Instant Time for the action
- * @param preppedRecords hoodieRecords to upsert
+ * @param preppedRecords HoodieRecords to upsert
* @return HoodieWriteMetadata
*/
HoodieWriteMetadata<List<WriteStatus>> upsertPrepped(
@@ -111,12 +111,12 @@ public interface ExplicitWriteHandleTable<T> {
*
* <p>This implementation requires that the input records are already
tagged, and de-duped if needed.
*
- * <p>Specifies the write handle explicitly in order to have fine grained
control with
+ * <p>Specifies the write handle explicitly in order to have fine-grained
control with
* the underneath file.
*
* @param context HoodieEngineContext
* @param instantTime Instant Time for the action
- * @param preppedRecords hoodieRecords to upsert
+ * @param preppedRecords Hoodie records to insert
* @return HoodieWriteMetadata
*/
HoodieWriteMetadata<List<WriteStatus>> insertPrepped(
@@ -125,6 +125,25 @@ public interface ExplicitWriteHandleTable<T> {
String instantTime,
List<HoodieRecord<T>> preppedRecords);
+ /**
+ * Bulk inserts the given prepared records into the Hoodie table, at the
supplied instantTime.
+ *
+ * <p>This implementation requires that the input records are already
tagged, and de-duped if needed.
+ *
+ * <p>Specifies the write handle explicitly in order to have fine-grained
control with
+ * the underneath file.
+ *
+ * @param context HoodieEngineContext
+ * @param instantTime Instant Time for the action
+ * @param preppedRecords Hoodie records to bulk_insert
+ * @return HoodieWriteMetadata
+ */
+ HoodieWriteMetadata<List<WriteStatus>> bulkInsertPrepped(
+ HoodieEngineContext context,
+ HoodieWriteHandle<?, ?, ?, ?> writeHandle,
+ String instantTime,
+ List<HoodieRecord<T>> preppedRecords);
+
/**
* Replaces all the existing records and inserts the specified new records
into Hoodie table at the supplied instantTime,
* for the partition paths contained in input records.
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index cb046e2e91f..e07eff4cf3d 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -54,6 +54,7 @@ import
org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.hudi.table.action.clean.CleanActionExecutor;
import org.apache.hudi.table.action.clean.CleanPlanActionExecutor;
import org.apache.hudi.table.action.cluster.ClusteringPlanActionExecutor;
+import
org.apache.hudi.table.action.commit.FlinkBulkInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.FlinkDeleteCommitActionExecutor;
import
org.apache.hudi.table.action.commit.FlinkDeletePartitionCommitActionExecutor;
import org.apache.hudi.table.action.commit.FlinkInsertCommitActionExecutor;
@@ -94,7 +95,7 @@ public class HoodieFlinkCopyOnWriteTable<T>
/**
* Upsert a batch of new records into Hoodie table at the supplied
instantTime.
*
- * <p>Specifies the write handle explicitly in order to have fine grained
control with
+ * <p>Specifies the write handle explicitly in order to have fine-grained
control with
* the underneath file.
*
* @param context HoodieEngineContext
@@ -114,7 +115,7 @@ public class HoodieFlinkCopyOnWriteTable<T>
/**
* Insert a batch of new records into Hoodie table at the supplied
instantTime.
*
- * <p>Specifies the write handle explicitly in order to have fine grained
control with
+ * <p>Specifies the write handle explicitly in order to have fine-grained
control with
* the underneath file.
*
* @param context HoodieEngineContext
@@ -135,7 +136,7 @@ public class HoodieFlinkCopyOnWriteTable<T>
* Deletes a list of {@link HoodieKey}s from the Hoodie table, at the
supplied instantTime {@link HoodieKey}s will be
* de-duped and non existent keys will be removed before deleting.
*
- * <p>Specifies the write handle explicitly in order to have fine grained
control with
+ * <p>Specifies the write handle explicitly in order to have fine-grained
control with
* the underneath file.
*
* @param context HoodieEngineContext
@@ -157,12 +158,12 @@ public class HoodieFlinkCopyOnWriteTable<T>
*
* <p>This implementation requires that the input records are already
tagged, and de-duped if needed.
*
- * <p>Specifies the write handle explicitly in order to have fine grained
control with
+ * <p>Specifies the write handle explicitly in order to have fine-grained
control with
* the underneath file.
*
* @param context HoodieEngineContext
* @param instantTime Instant Time for the action
- * @param preppedRecords hoodieRecords to upsert
+ * @param preppedRecords Hoodie records to upsert
* @return HoodieWriteMetadata
*/
public HoodieWriteMetadata<List<WriteStatus>> upsertPrepped(
@@ -178,12 +179,12 @@ public class HoodieFlinkCopyOnWriteTable<T>
*
* <p>This implementation requires that the input records are already
tagged, and de-duped if needed.
*
- * <p>Specifies the write handle explicitly in order to have fine grained
control with
+ * <p>Specifies the write handle explicitly in order to have fine-grained
control with
* the underneath file.
*
* @param context HoodieEngineContext
* @param instantTime Instant Time for the action
- * @param preppedRecords hoodieRecords to upsert
+ * @param preppedRecords Hoodie records to insert
* @return HoodieWriteMetadata
*/
public HoodieWriteMetadata<List<WriteStatus>> insertPrepped(
@@ -194,6 +195,27 @@ public class HoodieFlinkCopyOnWriteTable<T>
return new FlinkInsertPreppedCommitActionExecutor<>(context, writeHandle,
config, this, instantTime, preppedRecords).execute();
}
+ /**
+ * Bulk inserts the given prepared records into the Hoodie table, at the
supplied instantTime.
+ *
+ * <p>This implementation requires that the input records are already
tagged, and de-duped if needed.
+ *
+ * <p>Specifies the write handle explicitly in order to have fine-grained
control with
+ * the underneath file.
+ *
+ * @param context HoodieEngineContext
+ * @param instantTime Instant Time for the action
+ * @param preppedRecords Hoodie records to bulk_insert
+ * @return HoodieWriteMetadata
+ */
+ public HoodieWriteMetadata<List<WriteStatus>> bulkInsertPrepped(
+ HoodieEngineContext context,
+ HoodieWriteHandle<?, ?, ?, ?> writeHandle,
+ String instantTime,
+ List<HoodieRecord<T>> preppedRecords) {
+ return new FlinkBulkInsertPreppedCommitActionExecutor<>(context,
writeHandle, config, this, instantTime, preppedRecords).execute();
+ }
+
@Override
public HoodieWriteMetadata<List<WriteStatus>> insertOverwrite(
HoodieEngineContext context,
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkBulkInsertPreppedCommitActionExecutor.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkBulkInsertPreppedCommitActionExecutor.java
new file mode 100644
index 00000000000..70c35419970
--- /dev/null
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkBulkInsertPreppedCommitActionExecutor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.HoodieWriteHandle;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import java.util.List;
+
+/**
+ * Flink insert prepped commit action executor.
+ */
+public class FlinkBulkInsertPreppedCommitActionExecutor<T> extends
BaseFlinkCommitActionExecutor<T> {
+
+ private final List<HoodieRecord<T>> preppedRecords;
+
+ public FlinkBulkInsertPreppedCommitActionExecutor(HoodieEngineContext
context,
+ HoodieWriteHandle<?, ?, ?,
?> writeHandle,
+ HoodieWriteConfig config,
HoodieTable table,
+ String instantTime,
List<HoodieRecord<T>> preppedRecords) {
+ super(context, writeHandle, config, table, instantTime,
WriteOperationType.BULK_INSERT_PREPPED);
+ this.preppedRecords = preppedRecords;
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>> execute() {
+ return super.execute(preppedRecords);
+ }
+}