This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7281f2b73c50c2c6c90894e67f2b78e61986b4f7 Author: Yun Gao <[email protected]> AuthorDate: Mon May 31 20:40:25 2021 +0800 [hotfix] Refactor the initialization of StreamTask to expose CheckpointBarrierHandler --- .../io/checkpointing/InputProcessorUtil.java | 54 ---------------------- .../runtime/tasks/MultipleInputStreamTask.java | 5 ++ .../runtime/tasks/OneInputStreamTask.java | 45 ++++++++++++++---- .../flink/streaming/runtime/tasks/StreamTask.java | 4 ++ .../runtime/tasks/TwoInputStreamTask.java | 30 ++++++++++-- .../io/checkpointing/InputProcessorUtilTest.java | 19 +++++--- 6 files changed, 82 insertions(+), 75 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtil.java index 16c42b4..da1d2a2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtil.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtil.java @@ -36,12 +36,9 @@ import org.apache.flink.streaming.runtime.tasks.TimerService; import org.apache.flink.util.clock.Clock; import org.apache.flink.util.clock.SystemClock; -import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; - import java.time.Duration; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.concurrent.Callable; @@ -55,57 +52,6 @@ import java.util.stream.Stream; */ @Internal public class InputProcessorUtil { - @SuppressWarnings("unchecked") - public static CheckpointedInputGate createCheckpointedInputGate( - AbstractInvokable toNotifyOnCheckpoint, - StreamConfig config, - SubtaskCheckpointCoordinator checkpointCoordinator, - IndexedInputGate[] inputGates, - TaskIOMetricGroup taskIOMetricGroup, - String taskName, - MailboxExecutor mailboxExecutor, - TimerService timerService) { - CheckpointedInputGate[] checkpointedInputGates = - createCheckpointedMultipleInputGate( - toNotifyOnCheckpoint, - config, - checkpointCoordinator, - taskIOMetricGroup, - taskName, - mailboxExecutor, - new List[] {Arrays.asList(inputGates)}, - Collections.emptyList(), - timerService); - return Iterables.getOnlyElement(Arrays.asList(checkpointedInputGates)); - } - - /** - * @return an array of {@link CheckpointedInputGate} created for corresponding {@link - * InputGate}s supplied as parameters. - */ - public static CheckpointedInputGate[] createCheckpointedMultipleInputGate( - AbstractInvokable toNotifyOnCheckpoint, - StreamConfig config, - SubtaskCheckpointCoordinator checkpointCoordinator, - TaskIOMetricGroup taskIOMetricGroup, - String taskName, - MailboxExecutor mailboxExecutor, - List<IndexedInputGate>[] inputGates, - List<StreamTaskSourceInput<?>> sourceInputs, - TimerService timerService) { - CheckpointBarrierHandler barrierHandler = - createCheckpointBarrierHandler( - toNotifyOnCheckpoint, - config, - checkpointCoordinator, - taskName, - inputGates, - sourceInputs, - mailboxExecutor, - timerService); - return createCheckpointedMultipleInputGate( - mailboxExecutor, inputGates, taskIOMetricGroup, barrierHandler, config); - } public static CheckpointedInputGate[] createCheckpointedMultipleInputGate( MailboxExecutor mailboxExecutor, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java index 20d775a..1ec5906 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java @@ -49,6 +49,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.function.Function; @@ -175,6 +176,10 @@ public class MultipleInputStreamTask<OUT> getEnvironment().getTaskInfo()); } + protected Optional<CheckpointBarrierHandler> getCheckpointBarrierHandler() { + return Optional.ofNullable(checkpointBarrierHandler); + } + @Override public Future<Boolean> triggerCheckpointAsync( CheckpointMetaData metadata, CheckpointOptions options) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index c1296f3..c114ceb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -35,6 +35,7 @@ import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor; import org.apache.flink.streaming.runtime.io.StreamTaskInput; import org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput; import org.apache.flink.streaming.runtime.io.StreamTaskNetworkInputFactory; +import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler; import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate; import org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil; import org.apache.flink.streaming.runtime.metrics.WatermarkGauge; @@ -43,8 +44,15 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve; import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; +import org.apache.flink.shaded.curator4.com.google.common.collect.Iterables; + import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + import static org.apache.flink.streaming.api.graph.StreamConfig.requiresSorting; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -53,6 +61,8 @@ import static org.apache.flink.util.Preconditions.checkState; @Internal public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamOperator<IN, OUT>> { + @Nullable private CheckpointBarrierHandler checkpointBarrierHandler; + private final WatermarkGauge inputWatermarkGauge = new WatermarkGauge(); /** @@ -117,6 +127,11 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO .gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, inputWatermarkGauge::getValue); } + @Override + protected Optional<CheckpointBarrierHandler> getCheckpointBarrierHandler() { + return Optional.ofNullable(checkpointBarrierHandler); + } + private StreamTaskInput<IN> wrapWithSorted(StreamTaskInput<IN> input) { ClassLoader userCodeClassLoader = getUserCodeClassLoader(); return new SortingDataInput<>( @@ -133,18 +148,30 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO this); } + @SuppressWarnings("unchecked") private CheckpointedInputGate createCheckpointedInputGate() { IndexedInputGate[] inputGates = getEnvironment().getAllInputGates(); - return InputProcessorUtil.createCheckpointedInputGate( - this, - configuration, - getCheckpointCoordinator(), - inputGates, - getEnvironment().getMetricGroup().getIOMetricGroup(), - getTaskNameWithSubtaskAndId(), - mainMailboxExecutor, - systemTimerService); + checkpointBarrierHandler = + InputProcessorUtil.createCheckpointBarrierHandler( + this, + configuration, + getCheckpointCoordinator(), + getTaskNameWithSubtaskAndId(), + new List[] {Arrays.asList(inputGates)}, + Collections.emptyList(), + mainMailboxExecutor, + systemTimerService); + + CheckpointedInputGate[] checkpointedInputGates = + InputProcessorUtil.createCheckpointedMultipleInputGate( + mainMailboxExecutor, + new List[] {Arrays.asList(inputGates)}, + getEnvironment().getMetricGroup().getIOMetricGroup(), + checkpointBarrierHandler, + configuration); + + return Iterables.getOnlyElement(Arrays.asList(checkpointedInputGates)); } private DataOutput<IN> createDataOutput(Counter numRecordsIn) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 23fa66f..29ad693 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -1012,6 +1012,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab } } + protected Optional<CheckpointBarrierHandler> getCheckpointBarrierHandler() { + return Optional.empty(); + } + @Override public void triggerCheckpointOnBarrier( CheckpointMetaData checkpointMetaData, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java index 6f15b8b..87e9ee6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java @@ -22,12 +22,16 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory; +import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler; import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate; import org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; +import javax.annotation.Nullable; + import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.function.Function; import static org.apache.flink.util.Preconditions.checkState; @@ -39,28 +43,44 @@ import static org.apache.flink.util.Preconditions.checkState; @Internal public class TwoInputStreamTask<IN1, IN2, OUT> extends AbstractTwoInputStreamTask<IN1, IN2, OUT> { + @Nullable private CheckpointBarrierHandler checkpointBarrierHandler; + public TwoInputStreamTask(Environment env) throws Exception { super(env); } @Override + protected Optional<CheckpointBarrierHandler> getCheckpointBarrierHandler() { + return Optional.ofNullable(checkpointBarrierHandler); + } + + @SuppressWarnings("unchecked") + @Override protected void createInputProcessor( List<IndexedInputGate> inputGates1, List<IndexedInputGate> inputGates2, Function<Integer, StreamPartitioner<?>> gatePartitioners) { // create an input instance for each input - CheckpointedInputGate[] checkpointedInputGates = - InputProcessorUtil.createCheckpointedMultipleInputGate( + checkpointBarrierHandler = + InputProcessorUtil.createCheckpointBarrierHandler( this, - getConfiguration(), + configuration, getCheckpointCoordinator(), - getEnvironment().getMetricGroup().getIOMetricGroup(), getTaskNameWithSubtaskAndId(), - mainMailboxExecutor, new List[] {inputGates1, inputGates2}, Collections.emptyList(), + mainMailboxExecutor, systemTimerService); + + CheckpointedInputGate[] checkpointedInputGates = + InputProcessorUtil.createCheckpointedMultipleInputGate( + mainMailboxExecutor, + new List[] {inputGates1, inputGates2}, + getEnvironment().getMetricGroup().getIOMetricGroup(), + checkpointBarrierHandler, + configuration); + checkState(checkpointedInputGates.length == 2); inputProcessor = diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtilTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtilTest.java index ff4cdbf..76b67e2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtilTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtilTest.java @@ -69,24 +69,29 @@ public class InputProcessorUtilTest { Collections.singletonList(getGate(0, 2)), }; - CheckpointedInputGate[] checkpointedMultipleInputGate = - InputProcessorUtil.createCheckpointedMultipleInputGate( + CheckpointBarrierHandler barrierHandler = + InputProcessorUtil.createCheckpointBarrierHandler( streamTask, streamConfig, new TestSubtaskCheckpointCoordinator(new MockChannelStateWriter()), - environment.getMetricGroup().getIOMetricGroup(), streamTask.getName(), - new SyncMailboxExecutor(), inputGates, Collections.emptyList(), + new SyncMailboxExecutor(), new TestProcessingTimeService()); + + CheckpointedInputGate[] checkpointedMultipleInputGate = + InputProcessorUtil.createCheckpointedMultipleInputGate( + new SyncMailboxExecutor(), + inputGates, + environment.getMetricGroup().getIOMetricGroup(), + barrierHandler, + streamConfig); + for (CheckpointedInputGate checkpointedInputGate : checkpointedMultipleInputGate) { registry.registerCloseable(checkpointedInputGate); } - CheckpointBarrierHandler barrierHandler = - checkpointedMultipleInputGate[0].getCheckpointBarrierHandler(); - List<IndexedInputGate> allInputGates = Arrays.stream(inputGates) .flatMap(gates -> gates.stream())
