Repository: flink
Updated Branches:
  refs/heads/release-1.5 9f2b3c332 -> f0e6ae6eb


[FLINK-8984][network] Drop taskmanager.exactly-once.blocking.data.enabled 
config option

Previously there were twe options:

taskmanager.network.credit-based-flow-control.enabled

and

taskmanager.exactly-once.blocking.data.enabled

If we disabled first one, but keept default value for the second one deadlocks 
will occur.

By dropping taskmanager.exactly-once.blocking.data.enabled we can always use:
 - blocking BarrierBuffer for credit based flow control
 - spilling BarrierBuffer for non credit based flow control.

This closes #5708.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/23005ee1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/23005ee1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/23005ee1

Branch: refs/heads/release-1.5
Commit: 23005ee1f653a3fcddac0f710d56f3e0d6157a48
Parents: 0fa76e5
Author: Piotr Nowojski <[email protected]>
Authored: Fri Mar 16 13:28:08 2018 +0100
Committer: zentol <[email protected]>
Committed: Wed Mar 21 21:01:31 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/configuration/TaskManagerOptions.java  | 12 ------------
 .../flink/streaming/runtime/io/InputProcessorUtil.java  |  2 +-
 2 files changed, 1 insertion(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/23005ee1/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 4e08fda..c7b0782 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -326,18 +326,6 @@ public class TaskManagerOptions {
                        .defaultValue(true)
                        .withDescription("Boolean flag to enable/disable 
network credit-based flow control.");
 
-       /**
-        * Config parameter defining whether to spill data for channels with 
barrier or not in exactly-once
-        * mode based on credit-based flow control.
-        *
-        * @deprecated Will be removed for Flink 1.6 when the old code will be 
dropped in favour of
-        * credit-based flow control.
-        */
-       @Deprecated
-       public static final ConfigOption<Boolean> 
EXACTLY_ONCE_BLOCKING_DATA_ENABLED =
-                       key("taskmanager.exactly-once.blocking.data.enabled")
-                       .defaultValue(true);
-
        // 
------------------------------------------------------------------------
        //  Task Options
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/23005ee1/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index cb56eee..1ae34b3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -51,7 +51,7 @@ public class InputProcessorUtil {
                                        + " must be positive or -1 (infinite)");
                        }
 
-                       if 
(taskManagerConfig.getBoolean(TaskManagerOptions.EXACTLY_ONCE_BLOCKING_DATA_ENABLED))
 {
+                       if 
(taskManagerConfig.getBoolean(TaskManagerOptions.NETWORK_CREDIT_BASED_FLOW_CONTROL_ENABLED))
 {
                                barrierHandler = new BarrierBuffer(inputGate, 
new CachedBufferBlocker(inputGate.getPageSize()), maxAlign);
                        } else {
                                barrierHandler = new BarrierBuffer(inputGate, 
new BufferSpiller(ioManager, inputGate.getPageSize()), maxAlign);

Reply via email to