This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2e5c499d6263b59183dc93d56b2bb27684eeecb1
Author: Piotr Nowojski <[email protected]>
AuthorDate: Tue Jun 18 16:57:19 2019 +0200

    [hotfix][network] Split InputProcessorUtil into smaller methods
---
 .../streaming/runtime/io/InputProcessorUtil.java   | 79 ++++++++++++++--------
 1 file changed, 49 insertions(+), 30 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index 7eda06c..419cf16 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -24,8 +24,8 @@ import 
org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
 import java.io.IOException;
 
@@ -37,44 +37,63 @@ import java.io.IOException;
 public class InputProcessorUtil {
 
        public static CheckpointedInputGate createCheckpointedInputGate(
-                       StreamTask<?, ?> checkpointedTask,
+                       AbstractInvokable toNotifyOnCheckpoint,
                        CheckpointingMode checkpointMode,
                        IOManager ioManager,
                        InputGate inputGate,
                        Configuration taskManagerConfig,
                        String taskName) throws IOException {
 
-               CheckpointedInputGate checkpointedInputGate;
-               if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
-                       long maxAlign = 
taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
-                       if (!(maxAlign == -1 || maxAlign > 0)) {
-                               throw new IllegalConfigurationException(
-                                       
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
-                                       + " must be positive or -1 (infinite)");
-                       }
+               BufferStorage bufferStorage = createBufferStorage(
+                       checkpointMode, ioManager, inputGate.getPageSize(), 
taskManagerConfig, taskName);
+               CheckpointBarrierHandler barrierHandler = 
createCheckpointBarrierHandler(
+                       checkpointMode, inputGate.getNumberOfInputChannels(), 
taskName, toNotifyOnCheckpoint);
+               return new CheckpointedInputGate(inputGate, bufferStorage, 
barrierHandler);
+       }
 
-                       if 
(taskManagerConfig.getBoolean(NettyShuffleEnvironmentOptions.NETWORK_CREDIT_MODEL))
 {
-                               checkpointedInputGate = new 
CheckpointedInputGate(
-                                       inputGate,
-                                       new 
CachedBufferStorage(inputGate.getPageSize(), maxAlign, taskName),
-                                       taskName,
-                                       checkpointedTask);
-                       } else {
-                               checkpointedInputGate = new 
CheckpointedInputGate(
-                                       inputGate,
-                                       new BufferSpiller(ioManager, 
inputGate.getPageSize(), maxAlign, taskName),
+       private static CheckpointBarrierHandler createCheckpointBarrierHandler(
+                       CheckpointingMode checkpointMode,
+                       int numberOfInputChannels,
+                       String taskName,
+                       AbstractInvokable toNotifyOnCheckpoint) {
+               switch (checkpointMode) {
+                       case EXACTLY_ONCE:
+                               return new CheckpointBarrierAligner(
+                                       numberOfInputChannels,
                                        taskName,
-                                       checkpointedTask);
-                       }
-               } else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
-                       checkpointedInputGate = new CheckpointedInputGate(
-                               inputGate,
-                               new EmptyBufferStorage(),
-                               new 
CheckpointBarrierTracker(inputGate.getNumberOfInputChannels(), 
checkpointedTask));
-               } else {
-                       throw new IllegalArgumentException("Unrecognized 
Checkpointing Mode: " + checkpointMode);
+                                       toNotifyOnCheckpoint);
+                       case AT_LEAST_ONCE:
+                               return new 
CheckpointBarrierTracker(numberOfInputChannels, toNotifyOnCheckpoint);
+                       default:
+                               throw new 
UnsupportedOperationException("Unrecognized Checkpointing Mode: " + 
checkpointMode);
                }
+       }
 
-               return checkpointedInputGate;
+       private static BufferStorage createBufferStorage(
+                       CheckpointingMode checkpointMode,
+                       IOManager ioManager,
+                       int pageSize,
+                       Configuration taskManagerConfig,
+                       String taskName) throws IOException {
+               switch (checkpointMode) {
+                       case EXACTLY_ONCE: {
+                               long maxAlign = 
taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
+                               if (!(maxAlign == -1 || maxAlign > 0)) {
+                                       throw new IllegalConfigurationException(
+                                               
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
+                                                       + " must be positive or 
-1 (infinite)");
+                               }
+
+                               if 
(taskManagerConfig.getBoolean(NettyShuffleEnvironmentOptions.NETWORK_CREDIT_MODEL))
 {
+                                       return new 
CachedBufferStorage(pageSize, maxAlign, taskName);
+                               } else {
+                                       return new BufferSpiller(ioManager, 
pageSize, maxAlign, taskName);
+                               }
+                       }
+                       case AT_LEAST_ONCE:
+                               return new EmptyBufferStorage();
+                       default:
+                               throw new 
UnsupportedOperationException("Unrecognized Checkpointing Mode: " + 
checkpointMode);
+               }
        }
 }

Reply via email to