[FLINK-5040] [taskmanager] Adjust partition request backoffs The back offs were hard coded before, which would have made it impossible to react to any potential problems with them.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0bd8e027 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0bd8e027 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0bd8e027 Branch: refs/heads/release-1.1 Commit: 0bd8e027934fc34302c5ddb48f9e9aa448a58721 Parents: 55c506f Author: Ufuk Celebi <[email protected]> Authored: Thu Nov 10 11:15:47 2016 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Thu Nov 10 21:53:31 2016 +0100 ---------------------------------------------------------------------- .../flink/configuration/ConfigConstants.java | 11 +++ .../ResultPartitionDeploymentDescriptor.java | 10 +-- .../runtime/io/network/NetworkEnvironment.java | 2 +- .../partition/consumer/SingleInputGate.java | 8 ++ .../apache/flink/runtime/taskmanager/Task.java | 2 +- .../NetworkEnvironmentConfiguration.scala | 12 +-- .../flink/runtime/taskmanager/TaskManager.scala | 8 ++ ...ResultPartitionDeploymentDescriptorTest.java | 2 +- .../ExecutionVertexDeploymentTest.java | 2 +- .../io/network/NetworkEnvironmentTest.java | 3 +- .../partition/consumer/SingleInputGateTest.java | 83 ++++++++++++++++++++ ...askManagerComponentsStartupShutdownTest.java | 10 +-- .../runtime/taskmanager/TaskManagerTest.java | 4 + 13 files changed, 135 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index d9ccb35..1431eae 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -216,6 +216,11 @@ public final class ConfigConstants { */ public static final String TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY = "taskmanager.network.numberOfBuffers"; + /** Minimum backoff for partition requests of input channels. */ + public static final String NETWORK_REQUEST_BACKOFF_INITIAL_KEY = "taskmanager.net.request-backoff.initial"; + + public static final String NETWORK_REQUEST_BACKOFF_MAX_KEY = "taskmanager.net.request-backoff.max"; + /** * Config parameter defining the size of memory buffers used by the network stack and the memory manager. */ @@ -823,6 +828,12 @@ public final class ConfigConstants { */ public static final int DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS = 2048; + /** Initial backoff for partition requests of input channels. */ + public static final int DEFAULT_NETWORK_REQUEST_BACKOFF_INITIAL = 100; + + /** Maximum backoff for partition requests of input channels. */ + public static final int DEFAULT_NETWORK_REQUEST_BACKOFF_MAX = 10000; + /** * Default size of memory segments in the network stack and the memory manager. */ http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java index 2ecde80..3edd279 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java @@ -49,14 +49,14 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { private final int numberOfSubpartitions; /** Flag whether the result partition should send scheduleOrUpdateConsumer messages. */ - private final boolean lazyScheduling; + private final boolean sendScheduleOrUpdateConsumersMessage; public ResultPartitionDeploymentDescriptor( IntermediateDataSetID resultId, IntermediateResultPartitionID partitionId, ResultPartitionType partitionType, int numberOfSubpartitions, - boolean lazyScheduling) { + boolean sendScheduleOrUpdateConsumersMessage) { this.resultId = checkNotNull(resultId); this.partitionId = checkNotNull(partitionId); @@ -64,7 +64,7 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { checkArgument(numberOfSubpartitions >= 1); this.numberOfSubpartitions = numberOfSubpartitions; - this.lazyScheduling = lazyScheduling; + this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage; } public IntermediateDataSetID getResultId() { @@ -83,8 +83,8 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { return numberOfSubpartitions; } - public boolean allowLazyScheduling() { - return lazyScheduling; + public boolean sendScheduleOrUpdateConsumersMessage() { + return sendScheduleOrUpdateConsumersMessage; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index 11661cc..d3715ed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -148,7 +148,7 @@ public class NetworkEnvironment { } public Tuple2<Integer, Integer> getPartitionRequestInitialAndMaxBackoff() { - return configuration.partitionRequestInitialAndMaxBackoff(); + return configuration.partitionRequestInitialMaxBackoff(); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 351181a..212aade 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.metrics.groups.IOMetricGroup; @@ -496,6 +497,13 @@ public class SingleInputGate implements InputGate { // ------------------------------------------------------------------------ + @VisibleForTesting + Map<IntermediateResultPartitionID, InputChannel> getInputChannels() { + return inputChannels; + } + + // ------------------------------------------------------------------------ + /** * Creates an input gate and all of its input channels. */ http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 2179fc1..56aea1b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -337,7 +337,7 @@ public class Task implements Runnable { networkEnvironment.getPartitionConsumableNotifier(), ioManager, networkEnvironment.getDefaultIOMode(), - desc.allowLazyScheduling()); + desc.sendScheduleOrUpdateConsumersMessage()); writers[counter] = new ResultPartitionWriter(producedPartitions[counter]); http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala index 619da96..b7fa140 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala @@ -24,9 +24,9 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode import org.apache.flink.runtime.io.network.netty.NettyConfig case class NetworkEnvironmentConfiguration( - numNetworkBuffers: Int, - networkBufferSize: Int, - memoryType: MemoryType, - ioMode: IOMode, - nettyConfig: Option[NettyConfig] = None, - partitionRequestInitialAndMaxBackoff: (Integer, Integer) = (500, 3000)) + numNetworkBuffers: Int, + networkBufferSize: Int, + memoryType: MemoryType, + ioMode: IOMode, + partitionRequestInitialMaxBackoff : (Integer, Integer), + nettyConfig: Option[NettyConfig] = None) http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index c6759c1..40ae234 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -2137,11 +2137,19 @@ object TaskManager { val ioMode : IOMode = if (syncOrAsync == "async") IOMode.ASYNC else IOMode.SYNC + val initialRequestBackoff = configuration.getInteger( + ConfigConstants.NETWORK_REQUEST_BACKOFF_INITIAL_KEY, + ConfigConstants.DEFAULT_NETWORK_REQUEST_BACKOFF_INITIAL) + val maxRequestBackoff = configuration.getInteger( + ConfigConstants.NETWORK_REQUEST_BACKOFF_MAX_KEY, + ConfigConstants.DEFAULT_NETWORK_REQUEST_BACKOFF_MAX) + val networkConfig = NetworkEnvironmentConfiguration( numNetworkBuffers, pageSize, memType, ioMode, + (initialRequestBackoff, maxRequestBackoff), nettyConfig) // ----> timeouts, library caching, profiling http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java index 4223b49..3ed8236 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java @@ -55,6 +55,6 @@ public class ResultPartitionDeploymentDescriptorTest { assertEquals(partitionId, copy.getPartitionId()); assertEquals(partitionType, copy.getPartitionType()); assertEquals(numberOfSubpartitions, copy.getNumberOfSubpartitions()); - assertTrue(copy.allowLazyScheduling()); + assertTrue(copy.sendScheduleOrUpdateConsumersMessage()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java index 1f5c915..b3e6b63 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java @@ -374,7 +374,7 @@ public class ExecutionVertexDeploymentTest { assertEquals(1, producedPartitions.size()); ResultPartitionDeploymentDescriptor desc = producedPartitions.get(0); - assertEquals(mode.allowLazyDeployment(), desc.allowLazyScheduling()); + assertEquals(mode.allowLazyDeployment(), desc.sendScheduleOrUpdateConsumersMessage()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java index a659be3..ca4b7fa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java @@ -62,8 +62,7 @@ public class NetworkEnvironmentTest { NettyConfig nettyConf = new NettyConfig(InetAddress.getLocalHost(), port, BUFFER_SIZE, 1, new Configuration()); NetworkEnvironmentConfiguration config = new NetworkEnvironmentConfiguration( NUM_BUFFERS, BUFFER_SIZE, MemoryType.HEAP, - IOManager.IOMode.SYNC, new Some<>(nettyConf), - new Tuple2<>(0, 0)); + IOManager.IOMode.SYNC, new Tuple2<>(0, 0), new Some<>(nettyConf)); NetworkEnvironment env = new NetworkEnvironment( TestingUtils.defaultExecutionContext(), http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index 05427a1..9c8be81 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -21,11 +21,14 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.api.common.JobID; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionLocation; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; import org.apache.flink.runtime.io.network.LocalConnectionManager; +import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferPool; @@ -43,9 +46,12 @@ import org.junit.Test; import scala.Tuple2; import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -269,6 +275,83 @@ public class SingleInputGateTest { } /** + * Tests request back off configuration is correctly forwarded to the channels. + */ + @Test + public void testRequestBackoffConfiguration() throws Exception { + ResultPartitionID[] partitionIds = new ResultPartitionID[] { + new ResultPartitionID(), + new ResultPartitionID(), + new ResultPartitionID() + }; + + InputChannelDeploymentDescriptor[] channelDescs = new InputChannelDeploymentDescriptor[]{ + // Local + new InputChannelDeploymentDescriptor( + partitionIds[0], + ResultPartitionLocation.createLocal()), + // Remote + new InputChannelDeploymentDescriptor( + partitionIds[1], + ResultPartitionLocation.createRemote(new ConnectionID(new InetSocketAddress("localhost", 5000), 0))), + // Unknown + new InputChannelDeploymentDescriptor( + partitionIds[2], + ResultPartitionLocation.createUnknown())}; + + InputGateDeploymentDescriptor gateDesc = new InputGateDeploymentDescriptor(new IntermediateDataSetID(), 0, channelDescs); + + int initialBackoff = 137; + int maxBackoff = 1001; + + NetworkEnvironment netEnv = mock(NetworkEnvironment.class); + when(netEnv.getPartitionManager()).thenReturn(new ResultPartitionManager()); + when(netEnv.getTaskEventDispatcher()).thenReturn(new TaskEventDispatcher()); + when(netEnv.getPartitionStateChecker()).thenReturn(mock(PartitionStateChecker.class)); + when(netEnv.getPartitionRequestInitialAndMaxBackoff()).thenReturn(new Tuple2<>(initialBackoff, maxBackoff)); + when(netEnv.getConnectionManager()).thenReturn(new LocalConnectionManager()); + + SingleInputGate gate = SingleInputGate.create( + "TestTask", + new JobID(), + new ExecutionAttemptID(), + gateDesc, + netEnv, + new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + + Map<IntermediateResultPartitionID, InputChannel> channelMap = gate.getInputChannels(); + + assertEquals(3, channelMap.size()); + InputChannel localChannel = channelMap.get(partitionIds[0].getPartitionId()); + assertEquals(LocalInputChannel.class, localChannel.getClass()); + + InputChannel remoteChannel = channelMap.get(partitionIds[1].getPartitionId()); + assertEquals(RemoteInputChannel.class, remoteChannel.getClass()); + + InputChannel unknownChannel = channelMap.get(partitionIds[2].getPartitionId()); + assertEquals(UnknownInputChannel.class, unknownChannel.getClass()); + + InputChannel[] channels = new InputChannel[]{localChannel, remoteChannel, unknownChannel}; + for (InputChannel ch : channels) { + assertEquals(0, ch.getCurrentBackoff()); + + assertTrue(ch.increaseBackoff()); + assertEquals(initialBackoff, ch.getCurrentBackoff()); + + assertTrue(ch.increaseBackoff()); + assertEquals(initialBackoff * 2, ch.getCurrentBackoff()); + + assertTrue(ch.increaseBackoff()); + assertEquals(initialBackoff * 2 * 2, ch.getCurrentBackoff()); + + assertTrue(ch.increaseBackoff()); + assertEquals(maxBackoff, ch.getCurrentBackoff()); + + assertFalse(ch.increaseBackoff()); + } + } + + /** * Returns whether the stack trace represents a Thread in a blocking queue * poll call. * http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index 60bf8e7..b4c456c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.taskmanager; -import static org.junit.Assert.*; - import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Kill; @@ -42,7 +40,6 @@ import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService; import org.apache.flink.runtime.memory.MemoryManager; - import org.apache.flink.runtime.messages.TaskManagerMessages; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.util.LeaderRetrievalUtils; @@ -54,6 +51,10 @@ import scala.concurrent.duration.FiniteDuration; import java.net.InetAddress; import java.util.concurrent.TimeUnit; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class TaskManagerComponentsStartupShutdownTest { /** @@ -98,8 +99,7 @@ public class TaskManagerComponentsStartupShutdownTest { config); final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration( - 32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, Option.<NettyConfig>empty(), - new Tuple2<Integer, Integer>(0, 0)); + 32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, new Tuple2<>(0, 0), Option.<NettyConfig>empty()); final InstanceConnectionInfo connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost(), 10000); http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index 431cbb8..f2fd859 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -903,6 +903,8 @@ public class TaskManagerTest extends TestLogger { final int dataPort = NetUtils.getAvailablePort(); Configuration config = new Configuration(); config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort); + config.setInteger(ConfigConstants.NETWORK_REQUEST_BACKOFF_INITIAL_KEY, 100); + config.setInteger(ConfigConstants.NETWORK_REQUEST_BACKOFF_MAX_KEY, 200); taskManager = TestingUtils.createTaskManager( system, @@ -999,6 +1001,8 @@ public class TaskManagerTest extends TestLogger { final int dataPort = NetUtils.getAvailablePort(); final Configuration config = new Configuration(); + config.setInteger(ConfigConstants.NETWORK_REQUEST_BACKOFF_INITIAL_KEY, 100); + config.setInteger(ConfigConstants.NETWORK_REQUEST_BACKOFF_MAX_KEY, 200); config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
