This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9b01d2f [HUDI-1915] Fix the file id for write data buffer before
flushing (#2966)
9b01d2f is described below
commit 9b01d2f864e5cc4a559cfd4199136bca0979b095
Author: Danny Chan <[email protected]>
AuthorDate: Thu May 20 10:20:08 2021 +0800
[HUDI-1915] Fix the file id for write data buffer before flushing (#2966)
---
.../hudi/table/HoodieFlinkMergeOnReadTable.java | 8 +++
.../FlinkMergeOnReadRollbackActionExecutor.java | 77 ++++++++++++++++++++++
.../apache/hudi/configuration/FlinkOptions.java | 2 +-
.../org/apache/hudi/sink/StreamWriteFunction.java | 24 ++++---
.../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 64 ++++++++++++++++++
.../test/java/org/apache/hudi/utils/TestData.java | 7 ++
6 files changed, 171 insertions(+), 11 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
index b7f177b..8bcb979 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
@@ -19,11 +19,13 @@
package org.apache.hudi.table;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -34,6 +36,7 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
import
org.apache.hudi.table.action.commit.delta.FlinkUpsertDeltaCommitActionExecutor;
import
org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor;
import
org.apache.hudi.table.action.compact.FlinkScheduleCompactionActionExecutor;
+import
org.apache.hudi.table.action.rollback.FlinkMergeOnReadRollbackActionExecutor;
import java.util.List;
import java.util.Map;
@@ -87,5 +90,10 @@ public class HoodieFlinkMergeOnReadTable<T extends
HoodieRecordPayload>
throw new HoodieNotSupportedException("Compaction is supported as a
separate pipeline, "
+ "should not invoke directly through HoodieFlinkMergeOnReadTable");
}
+
+ @Override
+ public HoodieRollbackMetadata rollback(HoodieEngineContext context, String
rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) {
+ return new FlinkMergeOnReadRollbackActionExecutor(context, config, this,
rollbackInstantTime, commitInstant, deleteInstants).execute();
+ }
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMergeOnReadRollbackActionExecutor.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMergeOnReadRollbackActionExecutor.java
new file mode 100644
index 0000000..25b20a5
--- /dev/null
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMergeOnReadRollbackActionExecutor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+
+import java.io.IOException;
+import java.util.List;
+
+@SuppressWarnings("checkstyle:LineLength")
+public class FlinkMergeOnReadRollbackActionExecutor<T extends
HoodieRecordPayload> extends
+ BaseMergeOnReadRollbackActionExecutor<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>> {
+ public FlinkMergeOnReadRollbackActionExecutor(HoodieEngineContext context,
+ HoodieWriteConfig config,
+ HoodieTable<T,
List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
+ String instantTime,
+ HoodieInstant commitInstant,
+ boolean deleteInstants) {
+ super(context, config, table, instantTime, commitInstant, deleteInstants);
+ }
+
+ public FlinkMergeOnReadRollbackActionExecutor(HoodieEngineContext context,
+ HoodieWriteConfig config,
+ HoodieTable<T,
List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
+ String instantTime,
+ HoodieInstant commitInstant,
+ boolean deleteInstants,
+ boolean skipTimelinePublish,
+ boolean
useMarkerBasedStrategy) {
+ super(context, config, table, instantTime, commitInstant, deleteInstants,
skipTimelinePublish, useMarkerBasedStrategy);
+ }
+
+ @Override
+ protected RollbackStrategy getRollbackStrategy() {
+ if (useMarkerBasedStrategy) {
+ return new FlinkMarkerBasedRollbackStrategy(table, context, config,
instantTime);
+ } else {
+ return this::executeRollbackUsingFileListing;
+ }
+ }
+
+ @Override
+ protected List<HoodieRollbackStat>
executeRollbackUsingFileListing(HoodieInstant resolvedInstant) {
+ List<ListingBasedRollbackRequest> rollbackRequests;
+ try {
+ rollbackRequests =
RollbackUtils.generateRollbackRequestsUsingFileListingMOR(resolvedInstant,
table, context);
+ } catch (IOException e) {
+ throw new HoodieIOException("Error generating rollback requests by file
listing.", e);
+ }
+ return new ListingBasedRollbackHelper(table.getMetaClient(),
config).performRollback(context, resolvedInstant, rollbackRequests);
+ }
+}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 3f1b17b..4f1be0b 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -91,7 +91,7 @@ public class FlinkOptions {
.booleanType()
.defaultValue(false)
.withDescription("Whether to update index for the old partition path\n"
- + "if same key record with different partition path came in, default
true");
+ + "if same key record with different partition path came in, default
false");
// ------------------------------------------------------------------------
// Read Options
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 2a38f31..a1b3346 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -308,26 +308,28 @@ public class StreamWriteFunction<K, I, O>
}
/**
- * Prepare the write data buffer:
- *
- * <ul>
- * <li>Patch up all the records with correct partition path;</li>
- * <li>Patch up the first record with correct partition path and
fileID.</li>
- * </ul>
+ * Prepare the write data buffer: patch up all the records with correct
partition path.
*/
public List<HoodieRecord> writeBuffer() {
// rewrite all the records with new record key
- List<HoodieRecord> recordList = records.stream()
+ return records.stream()
.map(record -> record.toHoodieRecord(partitionPath))
.collect(Collectors.toList());
+ }
+
+ /**
+ * Sets up before flush: patch up the first record with correct partition
path and fileID.
+ *
+ * <p>Note: the method may modify the given records {@code records}.
+ */
+ public void preWrite(List<HoodieRecord> records) {
// rewrite the first record with expected fileID
- HoodieRecord<?> first = recordList.get(0);
+ HoodieRecord<?> first = records.get(0);
HoodieRecord<?> record = new HoodieRecord<>(first.getKey(),
first.getData());
HoodieRecordLocation newLoc = new
HoodieRecordLocation(first.getCurrentLocation().getInstantTime(), fileID);
record.setCurrentLocation(newLoc);
- recordList.set(0, record);
- return recordList;
+ records.set(0, record);
}
public void reset() {
@@ -469,6 +471,7 @@ public class StreamWriteFunction<K, I, O>
if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
records = FlinkWriteHelper.newInstance().deduplicateRecords(records,
(HoodieIndex) null, -1);
}
+ bucket.preWrite(records);
final List<WriteStatus> writeStatus = new
ArrayList<>(writeFunction.apply(records, instant));
final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder()
.taskID(taskID)
@@ -500,6 +503,7 @@ public class StreamWriteFunction<K, I, O>
if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
records =
FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null,
-1);
}
+ bucket.preWrite(records);
writeStatus.addAll(writeFunction.apply(records, currentInstant));
bucket.reset();
}
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index b962afb..d2d04ee 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -437,6 +437,70 @@ public class TestWriteCopyOnWrite {
}
@Test
+ public void testInsertWithDeduplication() throws Exception {
+ // reset the config option
+ conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch
size
+ conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true);
+ funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(),
conf);
+
+ // open the function and ingest data
+ funcWrapper.openFunction();
+ // Each record is 208 bytes. so 4 records expect to trigger a mini-batch
write
+ for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
+ funcWrapper.invoke(rowData);
+ }
+
+ Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
+ assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
+ assertThat("3 records expect to flush out as a mini-batch",
+ dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
+ is(2));
+
+ // this triggers the data write and event send
+ funcWrapper.checkpointFunction(1);
+ dataBuffer = funcWrapper.getDataBuffer();
+ assertThat("All data should be flushed out", dataBuffer.size(), is(0));
+
+ final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the
first event first
+ final OperatorEvent event2 = funcWrapper.getNextEvent();
+ assertThat("The operator expect to send an event", event2,
instanceOf(BatchWriteSuccessEvent.class));
+
+ funcWrapper.getCoordinator().handleEventFromOperator(0, event1);
+ funcWrapper.getCoordinator().handleEventFromOperator(0, event2);
+ assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the
event");
+
+ String instant = funcWrapper.getWriteClient()
+ .getLastPendingInstant(getTableType());
+
+ funcWrapper.checkpointComplete(1);
+
+ Map<String, String> expected = new HashMap<>();
+ expected.put("par1", "[id1,par1,id1,Danny,23,4,par1]");
+
+ checkWrittenData(tempFile, expected, 1);
+
+ // started a new instant already
+ checkInflightInstant(funcWrapper.getWriteClient());
+ checkInstantState(funcWrapper.getWriteClient(),
HoodieInstant.State.COMPLETED, instant);
+
+ // insert duplicates again
+ for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
+ funcWrapper.invoke(rowData);
+ }
+
+ funcWrapper.checkpointFunction(2);
+
+ final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the
first event first
+ final OperatorEvent event4 = funcWrapper.getNextEvent();
+ funcWrapper.getCoordinator().handleEventFromOperator(0, event3);
+ funcWrapper.getCoordinator().handleEventFromOperator(0, event4);
+ funcWrapper.checkpointComplete(2);
+
+ // Same the original base file content.
+ checkWrittenData(tempFile, expected, 1);
+ }
+
+ @Test
public void testInsertWithSmallBufferSize() throws Exception {
// reset the config option
conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes
buffer size
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index bb67661..fae0765 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -121,6 +121,13 @@ public class TestData {
TimestampData.fromEpochMillis(1), StringData.fromString("par1"))));
}
+ public static List<RowData> DATA_SET_INSERT_SAME_KEY = new ArrayList<>();
+ static {
+ IntStream.range(0, 5).forEach(i -> DATA_SET_INSERT_SAME_KEY.add(
+ insertRow(StringData.fromString("id1"),
StringData.fromString("Danny"), 23,
+ TimestampData.fromEpochMillis(i), StringData.fromString("par1"))));
+ }
+
// data set of test_source.data
public static List<RowData> DATA_SET_SOURCE_INSERT = Arrays.asList(
insertRow(StringData.fromString("id1"), StringData.fromString("Danny"),
23,