This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2e5c499d6263b59183dc93d56b2bb27684eeecb1 Author: Piotr Nowojski <[email protected]> AuthorDate: Tue Jun 18 16:57:19 2019 +0200 [hotfix][network] Split InputProcessorUtil into smaller methods --- .../streaming/runtime/io/InputProcessorUtil.java | 79 ++++++++++++++-------- 1 file changed, 49 insertions(+), 30 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java index 7eda06c..419cf16 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java @@ -24,8 +24,8 @@ import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.streaming.api.CheckpointingMode; -import org.apache.flink.streaming.runtime.tasks.StreamTask; import java.io.IOException; @@ -37,44 +37,63 @@ import java.io.IOException; public class InputProcessorUtil { public static CheckpointedInputGate createCheckpointedInputGate( - StreamTask<?, ?> checkpointedTask, + AbstractInvokable toNotifyOnCheckpoint, CheckpointingMode checkpointMode, IOManager ioManager, InputGate inputGate, Configuration taskManagerConfig, String taskName) throws IOException { - CheckpointedInputGate checkpointedInputGate; - if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) { - long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT); - if (!(maxAlign == -1 || maxAlign > 0)) { - throw new IllegalConfigurationException( - TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key() - + " must be positive or -1 (infinite)"); - } + BufferStorage bufferStorage = createBufferStorage( + checkpointMode, ioManager, inputGate.getPageSize(), taskManagerConfig, taskName); + CheckpointBarrierHandler barrierHandler = createCheckpointBarrierHandler( + checkpointMode, inputGate.getNumberOfInputChannels(), taskName, toNotifyOnCheckpoint); + return new CheckpointedInputGate(inputGate, bufferStorage, barrierHandler); + } - if (taskManagerConfig.getBoolean(NettyShuffleEnvironmentOptions.NETWORK_CREDIT_MODEL)) { - checkpointedInputGate = new CheckpointedInputGate( - inputGate, - new CachedBufferStorage(inputGate.getPageSize(), maxAlign, taskName), - taskName, - checkpointedTask); - } else { - checkpointedInputGate = new CheckpointedInputGate( - inputGate, - new BufferSpiller(ioManager, inputGate.getPageSize(), maxAlign, taskName), + private static CheckpointBarrierHandler createCheckpointBarrierHandler( + CheckpointingMode checkpointMode, + int numberOfInputChannels, + String taskName, + AbstractInvokable toNotifyOnCheckpoint) { + switch (checkpointMode) { + case EXACTLY_ONCE: + return new CheckpointBarrierAligner( + numberOfInputChannels, taskName, - checkpointedTask); - } - } else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) { - checkpointedInputGate = new CheckpointedInputGate( - inputGate, - new EmptyBufferStorage(), - new CheckpointBarrierTracker(inputGate.getNumberOfInputChannels(), checkpointedTask)); - } else { - throw new IllegalArgumentException("Unrecognized Checkpointing Mode: " + checkpointMode); + toNotifyOnCheckpoint); + case AT_LEAST_ONCE: + return new CheckpointBarrierTracker(numberOfInputChannels, toNotifyOnCheckpoint); + default: + throw new UnsupportedOperationException("Unrecognized Checkpointing Mode: " + checkpointMode); } + } - return checkpointedInputGate; + private static BufferStorage createBufferStorage( + CheckpointingMode checkpointMode, + IOManager ioManager, + int pageSize, + Configuration taskManagerConfig, + String taskName) throws IOException { + switch (checkpointMode) { + case EXACTLY_ONCE: { + long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT); + if (!(maxAlign == -1 || maxAlign > 0)) { + throw new IllegalConfigurationException( + TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key() + + " must be positive or -1 (infinite)"); + } + + if (taskManagerConfig.getBoolean(NettyShuffleEnvironmentOptions.NETWORK_CREDIT_MODEL)) { + return new CachedBufferStorage(pageSize, maxAlign, taskName); + } else { + return new BufferSpiller(ioManager, pageSize, maxAlign, taskName); + } + } + case AT_LEAST_ONCE: + return new EmptyBufferStorage(); + default: + throw new UnsupportedOperationException("Unrecognized Checkpointing Mode: " + checkpointMode); + } } }
