[FLINK-5040] [taskmanager] Adjust partition request backoffs The back offs were hard coded before, which would have made it impossible to react to any potential problems with them.
This closes #2784. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5d5637b0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5d5637b0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5d5637b0 Branch: refs/heads/master Commit: 5d5637b01031746b2dfadf6d7fcd59155f7de653 Parents: 2742d5c Author: Ufuk Celebi <[email protected]> Authored: Thu Nov 10 11:15:47 2016 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Fri Nov 11 09:41:39 2016 +0100 ---------------------------------------------------------------------- .../flink/configuration/TaskManagerOptions.java | 24 ++++-- .../ResultPartitionDeploymentDescriptor.java | 8 +- .../partition/consumer/SingleInputGate.java | 10 ++- .../apache/flink/runtime/taskmanager/Task.java | 2 +- .../NetworkEnvironmentConfiguration.scala | 14 ++-- .../flink/runtime/taskmanager/TaskManager.scala | 9 ++- ...ResultPartitionDeploymentDescriptorTest.java | 2 +- .../partition/consumer/SingleInputGateTest.java | 84 ++++++++++++++++++++ ...askManagerComponentsStartupShutdownTest.java | 4 +- .../runtime/taskmanager/TaskManagerTest.java | 5 ++ 10 files changed, 140 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java index e5d36aa..6f6238b 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java @@ -35,6 +35,20 @@ public class TaskManagerOptions { // @TODO Migrate 'taskmanager.*' config options from ConfigConstants // ------------------------------------------------------------------------ + // Network Options + // ------------------------------------------------------------------------ + + /** Minimum backoff for partition requests of input channels. */ + public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL = + key("taskmanager.net.request-backoff.initial") + .defaultValue(100); + + /** Maximum backoff for partition requests of input channels. */ + public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_MAX = + key("taskmanager.net.request-backoff.max") + .defaultValue(10000); + + // ------------------------------------------------------------------------ // Task Options // ------------------------------------------------------------------------ @@ -44,8 +58,8 @@ public class TaskManagerOptions { */ public static final ConfigOption<Long> TASK_CANCELLATION_INTERVAL = key("task.cancellation.interval") - .defaultValue(30000L) - .withDeprecatedKeys("task.cancellation-interval"); + .defaultValue(30000L) + .withDeprecatedKeys("task.cancellation-interval"); /** * Timeout in milliseconds after which a task cancellation times out and @@ -54,19 +68,19 @@ public class TaskManagerOptions { */ public static final ConfigOption<Long> TASK_CANCELLATION_TIMEOUT = key("task.cancellation.timeout") - .defaultValue(180000L); + .defaultValue(180000L); /** * The maximum number of bytes that a checkpoint alignment may buffer. * If the checkpoint alignment buffers more than the configured amount of * data, the checkpoint is aborted (skipped). - * + * * <p>The default value of {@code -1} indicates that there is no limit. */ public static final ConfigOption<Long> TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT = key("task.checkpoint.alignment.max-size") .defaultValue(-1L); - + // ------------------------------------------------------------------------ /** Not intended to be instantiated */ http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java index 2ecde80..14c7d2a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java @@ -49,7 +49,7 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { private final int numberOfSubpartitions; /** Flag whether the result partition should send scheduleOrUpdateConsumer messages. */ - private final boolean lazyScheduling; + private final boolean sendScheduleOrUpdateConsumersMessage; public ResultPartitionDeploymentDescriptor( IntermediateDataSetID resultId, @@ -64,7 +64,7 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { checkArgument(numberOfSubpartitions >= 1); this.numberOfSubpartitions = numberOfSubpartitions; - this.lazyScheduling = lazyScheduling; + this.sendScheduleOrUpdateConsumersMessage = lazyScheduling; } public IntermediateDataSetID getResultId() { @@ -83,8 +83,8 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { return numberOfSubpartitions; } - public boolean allowLazyScheduling() { - return lazyScheduling; + public boolean sendScheduleOrUpdateConsumersMessage() { + return sendScheduleOrUpdateConsumersMessage; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index af5fd89..8f57542 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; import com.google.common.collect.Maps; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.taskmanager.TaskActions; @@ -520,6 +521,13 @@ public class SingleInputGate implements InputGate { // ------------------------------------------------------------------------ + @VisibleForTesting + Map<IntermediateResultPartitionID, InputChannel> getInputChannels() { + return inputChannels; + } + + // ------------------------------------------------------------------------ + /** * Creates an input gate and all of its input channels. */ @@ -565,7 +573,7 @@ public class SingleInputGate implements InputGate { partitionLocation.getConnectionId(), networkEnvironment.getConnectionManager(), networkEnvironment.getPartitionRequestInitialBackoff(), - networkEnvironment.getPartitionRequestInitialBackoff(), + networkEnvironment.getPartitionRequestMaxBackoff(), metrics ); } http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 4f3dd54..b960e68 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -346,7 +346,7 @@ public class Task implements Runnable, TaskActions { resultPartitionConsumableNotifier, ioManager, networkEnvironment.getDefaultIOMode(), - desc.allowLazyScheduling()); + desc.sendScheduleOrUpdateConsumersMessage()); writers[counter] = new ResultPartitionWriter(producedPartitions[counter]); http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala index 14589a1..6a59665 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala @@ -23,10 +23,10 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode import org.apache.flink.runtime.io.network.netty.NettyConfig case class NetworkEnvironmentConfiguration( - numNetworkBuffers: Int, - networkBufferSize: Int, - memoryType: MemoryType, - ioMode: IOMode, - nettyConfig: Option[NettyConfig] = None, - partitionRequestInitialBackoff: Int = 500, - partitinRequestMaxBackoff: Int = 3000) + numNetworkBuffers: Int, + networkBufferSize: Int, + memoryType: MemoryType, + ioMode: IOMode, + partitionRequestInitialBackoff : Int, + partitionRequestMaxBackoff : Int, + nettyConfig: Option[NettyConfig] = None) http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 4bb2da4..dd5d218 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -1982,7 +1982,7 @@ object TaskManager { kvStateServer, netConfig.ioMode, netConfig.partitionRequestInitialBackoff, - netConfig.partitinRequestMaxBackoff) + netConfig.partitionRequestMaxBackoff) network.start() @@ -2258,11 +2258,18 @@ object TaskManager { val ioMode : IOMode = if (syncOrAsync == "async") IOMode.ASYNC else IOMode.SYNC + val initialRequestBackoff = configuration.getInteger( + TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL) + val maxRequestBackoff = configuration.getInteger( + TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX) + val networkConfig = NetworkEnvironmentConfiguration( numNetworkBuffers, pageSize, memType, ioMode, + initialRequestBackoff, + maxRequestBackoff, nettyConfig) // ----> timeouts, library caching, profiling http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java index 4223b49..3ed8236 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java @@ -55,6 +55,6 @@ public class ResultPartitionDeploymentDescriptorTest { assertEquals(partitionId, copy.getPartitionId()); assertEquals(partitionType, copy.getPartitionType()); assertEquals(numberOfSubpartitions, copy.getNumberOfSubpartitions()); - assertTrue(copy.allowLazyScheduling()); + assertTrue(copy.sendScheduleOrUpdateConsumersMessage()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index 8f9ea9e..0b7b10d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -21,11 +21,14 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.api.common.JobID; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionLocation; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; import org.apache.flink.runtime.io.network.LocalConnectionManager; +import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferPool; @@ -42,9 +45,12 @@ import org.apache.flink.runtime.taskmanager.TaskActions; import org.junit.Test; import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -272,6 +278,84 @@ public class SingleInputGateTest { } /** + * Tests request back off configuration is correctly forwarded to the channels. + */ + @Test + public void testRequestBackoffConfiguration() throws Exception { + ResultPartitionID[] partitionIds = new ResultPartitionID[] { + new ResultPartitionID(), + new ResultPartitionID(), + new ResultPartitionID() + }; + + InputChannelDeploymentDescriptor[] channelDescs = new InputChannelDeploymentDescriptor[]{ + // Local + new InputChannelDeploymentDescriptor( + partitionIds[0], + ResultPartitionLocation.createLocal()), + // Remote + new InputChannelDeploymentDescriptor( + partitionIds[1], + ResultPartitionLocation.createRemote(new ConnectionID(new InetSocketAddress("localhost", 5000), 0))), + // Unknown + new InputChannelDeploymentDescriptor( + partitionIds[2], + ResultPartitionLocation.createUnknown())}; + + InputGateDeploymentDescriptor gateDesc = new InputGateDeploymentDescriptor(new IntermediateDataSetID(), 0, channelDescs); + + int initialBackoff = 137; + int maxBackoff = 1001; + + NetworkEnvironment netEnv = mock(NetworkEnvironment.class); + when(netEnv.getResultPartitionManager()).thenReturn(new ResultPartitionManager()); + when(netEnv.getTaskEventDispatcher()).thenReturn(new TaskEventDispatcher()); + when(netEnv.getPartitionRequestInitialBackoff()).thenReturn(initialBackoff); + when(netEnv.getPartitionRequestMaxBackoff()).thenReturn(maxBackoff); + when(netEnv.getConnectionManager()).thenReturn(new LocalConnectionManager()); + + SingleInputGate gate = SingleInputGate.create( + "TestTask", + new JobID(), + new ExecutionAttemptID(), + gateDesc, + netEnv, + mock(TaskActions.class), + new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + + Map<IntermediateResultPartitionID, InputChannel> channelMap = gate.getInputChannels(); + + assertEquals(3, channelMap.size()); + InputChannel localChannel = channelMap.get(partitionIds[0].getPartitionId()); + assertEquals(LocalInputChannel.class, localChannel.getClass()); + + InputChannel remoteChannel = channelMap.get(partitionIds[1].getPartitionId()); + assertEquals(RemoteInputChannel.class, remoteChannel.getClass()); + + InputChannel unknownChannel = channelMap.get(partitionIds[2].getPartitionId()); + assertEquals(UnknownInputChannel.class, unknownChannel.getClass()); + + InputChannel[] channels = new InputChannel[]{localChannel, remoteChannel, unknownChannel}; + for (InputChannel ch : channels) { + assertEquals(0, ch.getCurrentBackoff()); + + assertTrue(ch.increaseBackoff()); + assertEquals(initialBackoff, ch.getCurrentBackoff()); + + assertTrue(ch.increaseBackoff()); + assertEquals(initialBackoff * 2, ch.getCurrentBackoff()); + + assertTrue(ch.increaseBackoff()); + assertEquals(initialBackoff * 2 * 2, ch.getCurrentBackoff()); + + assertTrue(ch.increaseBackoff()); + assertEquals(maxBackoff, ch.getCurrentBackoff()); + + assertFalse(ch.increaseBackoff()); + } + } + + /** * Returns whether the stack trace represents a Thread in a blocking queue * poll call. * http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index 0bcd1ce..f9434e2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -104,7 +104,7 @@ public class TaskManagerComponentsStartupShutdownTest { config); final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration( - 32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, Option.<NettyConfig>empty(), 0, 0); + 32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, 0, 0, Option.<NettyConfig>empty()); ResourceID taskManagerId = ResourceID.generate(); @@ -121,7 +121,7 @@ public class TaskManagerComponentsStartupShutdownTest { null, netConf.ioMode(), netConf.partitionRequestInitialBackoff(), - netConf.partitinRequestMaxBackoff()); + netConf.partitionRequestMaxBackoff()); network.start(); http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index 22f0c60..fd9ff05 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -30,6 +30,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.FlinkUntypedActor; import org.apache.flink.runtime.blob.BlobKey; @@ -903,6 +904,8 @@ public class TaskManagerTest extends TestLogger { final int dataPort = NetUtils.getAvailablePort(); Configuration config = new Configuration(); config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort); + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100); + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200); taskManager = TestingUtils.createTaskManager( system, @@ -998,6 +1001,8 @@ public class TaskManagerTest extends TestLogger { jobManager = new AkkaActorGateway(jm, leaderSessionID); final Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100); + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200); taskManager = TestingUtils.createTaskManager( system,
