[FLINK-7748][network] Properly use the TaskEventDispatcher for subscribing to 
events

Previously, the ResultPartitionWriter implemented the EventListener interface
and was used for event registration, although event publishing was already
handled via the TaskEventDispatcher. Now, we use the TaskEventDispatcher for
both, event registration and publishing.

It also adds the TaskEventDispatcher to the Environment information for a task
to be able to work with it (only IterationHeadTask so far).

This closes #4761.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/175e1b38
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/175e1b38
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/175e1b38

Branch: refs/heads/master
Commit: 175e1b3871b13fee3e423aef87cb45ceed409783
Parents: c5efb1f
Author: Nico Kruber <[email protected]>
Authored: Tue Aug 29 18:24:00 2017 +0200
Committer: zentol <[email protected]>
Committed: Tue Dec 12 17:01:14 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/execution/Environment.java    |   3 +
 .../runtime/io/network/NetworkEnvironment.java  |   4 +-
 .../runtime/io/network/TaskEventDispatcher.java | 110 +++++++++---
 .../api/writer/ResultPartitionWriter.java       |  20 +--
 .../iterative/task/IterationHeadTask.java       |   8 +-
 .../runtime/taskmanager/RuntimeEnvironment.java |  10 ++
 .../apache/flink/runtime/taskmanager/Task.java  |  28 ++-
 .../io/network/TaskEventDispatcherTest.java     | 180 +++++++++++++++++++
 .../operators/testutils/DummyEnvironment.java   |   5 +
 .../operators/testutils/MockEnvironment.java    |   8 +
 .../taskexecutor/TaskExecutorITCase.java        |   3 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  |   3 +
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   3 +
 .../flink/runtime/taskmanager/TaskTest.java     |   7 +
 .../runtime/util/JvmExitOnFatalErrorTest.java   |   3 +
 .../tasks/InterruptSensitiveRestoreTest.java    |   3 +
 .../runtime/tasks/StreamMockEnvironment.java    |   8 +
 .../tasks/StreamTaskTerminationTest.java        |   3 +
 .../streaming/runtime/tasks/StreamTaskTest.java |   3 +
 .../tasks/TaskCheckpointingBehaviourTest.java   |   3 +
 20 files changed, 358 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 203ee85..ad66c57 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -209,4 +210,6 @@ public interface Environment {
        InputGate getInputGate(int index);
 
        InputGate[] getAllInputGates();
+
+       TaskEventDispatcher getTaskEventDispatcher();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 71d0386..f2619e8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -214,7 +214,7 @@ public class NetworkEnvironment {
                                }
 
                                // Register writer with task event dispatcher
-                               
taskEventDispatcher.registerWriterForIncomingTaskEvents(writer.getPartitionId(),
 writer);
+                               
taskEventDispatcher.registerPartition(writer.getPartitionId());
                        }
 
                        // Setup the buffer pool for each buffer reader
@@ -266,7 +266,7 @@ public class NetworkEnvironment {
                        ResultPartitionWriter[] writers = task.getAllWriters();
                        if (writers != null) {
                                for (ResultPartitionWriter writer : writers) {
-                                       
taskEventDispatcher.unregisterWriter(writer);
+                                       
taskEventDispatcher.unregisterPartition(writer.getPartitionId());
                                }
                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
index 8816e32..1ec4ade 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
@@ -19,70 +19,126 @@
 package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.api.TaskEventHandler;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import 
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.util.event.EventListener;
 
-import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * The task event dispatcher dispatches events flowing backwards from a 
consuming task to the task
  * producing the consumed result.
  *
- * <p> Backwards events only work for tasks, which produce pipelined results, 
where both the
+ * <p>Backwards events only work for tasks, which produce pipelined results, 
where both the
  * producing and consuming task are running at the same time.
  */
 public class TaskEventDispatcher {
+       private static final Logger LOG = 
LoggerFactory.getLogger(TaskEventDispatcher.class);
 
-       private final Map<ResultPartitionID, ResultPartitionWriter> 
registeredWriters = Maps.newHashMap();
+       private final Map<ResultPartitionID, TaskEventHandler> 
registeredHandlers = new HashMap<>();
 
-       public void registerWriterForIncomingTaskEvents(ResultPartitionID 
partitionId, ResultPartitionWriter writer) {
-               synchronized (registeredWriters) {
-                       if (registeredWriters.put(partitionId, writer) != null) 
{
-                               throw new IllegalStateException("Already 
registered at task event dispatcher.");
+       /**
+        * Registers the given partition for incoming task events allowing 
calls to {@link
+        * #subscribeToEvent(ResultPartitionID, EventListener, Class)}.
+        *
+        * @param partitionId
+        *              the partition ID
+        */
+       public void registerPartition(ResultPartitionID partitionId) {
+               checkNotNull(partitionId);
+
+               synchronized (registeredHandlers) {
+                       LOG.debug("registering {}", partitionId);
+                       if (registeredHandlers.put(partitionId, new 
TaskEventHandler()) != null) {
+                               throw new IllegalStateException(
+                                       "Partition " + partitionId + " already 
registered at task event dispatcher.");
                        }
                }
        }
 
-       public void unregisterWriter(ResultPartitionWriter writer) {
-               synchronized (registeredWriters) {
-                       registeredWriters.remove(writer.getPartitionId());
+       /**
+        * Removes the given partition from listening to incoming task events, 
thus forbidding calls to
+        * {@link #subscribeToEvent(ResultPartitionID, EventListener, Class)}.
+        *
+        * @param partitionId
+        *              the partition ID
+        */
+       public void unregisterPartition(ResultPartitionID partitionId) {
+               checkNotNull(partitionId);
+
+               synchronized (registeredHandlers) {
+                       LOG.debug("unregistering {}", partitionId);
+                       // NOTE: tolerate un-registration of non-registered 
task (unregister is always called
+                       //       in the cleanup phase of a task even if it 
never came to the registration - see
+                       //       Task.java)
+                       registeredHandlers.remove(partitionId);
                }
        }
 
        /**
-        * Publishes the event to the registered {@link ResultPartitionWriter} 
instances.
-        * <p>
-        * This method is either called directly from a {@link 
LocalInputChannel} or the network I/O
+        * Subscribes a listener to this dispatcher for events on a partition.
+        *
+        * @param partitionId
+        *              ID of the partition to subscribe for (must be 
registered via {@link
+        *              #registerPartition(ResultPartitionID)} first!)
+        * @param eventListener
+        *              the event listener to subscribe
+        * @param eventType
+        *              event type to subscribe to
+        */
+       public void subscribeToEvent(
+                       ResultPartitionID partitionId,
+                       EventListener<TaskEvent> eventListener,
+                       Class<? extends TaskEvent> eventType) {
+               checkNotNull(partitionId);
+               checkNotNull(eventListener);
+               checkNotNull(eventType);
+
+               TaskEventHandler taskEventHandler = 
registeredHandlers.get(partitionId);
+               if (taskEventHandler == null) {
+                       throw new IllegalStateException(
+                               "Partition " + partitionId + " not registered 
at task event dispatcher.");
+               }
+               taskEventHandler.subscribe(eventListener, eventType);
+       }
+
+       /**
+        * Publishes the event to the registered {@link EventListener} 
instances.
+        *
+        * <p>This method is either called directly from a {@link 
LocalInputChannel} or the network I/O
         * thread on behalf of a {@link RemoteInputChannel}.
+        *
+        * @return whether the event was published to a registered event 
handler (initiated via {@link
+        * #registerPartition(ResultPartitionID)}) or not
         */
        public boolean publish(ResultPartitionID partitionId, TaskEvent event) {
-               EventListener<TaskEvent> listener = 
registeredWriters.get(partitionId);
+               checkNotNull(partitionId);
+               checkNotNull(event);
 
-               if (listener != null) {
-                       listener.onEvent(event);
+               TaskEventHandler taskEventHandler = 
registeredHandlers.get(partitionId);
+
+               if (taskEventHandler != null) {
+                       taskEventHandler.publish(event);
                        return true;
                }
 
                return false;
        }
 
-       public void clearAll() {
-               synchronized (registeredWriters) {
-                       registeredWriters.clear();
-               }
-       }
-
        /**
-        * Returns the number of currently registered writers.
+        * Removes all registered event handlers.
         */
-       int getNumberOfRegisteredWriters() {
-               synchronized (registeredWriters) {
-                       return registeredWriters.size();
+       public void clearAll() {
+               synchronized (registeredHandlers) {
+                       registeredHandlers.clear();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 57c7098..777c7ad 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -18,13 +18,10 @@
 
 package org.apache.flink.runtime.io.network.api.writer;
 
-import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.io.network.api.TaskEventHandler;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.util.event.EventListener;
 
 import java.io.IOException;
 
@@ -34,12 +31,10 @@ import java.io.IOException;
  * The {@link ResultPartitionWriter} is the runtime API for producing results. 
It
  * supports two kinds of data to be sent: buffers and events.
  */
-public class ResultPartitionWriter implements EventListener<TaskEvent> {
+public class ResultPartitionWriter {
 
        private final ResultPartition partition;
 
-       private final TaskEventHandler taskEventHandler = new 
TaskEventHandler();
-
        public ResultPartitionWriter(ResultPartition partition) {
                this.partition = partition;
        }
@@ -94,17 +89,4 @@ public class ResultPartitionWriter implements 
EventListener<TaskEvent> {
                        eventBuffer.recycle();
                }
        }
-
-       // 
------------------------------------------------------------------------
-       // Event handling
-       // 
------------------------------------------------------------------------
-
-       public void subscribeToEvent(EventListener<TaskEvent> eventListener, 
Class<? extends TaskEvent> eventType) {
-               taskEventHandler.subscribe(eventListener, eventType);
-       }
-
-       @Override
-       public void onEvent(TaskEvent event) {
-               taskEventHandler.publish(event);
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
index b673ba0..65dd8ac 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
@@ -27,10 +27,12 @@ import 
org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.InputViewIterator;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
 import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannelBroker;
 import org.apache.flink.runtime.iterative.concurrent.Broker;
@@ -223,8 +225,10 @@ public class IterationHeadTask<X, Y, S extends Function, 
OT> extends AbstractIte
 
        private SuperstepBarrier initSuperstepBarrier() {
                SuperstepBarrier barrier = new 
SuperstepBarrier(getUserCodeClassLoader());
-               this.toSync.subscribeToEvent(barrier, 
AllWorkersDoneEvent.class);
-               this.toSync.subscribeToEvent(barrier, TerminationEvent.class);
+               TaskEventDispatcher taskEventDispatcher = 
getEnvironment().getTaskEventDispatcher();
+               ResultPartitionID partitionId = toSync.getPartitionId();
+               taskEventDispatcher.subscribeToEvent(partitionId, barrier, 
AllWorkersDoneEvent.class);
+               taskEventDispatcher.subscribeToEvent(partitionId, barrier, 
TerminationEvent.class);
                return barrier;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 92b5886..60738f0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -69,6 +70,8 @@ public class RuntimeEnvironment implements Environment {
 
        private final ResultPartitionWriter[] writers;
        private final InputGate[] inputGates;
+
+       private final TaskEventDispatcher taskEventDispatcher;
        
        private final CheckpointResponder checkpointResponder;
 
@@ -101,6 +104,7 @@ public class RuntimeEnvironment implements Environment {
                        Map<String, Future<Path>> distCacheEntries,
                        ResultPartitionWriter[] writers,
                        InputGate[] inputGates,
+                       TaskEventDispatcher taskEventDispatcher,
                        CheckpointResponder checkpointResponder,
                        TaskManagerRuntimeInfo taskManagerInfo,
                        TaskMetricGroup metrics,
@@ -123,6 +127,7 @@ public class RuntimeEnvironment implements Environment {
                this.distCacheEntries = checkNotNull(distCacheEntries);
                this.writers = checkNotNull(writers);
                this.inputGates = checkNotNull(inputGates);
+               this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
                this.checkpointResponder = checkNotNull(checkpointResponder);
                this.taskManagerInfo = checkNotNull(taskManagerInfo);
                this.containingTask = containingTask;
@@ -237,6 +242,11 @@ public class RuntimeEnvironment implements Environment {
        }
 
        @Override
+       public TaskEventDispatcher getTaskEventDispatcher() {
+               return taskEventDispatcher;
+       }
+
+       @Override
        public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics 
checkpointMetrics) {
                acknowledgeCheckpoint(checkpointId, checkpointMetrics, null);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 2cb356c..e54adb9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -667,12 +667,28 @@ public class Task implements Runnable, TaskActions {
                                        .createKvStateTaskRegistry(jobId, 
getJobVertexId());
 
                        Environment env = new RuntimeEnvironment(
-                               jobId, vertexId, executionId, executionConfig, 
taskInfo,
-                               jobConfiguration, taskConfiguration, 
userCodeClassLoader,
-                               memoryManager, ioManager, 
broadcastVariableManager,
-                               accumulatorRegistry, kvStateRegistry, 
inputSplitProvider,
-                               distributedCacheEntries, writers, inputGates,
-                               checkpointResponder, taskManagerConfig, 
metrics, this);
+                               jobId,
+                               vertexId,
+                               executionId,
+                               executionConfig,
+                               taskInfo,
+                               jobConfiguration,
+                               taskConfiguration,
+                               userCodeClassLoader,
+                               memoryManager,
+                               ioManager,
+                               broadcastVariableManager,
+                               accumulatorRegistry,
+                               kvStateRegistry,
+                               inputSplitProvider,
+                               distributedCacheEntries,
+                               writers,
+                               inputGates,
+                               network.getTaskEventDispatcher(),
+                               checkpointResponder,
+                               taskManagerConfig,
+                               metrics,
+                               this);
 
                        // let the task code create its readers and writers
                        invokable.setEnvironment(env);

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TaskEventDispatcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TaskEventDispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TaskEventDispatcherTest.java
new file mode 100644
index 0000000..41201cf
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TaskEventDispatcherTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
+import org.apache.flink.runtime.iterative.event.TerminationEvent;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Basic tests for {@link TaskEventDispatcher}.
+ */
+public class TaskEventDispatcherTest extends TestLogger {
+
+       @Rule
+       public ExpectedException expectedException = ExpectedException.none();
+
+       @Test
+       public void registerPartitionTwice() throws Exception {
+               ResultPartitionID partitionId = new ResultPartitionID();
+               TaskEventDispatcher ted = new TaskEventDispatcher();
+               ted.registerPartition(partitionId);
+
+               expectedException.expect(IllegalStateException.class);
+               expectedException.expectMessage("already registered at task 
event dispatcher");
+
+               ted.registerPartition(partitionId);
+       }
+
+       @Test
+       public void subscribeToEventNotRegistered() throws Exception {
+               TaskEventDispatcher ted = new TaskEventDispatcher();
+
+               expectedException.expect(IllegalStateException.class);
+               expectedException.expectMessage("not registered at task event 
dispatcher");
+
+               ted.subscribeToEvent(new ResultPartitionID(), new 
ZeroShotEventListener(), TaskEvent.class);
+       }
+
+       /**
+        * Tests {@link TaskEventDispatcher#publish(ResultPartitionID, 
TaskEvent)} and {@link TaskEventDispatcher#subscribeToEvent(ResultPartitionID, 
EventListener, Class)} methods.
+        */
+       @Test
+       public void publishSubscribe() throws Exception {
+               ResultPartitionID partitionId1 = new ResultPartitionID();
+               ResultPartitionID partitionId2 = new ResultPartitionID();
+               TaskEventDispatcher ted = new TaskEventDispatcher();
+
+               AllWorkersDoneEvent event1 = new AllWorkersDoneEvent();
+               TerminationEvent event2 = new TerminationEvent();
+               assertFalse(ted.publish(partitionId1, event1));
+
+               ted.registerPartition(partitionId1);
+               ted.registerPartition(partitionId2);
+
+               // no event listener subscribed yet, but the event is forwarded 
to a TaskEventHandler
+               assertTrue(ted.publish(partitionId1, event1));
+
+               OneShotEventListener eventListener1a = new 
OneShotEventListener(event1);
+               ZeroShotEventListener eventListener1b = new 
ZeroShotEventListener();
+               ZeroShotEventListener eventListener2 = new 
ZeroShotEventListener();
+               OneShotEventListener eventListener3 = new 
OneShotEventListener(event2);
+               ted.subscribeToEvent(partitionId1, eventListener1a, 
AllWorkersDoneEvent.class);
+               ted.subscribeToEvent(partitionId2, eventListener1b, 
AllWorkersDoneEvent.class);
+               ted.subscribeToEvent(partitionId1, eventListener2, 
TaskEvent.class);
+               ted.subscribeToEvent(partitionId1, eventListener3, 
TerminationEvent.class);
+
+               assertTrue(ted.publish(partitionId1, event1));
+               assertTrue("listener should have fired for 
AllWorkersDoneEvent", eventListener1a.fired);
+               assertFalse("listener should not have fired for 
AllWorkersDoneEvent", eventListener3.fired);
+
+               // publish another event, verify that only the right subscriber 
is called
+               assertTrue(ted.publish(partitionId1, event2));
+               assertTrue("listener should have fired for TerminationEvent", 
eventListener3.fired);
+       }
+
+       @Test
+       public void unregisterPartition() throws Exception {
+               ResultPartitionID partitionId1 = new ResultPartitionID();
+               ResultPartitionID partitionId2 = new ResultPartitionID();
+               TaskEventDispatcher ted = new TaskEventDispatcher();
+
+               AllWorkersDoneEvent event = new AllWorkersDoneEvent();
+               assertFalse(ted.publish(partitionId1, event));
+
+               ted.registerPartition(partitionId1);
+               ted.registerPartition(partitionId2);
+
+               OneShotEventListener eventListener1a = new 
OneShotEventListener(event);
+               ZeroShotEventListener eventListener1b = new 
ZeroShotEventListener();
+               OneShotEventListener eventListener2 = new 
OneShotEventListener(event);
+               ted.subscribeToEvent(partitionId1, eventListener1a, 
AllWorkersDoneEvent.class);
+               ted.subscribeToEvent(partitionId2, eventListener1b, 
AllWorkersDoneEvent.class);
+               ted.subscribeToEvent(partitionId1, eventListener2, 
AllWorkersDoneEvent.class);
+
+               ted.unregisterPartition(partitionId2);
+
+               // publis something for partitionId1 triggering all according 
listeners
+               assertTrue(ted.publish(partitionId1, event));
+               assertTrue("listener should have fired for 
AllWorkersDoneEvent", eventListener1a.fired);
+               assertTrue("listener should have fired for 
AllWorkersDoneEvent", eventListener2.fired);
+
+               // now publish something for partitionId2 which should not 
trigger any listeners
+               assertFalse(ted.publish(partitionId2, event));
+       }
+
+       @Test
+       public void clearAll() throws Exception {
+               ResultPartitionID partitionId = new ResultPartitionID();
+               TaskEventDispatcher ted = new TaskEventDispatcher();
+               ted.registerPartition(partitionId);
+
+               //noinspection unchecked
+               ZeroShotEventListener eventListener1 = new 
ZeroShotEventListener();
+               ted.subscribeToEvent(partitionId, eventListener1, 
AllWorkersDoneEvent.class);
+
+               ted.clearAll();
+
+               assertFalse(ted.publish(partitionId, new 
AllWorkersDoneEvent()));
+       }
+
+       /**
+        * Event listener that expects a given {@link TaskEvent} once in its 
{@link #onEvent(TaskEvent)}
+        * call and will fail for any subsequent call.
+        *
+        * <p>Be sure to check that {@link #fired} is <tt>true</tt> to ensure 
that this handle has been
+        * called once.
+        */
+       private static class OneShotEventListener implements 
EventListener<TaskEvent> {
+               private final TaskEvent expected;
+               boolean fired = false;
+
+               OneShotEventListener(TaskEvent expected) {
+                       this.expected = expected;
+               }
+
+               public void onEvent(TaskEvent actual) {
+                       checkState(!fired, "Should only fire once");
+                       fired = true;
+                       checkArgument(actual == expected,
+                               "Fired on unexpected event: %s (expected: %s)", 
actual, expected);
+               }
+       }
+
+       /**
+        * Event listener which ensures that it's {@link #onEvent(TaskEvent)} 
method is never called.
+        */
+       private static class ZeroShotEventListener implements 
EventListener<TaskEvent> {
+               public void onEvent(TaskEvent actual) {
+                       throw new IllegalStateException("Should never fire");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 0125a5e..718ecfe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -190,4 +191,8 @@ public class DummyEnvironment implements Environment {
                return null;
        }
 
+       @Override
+       public TaskEventDispatcher getTaskEventDispatcher() {
+               throw new UnsupportedOperationException();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 7514cc4..c8ca654 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import 
org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
 import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -100,6 +101,8 @@ public class MockEnvironment implements Environment {
 
        private final ClassLoader userCodeClassLoader;
 
+       private TaskEventDispatcher taskEventDispatcher = 
mock(TaskEventDispatcher.class);
+
        public MockEnvironment(String taskName, long memorySize, 
MockInputSplitProvider inputSplitProvider, int bufferSize) {
                this(taskName, memorySize, inputSplitProvider, bufferSize, new 
Configuration(), new ExecutionConfig());
        }
@@ -324,6 +327,11 @@ public class MockEnvironment implements Environment {
        }
 
        @Override
+       public TaskEventDispatcher getTaskEventDispatcher() {
+               return taskEventDispatcher;
+       }
+
+       @Override
        public JobVertexID getJobVertexId() {
                return new JobVertexID(new byte[16]);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 1f1d09d..9c12fff 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.MockNetworkEnvironment;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
@@ -119,7 +120,7 @@ public class TaskExecutorITCase extends TestLogger {
                final TaskManagerLocation taskManagerLocation = new 
TaskManagerLocation(taskManagerResourceId, InetAddress.getLocalHost(), 1234);
                final MemoryManager memoryManager = mock(MemoryManager.class);
                final IOManager ioManager = mock(IOManager.class);
-               final NetworkEnvironment networkEnvironment = 
mock(NetworkEnvironment.class);
+               final NetworkEnvironment networkEnvironment = 
MockNetworkEnvironment.getMock();
                final TaskManagerMetricGroup taskManagerMetricGroup = 
mock(TaskManagerMetricGroup.class);
                final BroadcastVariableManager broadcastVariableManager = 
mock(BroadcastVariableManager.class);
                final FileCache fileCache = mock(FileCache.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 776bdf9..6372792 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -51,6 +51,7 @@ import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -723,9 +724,11 @@ public class TaskExecutorTest extends TestLogger {
                when(taskSlotTable.existsActiveSlot(eq(jobId), 
eq(allocationId))).thenReturn(true);
                when(taskSlotTable.addTask(any(Task.class))).thenReturn(true);
 
+               TaskEventDispatcher taskEventDispatcher = new 
TaskEventDispatcher();
                final NetworkEnvironment networkEnvironment = 
mock(NetworkEnvironment.class);
 
                when(networkEnvironment.createKvStateTaskRegistry(eq(jobId), 
eq(jobVertexId))).thenReturn(mock(TaskKvStateRegistry.class));
+               
when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
                final TaskManagerMetricGroup taskManagerMetricGroup = 
mock(TaskManagerMetricGroup.class);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 5045606..f2c60de 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -41,6 +41,7 @@ import 
org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
@@ -217,11 +218,13 @@ public class TaskAsyncCallTest {
                ResultPartitionConsumableNotifier consumableNotifier = 
mock(ResultPartitionConsumableNotifier.class);
                PartitionProducerStateChecker partitionProducerStateChecker = 
mock(PartitionProducerStateChecker.class);
                Executor executor = mock(Executor.class);
+               TaskEventDispatcher taskEventDispatcher = 
mock(TaskEventDispatcher.class);
                NetworkEnvironment networkEnvironment = 
mock(NetworkEnvironment.class);
                
when(networkEnvironment.getResultPartitionManager()).thenReturn(partitionManager);
                
when(networkEnvironment.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
                
when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class)))
                                .thenReturn(mock(TaskKvStateRegistry.class));
+               
when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
                TaskMetricGroup taskMetricGroup = mock(TaskMetricGroup.class);
                
when(taskMetricGroup.getIOMetricGroup()).thenReturn(mock(TaskIOMetricGroup.class));

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index b089997..4fa36bb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -273,10 +274,12 @@ public class TaskTest extends TestLogger {
                        ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
                        ResultPartitionConsumableNotifier consumableNotifier = 
mock(ResultPartitionConsumableNotifier.class);
                        PartitionProducerStateChecker 
partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
+                       TaskEventDispatcher taskEventDispatcher = 
mock(TaskEventDispatcher.class);
                        Executor executor = mock(Executor.class);
                        NetworkEnvironment network = 
mock(NetworkEnvironment.class);
                        
when(network.getResultPartitionManager()).thenReturn(partitionManager);
                        
when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
+                       
when(network.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
                        doThrow(new 
RuntimeException("buffers")).when(network).registerTask(any(Task.class));
 
                        Task task = createTask(TestInvokableCorrect.class, 
blobService, libCache, network, consumableNotifier, 
partitionProducerStateChecker, executor);
@@ -629,6 +632,7 @@ public class TaskTest extends TestLogger {
                
when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
 
                PartitionProducerStateChecker partitionChecker = 
mock(PartitionProducerStateChecker.class);
+               TaskEventDispatcher taskEventDispatcher = 
mock(TaskEventDispatcher.class);
 
                ResultPartitionConsumableNotifier consumableNotifier = 
mock(ResultPartitionConsumableNotifier.class);
                NetworkEnvironment network = mock(NetworkEnvironment.class);
@@ -636,6 +640,7 @@ public class TaskTest extends TestLogger {
                
when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
                when(network.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class)))
                        .thenReturn(mock(TaskKvStateRegistry.class));
+               
when(network.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
                createTask(InvokableBlockingInInvoke.class, blobService, 
libCache, network, consumableNotifier, partitionChecker, 
Executors.directExecutor());
 
@@ -933,12 +938,14 @@ public class TaskTest extends TestLogger {
                ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
                ResultPartitionConsumableNotifier consumableNotifier = 
mock(ResultPartitionConsumableNotifier.class);
                PartitionProducerStateChecker partitionProducerStateChecker = 
mock(PartitionProducerStateChecker.class);
+               TaskEventDispatcher taskEventDispatcher = 
mock(TaskEventDispatcher.class);
                Executor executor = mock(Executor.class);
                NetworkEnvironment network = mock(NetworkEnvironment.class);
                
when(network.getResultPartitionManager()).thenReturn(partitionManager);
                
when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
                when(network.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class)))
                                .thenReturn(mock(TaskKvStateRegistry.class));
+               
when(network.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
                return createTask(invokable, blobService, libCache, network, 
consumableNotifier, partitionProducerStateChecker, executor, config, 
execConfig);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index 8072295..38238cd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -157,6 +158,8 @@ public class JvmExitOnFatalErrorTest {
 
                                final NetworkEnvironment networkEnvironment = 
mock(NetworkEnvironment.class);
                                
when(networkEnvironment.createKvStateTaskRegistry(jid, 
jobVertexId)).thenReturn(mock(TaskKvStateRegistry.class));
+                               TaskEventDispatcher taskEventDispatcher = 
mock(TaskEventDispatcher.class);
+                               
when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
                                final TaskManagerRuntimeInfo tmInfo = 
TaskManagerConfiguration.fromConfiguration(taskManagerConfig);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index c641aa8..eacded6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -42,6 +42,7 @@ import 
org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -175,9 +176,11 @@ public class InterruptSensitiveRestoreTest {
                        StreamStateHandle state,
                        int mode) throws IOException {
 
+               TaskEventDispatcher taskEventDispatcher = new 
TaskEventDispatcher();
                NetworkEnvironment networkEnvironment = 
mock(NetworkEnvironment.class);
                
when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class)))
                                .thenReturn(mock(TaskKvStateRegistry.class));
+               
when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
                Collection<KeyedStateHandle> keyedStateFromBackend = 
Collections.emptyList();
                Collection<KeyedStateHandle> keyedStateFromStream = 
Collections.emptyList();

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 231f59e..6b6506a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import 
org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
@@ -107,6 +108,8 @@ public class StreamMockEnvironment implements Environment {
 
        private volatile boolean wasFailedExternally = false;
 
+       private TaskEventDispatcher taskEventDispatcher = 
mock(TaskEventDispatcher.class);
+
        public StreamMockEnvironment(Configuration jobConfig, Configuration 
taskConfig, ExecutionConfig executionConfig,
                                                                long 
memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
                this.taskInfo = new TaskInfo(
@@ -304,6 +307,11 @@ public class StreamMockEnvironment implements Environment {
        }
 
        @Override
+       public TaskEventDispatcher getTaskEventDispatcher() {
+               return taskEventDispatcher;
+       }
+
+       @Override
        public JobVertexID getJobVertexId() {
                return new JobVertexID(new byte[16]);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index 4c73e72..5480ce7 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -42,6 +42,7 @@ import 
org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -134,8 +135,10 @@ public class StreamTaskTerminationTest extends TestLogger {
 
                final TaskManagerRuntimeInfo taskManagerRuntimeInfo = new 
TestingTaskManagerRuntimeInfo();
 
+               TaskEventDispatcher taskEventDispatcher = new 
TaskEventDispatcher();
                final NetworkEnvironment networkEnv = 
mock(NetworkEnvironment.class);
                when(networkEnv.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class))).thenReturn(mock(TaskKvStateRegistry.class));
+               
when(networkEnv.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
                BlobCacheService blobService =
                        new BlobCacheService(mock(PermanentBlobCache.class), 
mock(TransientBlobCache.class));

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index b31fb41..d0ea714 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -49,6 +49,7 @@ import 
org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
@@ -887,12 +888,14 @@ public class StreamTaskTest extends TestLogger {
                ResultPartitionConsumableNotifier consumableNotifier = 
mock(ResultPartitionConsumableNotifier.class);
                PartitionProducerStateChecker partitionProducerStateChecker = 
mock(PartitionProducerStateChecker.class);
                Executor executor = mock(Executor.class);
+               TaskEventDispatcher taskEventDispatcher = new 
TaskEventDispatcher();
 
                NetworkEnvironment network = mock(NetworkEnvironment.class);
                
when(network.getResultPartitionManager()).thenReturn(partitionManager);
                
when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
                when(network.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class)))
                                .thenReturn(mock(TaskKvStateRegistry.class));
+               
when(network.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
                JobInformation jobInformation = new JobInformation(
                        new JobID(),

http://git-wip-us.apache.org/repos/asf/flink/blob/175e1b38/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index d755c56..d61b95d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -45,6 +45,7 @@ import 
org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -210,8 +211,10 @@ public class TaskCheckpointingBehaviourTest extends 
TestLogger {
                                taskConfig);
 
                TaskKvStateRegistry mockKvRegistry = 
mock(TaskKvStateRegistry.class);
+               TaskEventDispatcher taskEventDispatcher = new 
TaskEventDispatcher();
                NetworkEnvironment network = mock(NetworkEnvironment.class);
                when(network.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class))).thenReturn(mockKvRegistry);
+               
when(network.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
                BlobCacheService blobService =
                        new BlobCacheService(mock(PermanentBlobCache.class), 
mock(TransientBlobCache.class));

Reply via email to