This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 86d1e39fb4e [HUDI-3661] Flink async compaction is not thread safe when
use watermark (#7399)
86d1e39fb4e is described below
commit 86d1e39fb4e971b11e8c6394f6611b7bd7089bd4
Author: Danny Chan <[email protected]>
AuthorDate: Wed Dec 7 18:31:26 2022 +0800
[HUDI-3661] Flink async compaction is not thread safe when use watermark
(#7399)
---
.../hudi/sink/clustering/ClusteringOperator.java | 8 +++-
.../{CompactFunction.java => CompactOperator.java} | 31 ++++++++++---
.../hudi/sink/compact/CompactionCommitEvent.java | 2 +-
.../hudi/sink/compact/HoodieFlinkCompactor.java | 3 +-
.../java/org/apache/hudi/sink/utils/Pipelines.java | 5 +-
.../sink/compact/ITTestHoodieFlinkCompactor.java | 5 +-
.../hudi/sink/utils/ClusteringFunctionWrapper.java | 2 +-
.../hudi/sink/utils/CompactFunctionWrapper.java | 53 +++++++++++-----------
.../sink/utils/StreamWriteFunctionWrapper.java | 2 +-
9 files changed, 66 insertions(+), 45 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index ca1cd54c1fe..e7bde41ca8b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -60,6 +60,7 @@ import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
@@ -159,7 +160,12 @@ public class ClusteringOperator extends
TableStreamOperator<ClusteringCommitEven
this.executor = NonThrownExecutor.builder(LOG).build();
}
- collector = new StreamRecordCollector<>(output);
+ this.collector = new StreamRecordCollector<>(output);
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) {
+ // no need to propagate the watermark
}
@Override
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
similarity index 82%
rename from
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
rename to
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
index db05b0dbabe..65f70ad6aaf 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
@@ -32,7 +32,11 @@ import org.apache.hudi.util.FlinkWriteClients;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,11 +45,12 @@ import java.io.IOException;
import java.util.List;
/**
- * Function to execute the actual compaction task assigned by the compaction
plan task.
+ * Operator to execute the actual compaction task assigned by the compaction
plan task.
* In order to execute scalable, the input should shuffle by the compact event
{@link CompactionPlanEvent}.
*/
-public class CompactFunction extends ProcessFunction<CompactionPlanEvent,
CompactionCommitEvent> {
- private static final Logger LOG =
LoggerFactory.getLogger(CompactFunction.class);
+public class CompactOperator extends TableStreamOperator<CompactionCommitEvent>
+ implements OneInputStreamOperator<CompactionPlanEvent,
CompactionCommitEvent> {
+ private static final Logger LOG =
LoggerFactory.getLogger(CompactOperator.class);
/**
* Config options.
@@ -72,22 +77,34 @@ public class CompactFunction extends
ProcessFunction<CompactionPlanEvent, Compac
*/
private transient NonThrownExecutor executor;
- public CompactFunction(Configuration conf) {
+ /**
+ * Output records collector.
+ */
+ private transient StreamRecordCollector<CompactionCommitEvent> collector;
+
+ public CompactOperator(Configuration conf) {
this.conf = conf;
this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf);
}
@Override
- public void open(Configuration parameters) throws Exception {
+ public void open() throws Exception {
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.writeClient = FlinkWriteClients.createWriteClient(conf,
getRuntimeContext());
if (this.asyncCompaction) {
this.executor = NonThrownExecutor.builder(LOG).build();
}
+ this.collector = new StreamRecordCollector<>(output);
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) {
+ // no need to propagate the watermark
}
@Override
- public void processElement(CompactionPlanEvent event, Context context,
Collector<CompactionCommitEvent> collector) throws Exception {
+ public void processElement(StreamRecord<CompactionPlanEvent> record) throws
Exception {
+ final CompactionPlanEvent event = record.getValue();
final String instantTime = event.getCompactionInstantTime();
final CompactionOperation compactionOperation = event.getOperation();
if (asyncCompaction) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitEvent.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitEvent.java
index 398dfcf6195..faad4c2338d 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitEvent.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitEvent.java
@@ -24,7 +24,7 @@ import java.io.Serializable;
import java.util.List;
/**
- * Represents a commit event from the compaction task {@link CompactFunction}.
+ * Represents a commit event from the compaction task {@link CompactOperator}.
*/
public class CompactionCommitEvent implements Serializable {
private static final long serialVersionUID = 1L;
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
index 34576cfb017..1475a493c1a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -41,7 +41,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import
org.apache.flink.client.deployment.application.ApplicationExecutionException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -311,7 +310,7 @@ public class HoodieFlinkCompactor {
.rebalance()
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
- new ProcessOperator<>(new CompactFunction(conf)))
+ new CompactOperator(conf))
.setParallelism(compactionParallelism)
.addSink(new CompactionCommitSink(conf))
.name("compaction_commit")
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index a045a9276c5..d17213dcc04 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -39,7 +39,7 @@ import org.apache.hudi.sink.clustering.ClusteringOperator;
import org.apache.hudi.sink.clustering.ClusteringPlanEvent;
import org.apache.hudi.sink.clustering.ClusteringPlanOperator;
import org.apache.hudi.sink.common.WriteOperatorFactory;
-import org.apache.hudi.sink.compact.CompactFunction;
+import org.apache.hudi.sink.compact.CompactOperator;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionCommitSink;
import org.apache.hudi.sink.compact.CompactionPlanEvent;
@@ -57,7 +57,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
-import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
@@ -372,7 +371,7 @@ public class Pipelines {
.keyBy(plan -> plan.getOperation().getFileGroupId().getFileId())
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
- new ProcessOperator<>(new CompactFunction(conf)))
+ new CompactOperator(conf))
.setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
.addSink(new CompactionCommitSink(conf))
.name("compact_commit")
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
index 711b738288f..6157b5e9011 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
@@ -39,7 +39,6 @@ import org.apache.hudi.utils.TestSQL;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
@@ -158,7 +157,7 @@ public class ITTestHoodieFlinkCompactor {
.rebalance()
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
- new ProcessOperator<>(new CompactFunction(conf)))
+ new CompactOperator(conf))
.setParallelism(FlinkMiniCluster.DEFAULT_PARALLELISM)
.addSink(new CompactionCommitSink(conf))
.name("clean_commits")
@@ -282,7 +281,7 @@ public class ITTestHoodieFlinkCompactor {
.rebalance()
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
- new ProcessOperator<>(new CompactFunction(conf)))
+ new CompactOperator(conf))
.setParallelism(1)
.addSink(new CompactionCommitSink(conf))
.name("compaction_commit")
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ClusteringFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ClusteringFunctionWrapper.java
index 55a79915d47..e3b75cbf637 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ClusteringFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ClusteringFunctionWrapper.java
@@ -69,6 +69,7 @@ public class ClusteringFunctionWrapper {
private ClusteringCommitSink commitSink;
public ClusteringFunctionWrapper(Configuration conf, StreamTask<?, ?>
streamTask, StreamConfig streamConfig) {
+ this.conf = conf;
this.ioManager = new IOManagerAsync();
MockEnvironment environment = new MockEnvironmentBuilder()
.setTaskName("mockTask")
@@ -76,7 +77,6 @@ public class ClusteringFunctionWrapper {
.setIOManager(ioManager)
.build();
this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0,
environment);
- this.conf = conf;
this.streamTask = streamTask;
this.streamConfig = streamConfig;
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
index 1dba81ce2b7..78a8305c9c5 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
@@ -19,7 +19,7 @@
package org.apache.hudi.sink.utils;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
-import org.apache.hudi.sink.compact.CompactFunction;
+import org.apache.hudi.sink.compact.CompactOperator;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionCommitSink;
import org.apache.hudi.sink.compact.CompactionPlanEvent;
@@ -33,14 +33,14 @@ import org.apache.flink.runtime.memory.MemoryManager;
import
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-import java.util.List;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
/**
- * A wrapper class to manipulate the {@link
org.apache.hudi.sink.compact.CompactFunction} instance for testing.
+ * A wrapper class to manipulate the {@link CompactOperator} instance for
testing.
*/
public class CompactFunctionWrapper {
private final Configuration conf;
@@ -48,20 +48,28 @@ public class CompactFunctionWrapper {
private final IOManager ioManager;
private final StreamingRuntimeContext runtimeContext;
+ private final StreamTask<?, ?> streamTask;
+ private final StreamConfig streamConfig;
+
/**
* Function that generates the {@link HoodieCompactionPlan}.
*/
private CompactionPlanOperator compactionPlanOperator;
+ /**
+ * Output to collect the compaction commit events.
+ */
+ private CollectorOutput<CompactionCommitEvent> commitEventOutput;
/**
* Function that executes the compaction task.
*/
- private CompactFunction compactFunction;
+ private CompactOperator compactOperator;
/**
* Stream sink to handle compaction commits.
*/
private CompactionCommitSink commitSink;
- public CompactFunctionWrapper(Configuration conf) throws Exception {
+ public CompactFunctionWrapper(Configuration conf, StreamTask<?, ?>
streamTask, StreamConfig streamConfig) {
+ this.conf = conf;
this.ioManager = new IOManagerAsync();
MockEnvironment environment = new MockEnvironmentBuilder()
.setTaskName("mockTask")
@@ -69,19 +77,23 @@ public class CompactFunctionWrapper {
.setIOManager(ioManager)
.build();
this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0,
environment);
- this.conf = conf;
+ this.streamTask = streamTask;
+ this.streamConfig = streamConfig;
}
public void openFunction() throws Exception {
compactionPlanOperator = new CompactionPlanOperator(conf);
compactionPlanOperator.open();
- compactFunction = new CompactFunction(conf);
- compactFunction.setRuntimeContext(runtimeContext);
- compactFunction.open(conf);
+ compactOperator = new CompactOperator(conf);
+ // CAUTION: deprecated API used.
+ compactOperator.setProcessingTimeService(new TestProcessingTimeService());
+ commitEventOutput = new CollectorOutput<>();
+ compactOperator.setup(streamTask, streamConfig, commitEventOutput);
+ compactOperator.open();
final NonThrownExecutor syncExecutor = new MockCoordinatorExecutor(
new MockOperatorCoordinatorContext(new OperatorID(), 1));
- compactFunction.setExecutor(syncExecutor);
+ compactOperator.setExecutor(syncExecutor);
commitSink = new CompactionCommitSink(conf);
commitSink.setRuntimeContext(runtimeContext);
@@ -94,22 +106,11 @@ public class CompactFunctionWrapper {
compactionPlanOperator.setOutput(output);
compactionPlanOperator.notifyCheckpointComplete(checkpointID);
// collect the CompactCommitEvents
- List<CompactionCommitEvent> compactCommitEvents = new ArrayList<>();
for (CompactionPlanEvent event : output.getRecords()) {
- compactFunction.processElement(event, null, new
Collector<CompactionCommitEvent>() {
- @Override
- public void collect(CompactionCommitEvent event) {
- compactCommitEvents.add(event);
- }
-
- @Override
- public void close() {
-
- }
- });
+ compactOperator.processElement(new StreamRecord<>(event));
}
// handle and commit the compaction
- for (CompactionCommitEvent event : compactCommitEvents) {
+ for (CompactionCommitEvent event : commitEventOutput.getRecords()) {
commitSink.invoke(event, null);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index a1a14456e3c..db8ff36962b 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -117,7 +117,6 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
// one function
this.coordinatorContext = new MockOperatorCoordinatorContext(new
OperatorID(), 1);
this.coordinator = new StreamWriteOperatorCoordinator(conf,
this.coordinatorContext);
- this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
this.bucketAssignFunctionContext = new MockBucketAssignFunctionContext();
this.stateInitializationContext = new MockStateInitializationContext();
this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf);
@@ -127,6 +126,7 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
.setConfig(new StreamConfig(conf))
.setExecutionConfig(new ExecutionConfig().enableObjectReuse())
.build();
+ this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf,
this.streamTask, this.streamConfig);
}
public void openFunction() throws Exception {