[FLINK-5040] [taskmanager] Adjust partition request backoffs

The back offs were hard coded before, which would have made it
impossible to react to any potential problems with them.

This closes #2784.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5d5637b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5d5637b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5d5637b0

Branch: refs/heads/master
Commit: 5d5637b01031746b2dfadf6d7fcd59155f7de653
Parents: 2742d5c
Author: Ufuk Celebi <[email protected]>
Authored: Thu Nov 10 11:15:47 2016 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Fri Nov 11 09:41:39 2016 +0100

----------------------------------------------------------------------
 .../flink/configuration/TaskManagerOptions.java | 24 ++++--
 .../ResultPartitionDeploymentDescriptor.java    |  8 +-
 .../partition/consumer/SingleInputGate.java     | 10 ++-
 .../apache/flink/runtime/taskmanager/Task.java  |  2 +-
 .../NetworkEnvironmentConfiguration.scala       | 14 ++--
 .../flink/runtime/taskmanager/TaskManager.scala |  9 ++-
 ...ResultPartitionDeploymentDescriptorTest.java |  2 +-
 .../partition/consumer/SingleInputGateTest.java | 84 ++++++++++++++++++++
 ...askManagerComponentsStartupShutdownTest.java |  4 +-
 .../runtime/taskmanager/TaskManagerTest.java    |  5 ++
 10 files changed, 140 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index e5d36aa..6f6238b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -35,6 +35,20 @@ public class TaskManagerOptions {
        // @TODO Migrate 'taskmanager.*' config options from ConfigConstants
 
        // 
------------------------------------------------------------------------
+       //  Network Options
+       // 
------------------------------------------------------------------------
+
+       /** Minimum backoff for partition requests of input channels. */
+       public static final ConfigOption<Integer> 
NETWORK_REQUEST_BACKOFF_INITIAL =
+                       key("taskmanager.net.request-backoff.initial")
+                       .defaultValue(100);
+
+       /** Maximum backoff for partition requests of input channels. */
+       public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_MAX =
+                       key("taskmanager.net.request-backoff.max")
+                       .defaultValue(10000);
+
+       // 
------------------------------------------------------------------------
        //  Task Options
        // 
------------------------------------------------------------------------
 
@@ -44,8 +58,8 @@ public class TaskManagerOptions {
         */
        public static final ConfigOption<Long> TASK_CANCELLATION_INTERVAL =
                        key("task.cancellation.interval")
-                                       .defaultValue(30000L)
-                                       
.withDeprecatedKeys("task.cancellation-interval");
+                       .defaultValue(30000L)
+                       .withDeprecatedKeys("task.cancellation-interval");
 
        /**
         * Timeout in milliseconds after which a task cancellation times out and
@@ -54,19 +68,19 @@ public class TaskManagerOptions {
         */
        public static final ConfigOption<Long> TASK_CANCELLATION_TIMEOUT =
                        key("task.cancellation.timeout")
-                                       .defaultValue(180000L);
+                       .defaultValue(180000L);
 
        /**
         * The maximum number of bytes that a checkpoint alignment may buffer.
         * If the checkpoint alignment buffers more than the configured amount 
of
         * data, the checkpoint is aborted (skipped).
-        * 
+        *
         * <p>The default value of {@code -1} indicates that there is no limit.
         */
        public static final ConfigOption<Long> 
TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT =
                        key("task.checkpoint.alignment.max-size")
                        .defaultValue(-1L);
-       
+
        // 
------------------------------------------------------------------------
 
        /** Not intended to be instantiated */

http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
index 2ecde80..14c7d2a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
@@ -49,7 +49,7 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
        private final int numberOfSubpartitions;
        
        /** Flag whether the result partition should send 
scheduleOrUpdateConsumer messages. */
-       private final boolean lazyScheduling;
+       private final boolean sendScheduleOrUpdateConsumersMessage;
 
        public ResultPartitionDeploymentDescriptor(
                        IntermediateDataSetID resultId,
@@ -64,7 +64,7 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
 
                checkArgument(numberOfSubpartitions >= 1);
                this.numberOfSubpartitions = numberOfSubpartitions;
-               this.lazyScheduling = lazyScheduling;
+               this.sendScheduleOrUpdateConsumersMessage = lazyScheduling;
        }
 
        public IntermediateDataSetID getResultId() {
@@ -83,8 +83,8 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
                return numberOfSubpartitions;
        }
 
-       public boolean allowLazyScheduling() {
-               return lazyScheduling;
+       public boolean sendScheduleOrUpdateConsumersMessage() {
+               return sendScheduleOrUpdateConsumersMessage;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index af5fd89..8f57542 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import com.google.common.collect.Maps;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.taskmanager.TaskActions;
@@ -520,6 +521,13 @@ public class SingleInputGate implements InputGate {
 
        // 
------------------------------------------------------------------------
 
+       @VisibleForTesting
+       Map<IntermediateResultPartitionID, InputChannel> getInputChannels() {
+               return inputChannels;
+       }
+
+       // 
------------------------------------------------------------------------
+
        /**
         * Creates an input gate and all of its input channels.
         */
@@ -565,7 +573,7 @@ public class SingleInputGate implements InputGate {
                                        partitionLocation.getConnectionId(),
                                        
networkEnvironment.getConnectionManager(),
                                        
networkEnvironment.getPartitionRequestInitialBackoff(),
-                                       
networkEnvironment.getPartitionRequestInitialBackoff(),
+                                       
networkEnvironment.getPartitionRequestMaxBackoff(),
                                        metrics
                                );
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 4f3dd54..b960e68 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -346,7 +346,7 @@ public class Task implements Runnable, TaskActions {
                                resultPartitionConsumableNotifier,
                                ioManager,
                                networkEnvironment.getDefaultIOMode(),
-                               desc.allowLazyScheduling());
+                               desc.sendScheduleOrUpdateConsumersMessage());
 
                        writers[counter] = new 
ResultPartitionWriter(producedPartitions[counter]);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
index 14589a1..6a59665 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
@@ -23,10 +23,10 @@ import 
org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
 import org.apache.flink.runtime.io.network.netty.NettyConfig
 
 case class NetworkEnvironmentConfiguration(
-  numNetworkBuffers: Int,
-  networkBufferSize: Int,
-  memoryType: MemoryType,
-  ioMode: IOMode,
-  nettyConfig: Option[NettyConfig] = None,
-  partitionRequestInitialBackoff: Int = 500,
-  partitinRequestMaxBackoff: Int = 3000)
+    numNetworkBuffers: Int,
+    networkBufferSize: Int,
+    memoryType: MemoryType,
+    ioMode: IOMode,
+    partitionRequestInitialBackoff : Int,
+    partitionRequestMaxBackoff : Int,
+    nettyConfig: Option[NettyConfig] = None)

http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 4bb2da4..dd5d218 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1982,7 +1982,7 @@ object TaskManager {
       kvStateServer,
       netConfig.ioMode,
       netConfig.partitionRequestInitialBackoff,
-      netConfig.partitinRequestMaxBackoff)
+      netConfig.partitionRequestMaxBackoff)
 
     network.start()
 
@@ -2258,11 +2258,18 @@ object TaskManager {
 
     val ioMode : IOMode = if (syncOrAsync == "async") IOMode.ASYNC else 
IOMode.SYNC
 
+    val initialRequestBackoff = configuration.getInteger(
+      TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL)
+    val maxRequestBackoff = configuration.getInteger(
+      TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX)
+
     val networkConfig = NetworkEnvironmentConfiguration(
       numNetworkBuffers,
       pageSize,
       memType,
       ioMode,
+      initialRequestBackoff,
+      maxRequestBackoff,
       nettyConfig)
 
     // ----> timeouts, library caching, profiling

http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index 4223b49..3ed8236 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -55,6 +55,6 @@ public class ResultPartitionDeploymentDescriptorTest {
                assertEquals(partitionId, copy.getPartitionId());
                assertEquals(partitionType, copy.getPartitionType());
                assertEquals(numberOfSubpartitions, 
copy.getNumberOfSubpartitions());
-               assertTrue(copy.allowLazyScheduling());
+               assertTrue(copy.sendScheduleOrUpdateConsumersMessage());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 8f9ea9e..0b7b10d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -21,11 +21,14 @@ package 
org.apache.flink.runtime.io.network.partition.consumer;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.LocalConnectionManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -42,9 +45,12 @@ import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -272,6 +278,84 @@ public class SingleInputGateTest {
        }
 
        /**
+        * Tests request back off configuration is correctly forwarded to the 
channels.
+        */
+       @Test
+       public void testRequestBackoffConfiguration() throws Exception {
+               ResultPartitionID[] partitionIds = new ResultPartitionID[] {
+                       new ResultPartitionID(),
+                       new ResultPartitionID(),
+                       new ResultPartitionID()
+               };
+
+               InputChannelDeploymentDescriptor[] channelDescs = new 
InputChannelDeploymentDescriptor[]{
+                       // Local
+                       new InputChannelDeploymentDescriptor(
+                               partitionIds[0],
+                               ResultPartitionLocation.createLocal()),
+                       // Remote
+                       new InputChannelDeploymentDescriptor(
+                               partitionIds[1],
+                               ResultPartitionLocation.createRemote(new 
ConnectionID(new InetSocketAddress("localhost", 5000), 0))),
+                       // Unknown
+                       new InputChannelDeploymentDescriptor(
+                               partitionIds[2],
+                               ResultPartitionLocation.createUnknown())};
+
+               InputGateDeploymentDescriptor gateDesc = new 
InputGateDeploymentDescriptor(new IntermediateDataSetID(), 0, channelDescs);
+
+               int initialBackoff = 137;
+               int maxBackoff = 1001;
+
+               NetworkEnvironment netEnv = mock(NetworkEnvironment.class);
+               when(netEnv.getResultPartitionManager()).thenReturn(new 
ResultPartitionManager());
+               when(netEnv.getTaskEventDispatcher()).thenReturn(new 
TaskEventDispatcher());
+               
when(netEnv.getPartitionRequestInitialBackoff()).thenReturn(initialBackoff);
+               
when(netEnv.getPartitionRequestMaxBackoff()).thenReturn(maxBackoff);
+               when(netEnv.getConnectionManager()).thenReturn(new 
LocalConnectionManager());
+
+               SingleInputGate gate = SingleInputGate.create(
+                       "TestTask",
+                       new JobID(),
+                       new ExecutionAttemptID(),
+                       gateDesc,
+                       netEnv,
+                       mock(TaskActions.class),
+                       new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+
+               Map<IntermediateResultPartitionID, InputChannel> channelMap = 
gate.getInputChannels();
+
+               assertEquals(3, channelMap.size());
+               InputChannel localChannel = 
channelMap.get(partitionIds[0].getPartitionId());
+               assertEquals(LocalInputChannel.class, localChannel.getClass());
+
+               InputChannel remoteChannel = 
channelMap.get(partitionIds[1].getPartitionId());
+               assertEquals(RemoteInputChannel.class, 
remoteChannel.getClass());
+
+               InputChannel unknownChannel = 
channelMap.get(partitionIds[2].getPartitionId());
+               assertEquals(UnknownInputChannel.class, 
unknownChannel.getClass());
+
+               InputChannel[] channels = new InputChannel[]{localChannel, 
remoteChannel, unknownChannel};
+               for (InputChannel ch : channels) {
+                       assertEquals(0, ch.getCurrentBackoff());
+
+                       assertTrue(ch.increaseBackoff());
+                       assertEquals(initialBackoff, ch.getCurrentBackoff());
+
+                       assertTrue(ch.increaseBackoff());
+                       assertEquals(initialBackoff * 2, 
ch.getCurrentBackoff());
+
+                       assertTrue(ch.increaseBackoff());
+                       assertEquals(initialBackoff * 2 * 2, 
ch.getCurrentBackoff());
+
+                       assertTrue(ch.increaseBackoff());
+                       assertEquals(maxBackoff, ch.getCurrentBackoff());
+
+                       assertFalse(ch.increaseBackoff());
+               }
+       }
+
+       /**
         * Returns whether the stack trace represents a Thread in a blocking 
queue
         * poll call.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 0bcd1ce..f9434e2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -104,7 +104,7 @@ public class TaskManagerComponentsStartupShutdownTest {
                                        config);
 
                        final NetworkEnvironmentConfiguration netConf = new 
NetworkEnvironmentConfiguration(
-                                       32, BUFFER_SIZE, MemoryType.HEAP, 
IOManager.IOMode.SYNC, Option.<NettyConfig>empty(), 0, 0);
+                                       32, BUFFER_SIZE, MemoryType.HEAP, 
IOManager.IOMode.SYNC, 0, 0, Option.<NettyConfig>empty());
 
                        ResourceID taskManagerId = ResourceID.generate();
                        
@@ -121,7 +121,7 @@ public class TaskManagerComponentsStartupShutdownTest {
                                null,
                                netConf.ioMode(),
                                netConf.partitionRequestInitialBackoff(),
-                               netConf.partitinRequestMaxBackoff());
+                               netConf.partitionRequestMaxBackoff());
 
                        network.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 22f0c60..fd9ff05 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -903,6 +904,8 @@ public class TaskManagerTest extends TestLogger {
                                final int dataPort = 
NetUtils.getAvailablePort();
                                Configuration config = new Configuration();
                                
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
+                               
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+                               
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
 
                                taskManager = TestingUtils.createTaskManager(
                                                system,
@@ -998,6 +1001,8 @@ public class TaskManagerTest extends TestLogger {
                                jobManager = new AkkaActorGateway(jm, 
leaderSessionID);
 
                                final Configuration config = new 
Configuration();
+                               
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+                               
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
 
                                taskManager = TestingUtils.createTaskManager(
                                                system,

Reply via email to