[FLINK-5040] [taskmanager] Adjust partition request backoffs

The back offs were hard coded before, which would have made it
impossible to react to any potential problems with them.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0bd8e027
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0bd8e027
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0bd8e027

Branch: refs/heads/release-1.1
Commit: 0bd8e027934fc34302c5ddb48f9e9aa448a58721
Parents: 55c506f
Author: Ufuk Celebi <[email protected]>
Authored: Thu Nov 10 11:15:47 2016 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Thu Nov 10 21:53:31 2016 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    | 11 +++
 .../ResultPartitionDeploymentDescriptor.java    | 10 +--
 .../runtime/io/network/NetworkEnvironment.java  |  2 +-
 .../partition/consumer/SingleInputGate.java     |  8 ++
 .../apache/flink/runtime/taskmanager/Task.java  |  2 +-
 .../NetworkEnvironmentConfiguration.scala       | 12 +--
 .../flink/runtime/taskmanager/TaskManager.scala |  8 ++
 ...ResultPartitionDeploymentDescriptorTest.java |  2 +-
 .../ExecutionVertexDeploymentTest.java          |  2 +-
 .../io/network/NetworkEnvironmentTest.java      |  3 +-
 .../partition/consumer/SingleInputGateTest.java | 83 ++++++++++++++++++++
 ...askManagerComponentsStartupShutdownTest.java | 10 +--
 .../runtime/taskmanager/TaskManagerTest.java    |  4 +
 13 files changed, 135 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index d9ccb35..1431eae 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -216,6 +216,11 @@ public final class ConfigConstants {
         */
        public static final String TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY = 
"taskmanager.network.numberOfBuffers";
 
+       /** Minimum backoff for partition requests of input channels. */
+       public static final String NETWORK_REQUEST_BACKOFF_INITIAL_KEY = 
"taskmanager.net.request-backoff.initial";
+
+       public static final String NETWORK_REQUEST_BACKOFF_MAX_KEY = 
"taskmanager.net.request-backoff.max";
+
        /**
         * Config parameter defining the size of memory buffers used by the 
network stack and the memory manager.
         */
@@ -823,6 +828,12 @@ public final class ConfigConstants {
         */
        public static final int DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS = 2048;
 
+       /** Initial backoff for partition requests of input channels. */
+       public static final int DEFAULT_NETWORK_REQUEST_BACKOFF_INITIAL = 100;
+
+       /** Maximum backoff for partition requests of input channels. */
+       public static final int DEFAULT_NETWORK_REQUEST_BACKOFF_MAX = 10000;
+
        /**
         * Default size of memory segments in the network stack and the memory 
manager.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
index 2ecde80..3edd279 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
@@ -49,14 +49,14 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
        private final int numberOfSubpartitions;
        
        /** Flag whether the result partition should send 
scheduleOrUpdateConsumer messages. */
-       private final boolean lazyScheduling;
+       private final boolean sendScheduleOrUpdateConsumersMessage;
 
        public ResultPartitionDeploymentDescriptor(
                        IntermediateDataSetID resultId,
                        IntermediateResultPartitionID partitionId,
                        ResultPartitionType partitionType,
                        int numberOfSubpartitions,
-                       boolean lazyScheduling) {
+                       boolean sendScheduleOrUpdateConsumersMessage) {
 
                this.resultId = checkNotNull(resultId);
                this.partitionId = checkNotNull(partitionId);
@@ -64,7 +64,7 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
 
                checkArgument(numberOfSubpartitions >= 1);
                this.numberOfSubpartitions = numberOfSubpartitions;
-               this.lazyScheduling = lazyScheduling;
+               this.sendScheduleOrUpdateConsumersMessage = 
sendScheduleOrUpdateConsumersMessage;
        }
 
        public IntermediateDataSetID getResultId() {
@@ -83,8 +83,8 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
                return numberOfSubpartitions;
        }
 
-       public boolean allowLazyScheduling() {
-               return lazyScheduling;
+       public boolean sendScheduleOrUpdateConsumersMessage() {
+               return sendScheduleOrUpdateConsumersMessage;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 11661cc..d3715ed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -148,7 +148,7 @@ public class NetworkEnvironment {
        }
 
        public Tuple2<Integer, Integer> 
getPartitionRequestInitialAndMaxBackoff() {
-               return configuration.partitionRequestInitialAndMaxBackoff();
+               return configuration.partitionRequestInitialMaxBackoff();
        }
 
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 351181a..212aade 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
@@ -496,6 +497,13 @@ public class SingleInputGate implements InputGate {
 
        // 
------------------------------------------------------------------------
 
+       @VisibleForTesting
+       Map<IntermediateResultPartitionID, InputChannel> getInputChannels() {
+               return inputChannels;
+       }
+
+       // 
------------------------------------------------------------------------
+
        /**
         * Creates an input gate and all of its input channels.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 2179fc1..56aea1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -337,7 +337,7 @@ public class Task implements Runnable {
                                        
networkEnvironment.getPartitionConsumableNotifier(),
                                        ioManager,
                                        networkEnvironment.getDefaultIOMode(),
-                                       desc.allowLazyScheduling());
+                                       
desc.sendScheduleOrUpdateConsumersMessage());
 
                        writers[counter] = new 
ResultPartitionWriter(producedPartitions[counter]);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
index 619da96..b7fa140 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
@@ -24,9 +24,9 @@ import 
org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
 import org.apache.flink.runtime.io.network.netty.NettyConfig
 
 case class NetworkEnvironmentConfiguration(
-  numNetworkBuffers: Int,
-  networkBufferSize: Int,
-  memoryType: MemoryType,
-  ioMode: IOMode,
-  nettyConfig: Option[NettyConfig] = None,
-  partitionRequestInitialAndMaxBackoff: (Integer, Integer) = (500, 3000))
+    numNetworkBuffers: Int,
+    networkBufferSize: Int,
+    memoryType: MemoryType,
+    ioMode: IOMode,
+    partitionRequestInitialMaxBackoff : (Integer, Integer),
+    nettyConfig: Option[NettyConfig] = None)

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index c6759c1..40ae234 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -2137,11 +2137,19 @@ object TaskManager {
 
     val ioMode : IOMode = if (syncOrAsync == "async") IOMode.ASYNC else 
IOMode.SYNC
 
+    val initialRequestBackoff = configuration.getInteger(
+      ConfigConstants.NETWORK_REQUEST_BACKOFF_INITIAL_KEY,
+      ConfigConstants.DEFAULT_NETWORK_REQUEST_BACKOFF_INITIAL)
+    val maxRequestBackoff = configuration.getInteger(
+      ConfigConstants.NETWORK_REQUEST_BACKOFF_MAX_KEY,
+      ConfigConstants.DEFAULT_NETWORK_REQUEST_BACKOFF_MAX)
+
     val networkConfig = NetworkEnvironmentConfiguration(
       numNetworkBuffers,
       pageSize,
       memType,
       ioMode,
+      (initialRequestBackoff, maxRequestBackoff),
       nettyConfig)
 
     // ----> timeouts, library caching, profiling

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index 4223b49..3ed8236 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -55,6 +55,6 @@ public class ResultPartitionDeploymentDescriptorTest {
                assertEquals(partitionId, copy.getPartitionId());
                assertEquals(partitionType, copy.getPartitionType());
                assertEquals(numberOfSubpartitions, 
copy.getNumberOfSubpartitions());
-               assertTrue(copy.allowLazyScheduling());
+               assertTrue(copy.sendScheduleOrUpdateConsumersMessage());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 1f5c915..b3e6b63 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -374,7 +374,7 @@ public class ExecutionVertexDeploymentTest {
 
                        assertEquals(1, producedPartitions.size());
                        ResultPartitionDeploymentDescriptor desc = 
producedPartitions.get(0);
-                       assertEquals(mode.allowLazyDeployment(), 
desc.allowLazyScheduling());
+                       assertEquals(mode.allowLazyDeployment(), 
desc.sendScheduleOrUpdateConsumersMessage());
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index a659be3..ca4b7fa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -62,8 +62,7 @@ public class NetworkEnvironmentTest {
                        NettyConfig nettyConf = new 
NettyConfig(InetAddress.getLocalHost(), port, BUFFER_SIZE, 1, new 
Configuration());
                        NetworkEnvironmentConfiguration config = new 
NetworkEnvironmentConfiguration(
                                        NUM_BUFFERS, BUFFER_SIZE, 
MemoryType.HEAP,
-                                       IOManager.IOMode.SYNC, new 
Some<>(nettyConf),
-                                       new Tuple2<>(0, 0));
+                                       IOManager.IOMode.SYNC, new Tuple2<>(0, 
0), new Some<>(nettyConf));
 
                        NetworkEnvironment env = new NetworkEnvironment(
                                TestingUtils.defaultExecutionContext(),

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 05427a1..9c8be81 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -21,11 +21,14 @@ package 
org.apache.flink.runtime.io.network.partition.consumer;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.LocalConnectionManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -43,9 +46,12 @@ import org.junit.Test;
 import scala.Tuple2;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -269,6 +275,83 @@ public class SingleInputGateTest {
        }
 
        /**
+        * Tests request back off configuration is correctly forwarded to the 
channels.
+        */
+       @Test
+       public void testRequestBackoffConfiguration() throws Exception {
+               ResultPartitionID[] partitionIds = new ResultPartitionID[] {
+                       new ResultPartitionID(),
+                       new ResultPartitionID(),
+                       new ResultPartitionID()
+               };
+
+               InputChannelDeploymentDescriptor[] channelDescs = new 
InputChannelDeploymentDescriptor[]{
+                       // Local
+                       new InputChannelDeploymentDescriptor(
+                               partitionIds[0],
+                               ResultPartitionLocation.createLocal()),
+                       // Remote
+                       new InputChannelDeploymentDescriptor(
+                               partitionIds[1],
+                               ResultPartitionLocation.createRemote(new 
ConnectionID(new InetSocketAddress("localhost", 5000), 0))),
+                       // Unknown
+                       new InputChannelDeploymentDescriptor(
+                               partitionIds[2],
+                               ResultPartitionLocation.createUnknown())};
+
+               InputGateDeploymentDescriptor gateDesc = new 
InputGateDeploymentDescriptor(new IntermediateDataSetID(), 0, channelDescs);
+
+               int initialBackoff = 137;
+               int maxBackoff = 1001;
+
+               NetworkEnvironment netEnv = mock(NetworkEnvironment.class);
+               when(netEnv.getPartitionManager()).thenReturn(new 
ResultPartitionManager());
+               when(netEnv.getTaskEventDispatcher()).thenReturn(new 
TaskEventDispatcher());
+               
when(netEnv.getPartitionStateChecker()).thenReturn(mock(PartitionStateChecker.class));
+               
when(netEnv.getPartitionRequestInitialAndMaxBackoff()).thenReturn(new 
Tuple2<>(initialBackoff, maxBackoff));
+               when(netEnv.getConnectionManager()).thenReturn(new 
LocalConnectionManager());
+
+               SingleInputGate gate = SingleInputGate.create(
+                       "TestTask",
+                       new JobID(),
+                       new ExecutionAttemptID(),
+                       gateDesc,
+                       netEnv,
+                       new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+
+               Map<IntermediateResultPartitionID, InputChannel> channelMap = 
gate.getInputChannels();
+
+               assertEquals(3, channelMap.size());
+               InputChannel localChannel = 
channelMap.get(partitionIds[0].getPartitionId());
+               assertEquals(LocalInputChannel.class, localChannel.getClass());
+
+               InputChannel remoteChannel = 
channelMap.get(partitionIds[1].getPartitionId());
+               assertEquals(RemoteInputChannel.class, 
remoteChannel.getClass());
+
+               InputChannel unknownChannel = 
channelMap.get(partitionIds[2].getPartitionId());
+               assertEquals(UnknownInputChannel.class, 
unknownChannel.getClass());
+
+               InputChannel[] channels = new InputChannel[]{localChannel, 
remoteChannel, unknownChannel};
+               for (InputChannel ch : channels) {
+                       assertEquals(0, ch.getCurrentBackoff());
+
+                       assertTrue(ch.increaseBackoff());
+                       assertEquals(initialBackoff, ch.getCurrentBackoff());
+
+                       assertTrue(ch.increaseBackoff());
+                       assertEquals(initialBackoff * 2, 
ch.getCurrentBackoff());
+
+                       assertTrue(ch.increaseBackoff());
+                       assertEquals(initialBackoff * 2 * 2, 
ch.getCurrentBackoff());
+
+                       assertTrue(ch.increaseBackoff());
+                       assertEquals(maxBackoff, ch.getCurrentBackoff());
+
+                       assertFalse(ch.increaseBackoff());
+               }
+       }
+
+       /**
         * Returns whether the stack trace represents a Thread in a blocking 
queue
         * poll call.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 60bf8e7..b4c456c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import static org.junit.Assert.*;
-
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Kill;
@@ -42,7 +40,6 @@ import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import 
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
-
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
@@ -54,6 +51,10 @@ import scala.concurrent.duration.FiniteDuration;
 import java.net.InetAddress;
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 public class TaskManagerComponentsStartupShutdownTest {
 
        /**
@@ -98,8 +99,7 @@ public class TaskManagerComponentsStartupShutdownTest {
                                        config);
 
                        final NetworkEnvironmentConfiguration netConf = new 
NetworkEnvironmentConfiguration(
-                                       32, BUFFER_SIZE, MemoryType.HEAP, 
IOManager.IOMode.SYNC, Option.<NettyConfig>empty(),
-                                       new Tuple2<Integer, Integer>(0, 0));
+                                       32, BUFFER_SIZE, MemoryType.HEAP, 
IOManager.IOMode.SYNC, new Tuple2<>(0, 0), Option.<NettyConfig>empty());
 
                        final InstanceConnectionInfo connectionInfo = new 
InstanceConnectionInfo(InetAddress.getLocalHost(), 10000);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 431cbb8..f2fd859 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -903,6 +903,8 @@ public class TaskManagerTest extends TestLogger {
                                final int dataPort = 
NetUtils.getAvailablePort();
                                Configuration config = new Configuration();
                                
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
+                               
config.setInteger(ConfigConstants.NETWORK_REQUEST_BACKOFF_INITIAL_KEY, 100);
+                               
config.setInteger(ConfigConstants.NETWORK_REQUEST_BACKOFF_MAX_KEY, 200);
 
                                taskManager = TestingUtils.createTaskManager(
                                                system,
@@ -999,6 +1001,8 @@ public class TaskManagerTest extends TestLogger {
 
                                final int dataPort = 
NetUtils.getAvailablePort();
                                final Configuration config = new 
Configuration();
+                               
config.setInteger(ConfigConstants.NETWORK_REQUEST_BACKOFF_INITIAL_KEY, 100);
+                               
config.setInteger(ConfigConstants.NETWORK_REQUEST_BACKOFF_MAX_KEY, 200);
 
                                
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
 

Reply via email to