Repository: flink Updated Branches: refs/heads/master 47db9cb1a -> a078666d4
http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index 8177bf7..7953ceb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -25,6 +25,8 @@ import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.execution.CancelTaskException; @@ -37,7 +39,7 @@ import org.apache.flink.runtime.filecache.FileCache; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.NetworkEnvironment; -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; +import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; @@ -46,6 +48,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; +import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.messages.TaskManagerMessages; import org.apache.flink.runtime.messages.TaskMessages; @@ -68,6 +71,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -258,14 +262,14 @@ public class TaskTest extends TestLogger { // mock a network manager that rejects registration ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class); - PartitionStateChecker partitionStateChecker = mock(PartitionStateChecker.class); + PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class); Executor executor = mock(Executor.class); NetworkEnvironment network = mock(NetworkEnvironment.class); when(network.getResultPartitionManager()).thenReturn(partitionManager); when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC); doThrow(new RuntimeException("buffers")).when(network).registerTask(any(Task.class)); - Task task = createTask(TestInvokableCorrect.class, libCache, network, consumableNotifier, partitionStateChecker, executor); + Task task = createTask(TestInvokableCorrect.class, libCache, network, consumableNotifier, partitionProducerStateChecker, executor); task.registerExecutionListener(listener); @@ -544,7 +548,7 @@ public class TaskTest extends TestLogger { // Set the mock input gate setInputGate(task, inputGate); - // Expected task state for each partition state + // Expected task state for each producer state final Map<ExecutionState, ExecutionState> expected = new HashMap<>(ExecutionState.values().length); // Fail the task for unexpected states @@ -556,7 +560,7 @@ public class TaskTest extends TestLogger { expected.put(ExecutionState.SCHEDULED, ExecutionState.RUNNING); expected.put(ExecutionState.DEPLOYING, ExecutionState.RUNNING); expected.put(ExecutionState.FINISHED, ExecutionState.RUNNING); - + expected.put(ExecutionState.CANCELED, ExecutionState.CANCELING); expected.put(ExecutionState.CANCELING, ExecutionState.CANCELING); expected.put(ExecutionState.FAILED, ExecutionState.CANCELING); @@ -564,7 +568,7 @@ public class TaskTest extends TestLogger { for (ExecutionState state : ExecutionState.values()) { setState(task, ExecutionState.RUNNING); - task.onPartitionStateUpdate(resultId, partitionId.getPartitionId(), state); + task.onPartitionStateUpdate(resultId, partitionId, state); ExecutionState newTaskState = task.getExecutionState(); @@ -575,6 +579,126 @@ public class TaskTest extends TestLogger { } /** + * Tests the trigger partition state update future completions. + */ + @Test + public void testTriggerPartitionStateUpdate() throws Exception { + IntermediateDataSetID resultId = new IntermediateDataSetID(); + ResultPartitionID partitionId = new ResultPartitionID(); + + LibraryCacheManager libCache = mock(LibraryCacheManager.class); + when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader()); + + PartitionProducerStateChecker partitionChecker = mock(PartitionProducerStateChecker.class); + + ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class); + NetworkEnvironment network = mock(NetworkEnvironment.class); + when(network.getResultPartitionManager()).thenReturn(mock(ResultPartitionManager.class)); + when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC); + when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))) + .thenReturn(mock(TaskKvStateRegistry.class)); + + createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor()); + + // Test all branches of trigger partition state check + + { + // Reset latches + createQueuesAndActors(); + + // PartitionProducerDisposedException + Task task = createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor()); + + FlinkCompletableFuture<ExecutionState> promise = new FlinkCompletableFuture<>(); + when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise); + + task.triggerPartitionProducerStateCheck(task.getJobID(), resultId, partitionId); + + promise.completeExceptionally(new PartitionProducerDisposedException(partitionId)); + assertEquals(ExecutionState.CANCELING, task.getExecutionState()); + } + + { + // Reset latches + createQueuesAndActors(); + + // Any other exception + Task task = createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor()); + + FlinkCompletableFuture<ExecutionState> promise = new FlinkCompletableFuture<>(); + when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise); + + task.triggerPartitionProducerStateCheck(task.getJobID(), resultId, partitionId); + + promise.completeExceptionally(new RuntimeException("Any other exception")); + + assertEquals(ExecutionState.FAILED, task.getExecutionState()); + } + + { + // Reset latches + createQueuesAndActors(); + + // TimeoutException handled special => retry + Task task = createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor()); + SingleInputGate inputGate = mock(SingleInputGate.class); + when(inputGate.getConsumedResultId()).thenReturn(resultId); + + try { + task.startTaskThread(); + awaitLatch.await(); + + setInputGate(task, inputGate); + + FlinkCompletableFuture<ExecutionState> promise = new FlinkCompletableFuture<>(); + when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise); + + task.triggerPartitionProducerStateCheck(task.getJobID(), resultId, partitionId); + + promise.completeExceptionally(new TimeoutException()); + + assertEquals(ExecutionState.RUNNING, task.getExecutionState()); + + verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId())); + } finally { + task.getExecutingThread().interrupt(); + task.getExecutingThread().join(); + } + } + + { + // Reset latches + createQueuesAndActors(); + + // Success + Task task = createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor()); + SingleInputGate inputGate = mock(SingleInputGate.class); + when(inputGate.getConsumedResultId()).thenReturn(resultId); + + try { + task.startTaskThread(); + awaitLatch.await(); + + setInputGate(task, inputGate); + + FlinkCompletableFuture<ExecutionState> promise = new FlinkCompletableFuture<>(); + when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise); + + task.triggerPartitionProducerStateCheck(task.getJobID(), resultId, partitionId); + + promise.complete(ExecutionState.RUNNING); + + assertEquals(ExecutionState.RUNNING, task.getExecutionState()); + + verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId())); + } finally { + task.getExecutingThread().interrupt(); + task.getExecutingThread().join(); + } + } + } + + /** * Tests that interrupt happens via watch dog if canceller is stuck in cancel. * Task cancellation blocks the task canceller. Interrupt after cancel via * cancellation watch dog. @@ -753,7 +877,7 @@ public class TaskTest extends TestLogger { ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class); - PartitionStateChecker partitionStateChecker = mock(PartitionStateChecker.class); + PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class); Executor executor = mock(Executor.class); NetworkEnvironment network = mock(NetworkEnvironment.class); when(network.getResultPartitionManager()).thenReturn(partitionManager); @@ -761,7 +885,7 @@ public class TaskTest extends TestLogger { when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))) .thenReturn(mock(TaskKvStateRegistry.class)); - return createTask(invokable, libCache, network, consumableNotifier, partitionStateChecker, executor, config, execConfig); + return createTask(invokable, libCache, network, consumableNotifier, partitionProducerStateChecker, executor, config, execConfig); } private Task createTask( @@ -769,9 +893,9 @@ public class TaskTest extends TestLogger { LibraryCacheManager libCache, NetworkEnvironment networkEnvironment, ResultPartitionConsumableNotifier consumableNotifier, - PartitionStateChecker partitionStateChecker, + PartitionProducerStateChecker partitionProducerStateChecker, Executor executor) throws IOException { - return createTask(invokable, libCache, networkEnvironment, consumableNotifier, partitionStateChecker, executor, new Configuration(), new ExecutionConfig()); + return createTask(invokable, libCache, networkEnvironment, consumableNotifier, partitionProducerStateChecker, executor, new Configuration(), new ExecutionConfig()); } private Task createTask( @@ -779,7 +903,7 @@ public class TaskTest extends TestLogger { LibraryCacheManager libCache, NetworkEnvironment networkEnvironment, ResultPartitionConsumableNotifier consumableNotifier, - PartitionStateChecker partitionStateChecker, + PartitionProducerStateChecker partitionProducerStateChecker, Executor executor, Configuration taskManagerConfig, ExecutionConfig execConfig) throws IOException { @@ -837,7 +961,7 @@ public class TaskTest extends TestLogger { new TaskManagerRuntimeInfo("localhost", taskManagerConfig, System.getProperty("java.io.tmpdir")), mock(TaskMetricGroup.class), consumableNotifier, - partitionStateChecker, + partitionProducerStateChecker, executor); } http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java index 6e96400..291fd5f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java @@ -39,7 +39,7 @@ import org.apache.flink.runtime.executiongraph.TaskInformation; import org.apache.flink.runtime.filecache.FileCache; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.NetworkEnvironment; -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; +import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; @@ -157,9 +157,8 @@ public class BlockingCheckpointsTest { "localhost", new Configuration(), EnvironmentInformation.getTemporaryFileDirectory()), new UnregisteredTaskMetricsGroup(), mock(ResultPartitionConsumableNotifier.class), - mock(PartitionStateChecker.class), + mock(PartitionProducerStateChecker.class), Executors.directExecutor()); - } // ------------------------------------------------------------------------ @@ -297,4 +296,4 @@ public class BlockingCheckpointsTest { @Override protected void cancelTask() {} } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java index 1f79384..6cde30f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java @@ -35,7 +35,7 @@ import org.apache.flink.runtime.executiongraph.TaskInformation; import org.apache.flink.runtime.filecache.FileCache; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.NetworkEnvironment; -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; +import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; @@ -180,7 +180,7 @@ public class InterruptSensitiveRestoreTest { "localhost", new Configuration(), EnvironmentInformation.getTemporaryFileDirectory()), new UnregisteredTaskMetricsGroup(), mock(ResultPartitionConsumableNotifier.class), - mock(PartitionStateChecker.class), + mock(PartitionProducerStateChecker.class), mock(Executor.class)); } http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 3fe8a37..d04c456 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -38,7 +38,7 @@ import org.apache.flink.runtime.executiongraph.TaskInformation; import org.apache.flink.runtime.filecache.FileCache; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.NetworkEnvironment; -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; +import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -256,7 +256,7 @@ public class StreamTaskTest { ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class); - PartitionStateChecker partitionStateChecker = mock(PartitionStateChecker.class); + PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class); Executor executor = mock(Executor.class); NetworkEnvironment network = mock(NetworkEnvironment.class); @@ -303,7 +303,7 @@ public class StreamTaskTest { new TaskManagerRuntimeInfo("localhost", taskManagerConfig, System.getProperty("java.io.tmpdir")), new UnregisteredTaskMetricsGroup(), consumableNotifier, - partitionStateChecker, + partitionProducerStateChecker, executor); }
