[FLINK-1638] [streaming] Vertex level fault tolerance and state monitor

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5061edb8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5061edb8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5061edb8

Branch: refs/heads/master
Commit: 5061edb80df092a2f8719054b0d2bce8c670265c
Parents: b4e8350
Author: Gyula Fora <gyf...@apache.org>
Authored: Fri Feb 20 22:24:27 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Mar 10 14:58:48 2015 +0100

----------------------------------------------------------------------
 .../runtime/event/task/StreamingSuperstep.java  |  51 +++++++
 .../io/network/api/reader/AbstractReader.java   |   4 +
 .../api/reader/AbstractRecordReader.java        |  58 +++++---
 .../io/network/api/reader/BarrierBuffer.java    | 143 +++++++++++++++++++
 .../jobgraph/tasks/BarrierTransceiver.java      |  27 ++++
 .../runtime/jobmanager/StreamStateMonitor.scala |  96 +++++++++++++
 .../flink/streaming/api/StreamConfig.java       |   9 ++
 .../api/StreamingJobGraphGenerator.java         |   4 +-
 .../streaming/api/collector/StreamOutput.java   |   4 +
 .../datastream/SingleOutputStreamOperator.java  |   4 +-
 .../api/invokable/StreamInvokable.java          |   2 +-
 .../api/streamvertex/InputHandler.java          |  24 ++--
 .../api/streamvertex/OutputHandler.java         |   8 ++
 .../api/streamvertex/StreamVertex.java          |  61 +++++++-
 .../flink/streaming/io/CoRecordReader.java      |   8 +-
 .../streaming/io/IndexedMutableReader.java      |   2 +-
 .../flink/streaming/util/MockContext.java       |   2 +-
 17 files changed, 464 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java
new file mode 100644
index 0000000..e35eb28
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.event.task;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class StreamingSuperstep extends TaskEvent {
+
+       protected long id;
+
+       public StreamingSuperstep() {
+
+       }
+
+       public StreamingSuperstep(long id) {
+               this.id = id;
+       }
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               out.writeLong(id);
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               id = in.readLong();
+       }
+
+       public long getId() {
+               return id;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
index 1bfca84..96b6f99 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
@@ -110,6 +110,10 @@ public abstract class AbstractReader implements ReaderBase 
{
                        throw new IOException("Error while handling event of 
type " + eventType + ": " + t.getMessage(), t);
                }
        }
+       
+       public void publish(TaskEvent event){
+               taskEventHandler.publish(event);
+       }
 
        // 
------------------------------------------------------------------------
        // Iterations

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
index e70b6ee..cc36438 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
@@ -18,24 +18,37 @@
 
 package org.apache.flink.runtime.io.network.api.reader;
 
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Set;
+
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.task.StreamingSuperstep;
 import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import 
org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer;
-
-import java.io.IOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A record-oriented reader.
  * <p>
- * This abstract base class is used by both the mutable and immutable record 
readers.
- *
- * @param <T> The type of the record that can be read with this record reader.
+ * This abstract base class is used by both the mutable and immutable record
+ * readers.
+ * 
+ * @param <T>
+ *            The type of the record that can be read with this record reader.
  */
-abstract class AbstractRecordReader<T extends IOReadableWritable> extends 
AbstractReader implements ReaderBase {
+abstract class AbstractRecordReader<T extends IOReadableWritable> extends 
AbstractReader implements
+               ReaderBase {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(AbstractRecordReader.class);
 
        private final RecordDeserializer<T>[] recordDeserializers;
 
@@ -43,11 +56,15 @@ abstract class AbstractRecordReader<T extends 
IOReadableWritable> extends Abstra
 
        private boolean isFinished;
 
+       private final BarrierBuffer barrierBuffer;
+
        protected AbstractRecordReader(InputGate inputGate) {
                super(inputGate);
+               barrierBuffer = new BarrierBuffer(inputGate, this);
 
                // Initialize one deserializer per input channel
-               this.recordDeserializers = new 
SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
+               this.recordDeserializers = new 
SpillingAdaptiveSpanningRecordDeserializer[inputGate
+                               .getNumberOfInputChannels()];
                for (int i = 0; i < recordDeserializers.length; i++) {
                        recordDeserializers[i] = new 
SpillingAdaptiveSpanningRecordDeserializer<T>();
                }
@@ -72,22 +89,27 @@ abstract class AbstractRecordReader<T extends 
IOReadableWritable> extends Abstra
                                }
                        }
 
-                       final BufferOrEvent bufferOrEvent = 
inputGate.getNextBufferOrEvent();
+                       final BufferOrEvent bufferOrEvent = 
barrierBuffer.getNextNonBlocked();
 
                        if (bufferOrEvent.isBuffer()) {
                                currentRecordDeserializer = 
recordDeserializers[bufferOrEvent.getChannelIndex()];
                                
currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
-                       }
-                       else if (handleEvent(bufferOrEvent.getEvent())) {
-                               if (inputGate.isFinished()) {
-                                       isFinished = true;
-
-                                       return false;
+                       } else {
+                               // Event received
+                               final AbstractEvent event = 
bufferOrEvent.getEvent();
+
+                               if (event instanceof StreamingSuperstep) {
+                                       
barrierBuffer.processSuperstep(bufferOrEvent);
+                               } else {
+                                       if (handleEvent(event)) {
+                                               if (inputGate.isFinished()) {
+                                                       isFinished = true;
+                                                       return false;
+                                               } else if 
(hasReachedEndOfSuperstep()) {
+                                                       return false;
+                                               } // else: More data is 
coming...
+                                       }
                                }
-                               else if (hasReachedEndOfSuperstep()) {
-
-                                       return false;
-                               } // else: More data is coming...
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java
new file mode 100644
index 0000000..ee317cd
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.reader;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Set;
+
+import org.apache.flink.runtime.event.task.StreamingSuperstep;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BarrierBuffer {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(BarrierBuffer.class);
+
+       private Queue<BufferOrEvent> bufferOrEvents = new 
LinkedList<BufferOrEvent>();
+       private Queue<BufferOrEvent> unprocessed = new 
LinkedList<BufferOrEvent>();
+
+       private Set<Integer> blockedChannels = new HashSet<Integer>();
+       private int totalNumberOfInputChannels;
+
+       private StreamingSuperstep currentSuperstep;
+       private boolean receivedSuperstep;
+
+       private boolean blockAll = false;
+
+       private AbstractReader reader;
+
+       private InputGate inputGate;
+
+       public BarrierBuffer(InputGate inputGate, AbstractReader reader) {
+               this.inputGate = inputGate;
+               totalNumberOfInputChannels = 
inputGate.getNumberOfInputChannels();
+               this.reader = reader;
+       }
+
+       private void startSuperstep(StreamingSuperstep superstep) {
+               this.currentSuperstep = superstep;
+               this.receivedSuperstep = true;
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Superstep started with id: " + 
superstep.getId());
+               }
+       }
+
+       private void store(BufferOrEvent bufferOrEvent) {
+               bufferOrEvents.add(bufferOrEvent);
+       }
+
+       private BufferOrEvent getNonProcessed() {
+               return unprocessed.poll();
+       }
+
+       private boolean isBlocked(int channelIndex) {
+               return blockAll || blockedChannels.contains(channelIndex);
+       }
+       
+       private boolean containsNonprocessed() {
+               return !unprocessed.isEmpty();
+       }
+
+       private boolean receivedSuperstep() {
+               return receivedSuperstep;
+       }
+
+       public BufferOrEvent getNextNonBlocked() throws IOException,
+                       InterruptedException {
+               BufferOrEvent bufferOrEvent = null;
+
+               if (containsNonprocessed()) {
+                       bufferOrEvent = getNonProcessed();
+               } else {
+                       while (bufferOrEvent == null) {
+                               BufferOrEvent nextBufferOrEvent = 
inputGate.getNextBufferOrEvent();
+                               if 
(isBlocked(nextBufferOrEvent.getChannelIndex())) {
+                                       store(nextBufferOrEvent);
+                               } else {
+                                       bufferOrEvent = nextBufferOrEvent;
+                               }
+                       }
+               }
+               return bufferOrEvent;
+       }
+
+       private void blockChannel(int channelIndex) {
+               if (!blockedChannels.contains(channelIndex)) {
+                       blockedChannels.add(channelIndex);
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Channel blocked with index: " + 
channelIndex);
+                       }
+                       if (blockedChannels.size() == 
totalNumberOfInputChannels) {
+                               reader.publish(currentSuperstep);
+                               unprocessed.addAll(bufferOrEvents);
+                               bufferOrEvents.clear();
+                               blockedChannels.clear();
+                               receivedSuperstep = false;
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("All barriers received, 
blocks released");
+                               }
+                       }
+
+               } else {
+                       throw new RuntimeException("Tried to block an already 
blocked channel");
+               }
+       }
+
+       public String toString() {
+               return blockedChannels.toString();
+       }
+
+       public void processSuperstep(BufferOrEvent bufferOrEvent) {
+               int channelIndex = bufferOrEvent.getChannelIndex();
+               if (isBlocked(channelIndex)) {
+                       store(bufferOrEvent);
+               } else {
+                       StreamingSuperstep superstep = (StreamingSuperstep) 
bufferOrEvent.getEvent();
+                       if (!receivedSuperstep()) {
+                               startSuperstep(superstep);
+                       }
+                       blockChannel(channelIndex);
+               }
+       }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
new file mode 100644
index 0000000..c56da62
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph.tasks;
+
+
+public interface BarrierTransceiver {
+
+       public void broadcastBarrier(long barrierID);
+       
+       public void confirmBarrier(long barrierID);
+       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala
new file mode 100644
index 0000000..a37ddb5
--- /dev/null
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager
+
+import akka.actor._
+import org.apache.flink.runtime.ActorLogMessages
+import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, 
ExecutionGraph, ExecutionVertex}
+import org.apache.flink.runtime.jobgraph.{JobID, JobVertexID}
+
+import scala.collection.JavaConversions.mapAsScalaMap
+import scala.collection.immutable.TreeMap
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.{FiniteDuration, _}
+
+
+object StreamStateMonitor {
+
+  def props(context: ActorContext, executionGraph: ExecutionGraph,
+            interval: FiniteDuration = 5 seconds): ActorRef = {
+
+    val vertices: Iterable[ExecutionVertex] = 
getExecutionVertices(executionGraph)
+    val monitor = context.system.actorOf(Props(new 
StreamStateMonitor(executionGraph,
+      vertices, vertices.map(x => ((x.getJobVertex.getJobVertexId, 
x.getParallelSubtaskIndex), List.empty[Long])).toMap, interval, 0L, -1L)))
+    monitor ! InitBarrierScheduler
+    monitor
+  }
+
+  private def getExecutionVertices(executionGraph: ExecutionGraph): 
Iterable[ExecutionVertex] = {
+    for ((_, execJobVertex) <- executionGraph.getAllVertices;
+         execVertex: ExecutionVertex <- execJobVertex.getTaskVertices)
+    yield execVertex
+  }
+}
+
+class StreamStateMonitor(val executionGraph: ExecutionGraph,
+                         val vertices: Iterable[ExecutionVertex], var acks: 
Map[(JobVertexID, Int), List[Long]],
+                         val interval: FiniteDuration, var curId: Long, var 
ackId: Long)
+        extends Actor with ActorLogMessages with ActorLogging {
+
+  override def receiveWithLogMessages: Receive = {
+    case InitBarrierScheduler =>
+      context.system.scheduler.schedule(interval, interval, self, 
BarrierTimeout)
+      context.system.scheduler.schedule(2 * interval, 2 * interval, self, 
UpdateCurrentBarrier)
+      log.debug("[FT-MONITOR] Started Stream State Monitor for job {}{}",
+        executionGraph.getJobID, executionGraph.getJobName)
+    case BarrierTimeout =>
+      curId += 1
+      log.debug("[FT-MONITOR] Sending Barrier to vertices of Job " + 
executionGraph.getJobName)
+      vertices.filter(v => 
v.getJobVertex.getJobVertex.isInputVertex).foreach(vertex
+      => vertex.getCurrentAssignedResource.getInstance.getTaskManager
+                ! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId, 
curId))
+    case BarrierAck(_, jobVertexID, instanceID, checkpointID) =>
+      acks.get(jobVertexID, instanceID) match {
+        case Some(acklist) =>
+          acks += (jobVertexID, instanceID) -> (checkpointID :: acklist)
+        case None =>
+      }
+      log.info(acks.toString)
+    case UpdateCurrentBarrier =>
+      val barrierCount = acks.values.foldLeft(TreeMap[Long, 
Int]().withDefaultValue(0))((dict, myList)
+      => myList.foldLeft(dict)((dict2, elem) => dict2.updated(elem, 
dict2(elem) + 1)))
+      val keysToKeep = barrierCount.filter(_._2 == acks.size).keys
+      ackId = if (!keysToKeep.isEmpty) keysToKeep.max else ackId
+      acks.keys.foreach(x => acks = acks.updated(x, acks(x).filter(_ >= 
ackId)))
+      log.debug("[FT-MONITOR] Last global barrier is " + ackId)
+  }
+}
+
+case class BarrierTimeout()
+
+case class InitBarrierScheduler()
+
+case class UpdateCurrentBarrier()
+
+case class BarrierReq(attemptID: ExecutionAttemptID, checkpointID: Long)
+
+case class BarrierAck(jobID: JobID, jobVertexID: JobVertexID, instanceID: Int, 
checkpointID: Long)
+
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index e1362c4..d464ef1 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -48,6 +48,7 @@ public class StreamConfig implements Serializable {
        private static final String OUTPUT_NAME = "outputName_";
        private static final String PARTITIONER_OBJECT = "partitionerObject_";
        private static final String VERTEX_NAME = "vertexID";
+       private static final String OPERATOR_NAME = "operatorName";
        private static final String ITERATION_ID = "iteration-id";
        private static final String OUTPUT_SELECTOR = "outputSelector";
        private static final String DIRECTED_EMIT = "directedEmit";
@@ -87,6 +88,14 @@ public class StreamConfig implements Serializable {
                return config.getInteger(VERTEX_NAME, -1);
        }
 
+       public void setOperatorName(String name) {
+               config.setString(OPERATOR_NAME, name);
+       }
+
+       public String getOperatorName() {
+               return config.getString(OPERATOR_NAME, "Missing");
+       }
+
        public void setTypeSerializerIn1(StreamRecordSerializer<?> serializer) {
                setTypeSerializer(TYPE_SERIALIZER_IN_1, serializer);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
index b999c27..c9698e3 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
@@ -188,7 +188,9 @@ public class StreamingJobGraphGenerator {
                builtVertices.add(vertexID);
                jobGraph.addVertex(vertex);
 
-               return new StreamConfig(vertex.getConfiguration());
+               StreamConfig retConfig = new 
StreamConfig(vertex.getConfiguration());
+               retConfig.setOperatorName(chainedNames.get(vertexID));
+               return retConfig;
        }
 
        private void setVertexConfig(Integer vertexID, StreamConfig config,

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
index a497119..c3f694e 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.collector;
 
 import java.io.IOException;
 
+import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
@@ -87,4 +88,7 @@ public class StreamOutput<OUT> implements Collector<OUT> {
                output.clearBuffers();
        }
 
+       public void broadcastEvent(TaskEvent barrier) throws IOException, 
InterruptedException {
+               output.broadcastEvent(barrier);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index dcfd6fe..cdf43ee 100755
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -112,7 +112,7 @@ public class SingleOutputStreamOperator<OUT, O extends 
SingleOutputStreamOperato
         *            The state to be registered for this name.
         * @return The data stream with state registered.
         */
-       protected SingleOutputStreamOperator<OUT, O> registerState(String name, 
OperatorState<?> state) {
+       public SingleOutputStreamOperator<OUT, O> registerState(String name, 
OperatorState<?> state) {
                streamGraph.addOperatorState(getId(), name, state);
                return this;
        }
@@ -128,7 +128,7 @@ public class SingleOutputStreamOperator<OUT, O extends 
SingleOutputStreamOperato
         *            The map containing the states that will be registered.
         * @return The data stream with states registered.
         */
-       protected SingleOutputStreamOperator<OUT, O> registerState(Map<String, 
OperatorState<?>> states) {
+       public SingleOutputStreamOperator<OUT, O> registerState(Map<String, 
OperatorState<?>> states) {
                for (Entry<String, OperatorState<?>> entry : states.entrySet()) 
{
                        streamGraph.addOperatorState(getId(), entry.getKey(), 
entry.getValue());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index abe31d4..6281de3 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -96,7 +96,7 @@ public abstract class StreamInvokable<IN, OUT> implements 
Serializable {
         * Reads the next record from the reader iterator and stores it in the
         * nextRecord variable
         */
-       protected StreamRecord<IN> readNext() {
+       protected StreamRecord<IN> readNext() throws IOException {
                this.nextRecord = inSerializer.createInstance();
                try {
                        nextRecord = recordIterator.next(nextRecord);

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
index e8a2ce1..a95965c 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
@@ -18,7 +18,9 @@
 package org.apache.flink.streaming.api.streamvertex;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.event.task.StreamingSuperstep;
 import org.apache.flink.runtime.io.network.api.reader.MutableReader;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.streaming.api.StreamConfig;
@@ -51,25 +53,19 @@ public class InputHandler<IN> {
                inputSerializer = 
configuration.getTypeSerializerIn1(streamVertex.userClassLoader);
 
                int numberOfInputs = configuration.getNumberOfInputs();
-               if (numberOfInputs > 0) {
 
-                       if (numberOfInputs < 2) {
-                               inputs = new 
IndexedMutableReader<DeserializationDelegate<StreamRecord<IN>>>(
-                                               
streamVertex.getEnvironment().getInputGate(0));
+               if (numberOfInputs > 0) {
+                       InputGate inputGate = numberOfInputs < 2 ? 
streamVertex.getEnvironment()
+                                       .getInputGate(0) : new 
UnionInputGate(streamVertex.getEnvironment()
+                                       .getAllInputGates());
 
-                       } else {
-                               inputs = new 
IndexedMutableReader<DeserializationDelegate<StreamRecord<IN>>>(
-                                               new 
UnionInputGate(streamVertex.getEnvironment().getAllInputGates()));
-                       }
+                       inputs = new 
IndexedMutableReader<DeserializationDelegate<StreamRecord<IN>>>(inputGate);
+                       
inputs.registerTaskEventListener(streamVertex.getSuperstepListener(),
+                                       StreamingSuperstep.class);
 
-                       inputIter = createInputIterator();
+                       inputIter = new 
IndexedReaderIterator<StreamRecord<IN>>(inputs, inputSerializer);
                }
-       }
 
-       private IndexedReaderIterator<StreamRecord<IN>> createInputIterator() {
-               final IndexedReaderIterator<StreamRecord<IN>> iter = new 
IndexedReaderIterator<StreamRecord<IN>>(
-                               inputs, inputSerializer);
-               return iter;
        }
 
        protected static <T> IndexedReaderIterator<StreamRecord<T>> 
staticCreateInputIterator(

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index 359675d..82f1329 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.event.task.StreamingSuperstep;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.StreamConfig;
@@ -84,6 +85,13 @@ public class OutputHandler<OUT> {
 
        }
 
+       public void broadcastBarrier(long id) throws IOException, 
InterruptedException {
+               StreamingSuperstep barrier = new StreamingSuperstep(id);
+               for (StreamOutput<?> streamOutput : outputMap.values()) {
+                       streamOutput.broadcastEvent(barrier);
+               }
+       }
+
        public Collection<StreamOutput<?>> getOutputs() {
                return outputMap.values();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index 99ca098..e2cdc34 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -17,10 +17,16 @@
 
 package org.apache.flink.streaming.api.streamvertex;
 
+import java.io.IOException;
 import java.util.Map;
 
+import org.apache.flink.runtime.event.task.StreamingSuperstep;
+import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver;
+import org.apache.flink.runtime.jobmanager.BarrierAck;
+import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.StreamConfig;
 import org.apache.flink.streaming.api.invokable.ChainableInvokable;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
@@ -34,7 +40,10 @@ import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class StreamVertex<IN, OUT> extends AbstractInvokable implements 
StreamTaskContext<OUT> {
+import akka.actor.ActorRef;
+
+public class StreamVertex<IN, OUT> extends AbstractInvokable implements 
StreamTaskContext<OUT>,
+               BarrierTransceiver {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(StreamVertex.class);
 
@@ -53,10 +62,13 @@ public class StreamVertex<IN, OUT> extends 
AbstractInvokable implements StreamTa
 
        protected ClassLoader userClassLoader;
 
+       private EventListener<TaskEvent> superstepListener;
+
        public StreamVertex() {
                userInvokable = null;
                numTasks = newVertex();
                instanceID = numTasks;
+               superstepListener = new SuperstepEventListener();
        }
 
        protected static int newVertex() {
@@ -78,6 +90,22 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable 
implements StreamTa
                this.context = 
createRuntimeContext(getEnvironment().getTaskName(), this.states);
        }
 
+       @Override
+       public void broadcastBarrier(long id) {
+               // Only called at input vertices
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Received barrier from jobmanager: " + id);
+               }
+               actOnBarrier(id);
+       }
+
+       @Override
+       public void confirmBarrier(long barrierID) {
+               getEnvironment().getJobManager().tell(
+                               new BarrierAck(getEnvironment().getJobID(), 
getEnvironment().getJobVertexId(),
+                                               
context.getIndexOfThisSubtask(), barrierID), ActorRef.noSender());
+       }
+
        public void setInputsOutputs() {
                inputHandler = new InputHandler<IN>(this);
                outputHandler = new OutputHandler<OUT>(this);
@@ -205,4 +233,35 @@ public class StreamVertex<IN, OUT> extends 
AbstractInvokable implements StreamTa
                throw new IllegalArgumentException("CoReader not available");
        }
 
+       public EventListener<TaskEvent> getSuperstepListener() {
+               return this.superstepListener;
+       }
+
+       private void actOnBarrier(long id) {
+               try {
+                       outputHandler.broadcastBarrier(id);
+                       System.out.println("Superstep " + id + " processed: " + 
StreamVertex.this);
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Superstep " + id + " processed: " + 
StreamVertex.this);
+                       }
+               } catch (IOException e) {
+                       e.printStackTrace();
+               } catch (InterruptedException e) {
+                       e.printStackTrace();
+               }
+       }
+
+       @Override
+       public String toString() {
+               return configuration.getOperatorName() + " (" + 
context.getIndexOfThisSubtask() + ")";
+       }
+
+       private class SuperstepEventListener implements 
EventListener<TaskEvent> {
+
+               @Override
+               public void onEvent(TaskEvent event) {
+                       actOnBarrier(((StreamingSuperstep) event).getId());
+               }
+
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
index bb20ecb..79f09c4 100755
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
@@ -17,6 +17,10 @@
 
 package org.apache.flink.streaming.io;
 
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
 import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
@@ -28,10 +32,6 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
 import org.apache.flink.runtime.util.event.EventListener;
 
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
 /**
  * A CoRecordReader wraps {@link MutableRecordReader}s of two different input
  * types to read records effectively.

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
index 175dba2..025393d 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
@@ -34,4 +34,4 @@ public class IndexedMutableReader<T extends 
IOReadableWritable> extends MutableR
        public int getNumberOfInputChannels() {
                return reader.getNumberOfInputChannels();
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
index 4b13165..af836e2 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -79,7 +79,7 @@ public class MockContext<IN, OUT> implements 
StreamTaskContext<OUT> {
                @Override
                public StreamRecord<IN> next() throws IOException {
                        if (listIterator.hasNext()) {
-                               StreamRecord<IN> result = new 
StreamRecord<IN>();
+                               StreamRecord<IN> result = 
inDeserializer.createInstance();
                                result.setObject(listIterator.next());
                                return result;
                        } else {

Reply via email to