This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ab4a7b0 [HUDI-1788] Insert overwrite (table) for Flink writer (#2808)
ab4a7b0 is described below
commit ab4a7b0b4afc66d2123c7f63fdab77b925a8a7f1
Author: Danny Chan <[email protected]>
AuthorDate: Wed Apr 14 10:23:37 2021 +0800
[HUDI-1788] Insert overwrite (table) for Flink writer (#2808)
Supports `INSERT OVERWRITE` and `INSERT OVERWRITE TABLE` for Flink
writer.
---
.../apache/hudi/client/HoodieFlinkWriteClient.java | 122 +++++++++++++++------
.../hudi/table/ExplicitWriteHandleTable.java | 32 ++++++
.../hudi/table/HoodieFlinkCopyOnWriteTable.java | 24 +++-
.../FlinkInsertOverwriteCommitActionExecutor.java | 69 ++++++++++++
...nkInsertOverwriteTableCommitActionExecutor.java | 50 +++++++++
.../hudi/common/model/WriteOperationType.java | 4 +
.../org/apache/hudi/common/util/CommitUtils.java | 11 ++
.../apache/hudi/sink/InstantGenerateOperator.java | 4 +-
.../org/apache/hudi/sink/StreamWriteFunction.java | 21 +++-
.../hudi/sink/StreamWriteOperatorCoordinator.java | 52 ++++++++-
.../sink/partitioner/BucketAssignFunction.java | 5 +-
.../hudi/sink/partitioner/BucketAssigners.java | 13 ++-
...Assigners.java => OverwriteBucketAssigner.java} | 43 +++-----
.../org/apache/hudi/table/HoodieTableSink.java | 23 +++-
.../sink/TestStreamWriteOperatorCoordinator.java | 10 +-
.../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 27 +++--
.../org/apache/hudi/sink/TestWriteMergeOnRead.java | 4 +-
.../hudi/sink/TestWriteMergeOnReadWithCompact.java | 4 +-
.../apache/hudi/table/HoodieDataSourceITCase.java | 50 ++++++++-
.../test/java/org/apache/hudi/utils/TestData.java | 20 ++++
.../main/java/org/apache/hudi/DataSourceUtils.java | 11 --
.../internal/DataSourceInternalWriterHelper.java | 4 +-
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 5 +-
23 files changed, 490 insertions(+), 118 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 7eeec4c..a84e116 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -22,6 +22,8 @@ import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -139,10 +141,8 @@ public class HoodieFlinkWriteClient<T extends
HoodieRecordPayload> extends
getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
- final HoodieRecord<T> record = records.get(0);
- final boolean isDelta =
table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ);
- final HoodieWriteHandle<?, ?, ?, ?> writeHandle =
getOrCreateWriteHandle(record, isDelta, getConfig(),
- instantTime, table, record.getPartitionPath(), records.listIterator());
+ final HoodieWriteHandle<?, ?, ?, ?> writeHandle =
getOrCreateWriteHandle(records.get(0), getConfig(),
+ instantTime, table, records.listIterator());
HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable<T>)
table).upsert(context, writeHandle, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR,
result.getIndexLookupDuration().get().toMillis());
@@ -162,10 +162,8 @@ public class HoodieFlinkWriteClient<T extends
HoodieRecordPayload> extends
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient());
// create the write handle if not exists
- final HoodieRecord<T> record = records.get(0);
- final boolean isDelta =
table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ);
- final HoodieWriteHandle<?, ?, ?, ?> writeHandle =
getOrCreateWriteHandle(record, isDelta, getConfig(),
- instantTime, table, record.getPartitionPath(), records.listIterator());
+ final HoodieWriteHandle<?, ?, ?, ?> writeHandle =
getOrCreateWriteHandle(records.get(0), getConfig(),
+ instantTime, table, records.listIterator());
HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable<T>)
table).insert(context, writeHandle, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR,
result.getIndexLookupDuration().get().toMillis());
@@ -173,6 +171,45 @@ public class HoodieFlinkWriteClient<T extends
HoodieRecordPayload> extends
return postWrite(result, instantTime, table);
}
+ /**
+ * Removes all existing records from the partitions affected and inserts the
given HoodieRecords, into the table.
+ *
+ * @param records HoodieRecords to insert
+ * @param instantTime Instant time of the commit
+ * @return list of WriteStatus to inspect errors and counts
+ */
+ public List<WriteStatus> insertOverwrite(
+ List<HoodieRecord<T>> records, final String instantTime) {
+ HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>
table =
+ getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE, instantTime);
+ table.validateInsertSchema();
+ preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE,
table.getMetaClient());
+ // create the write handle if not exists
+ final HoodieWriteHandle<?, ?, ?, ?> writeHandle =
getOrCreateWriteHandle(records.get(0), getConfig(),
+ instantTime, table, records.listIterator());
+ HoodieWriteMetadata result = ((HoodieFlinkTable<T>)
table).insertOverwrite(context, writeHandle, instantTime, records);
+ return postWrite(result, instantTime, table);
+ }
+
+ /**
+ * Removes all existing records of the Hoodie table and inserts the given
HoodieRecords, into the table.
+ *
+ * @param records HoodieRecords to insert
+ * @param instantTime Instant time of the commit
+ * @return list of WriteStatus to inspect errors and counts
+ */
+ public List<WriteStatus> insertOverwriteTable(
+ List<HoodieRecord<T>> records, final String instantTime) {
+ HoodieTable table =
getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE_TABLE, instantTime);
+ table.validateInsertSchema();
+ preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE_TABLE,
table.getMetaClient());
+ // create the write handle if not exists
+ final HoodieWriteHandle<?, ?, ?, ?> writeHandle =
getOrCreateWriteHandle(records.get(0), getConfig(),
+ instantTime, table, records.listIterator());
+ HoodieWriteMetadata result = ((HoodieFlinkTable<T>)
table).insertOverwriteTable(context, writeHandle, instantTime, records);
+ return postWrite(result, instantTime, table);
+ }
+
@Override
public List<WriteStatus> insertPreppedRecords(List<HoodieRecord<T>>
preppedRecords, String instantTime) {
throw new HoodieNotSupportedException("InsertPrepped operation is not
supported yet");
@@ -353,27 +390,25 @@ public class HoodieFlinkWriteClient<T extends
HoodieRecordPayload> extends
* Get or create a new write handle in order to reuse the file handles.
*
* @param record The first record in the bucket
- * @param isDelta Whether the table is in MOR mode
* @param config Write config
* @param instantTime The instant time
* @param table The table
- * @param partitionPath Partition path
* @param recordItr Record iterator
* @return Existing write handle or create a new one
*/
private HoodieWriteHandle<?, ?, ?, ?> getOrCreateWriteHandle(
HoodieRecord<T> record,
- boolean isDelta,
HoodieWriteConfig config,
String instantTime,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> table,
- String partitionPath,
Iterator<HoodieRecord<T>> recordItr) {
final HoodieRecordLocation loc = record.getCurrentLocation();
final String fileID = loc.getFileId();
if (bucketToHandles.containsKey(fileID)) {
return bucketToHandles.get(fileID);
}
+ final String partitionPath = record.getPartitionPath();
+ final boolean isDelta =
table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ);
final HoodieWriteHandle<?, ?, ?, ?> writeHandle;
if (isDelta) {
writeHandle = new FlinkAppendHandle<>(config, instantTime, table,
partitionPath, fileID, recordItr,
@@ -409,19 +444,23 @@ public class HoodieFlinkWriteClient<T extends
HoodieRecordPayload> extends
.collect(Collectors.toList());
}
- public String getInflightAndRequestedInstant(String tableType) {
- final String commitType =
CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
+ public String getLastPendingInstant(HoodieTableType tableType) {
+ final String actionType = CommitUtils.getCommitActionType(tableType);
+ return getLastPendingInstant(actionType);
+ }
+
+ public String getLastPendingInstant(String actionType) {
HoodieTimeline unCompletedTimeline =
getHoodieTable().getMetaClient().getCommitsTimeline().filterInflightsAndRequested();
return unCompletedTimeline.getInstants()
- .filter(x -> x.getAction().equals(commitType))
+ .filter(x -> x.getAction().equals(actionType))
.map(HoodieInstant::getTimestamp)
.collect(Collectors.toList()).stream()
.max(Comparator.naturalOrder())
.orElse(null);
}
- public String getLastCompletedInstant(String tableType) {
- final String commitType =
CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
+ public String getLastCompletedInstant(HoodieTableType tableType) {
+ final String commitType = CommitUtils.getCommitActionType(tableType);
HoodieTimeline completedTimeline =
getHoodieTable().getMetaClient().getCommitsTimeline().filterCompletedInstants();
return completedTimeline.getInstants()
.filter(x -> x.getAction().equals(commitType))
@@ -431,32 +470,49 @@ public class HoodieFlinkWriteClient<T extends
HoodieRecordPayload> extends
.orElse(null);
}
- public void deletePendingInstant(String tableType, String instant) {
- HoodieFlinkTable<T> table = getHoodieTable();
- String commitType =
CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
- HoodieActiveTimeline activeTimeline =
table.getMetaClient().getActiveTimeline();
- activeTimeline.deletePendingIfExists(HoodieInstant.State.INFLIGHT,
commitType, instant);
- activeTimeline.deletePendingIfExists(HoodieInstant.State.REQUESTED,
commitType, instant);
- }
-
- public void transitionRequestedToInflight(String tableType, String
inFlightInstant) {
+ public void transitionRequestedToInflight(String commitType, String
inFlightInstant) {
HoodieFlinkTable<T> table = getHoodieTable();
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
- String commitType =
CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
HoodieInstant requested = new HoodieInstant(HoodieInstant.State.REQUESTED,
commitType, inFlightInstant);
activeTimeline.transitionRequestedToInflight(requested, Option.empty(),
config.shouldAllowMultiWriteOnSameInstant());
}
- public void rollbackInflightCompaction(HoodieInstant inflightInstant) {
+ public HoodieFlinkTable<T> getHoodieTable() {
+ return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
+ }
+
+ public Map<String, List<String>> getPartitionToReplacedFileIds(
+ WriteOperationType writeOperationType,
+ List<WriteStatus> writeStatuses) {
HoodieFlinkTable<T> table = getHoodieTable();
- HoodieTimeline pendingCompactionTimeline =
table.getActiveTimeline().filterPendingCompactionTimeline();
- if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
- rollbackInflightCompaction(inflightInstant, table);
+ switch (writeOperationType) {
+ case INSERT_OVERWRITE:
+ return writeStatuses.stream().map(status ->
status.getStat().getPartitionPath()).distinct()
+ .collect(
+ Collectors.toMap(
+ partition -> partition,
+ partitionPath -> getAllExistingFileIds(table,
partitionPath)));
+ case INSERT_OVERWRITE_TABLE:
+ Map<String, List<String>> partitionToExistingFileIds = new HashMap<>();
+ List<String> partitionPaths =
+ FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(),
table.getMetaClient().getBasePath());
+ if (partitionPaths != null && partitionPaths.size() > 0) {
+ context.setJobStatus(this.getClass().getSimpleName(), "Getting
ExistingFileIds of all partitions");
+ partitionToExistingFileIds = partitionPaths.stream().parallel()
+ .collect(
+ Collectors.toMap(
+ partition -> partition,
+ partition -> getAllExistingFileIds(table, partition)));
+ }
+ return partitionToExistingFileIds;
+ default:
+ throw new AssertionError();
}
}
- public HoodieFlinkTable<T> getHoodieTable() {
- return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
+ private List<String> getAllExistingFileIds(HoodieFlinkTable<T> table, String
partitionPath) {
+ // because new commit is not complete. it is safe to mark all existing
file Ids as old files
+ return
table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList());
}
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java
index c0699ff..f799919 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java
@@ -125,4 +125,36 @@ public interface ExplicitWriteHandleTable<T extends
HoodieRecordPayload> {
HoodieWriteHandle<?, ?, ?, ?> writeHandle,
String instantTime,
List<HoodieRecord<T>> preppedRecords);
+
+ /**
+ * Replaces all the existing records and inserts the specified new records
into Hoodie table at the supplied instantTime,
+ * for the partition paths contained in input records.
+ *
+ * @param context HoodieEngineContext
+ * @param writeHandle The write handle
+ * @param instantTime Instant time for the replace action
+ * @param records input records
+ * @return HoodieWriteMetadata
+ */
+ HoodieWriteMetadata<List<WriteStatus>> insertOverwrite(
+ HoodieEngineContext context,
+ HoodieWriteHandle<?, ?, ?, ?> writeHandle,
+ String instantTime,
+ List<HoodieRecord<T>> records);
+
+ /**
+ * Deletes all the existing records of the Hoodie table and inserts the
specified new records into Hoodie table at the supplied instantTime,
+ * for the partition paths contained in input records.
+ *
+ * @param context HoodieEngineContext
+ * @param writeHandle The write handle
+ * @param instantTime Instant time for the replace action
+ * @param records input records
+ * @return HoodieWriteMetadata
+ */
+ HoodieWriteMetadata<List<WriteStatus>> insertOverwriteTable(
+ HoodieEngineContext context,
+ HoodieWriteHandle<?, ?, ?, ?> writeHandle,
+ String instantTime,
+ List<HoodieRecord<T>> records);
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 167b376..d8bdb9f 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -47,6 +47,8 @@ import
org.apache.hudi.table.action.clean.FlinkCleanActionExecutor;
import org.apache.hudi.table.action.clean.FlinkScheduleCleanActionExecutor;
import org.apache.hudi.table.action.commit.FlinkDeleteCommitActionExecutor;
import org.apache.hudi.table.action.commit.FlinkInsertCommitActionExecutor;
+import
org.apache.hudi.table.action.commit.FlinkInsertOverwriteCommitActionExecutor;
+import
org.apache.hudi.table.action.commit.FlinkInsertOverwriteTableCommitActionExecutor;
import
org.apache.hudi.table.action.commit.FlinkInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.FlinkMergeHelper;
import org.apache.hudi.table.action.commit.FlinkUpsertCommitActionExecutor;
@@ -182,6 +184,24 @@ public class HoodieFlinkCopyOnWriteTable<T extends
HoodieRecordPayload> extends
}
@Override
+ public HoodieWriteMetadata<List<WriteStatus>> insertOverwrite(
+ HoodieEngineContext context,
+ HoodieWriteHandle<?, ?, ?, ?> writeHandle,
+ String instantTime,
+ List<HoodieRecord<T>> records) {
+ return new FlinkInsertOverwriteCommitActionExecutor(context, writeHandle,
config, this, instantTime, records).execute();
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>> insertOverwriteTable(
+ HoodieEngineContext context,
+ HoodieWriteHandle<?, ?, ?, ?> writeHandle,
+ String instantTime,
+ List<HoodieRecord<T>> records) {
+ return new FlinkInsertOverwriteTableCommitActionExecutor(context,
writeHandle, config, this, instantTime, records).execute();
+ }
+
+ @Override
public HoodieWriteMetadata<List<WriteStatus>> upsert(HoodieEngineContext
context, String instantTime, List<HoodieRecord<T>> records) {
throw new HoodieNotSupportedException("This method should not be invoked");
}
@@ -229,12 +249,12 @@ public class HoodieFlinkCopyOnWriteTable<T extends
HoodieRecordPayload> extends
@Override
public HoodieWriteMetadata<List<WriteStatus>>
insertOverwrite(HoodieEngineContext context, String instantTime,
List<HoodieRecord<T>> records) {
- throw new HoodieNotSupportedException("InsertOverWrite is not supported
yet");
+ throw new HoodieNotSupportedException("This method should not be invoked");
}
@Override
public HoodieWriteMetadata<List<WriteStatus>>
insertOverwriteTable(HoodieEngineContext context, String instantTime,
List<HoodieRecord<T>> records) {
- throw new HoodieNotSupportedException("insertOverwriteTable is not
supported yet");
+ throw new HoodieNotSupportedException("This method should not be invoked");
}
@Override
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertOverwriteCommitActionExecutor.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertOverwriteCommitActionExecutor.java
new file mode 100644
index 0000000..583e0b6
--- /dev/null
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertOverwriteCommitActionExecutor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.HoodieWriteHandle;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import java.util.List;
+
+public class FlinkInsertOverwriteCommitActionExecutor<T extends
HoodieRecordPayload<T>>
+ extends BaseFlinkCommitActionExecutor<T> {
+
+ protected List<HoodieRecord<T>> inputRecords;
+
+ public FlinkInsertOverwriteCommitActionExecutor(HoodieEngineContext context,
+ HoodieWriteHandle<?, ?, ?,
?> writeHandle,
+ HoodieWriteConfig config,
+ HoodieTable table,
+ String instantTime,
+ List<HoodieRecord<T>>
inputRecords) {
+ this(context, writeHandle, config, table, instantTime, inputRecords,
WriteOperationType.INSERT_OVERWRITE);
+ }
+
+ public FlinkInsertOverwriteCommitActionExecutor(HoodieEngineContext context,
+ HoodieWriteHandle<?, ?, ?,
?> writeHandle,
+ HoodieWriteConfig config,
+ HoodieTable table,
+ String instantTime,
+ List<HoodieRecord<T>>
inputRecords,
+ WriteOperationType
writeOperationType) {
+ super(context, writeHandle, config, table, instantTime,
writeOperationType);
+ this.inputRecords = inputRecords;
+ }
+
+ @Override
+ protected String getCommitActionType() {
+ return HoodieTimeline.REPLACE_COMMIT_ACTION;
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>> execute() {
+ return FlinkWriteHelper.newInstance().write(instantTime, inputRecords,
context, table,
+ config.shouldCombineBeforeInsert(),
config.getInsertShuffleParallelism(), this, false);
+ }
+}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertOverwriteTableCommitActionExecutor.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertOverwriteTableCommitActionExecutor.java
new file mode 100644
index 0000000..a31679b
--- /dev/null
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertOverwriteTableCommitActionExecutor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.HoodieWriteHandle;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import java.util.List;
+
+public class FlinkInsertOverwriteTableCommitActionExecutor<T extends
HoodieRecordPayload<T>>
+ extends FlinkInsertOverwriteCommitActionExecutor<T> {
+
+ public FlinkInsertOverwriteTableCommitActionExecutor(HoodieEngineContext
context,
+ HoodieWriteHandle<?, ?,
?, ?> writeHandle,
+ HoodieWriteConfig
config,
+ HoodieTable table,
+ String instantTime,
+ List<HoodieRecord<T>>
inputRecords) {
+ super(context, writeHandle, config, table, instantTime, inputRecords,
WriteOperationType.INSERT_OVERWRITE_TABLE);
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>> execute() {
+ return FlinkWriteHelper.newInstance().write(instantTime, inputRecords,
context, table,
+ config.shouldCombineBeforeInsert(),
config.getInsertShuffleParallelism(), this, false);
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
index 41cfc8a..b5a3cc0 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
@@ -104,4 +104,8 @@ public enum WriteOperationType {
public static boolean isChangingRecords(WriteOperationType operationType) {
return operationType == UPSERT || operationType == UPSERT_PREPPED ||
operationType == DELETE;
}
+
+ public static boolean isOverwrite(WriteOperationType operationType) {
+ return operationType == INSERT_OVERWRITE || operationType ==
INSERT_OVERWRITE_TABLE;
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
index ea36f67..4956c14 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
@@ -41,6 +41,17 @@ public class CommitUtils {
private static final Logger LOG = LogManager.getLogger(CommitUtils.class);
/**
+ * Gets the commit action type for given write operation and table type.
+ */
+ public static String getCommitActionType(WriteOperationType operation,
HoodieTableType tableType) {
+ if (operation == WriteOperationType.INSERT_OVERWRITE || operation ==
WriteOperationType.INSERT_OVERWRITE_TABLE) {
+ return HoodieTimeline.REPLACE_COMMIT_ACTION;
+ } else {
+ return getCommitActionType(tableType);
+ }
+ }
+
+ /**
* Gets the commit action type for given table type.
*/
public static String getCommitActionType(HoodieTableType tableType) {
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/InstantGenerateOperator.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/InstantGenerateOperator.java
index 9413797..3c4fb1e 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/InstantGenerateOperator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/InstantGenerateOperator.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.streamer.FlinkStreamerConfig;
@@ -188,7 +189,8 @@ public class InstantGenerateOperator extends
AbstractStreamOperator<HoodieRecord
*/
private String startNewInstant(long checkpointId) {
String newTime = writeClient.startCommit();
- this.writeClient.transitionRequestedToInflight(this.cfg.tableType,
newTime);
+ final String actionType =
CommitUtils.getCommitActionType(HoodieTableType.valueOf(this.cfg.tableType));
+ this.writeClient.transitionRequestedToInflight(actionType, newTime);
LOG.info("create instant [{}], at checkpoint [{}]", newTime, checkpointId);
return newTime;
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index b0321ac..8244226 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -21,7 +21,9 @@ package org.apache.hudi.sink;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.ObjectSizeCalculator;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
@@ -129,6 +131,11 @@ public class StreamWriteFunction<K, I, O>
private transient OperatorEventGateway eventGateway;
/**
+ * Commit action type.
+ */
+ private transient String actionType;
+
+ /**
* Constructs a StreamingSinkFunction.
*
* @param config The config options
@@ -141,6 +148,9 @@ public class StreamWriteFunction<K, I, O>
public void open(Configuration parameters) throws IOException {
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.writeClient = StreamerUtil.createWriteClient(this.config,
getRuntimeContext());
+ this.actionType = CommitUtils.getCommitActionType(
+ WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
+ HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));
initBuffer();
initWriteFunction();
}
@@ -166,6 +176,7 @@ public class StreamWriteFunction<K, I, O>
@Override
public void close() {
if (this.writeClient != null) {
+ this.writeClient.cleanHandles();
this.writeClient.close();
}
}
@@ -224,6 +235,12 @@ public class StreamWriteFunction<K, I, O>
case UPSERT:
this.writeFunction = (records, instantTime) ->
this.writeClient.upsert(records, instantTime);
break;
+ case INSERT_OVERWRITE:
+ this.writeFunction = (records, instantTime) ->
this.writeClient.insertOverwrite(records, instantTime);
+ break;
+ case INSERT_OVERWRITE_TABLE:
+ this.writeFunction = (records, instantTime) ->
this.writeClient.insertOverwriteTable(records, instantTime);
+ break;
default:
throw new RuntimeException("Unsupported write operation : " +
writeOperation);
}
@@ -315,7 +332,7 @@ public class StreamWriteFunction<K, I, O>
@SuppressWarnings("unchecked, rawtypes")
private void flushBucket(DataBucket bucket) {
- this.currentInstant =
this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE));
+ this.currentInstant =
this.writeClient.getLastPendingInstant(this.actionType);
if (this.currentInstant == null) {
// in case there are empty checkpoints that has no input data
LOG.info("No inflight instant when flushing data, cancel.");
@@ -339,7 +356,7 @@ public class StreamWriteFunction<K, I, O>
@SuppressWarnings("unchecked, rawtypes")
private void flushRemaining(boolean isEndInput) {
- this.currentInstant =
this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE));
+ this.currentInstant =
this.writeClient.getLastPendingInstant(this.actionType);
if (this.currentInstant == null) {
// in case there are empty checkpoints that has no input data
LOG.info("No inflight instant when flushing data, cancel.");
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 54a7603..6244d65 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -20,7 +20,11 @@ package org.apache.hudi.sink;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
@@ -40,10 +44,14 @@ import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
+import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -117,6 +125,11 @@ public class StreamWriteOperatorCoordinator
private HiveSyncContext hiveSyncContext;
/**
+ * The table state.
+ */
+ private transient TableState tableState;
+
+ /**
* Constructs a StreamingSinkOperatorCoordinator.
*
* @param conf The config options
@@ -135,8 +148,8 @@ public class StreamWriteOperatorCoordinator
public void start() throws Exception {
// initialize event buffer
reset();
- // writeClient
this.writeClient = StreamerUtil.createWriteClient(conf, null);
+ this.tableState = TableState.create(conf);
// init table, create it if not exists.
initTableIfNotExists(this.conf);
// start a new instant
@@ -214,14 +227,16 @@ public class StreamWriteOperatorCoordinator
}
private void startInstant() {
- this.instant = this.writeClient.startCommit();
-
this.writeClient.transitionRequestedToInflight(conf.getString(FlinkOptions.TABLE_TYPE),
this.instant);
+ final String instant = HoodieActiveTimeline.createNewInstantTime();
+ this.writeClient.startCommitWithTime(instant, tableState.commitAction);
+ this.instant = instant;
+ this.writeClient.transitionRequestedToInflight(tableState.commitAction,
this.instant);
LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,
this.conf.getString(FlinkOptions.TABLE_NAME),
conf.getString(FlinkOptions.TABLE_TYPE));
}
@Override
- public void resetToCheckpoint(long checkpointID, @Nullable byte[]
checkpointData) throws Exception {
+ public void resetToCheckpoint(long checkpointID, @Nullable byte[]
checkpointData) {
// no operation
}
@@ -310,6 +325,7 @@ public class StreamWriteOperatorCoordinator
}
/** Performs the actual commit action. */
+ @SuppressWarnings("unchecked")
private void doCommit(List<WriteStatus> writeResults) {
// commit or rollback
long totalErrorRecords =
writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
@@ -323,7 +339,11 @@ public class StreamWriteOperatorCoordinator
+ totalErrorRecords + "/" + totalRecords);
}
- boolean success = writeClient.commit(this.instant, writeResults,
Option.of(checkpointCommitMetadata));
+ final Map<String, List<String>> partitionToReplacedFileIds =
tableState.isOverwrite
+ ?
writeClient.getPartitionToReplacedFileIds(tableState.operationType,
writeResults)
+ : Collections.emptyMap();
+ boolean success = writeClient.commit(this.instant, writeResults,
Option.of(checkpointCommitMetadata),
+ tableState.commitAction, partitionToReplacedFileIds);
if (success) {
reset();
LOG.info("Commit instant [{}] success!", this.instant);
@@ -401,4 +421,26 @@ public class StreamWriteOperatorCoordinator
return new StreamWriteOperatorCoordinator(this.conf, context);
}
}
+
+ /**
+ * Remember some table state variables.
+ */
+ private static class TableState implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final WriteOperationType operationType;
+ private final String commitAction;
+ private final boolean isOverwrite;
+
+ private TableState(Configuration conf) {
+ this.operationType =
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
+ this.commitAction = CommitUtils.getCommitActionType(this.operationType,
+
HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE).toUpperCase(Locale.ROOT)));
+ this.isOverwrite = WriteOperationType.isOverwrite(this.operationType);
+ }
+
+ public static TableState create(Configuration conf) {
+ return new TableState(conf);
+ }
+ }
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index 7e017cc..f765e9d 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -127,6 +127,7 @@ public class BucketAssignFunction<K, I, O extends
HoodieRecord<?>>
this.bucketAssigner = BucketAssigners.create(
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks(),
+
WriteOperationType.isOverwrite(WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION))),
HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)),
context,
writeConfig);
@@ -190,7 +191,9 @@ public class BucketAssignFunction<K, I, O extends
HoodieRecord<?>>
default:
throw new AssertionError();
}
- this.indexState.put(hoodieKey, location);
+ if (isChangingRecords) {
+ this.indexState.put(hoodieKey, location);
+ }
}
record.unseal();
record.setCurrentLocation(location);
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java
index 237ec27..1c28e6d 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java
@@ -33,19 +33,24 @@ public abstract class BucketAssigners {
/**
* Creates a {@code BucketAssigner}.
*
- * @param taskID The task ID
- * @param numTasks The number of tasks
+ * @param taskID The task ID
+ * @param numTasks The number of tasks
+ * @param isOverwrite Whether the write operation is OVERWRITE
* @param tableType The table type
- * @param context The engine context
- * @param config The configuration
+ * @param context The engine context
+ * @param config The configuration
* @return the bucket assigner instance
*/
public static BucketAssigner create(
int taskID,
int numTasks,
+ boolean isOverwrite,
HoodieTableType tableType,
HoodieFlinkEngineContext context,
HoodieWriteConfig config) {
+ if (isOverwrite) {
+ return new OverwriteBucketAssigner(taskID, numTasks, context, config);
+ }
switch (tableType) {
case COPY_ON_WRITE:
return new BucketAssigner(taskID, numTasks, context, config);
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/OverwriteBucketAssigner.java
similarity index 55%
copy from
hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java
copy to
hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/OverwriteBucketAssigner.java
index 237ec27..7e2320e 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/OverwriteBucketAssigner.java
@@ -19,40 +19,29 @@
package org.apache.hudi.sink.partitioner;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
-import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.sink.partitioner.delta.DeltaBucketAssigner;
+import org.apache.hudi.table.action.commit.SmallFile;
+
+import java.util.Collections;
+import java.util.List;
/**
- * Utilities for {@code BucketAssigner}.
+ * BucketAssigner for INSERT OVERWRITE and INSERT OVERWRITE TABLE operations,
+ * this assigner always skip the existing small files because of the
'OVERWRITE' semantics.
+ *
+ * <p>Note: assumes the index can always index log files for Flink write.
*/
-public abstract class BucketAssigners {
-
- private BucketAssigners() {}
-
- /**
- * Creates a {@code BucketAssigner}.
- *
- * @param taskID The task ID
- * @param numTasks The number of tasks
- * @param tableType The table type
- * @param context The engine context
- * @param config The configuration
- * @return the bucket assigner instance
- */
- public static BucketAssigner create(
+public class OverwriteBucketAssigner extends BucketAssigner {
+ public OverwriteBucketAssigner(
int taskID,
int numTasks,
- HoodieTableType tableType,
HoodieFlinkEngineContext context,
HoodieWriteConfig config) {
- switch (tableType) {
- case COPY_ON_WRITE:
- return new BucketAssigner(taskID, numTasks, context, config);
- case MERGE_ON_READ:
- return new DeltaBucketAssigner(taskID, numTasks, context, config);
- default:
- throw new AssertionError();
- }
+ super(taskID, numTasks, context, config);
+ }
+
+ @Override
+ protected List<SmallFile> getSmallFiles(String partitionPath) {
+ return Collections.emptyList();
}
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index b52e0ca..c3427f3 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -19,6 +19,7 @@
package org.apache.hudi.table;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperatorFactory;
@@ -40,6 +41,7 @@ import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
@@ -49,10 +51,11 @@ import java.util.Map;
/**
* Hoodie table sink.
*/
-public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning
{
+public class HoodieTableSink implements DynamicTableSink,
SupportsPartitioning, SupportsOverwrite {
private final Configuration conf;
private final TableSchema schema;
+ private boolean overwrite = false;
public HoodieTableSink(Configuration conf, TableSchema schema) {
this.conf = conf;
@@ -129,7 +132,21 @@ public class HoodieTableSink implements DynamicTableSink,
SupportsPartitioning {
}
@Override
- public void applyStaticPartition(Map<String, String> map) {
- // no operation
+ public void applyStaticPartition(Map<String, String> partition) {
+ // #applyOverwrite should have been invoked.
+ if (this.overwrite) {
+ final String operationType;
+ if (partition.size() > 0) {
+ operationType = WriteOperationType.INSERT_OVERWRITE.value();
+ } else {
+ operationType = WriteOperationType.INSERT_OVERWRITE_TABLE.value();
+ }
+ this.conf.setString(FlinkOptions.OPERATION, operationType);
+ }
+ }
+
+ @Override
+ public void applyOverwrite(boolean b) {
+ this.overwrite = b;
}
}
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index 0afd414..a2fdf22 100644
---
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -20,6 +20,7 @@ package org.apache.hudi.sink;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.configuration.FlinkOptions;
@@ -104,9 +105,8 @@ public class TestStreamWriteOperatorCoordinator {
coordinator.handleEventFromOperator(1, event1);
coordinator.notifyCheckpointComplete(1);
- String inflight = coordinator.getWriteClient()
- .getInflightAndRequestedInstant(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
- String lastCompleted =
coordinator.getWriteClient().getLastCompletedInstant(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
+ String inflight =
coordinator.getWriteClient().getLastPendingInstant(HoodieTableType.COPY_ON_WRITE);
+ String lastCompleted =
coordinator.getWriteClient().getLastCompletedInstant(HoodieTableType.COPY_ON_WRITE);
assertThat("Instant should be complete", lastCompleted, is(instant));
assertNotEquals("", inflight, "Should start a new instant");
assertNotEquals(instant, inflight, "Should start a new instant");
@@ -156,7 +156,7 @@ public class TestStreamWriteOperatorCoordinator {
assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1),
"Returns early for empty write results");
- String lastCompleted =
coordinator.getWriteClient().getLastCompletedInstant(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
+ String lastCompleted =
coordinator.getWriteClient().getLastCompletedInstant(HoodieTableType.COPY_ON_WRITE);
assertNull(lastCompleted, "Returns early for empty write results");
assertNull(coordinator.getEventBuffer()[0]);
@@ -172,7 +172,7 @@ public class TestStreamWriteOperatorCoordinator {
coordinator.handleEventFromOperator(1, event1);
assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(2),
"Commits the instant with partial events anyway");
- lastCompleted =
coordinator.getWriteClient().getLastCompletedInstant(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
+ lastCompleted =
coordinator.getWriteClient().getLastCompletedInstant(HoodieTableType.COPY_ON_WRITE);
assertThat("Commits the instant with partial events anyway",
lastCompleted, is(instant));
}
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index f373ab8..9e417e3 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -92,7 +92,7 @@ public class TestWriteCopyOnWrite {
public void before() throws Exception {
final String basePath = tempFile.getAbsolutePath();
conf = TestConfigurations.getDefaultConf(basePath);
- conf.setString(FlinkOptions.TABLE_TYPE, getTableType());
+ conf.setString(FlinkOptions.TABLE_TYPE, getTableType().name());
setUp(conf);
this.funcWrapper = new
StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
}
@@ -125,8 +125,7 @@ public class TestWriteCopyOnWrite {
// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
- String instant = funcWrapper.getWriteClient()
- .getInflightAndRequestedInstant(getTableType());
+ String instant =
funcWrapper.getWriteClient().getLastPendingInstant(getTableType());
final OperatorEvent nextEvent = funcWrapper.getNextEvent();
MatcherAssert.assertThat("The operator expect to send an event",
nextEvent, instanceOf(BatchWriteSuccessEvent.class));
@@ -152,7 +151,7 @@ public class TestWriteCopyOnWrite {
funcWrapper.checkpointFunction(2);
String instant2 = funcWrapper.getWriteClient()
- .getInflightAndRequestedInstant(getTableType());
+ .getLastPendingInstant(getTableType());
assertNotEquals(instant, instant2);
final OperatorEvent nextEvent2 = funcWrapper.getNextEvent();
@@ -181,7 +180,7 @@ public class TestWriteCopyOnWrite {
funcWrapper.checkpointFunction(1);
String instant = funcWrapper.getWriteClient()
- .getInflightAndRequestedInstant(getTableType());
+ .getLastPendingInstant(getTableType());
assertNotNull(instant);
final OperatorEvent nextEvent = funcWrapper.getNextEvent();
@@ -223,7 +222,7 @@ public class TestWriteCopyOnWrite {
funcWrapper.checkpointFunction(1);
String instant = funcWrapper.getWriteClient()
- .getInflightAndRequestedInstant(getTableType());
+ .getLastPendingInstant(getTableType());
final OperatorEvent nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent,
instanceOf(BatchWriteSuccessEvent.class));
@@ -309,7 +308,7 @@ public class TestWriteCopyOnWrite {
funcWrapper.checkpointFunction(2);
String instant = funcWrapper.getWriteClient()
- .getInflightAndRequestedInstant(getTableType());
+ .getLastPendingInstant(getTableType());
nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent,
instanceOf(BatchWriteSuccessEvent.class));
@@ -354,7 +353,7 @@ public class TestWriteCopyOnWrite {
funcWrapper.checkpointFunction(2);
String instant = funcWrapper.getWriteClient()
- .getInflightAndRequestedInstant(getTableType());
+ .getLastPendingInstant(getTableType());
nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent,
instanceOf(BatchWriteSuccessEvent.class));
@@ -409,7 +408,7 @@ public class TestWriteCopyOnWrite {
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the
event");
String instant = funcWrapper.getWriteClient()
- .getInflightAndRequestedInstant(getTableType());
+ .getLastPendingInstant(getTableType());
funcWrapper.checkpointComplete(1);
@@ -493,7 +492,7 @@ public class TestWriteCopyOnWrite {
funcWrapper.checkpointFunction(2);
String instant = funcWrapper.getWriteClient()
- .getInflightAndRequestedInstant(getTableType());
+ .getLastPendingInstant(getTableType());
nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent,
instanceOf(BatchWriteSuccessEvent.class));
@@ -516,7 +515,7 @@ public class TestWriteCopyOnWrite {
@SuppressWarnings("rawtypes")
private void checkInflightInstant(HoodieFlinkWriteClient writeClient) {
- final String instant =
writeClient.getInflightAndRequestedInstant(getTableType());
+ final String instant = writeClient.getLastPendingInstant(getTableType());
assertNotNull(instant);
}
@@ -528,7 +527,7 @@ public class TestWriteCopyOnWrite {
final String instant;
switch (state) {
case REQUESTED:
- instant = writeClient.getInflightAndRequestedInstant(getTableType());
+ instant = writeClient.getLastPendingInstant(getTableType());
break;
case COMPLETED:
instant = writeClient.getLastCompletedInstant(getTableType());
@@ -539,8 +538,8 @@ public class TestWriteCopyOnWrite {
assertThat(instant, is(instantStr));
}
- protected String getTableType() {
- return HoodieTableType.COPY_ON_WRITE.name();
+ protected HoodieTableType getTableType() {
+ return HoodieTableType.COPY_ON_WRITE;
}
protected void checkWrittenData(File baseFile, Map<String, String> expected)
throws Exception {
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index 5ce8ae2..b24881a 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -90,7 +90,7 @@ public class TestWriteMergeOnRead extends
TestWriteCopyOnWrite {
}
@Override
- protected String getTableType() {
- return HoodieTableType.MERGE_ON_READ.name();
+ protected HoodieTableType getTableType() {
+ return HoodieTableType.MERGE_ON_READ;
}
}
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
index 2821cae..98d1211 100644
---
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
+++
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
@@ -53,7 +53,7 @@ public class TestWriteMergeOnReadWithCompact extends
TestWriteCopyOnWrite {
}
@Override
- protected String getTableType() {
- return HoodieTableType.MERGE_ON_READ.name();
+ protected HoodieTableType getTableType() {
+ return HoodieTableType.MERGE_ON_READ;
}
}
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index 56cbb55..d2ba3e6 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.util.StreamerUtil;
@@ -223,7 +224,7 @@ public class HoodieDataSourceITCase extends
AbstractTestBase {
TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);
String latestCommit = StreamerUtil.createWriteClient(conf, null)
- .getLastCompletedInstant(FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+ .getLastCompletedInstant(HoodieTableType.MERGE_ON_READ);
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
@@ -276,6 +277,53 @@ public class HoodieDataSourceITCase extends
AbstractTestBase {
+ "id8,Han,56,1970-01-01T00:00:08,par4]");
}
+ @ParameterizedTest
+ @EnumSource(value = ExecMode.class)
+ void testInsertOverwrite(ExecMode execMode) {
+ TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv :
streamTableEnv;
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+ String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1",
options);
+ tableEnv.executeSql(hoodieTableDDL);
+
+ final String insertInto1 = "insert into t1 values\n"
+ + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n"
+ + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n"
+ + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n"
+ + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n"
+ + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n"
+ + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n"
+ + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n"
+ + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')";
+
+ execInsertSql(tableEnv, insertInto1);
+
+ // overwrite partition 'par1' and increase in age by 1
+ final String insertInto2 = "insert overwrite t1
partition(`partition`='par1') values\n"
+ + "('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01'),\n"
+ + "('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02')\n";
+
+ execInsertSql(tableEnv, insertInto2);
+
+ List<Row> result1 = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+ assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT_OVERWRITE);
+
+ // overwrite the whole table
+ final String insertInto3 = "insert overwrite t1 values\n"
+ + "('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01', 'par1'),\n"
+ + "('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02', 'par2')\n";
+
+ execInsertSql(tableEnv, insertInto3);
+
+ List<Row> result2 = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+ final String expected = "["
+ + "id1,Danny,24,1970-01-01T00:00:01,par1, "
+ + "id2,Stephen,34,1970-01-01T00:00:02,par2]";
+ assertRowsEquals(result2, expected);
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 2201aeb..4a2466c 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -167,6 +167,26 @@ public class TestData {
TimestampData.fromEpochMillis(8000), StringData.fromString("par4"))
);
+ // data set of test_source.data with partition 'par1' overwrite
+ public static List<RowData> DATA_SET_SOURCE_INSERT_OVERWRITE = Arrays.asList(
+ insertRow(StringData.fromString("id1"), StringData.fromString("Danny"),
24,
+ TimestampData.fromEpochMillis(1000), StringData.fromString("par1")),
+ insertRow(StringData.fromString("id2"),
StringData.fromString("Stephen"), 34,
+ TimestampData.fromEpochMillis(2000), StringData.fromString("par1")),
+ insertRow(StringData.fromString("id3"), StringData.fromString("Julian"),
53,
+ TimestampData.fromEpochMillis(3000), StringData.fromString("par2")),
+ insertRow(StringData.fromString("id4"), StringData.fromString("Fabian"),
31,
+ TimestampData.fromEpochMillis(4000), StringData.fromString("par2")),
+ insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"),
18,
+ TimestampData.fromEpochMillis(5000), StringData.fromString("par3")),
+ insertRow(StringData.fromString("id6"), StringData.fromString("Emma"),
20,
+ TimestampData.fromEpochMillis(6000), StringData.fromString("par3")),
+ insertRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44,
+ TimestampData.fromEpochMillis(7000), StringData.fromString("par4")),
+ insertRow(StringData.fromString("id8"), StringData.fromString("Han"), 56,
+ TimestampData.fromEpochMillis(8000), StringData.fromString("par4"))
+ );
+
public static List<RowData> DATA_SET_UPDATE_DELETE = Arrays.asList(
// this is update
insertRow(StringData.fromString("id1"), StringData.fromString("Danny"),
24,
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
index 632a155..00b7116 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -29,10 +29,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
@@ -193,14 +190,6 @@ public class DataSourceUtils {
return new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jssc),
createHoodieConfig(schemaStr, basePath, tblName, parameters));
}
- public static String getCommitActionType(WriteOperationType operation,
HoodieTableType tableType) {
- if (operation == WriteOperationType.INSERT_OVERWRITE || operation ==
WriteOperationType.INSERT_OVERWRITE_TABLE) {
- return HoodieTimeline.REPLACE_COMMIT_ACTION;
- } else {
- return CommitUtils.getCommitActionType(tableType);
- }
- }
-
public static HoodieWriteResult doWriteOperation(SparkRDDWriteClient client,
JavaRDD<HoodieRecord> hoodieRecords,
String instantTime,
WriteOperationType operation) throws HoodieException {
switch (operation) {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
index e804163..b2e7134 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
@@ -18,7 +18,6 @@
package org.apache.hudi.internal;
-import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieWriteStat;
@@ -27,6 +26,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
@@ -79,7 +79,7 @@ public class DataSourceInternalWriterHelper {
public void commit(List<HoodieWriteStat> writeStatList) {
try {
writeClient.commitStats(instantTime, writeStatList, Option.of(new
HashMap<>()),
- DataSourceUtils.getCommitActionType(operationType,
metaClient.getTableType()));
+ CommitUtils.getCommitActionType(operationType,
metaClient.getTableType()));
} catch (Exception ioe) {
throw new HoodieException(ioe.getMessage(), ioe);
} finally {
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 5b87278..829690f 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -19,7 +19,6 @@ package org.apache.hudi
import java.util
import java.util.Properties
-
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
@@ -33,7 +32,7 @@ import org.apache.hudi.common.config.{HoodieMetadataConfig,
TypedProperties}
import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType,
WriteOperationType}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
-import org.apache.hudi.common.util.ReflectionUtils
+import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP,
BOOTSTRAP_INDEX_CLASS_PROP, DEFAULT_BOOTSTRAP_INDEX_CLASS}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
@@ -126,7 +125,7 @@ private[hudi] object HoodieSparkSqlWriter {
tableConfig = tableMetaClient.getTableConfig
}
- val commitActionType = DataSourceUtils.getCommitActionType(operation,
tableConfig.getTableType)
+ val commitActionType = CommitUtils.getCommitActionType(operation,
tableConfig.getTableType)
// short-circuit if bulk_insert via row is enabled.
// scalastyle:off