[FLINK-1638] [streaming] Vertex level fault tolerance and state monitor
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5061edb8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5061edb8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5061edb8 Branch: refs/heads/master Commit: 5061edb80df092a2f8719054b0d2bce8c670265c Parents: b4e8350 Author: Gyula Fora <gyf...@apache.org> Authored: Fri Feb 20 22:24:27 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Mar 10 14:58:48 2015 +0100 ---------------------------------------------------------------------- .../runtime/event/task/StreamingSuperstep.java | 51 +++++++ .../io/network/api/reader/AbstractReader.java | 4 + .../api/reader/AbstractRecordReader.java | 58 +++++--- .../io/network/api/reader/BarrierBuffer.java | 143 +++++++++++++++++++ .../jobgraph/tasks/BarrierTransceiver.java | 27 ++++ .../runtime/jobmanager/StreamStateMonitor.scala | 96 +++++++++++++ .../flink/streaming/api/StreamConfig.java | 9 ++ .../api/StreamingJobGraphGenerator.java | 4 +- .../streaming/api/collector/StreamOutput.java | 4 + .../datastream/SingleOutputStreamOperator.java | 4 +- .../api/invokable/StreamInvokable.java | 2 +- .../api/streamvertex/InputHandler.java | 24 ++-- .../api/streamvertex/OutputHandler.java | 8 ++ .../api/streamvertex/StreamVertex.java | 61 +++++++- .../flink/streaming/io/CoRecordReader.java | 8 +- .../streaming/io/IndexedMutableReader.java | 2 +- .../flink/streaming/util/MockContext.java | 2 +- 17 files changed, 464 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java new file mode 100644 index 0000000..e35eb28 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.event.task; + +import java.io.IOException; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +public class StreamingSuperstep extends TaskEvent { + + protected long id; + + public StreamingSuperstep() { + + } + + public StreamingSuperstep(long id) { + this.id = id; + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeLong(id); + } + + @Override + public void read(DataInputView in) throws IOException { + id = in.readLong(); + } + + public long getId() { + return id; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java index 1bfca84..96b6f99 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java @@ -110,6 +110,10 @@ public abstract class AbstractReader implements ReaderBase { throw new IOException("Error while handling event of type " + eventType + ": " + t.getMessage(), t); } } + + public void publish(TaskEvent event){ + taskEventHandler.publish(event); + } // ------------------------------------------------------------------------ // Iterations http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java index e70b6ee..cc36438 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java @@ -18,24 +18,37 @@ package org.apache.flink.runtime.io.network.api.reader; +import java.io.IOException; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Queue; +import java.util.Set; + import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.runtime.event.task.AbstractEvent; +import org.apache.flink.runtime.event.task.StreamingSuperstep; import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer; - -import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A record-oriented reader. * <p> - * This abstract base class is used by both the mutable and immutable record readers. - * - * @param <T> The type of the record that can be read with this record reader. + * This abstract base class is used by both the mutable and immutable record + * readers. + * + * @param <T> + * The type of the record that can be read with this record reader. */ -abstract class AbstractRecordReader<T extends IOReadableWritable> extends AbstractReader implements ReaderBase { +abstract class AbstractRecordReader<T extends IOReadableWritable> extends AbstractReader implements + ReaderBase { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractRecordReader.class); private final RecordDeserializer<T>[] recordDeserializers; @@ -43,11 +56,15 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra private boolean isFinished; + private final BarrierBuffer barrierBuffer; + protected AbstractRecordReader(InputGate inputGate) { super(inputGate); + barrierBuffer = new BarrierBuffer(inputGate, this); // Initialize one deserializer per input channel - this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()]; + this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate + .getNumberOfInputChannels()]; for (int i = 0; i < recordDeserializers.length; i++) { recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>(); } @@ -72,22 +89,27 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra } } - final BufferOrEvent bufferOrEvent = inputGate.getNextBufferOrEvent(); + final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked(); if (bufferOrEvent.isBuffer()) { currentRecordDeserializer = recordDeserializers[bufferOrEvent.getChannelIndex()]; currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer()); - } - else if (handleEvent(bufferOrEvent.getEvent())) { - if (inputGate.isFinished()) { - isFinished = true; - - return false; + } else { + // Event received + final AbstractEvent event = bufferOrEvent.getEvent(); + + if (event instanceof StreamingSuperstep) { + barrierBuffer.processSuperstep(bufferOrEvent); + } else { + if (handleEvent(event)) { + if (inputGate.isFinished()) { + isFinished = true; + return false; + } else if (hasReachedEndOfSuperstep()) { + return false; + } // else: More data is coming... + } } - else if (hasReachedEndOfSuperstep()) { - - return false; - } // else: More data is coming... } } } http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java new file mode 100644 index 0000000..ee317cd --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.api.reader; + +import java.io.IOException; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Queue; +import java.util.Set; + +import org.apache.flink.runtime.event.task.StreamingSuperstep; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BarrierBuffer { + + private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class); + + private Queue<BufferOrEvent> bufferOrEvents = new LinkedList<BufferOrEvent>(); + private Queue<BufferOrEvent> unprocessed = new LinkedList<BufferOrEvent>(); + + private Set<Integer> blockedChannels = new HashSet<Integer>(); + private int totalNumberOfInputChannels; + + private StreamingSuperstep currentSuperstep; + private boolean receivedSuperstep; + + private boolean blockAll = false; + + private AbstractReader reader; + + private InputGate inputGate; + + public BarrierBuffer(InputGate inputGate, AbstractReader reader) { + this.inputGate = inputGate; + totalNumberOfInputChannels = inputGate.getNumberOfInputChannels(); + this.reader = reader; + } + + private void startSuperstep(StreamingSuperstep superstep) { + this.currentSuperstep = superstep; + this.receivedSuperstep = true; + if (LOG.isDebugEnabled()) { + LOG.debug("Superstep started with id: " + superstep.getId()); + } + } + + private void store(BufferOrEvent bufferOrEvent) { + bufferOrEvents.add(bufferOrEvent); + } + + private BufferOrEvent getNonProcessed() { + return unprocessed.poll(); + } + + private boolean isBlocked(int channelIndex) { + return blockAll || blockedChannels.contains(channelIndex); + } + + private boolean containsNonprocessed() { + return !unprocessed.isEmpty(); + } + + private boolean receivedSuperstep() { + return receivedSuperstep; + } + + public BufferOrEvent getNextNonBlocked() throws IOException, + InterruptedException { + BufferOrEvent bufferOrEvent = null; + + if (containsNonprocessed()) { + bufferOrEvent = getNonProcessed(); + } else { + while (bufferOrEvent == null) { + BufferOrEvent nextBufferOrEvent = inputGate.getNextBufferOrEvent(); + if (isBlocked(nextBufferOrEvent.getChannelIndex())) { + store(nextBufferOrEvent); + } else { + bufferOrEvent = nextBufferOrEvent; + } + } + } + return bufferOrEvent; + } + + private void blockChannel(int channelIndex) { + if (!blockedChannels.contains(channelIndex)) { + blockedChannels.add(channelIndex); + if (LOG.isDebugEnabled()) { + LOG.debug("Channel blocked with index: " + channelIndex); + } + if (blockedChannels.size() == totalNumberOfInputChannels) { + reader.publish(currentSuperstep); + unprocessed.addAll(bufferOrEvents); + bufferOrEvents.clear(); + blockedChannels.clear(); + receivedSuperstep = false; + if (LOG.isDebugEnabled()) { + LOG.debug("All barriers received, blocks released"); + } + } + + } else { + throw new RuntimeException("Tried to block an already blocked channel"); + } + } + + public String toString() { + return blockedChannels.toString(); + } + + public void processSuperstep(BufferOrEvent bufferOrEvent) { + int channelIndex = bufferOrEvent.getChannelIndex(); + if (isBlocked(channelIndex)) { + store(bufferOrEvent); + } else { + StreamingSuperstep superstep = (StreamingSuperstep) bufferOrEvent.getEvent(); + if (!receivedSuperstep()) { + startSuperstep(superstep); + } + blockChannel(channelIndex); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java new file mode 100644 index 0000000..c56da62 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph.tasks; + + +public interface BarrierTransceiver { + + public void broadcastBarrier(long barrierID); + + public void confirmBarrier(long barrierID); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala new file mode 100644 index 0000000..a37ddb5 --- /dev/null +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager + +import akka.actor._ +import org.apache.flink.runtime.ActorLogMessages +import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph, ExecutionVertex} +import org.apache.flink.runtime.jobgraph.{JobID, JobVertexID} + +import scala.collection.JavaConversions.mapAsScalaMap +import scala.collection.immutable.TreeMap +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.{FiniteDuration, _} + + +object StreamStateMonitor { + + def props(context: ActorContext, executionGraph: ExecutionGraph, + interval: FiniteDuration = 5 seconds): ActorRef = { + + val vertices: Iterable[ExecutionVertex] = getExecutionVertices(executionGraph) + val monitor = context.system.actorOf(Props(new StreamStateMonitor(executionGraph, + vertices, vertices.map(x => ((x.getJobVertex.getJobVertexId, x.getParallelSubtaskIndex), List.empty[Long])).toMap, interval, 0L, -1L))) + monitor ! InitBarrierScheduler + monitor + } + + private def getExecutionVertices(executionGraph: ExecutionGraph): Iterable[ExecutionVertex] = { + for ((_, execJobVertex) <- executionGraph.getAllVertices; + execVertex: ExecutionVertex <- execJobVertex.getTaskVertices) + yield execVertex + } +} + +class StreamStateMonitor(val executionGraph: ExecutionGraph, + val vertices: Iterable[ExecutionVertex], var acks: Map[(JobVertexID, Int), List[Long]], + val interval: FiniteDuration, var curId: Long, var ackId: Long) + extends Actor with ActorLogMessages with ActorLogging { + + override def receiveWithLogMessages: Receive = { + case InitBarrierScheduler => + context.system.scheduler.schedule(interval, interval, self, BarrierTimeout) + context.system.scheduler.schedule(2 * interval, 2 * interval, self, UpdateCurrentBarrier) + log.debug("[FT-MONITOR] Started Stream State Monitor for job {}{}", + executionGraph.getJobID, executionGraph.getJobName) + case BarrierTimeout => + curId += 1 + log.debug("[FT-MONITOR] Sending Barrier to vertices of Job " + executionGraph.getJobName) + vertices.filter(v => v.getJobVertex.getJobVertex.isInputVertex).foreach(vertex + => vertex.getCurrentAssignedResource.getInstance.getTaskManager + ! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId, curId)) + case BarrierAck(_, jobVertexID, instanceID, checkpointID) => + acks.get(jobVertexID, instanceID) match { + case Some(acklist) => + acks += (jobVertexID, instanceID) -> (checkpointID :: acklist) + case None => + } + log.info(acks.toString) + case UpdateCurrentBarrier => + val barrierCount = acks.values.foldLeft(TreeMap[Long, Int]().withDefaultValue(0))((dict, myList) + => myList.foldLeft(dict)((dict2, elem) => dict2.updated(elem, dict2(elem) + 1))) + val keysToKeep = barrierCount.filter(_._2 == acks.size).keys + ackId = if (!keysToKeep.isEmpty) keysToKeep.max else ackId + acks.keys.foreach(x => acks = acks.updated(x, acks(x).filter(_ >= ackId))) + log.debug("[FT-MONITOR] Last global barrier is " + ackId) + } +} + +case class BarrierTimeout() + +case class InitBarrierScheduler() + +case class UpdateCurrentBarrier() + +case class BarrierReq(attemptID: ExecutionAttemptID, checkpointID: Long) + +case class BarrierAck(jobID: JobID, jobVertexID: JobVertexID, instanceID: Int, checkpointID: Long) + + + http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java index e1362c4..d464ef1 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java @@ -48,6 +48,7 @@ public class StreamConfig implements Serializable { private static final String OUTPUT_NAME = "outputName_"; private static final String PARTITIONER_OBJECT = "partitionerObject_"; private static final String VERTEX_NAME = "vertexID"; + private static final String OPERATOR_NAME = "operatorName"; private static final String ITERATION_ID = "iteration-id"; private static final String OUTPUT_SELECTOR = "outputSelector"; private static final String DIRECTED_EMIT = "directedEmit"; @@ -87,6 +88,14 @@ public class StreamConfig implements Serializable { return config.getInteger(VERTEX_NAME, -1); } + public void setOperatorName(String name) { + config.setString(OPERATOR_NAME, name); + } + + public String getOperatorName() { + return config.getString(OPERATOR_NAME, "Missing"); + } + public void setTypeSerializerIn1(StreamRecordSerializer<?> serializer) { setTypeSerializer(TYPE_SERIALIZER_IN_1, serializer); } http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java index b999c27..c9698e3 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java @@ -188,7 +188,9 @@ public class StreamingJobGraphGenerator { builtVertices.add(vertexID); jobGraph.addVertex(vertex); - return new StreamConfig(vertex.getConfiguration()); + StreamConfig retConfig = new StreamConfig(vertex.getConfiguration()); + retConfig.setOperatorName(chainedNames.get(vertexID)); + return retConfig; } private void setVertexConfig(Integer vertexID, StreamConfig config, http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java index a497119..c3f694e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.collector; import java.io.IOException; +import org.apache.flink.runtime.event.task.TaskEvent; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.api.streamrecord.StreamRecord; @@ -87,4 +88,7 @@ public class StreamOutput<OUT> implements Collector<OUT> { output.clearBuffers(); } + public void broadcastEvent(TaskEvent barrier) throws IOException, InterruptedException { + output.broadcastEvent(barrier); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index dcfd6fe..cdf43ee 100755 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -112,7 +112,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato * The state to be registered for this name. * @return The data stream with state registered. */ - protected SingleOutputStreamOperator<OUT, O> registerState(String name, OperatorState<?> state) { + public SingleOutputStreamOperator<OUT, O> registerState(String name, OperatorState<?> state) { streamGraph.addOperatorState(getId(), name, state); return this; } @@ -128,7 +128,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato * The map containing the states that will be registered. * @return The data stream with states registered. */ - protected SingleOutputStreamOperator<OUT, O> registerState(Map<String, OperatorState<?>> states) { + public SingleOutputStreamOperator<OUT, O> registerState(Map<String, OperatorState<?>> states) { for (Entry<String, OperatorState<?>> entry : states.entrySet()) { streamGraph.addOperatorState(getId(), entry.getKey(), entry.getValue()); } http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java index abe31d4..6281de3 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java @@ -96,7 +96,7 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable { * Reads the next record from the reader iterator and stores it in the * nextRecord variable */ - protected StreamRecord<IN> readNext() { + protected StreamRecord<IN> readNext() throws IOException { this.nextRecord = inSerializer.createInstance(); try { nextRecord = recordIterator.next(nextRecord); http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java index e8a2ce1..a95965c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java @@ -18,7 +18,9 @@ package org.apache.flink.streaming.api.streamvertex; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.event.task.StreamingSuperstep; import org.apache.flink.runtime.io.network.api.reader.MutableReader; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; import org.apache.flink.runtime.plugable.DeserializationDelegate; import org.apache.flink.streaming.api.StreamConfig; @@ -51,25 +53,19 @@ public class InputHandler<IN> { inputSerializer = configuration.getTypeSerializerIn1(streamVertex.userClassLoader); int numberOfInputs = configuration.getNumberOfInputs(); - if (numberOfInputs > 0) { - if (numberOfInputs < 2) { - inputs = new IndexedMutableReader<DeserializationDelegate<StreamRecord<IN>>>( - streamVertex.getEnvironment().getInputGate(0)); + if (numberOfInputs > 0) { + InputGate inputGate = numberOfInputs < 2 ? streamVertex.getEnvironment() + .getInputGate(0) : new UnionInputGate(streamVertex.getEnvironment() + .getAllInputGates()); - } else { - inputs = new IndexedMutableReader<DeserializationDelegate<StreamRecord<IN>>>( - new UnionInputGate(streamVertex.getEnvironment().getAllInputGates())); - } + inputs = new IndexedMutableReader<DeserializationDelegate<StreamRecord<IN>>>(inputGate); + inputs.registerTaskEventListener(streamVertex.getSuperstepListener(), + StreamingSuperstep.class); - inputIter = createInputIterator(); + inputIter = new IndexedReaderIterator<StreamRecord<IN>>(inputs, inputSerializer); } - } - private IndexedReaderIterator<StreamRecord<IN>> createInputIterator() { - final IndexedReaderIterator<StreamRecord<IN>> iter = new IndexedReaderIterator<StreamRecord<IN>>( - inputs, inputSerializer); - return iter; } protected static <T> IndexedReaderIterator<StreamRecord<T>> staticCreateInputIterator( http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java index 359675d..82f1329 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.event.task.StreamingSuperstep; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.api.StreamConfig; @@ -84,6 +85,13 @@ public class OutputHandler<OUT> { } + public void broadcastBarrier(long id) throws IOException, InterruptedException { + StreamingSuperstep barrier = new StreamingSuperstep(id); + for (StreamOutput<?> streamOutput : outputMap.values()) { + streamOutput.broadcastEvent(barrier); + } + } + public Collection<StreamOutput<?>> getOutputs() { return outputMap.values(); } http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java index 99ca098..e2cdc34 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java @@ -17,10 +17,16 @@ package org.apache.flink.streaming.api.streamvertex; +import java.io.IOException; import java.util.Map; +import org.apache.flink.runtime.event.task.StreamingSuperstep; +import org.apache.flink.runtime.event.task.TaskEvent; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver; +import org.apache.flink.runtime.jobmanager.BarrierAck; +import org.apache.flink.runtime.util.event.EventListener; import org.apache.flink.streaming.api.StreamConfig; import org.apache.flink.streaming.api.invokable.ChainableInvokable; import org.apache.flink.streaming.api.invokable.StreamInvokable; @@ -34,7 +40,10 @@ import org.apache.flink.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTaskContext<OUT> { +import akka.actor.ActorRef; + +public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTaskContext<OUT>, + BarrierTransceiver { private static final Logger LOG = LoggerFactory.getLogger(StreamVertex.class); @@ -53,10 +62,13 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa protected ClassLoader userClassLoader; + private EventListener<TaskEvent> superstepListener; + public StreamVertex() { userInvokable = null; numTasks = newVertex(); instanceID = numTasks; + superstepListener = new SuperstepEventListener(); } protected static int newVertex() { @@ -78,6 +90,22 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa this.context = createRuntimeContext(getEnvironment().getTaskName(), this.states); } + @Override + public void broadcastBarrier(long id) { + // Only called at input vertices + if (LOG.isDebugEnabled()) { + LOG.debug("Received barrier from jobmanager: " + id); + } + actOnBarrier(id); + } + + @Override + public void confirmBarrier(long barrierID) { + getEnvironment().getJobManager().tell( + new BarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(), + context.getIndexOfThisSubtask(), barrierID), ActorRef.noSender()); + } + public void setInputsOutputs() { inputHandler = new InputHandler<IN>(this); outputHandler = new OutputHandler<OUT>(this); @@ -205,4 +233,35 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa throw new IllegalArgumentException("CoReader not available"); } + public EventListener<TaskEvent> getSuperstepListener() { + return this.superstepListener; + } + + private void actOnBarrier(long id) { + try { + outputHandler.broadcastBarrier(id); + System.out.println("Superstep " + id + " processed: " + StreamVertex.this); + if (LOG.isDebugEnabled()) { + LOG.debug("Superstep " + id + " processed: " + StreamVertex.this); + } + } catch (IOException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + public String toString() { + return configuration.getOperatorName() + " (" + context.getIndexOfThisSubtask() + ")"; + } + + private class SuperstepEventListener implements EventListener<TaskEvent> { + + @Override + public void onEvent(TaskEvent event) { + actOnBarrier(((StreamingSuperstep) event).getId()); + } + + } } http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java index bb20ecb..79f09c4 100755 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java @@ -17,6 +17,10 @@ package org.apache.flink.streaming.io; +import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.runtime.io.network.api.reader.AbstractReader; import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader; @@ -28,10 +32,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; import org.apache.flink.runtime.util.event.EventListener; -import java.io.IOException; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - /** * A CoRecordReader wraps {@link MutableRecordReader}s of two different input * types to read records effectively. http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java index 175dba2..025393d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java @@ -34,4 +34,4 @@ public class IndexedMutableReader<T extends IOReadableWritable> extends MutableR public int getNumberOfInputChannels() { return reader.getNumberOfInputChannels(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/5061edb8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java index 4b13165..af836e2 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java @@ -79,7 +79,7 @@ public class MockContext<IN, OUT> implements StreamTaskContext<OUT> { @Override public StreamRecord<IN> next() throws IOException { if (listIterator.hasNext()) { - StreamRecord<IN> result = new StreamRecord<IN>(); + StreamRecord<IN> result = inDeserializer.createInstance(); result.setObject(listIterator.next()); return result; } else {