Repository: flink
Updated Branches:
  refs/heads/master 47db9cb1a -> a078666d4


http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 8177bf7..7953ceb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -25,6 +25,8 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.execution.CancelTaskException;
@@ -37,7 +39,7 @@ import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
@@ -46,6 +48,7 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.messages.TaskMessages;
@@ -68,6 +71,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -258,14 +262,14 @@ public class TaskTest extends TestLogger {
                        // mock a network manager that rejects registration
                        ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
                        ResultPartitionConsumableNotifier consumableNotifier = 
mock(ResultPartitionConsumableNotifier.class);
-                       PartitionStateChecker partitionStateChecker = 
mock(PartitionStateChecker.class);
+                       PartitionProducerStateChecker 
partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
                        Executor executor = mock(Executor.class);
                        NetworkEnvironment network = 
mock(NetworkEnvironment.class);
                        
when(network.getResultPartitionManager()).thenReturn(partitionManager);
                        
when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
                        doThrow(new 
RuntimeException("buffers")).when(network).registerTask(any(Task.class));
 
-                       Task task = createTask(TestInvokableCorrect.class, 
libCache, network, consumableNotifier, partitionStateChecker, executor);
+                       Task task = createTask(TestInvokableCorrect.class, 
libCache, network, consumableNotifier, partitionProducerStateChecker, executor);
 
                        task.registerExecutionListener(listener);
 
@@ -544,7 +548,7 @@ public class TaskTest extends TestLogger {
                // Set the mock input gate
                setInputGate(task, inputGate);
 
-               // Expected task state for each partition state
+               // Expected task state for each producer state
                final Map<ExecutionState, ExecutionState> expected = new 
HashMap<>(ExecutionState.values().length);
 
                // Fail the task for unexpected states
@@ -556,7 +560,7 @@ public class TaskTest extends TestLogger {
                expected.put(ExecutionState.SCHEDULED, ExecutionState.RUNNING);
                expected.put(ExecutionState.DEPLOYING, ExecutionState.RUNNING);
                expected.put(ExecutionState.FINISHED, ExecutionState.RUNNING);
-
+               
                expected.put(ExecutionState.CANCELED, ExecutionState.CANCELING);
                expected.put(ExecutionState.CANCELING, 
ExecutionState.CANCELING);
                expected.put(ExecutionState.FAILED, ExecutionState.CANCELING);
@@ -564,7 +568,7 @@ public class TaskTest extends TestLogger {
                for (ExecutionState state : ExecutionState.values()) {
                        setState(task, ExecutionState.RUNNING);
 
-                       task.onPartitionStateUpdate(resultId, 
partitionId.getPartitionId(), state);
+                       task.onPartitionStateUpdate(resultId, partitionId, 
state);
 
                        ExecutionState newTaskState = task.getExecutionState();
 
@@ -575,6 +579,126 @@ public class TaskTest extends TestLogger {
        }
 
        /**
+        * Tests the trigger partition state update future completions.
+        */
+       @Test
+       public void testTriggerPartitionStateUpdate() throws Exception {
+               IntermediateDataSetID resultId = new IntermediateDataSetID();
+               ResultPartitionID partitionId = new ResultPartitionID();
+
+               LibraryCacheManager libCache = mock(LibraryCacheManager.class);
+               
when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
+
+               PartitionProducerStateChecker partitionChecker = 
mock(PartitionProducerStateChecker.class);
+
+               ResultPartitionConsumableNotifier consumableNotifier = 
mock(ResultPartitionConsumableNotifier.class);
+               NetworkEnvironment network = mock(NetworkEnvironment.class);
+               
when(network.getResultPartitionManager()).thenReturn(mock(ResultPartitionManager.class));
+               
when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
+               when(network.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class)))
+                       .thenReturn(mock(TaskKvStateRegistry.class));
+
+               createTask(InvokableBlockingInInvoke.class, libCache, network, 
consumableNotifier, partitionChecker, Executors.directExecutor());
+
+               // Test all branches of trigger partition state check
+
+               {
+                       // Reset latches
+                       createQueuesAndActors();
+
+                       // PartitionProducerDisposedException
+                       Task task = createTask(InvokableBlockingInInvoke.class, 
libCache, network, consumableNotifier, partitionChecker, 
Executors.directExecutor());
+
+                       FlinkCompletableFuture<ExecutionState> promise = new 
FlinkCompletableFuture<>();
+                       
when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), 
eq(resultId), eq(partitionId))).thenReturn(promise);
+
+                       
task.triggerPartitionProducerStateCheck(task.getJobID(), resultId, partitionId);
+
+                       promise.completeExceptionally(new 
PartitionProducerDisposedException(partitionId));
+                       assertEquals(ExecutionState.CANCELING, 
task.getExecutionState());
+               }
+
+               {
+                       // Reset latches
+                       createQueuesAndActors();
+
+                       // Any other exception
+                       Task task = createTask(InvokableBlockingInInvoke.class, 
libCache, network, consumableNotifier, partitionChecker, 
Executors.directExecutor());
+
+                       FlinkCompletableFuture<ExecutionState> promise = new 
FlinkCompletableFuture<>();
+                       
when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), 
eq(resultId), eq(partitionId))).thenReturn(promise);
+
+                       
task.triggerPartitionProducerStateCheck(task.getJobID(), resultId, partitionId);
+
+                       promise.completeExceptionally(new RuntimeException("Any 
other exception"));
+
+                       assertEquals(ExecutionState.FAILED, 
task.getExecutionState());
+               }
+
+               {
+                       // Reset latches
+                       createQueuesAndActors();
+
+                       // TimeoutException handled special => retry
+                       Task task = createTask(InvokableBlockingInInvoke.class, 
libCache, network, consumableNotifier, partitionChecker, 
Executors.directExecutor());
+                       SingleInputGate inputGate = mock(SingleInputGate.class);
+                       
when(inputGate.getConsumedResultId()).thenReturn(resultId);
+
+                       try {
+                               task.startTaskThread();
+                               awaitLatch.await();
+
+                               setInputGate(task, inputGate);
+
+                               FlinkCompletableFuture<ExecutionState> promise 
= new FlinkCompletableFuture<>();
+                               
when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), 
eq(resultId), eq(partitionId))).thenReturn(promise);
+
+                               
task.triggerPartitionProducerStateCheck(task.getJobID(), resultId, partitionId);
+
+                               promise.completeExceptionally(new 
TimeoutException());
+
+                               assertEquals(ExecutionState.RUNNING, 
task.getExecutionState());
+
+                               verify(inputGate, 
times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId()));
+                       } finally {
+                               task.getExecutingThread().interrupt();
+                               task.getExecutingThread().join();
+                       }
+               }
+
+               {
+                       // Reset latches
+                       createQueuesAndActors();
+
+                       // Success
+                       Task task = createTask(InvokableBlockingInInvoke.class, 
libCache, network, consumableNotifier, partitionChecker, 
Executors.directExecutor());
+                       SingleInputGate inputGate = mock(SingleInputGate.class);
+                       
when(inputGate.getConsumedResultId()).thenReturn(resultId);
+
+                       try {
+                               task.startTaskThread();
+                               awaitLatch.await();
+
+                               setInputGate(task, inputGate);
+
+                               FlinkCompletableFuture<ExecutionState> promise 
= new FlinkCompletableFuture<>();
+                               
when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), 
eq(resultId), eq(partitionId))).thenReturn(promise);
+
+                               
task.triggerPartitionProducerStateCheck(task.getJobID(), resultId, partitionId);
+
+                               promise.complete(ExecutionState.RUNNING);
+
+                               assertEquals(ExecutionState.RUNNING, 
task.getExecutionState());
+
+                               verify(inputGate, 
times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId()));
+                       } finally {
+                               task.getExecutingThread().interrupt();
+                               task.getExecutingThread().join();
+                       }
+               }
+       }
+
+       /**
         * Tests that interrupt happens via watch dog if canceller is stuck in 
cancel.
         * Task cancellation blocks the task canceller. Interrupt after cancel 
via
         * cancellation watch dog.
@@ -753,7 +877,7 @@ public class TaskTest extends TestLogger {
 
                ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
                ResultPartitionConsumableNotifier consumableNotifier = 
mock(ResultPartitionConsumableNotifier.class);
-               PartitionStateChecker partitionStateChecker = 
mock(PartitionStateChecker.class);
+               PartitionProducerStateChecker partitionProducerStateChecker = 
mock(PartitionProducerStateChecker.class);
                Executor executor = mock(Executor.class);
                NetworkEnvironment network = mock(NetworkEnvironment.class);
                
when(network.getResultPartitionManager()).thenReturn(partitionManager);
@@ -761,7 +885,7 @@ public class TaskTest extends TestLogger {
                when(network.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class)))
                                .thenReturn(mock(TaskKvStateRegistry.class));
 
-               return createTask(invokable, libCache, network, 
consumableNotifier, partitionStateChecker, executor, config, execConfig);
+               return createTask(invokable, libCache, network, 
consumableNotifier, partitionProducerStateChecker, executor, config, 
execConfig);
        }
 
        private Task createTask(
@@ -769,9 +893,9 @@ public class TaskTest extends TestLogger {
                        LibraryCacheManager libCache,
                        NetworkEnvironment networkEnvironment,
                        ResultPartitionConsumableNotifier consumableNotifier,
-                       PartitionStateChecker partitionStateChecker,
+                       PartitionProducerStateChecker 
partitionProducerStateChecker,
                        Executor executor) throws IOException {
-               return createTask(invokable, libCache, networkEnvironment, 
consumableNotifier, partitionStateChecker, executor, new Configuration(), new 
ExecutionConfig());
+               return createTask(invokable, libCache, networkEnvironment, 
consumableNotifier, partitionProducerStateChecker, executor, new 
Configuration(), new ExecutionConfig());
        }
        
        private Task createTask(
@@ -779,7 +903,7 @@ public class TaskTest extends TestLogger {
                LibraryCacheManager libCache,
                NetworkEnvironment networkEnvironment,
                ResultPartitionConsumableNotifier consumableNotifier,
-               PartitionStateChecker partitionStateChecker,
+               PartitionProducerStateChecker partitionProducerStateChecker,
                Executor executor,
                Configuration taskManagerConfig,
                ExecutionConfig execConfig) throws IOException {
@@ -837,7 +961,7 @@ public class TaskTest extends TestLogger {
                        new TaskManagerRuntimeInfo("localhost", 
taskManagerConfig, System.getProperty("java.io.tmpdir")),
                        mock(TaskMetricGroup.class),
                        consumableNotifier,
-                       partitionStateChecker,
+                       partitionProducerStateChecker,
                        executor);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
index 6e96400..291fd5f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -39,7 +39,7 @@ import 
org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -157,9 +157,8 @@ public class BlockingCheckpointsTest {
                                                "localhost", new 
Configuration(), EnvironmentInformation.getTemporaryFileDirectory()),
                                new UnregisteredTaskMetricsGroup(),
                                mock(ResultPartitionConsumableNotifier.class),
-                               mock(PartitionStateChecker.class),
+                               mock(PartitionProducerStateChecker.class),
                                Executors.directExecutor());
-
        }
 
        // 
------------------------------------------------------------------------
@@ -297,4 +296,4 @@ public class BlockingCheckpointsTest {
                @Override
                protected void cancelTask() {}
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 1f79384..6cde30f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -35,7 +35,7 @@ import 
org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -180,7 +180,7 @@ public class InterruptSensitiveRestoreTest {
                                        "localhost", new Configuration(), 
EnvironmentInformation.getTemporaryFileDirectory()),
                        new UnregisteredTaskMetricsGroup(),
                        mock(ResultPartitionConsumableNotifier.class),
-                       mock(PartitionStateChecker.class),
+                       mock(PartitionProducerStateChecker.class),
                        mock(Executor.class));
 
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 3fe8a37..d04c456 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -38,7 +38,7 @@ import 
org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -256,7 +256,7 @@ public class StreamTaskTest {
                
                ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
                ResultPartitionConsumableNotifier consumableNotifier = 
mock(ResultPartitionConsumableNotifier.class);
-               PartitionStateChecker partitionStateChecker = 
mock(PartitionStateChecker.class);
+               PartitionProducerStateChecker partitionProducerStateChecker = 
mock(PartitionProducerStateChecker.class);
                Executor executor = mock(Executor.class);
 
                NetworkEnvironment network = mock(NetworkEnvironment.class);
@@ -303,7 +303,7 @@ public class StreamTaskTest {
                        new TaskManagerRuntimeInfo("localhost", 
taskManagerConfig, System.getProperty("java.io.tmpdir")),
                        new UnregisteredTaskMetricsGroup(),
                        consumableNotifier,
-                       partitionStateChecker,
+                       partitionProducerStateChecker,
                        executor);
        }
        

Reply via email to