[FLINK-4984] [checkpointing] Add Cancellation Barriers as a way to signal aborted checkpoints
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1f028de Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1f028de Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1f028de Branch: refs/heads/release-1.1 Commit: a1f028dee49928ada014632bb27216b36e30250e Parents: 4dd3efe Author: Stephan Ewen <[email protected]> Authored: Sun Oct 23 18:41:32 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Tue Nov 8 18:27:47 2016 +0100 ---------------------------------------------------------------------- .../io/network/api/CancelCheckpointMarker.java | 77 +++ .../api/serialization/EventSerializer.java | 57 +- .../runtime/io/network/netty/NettyMessage.java | 2 +- .../partition/PipelinedSubpartition.java | 3 +- .../runtime/jobgraph/tasks/StatefulTask.java | 21 + .../api/serialization/EventSerializerTest.java | 45 +- .../jobmanager/JobManagerHARecoveryTest.java | 10 + .../runtime/taskmanager/TaskAsyncCallTest.java | 10 + .../streaming/runtime/io/BarrierBuffer.java | 265 ++++++-- .../streaming/runtime/io/BarrierTracker.java | 164 +++-- .../streaming/runtime/io/BufferSpiller.java | 2 +- .../runtime/io/CheckpointBarrierHandler.java | 22 +- .../runtime/io/StreamInputProcessor.java | 5 +- .../runtime/io/StreamTwoInputProcessor.java | 5 +- .../runtime/tasks/OneInputStreamTask.java | 2 +- .../streaming/runtime/tasks/OperatorChain.java | 32 +- .../streaming/runtime/tasks/StreamTask.java | 48 +- .../runtime/tasks/TwoInputStreamTask.java | 2 +- .../streaming/runtime/io/BarrierBufferTest.java | 617 +++++++++++++++++-- .../runtime/io/BarrierTrackerTest.java | 157 ++++- 20 files changed, 1291 insertions(+), 255 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java new file mode 100644 index 0000000..52a2517 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.api; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.event.RuntimeEvent; + +import java.io.IOException; + +/** + * The CancelCheckpointMarker travels through the data streams, similar to the {@link CheckpointBarrier}, + * but signals that a certain checkpoint should be canceled. Any in-progress alignment for that + * checkpoint needs to be canceled and regular processing should be resumed. + */ +public class CancelCheckpointMarker extends RuntimeEvent { + + /** The id of the checkpoint to be canceled */ + private final long checkpointId; + + public CancelCheckpointMarker(long checkpointId) { + this.checkpointId = checkpointId; + } + + public long getCheckpointId() { + return checkpointId; + } + + // ------------------------------------------------------------------------ + // These known and common event go through special code paths, rather than + // through generic serialization + + @Override + public void write(DataOutputView out) throws IOException { + throw new UnsupportedOperationException("this method should never be called"); + } + + @Override + public void read(DataInputView in) throws IOException { + throw new UnsupportedOperationException("this method should never be called"); + } + + // ------------------------------------------------------------------------ + + @Override + public int hashCode() { + return (int) (checkpointId ^ (checkpointId >>> 32)); + } + + @Override + public boolean equals(Object other) { + return other != null && + other.getClass() == CancelCheckpointMarker.class && + this.checkpointId == ((CancelCheckpointMarker) other).checkpointId; + } + + @Override + public String toString() { + return "CancelCheckpointMarker " + checkpointId; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java index a34f8cf..0bc3b28 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.api.serialization; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.event.AbstractEvent; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent; @@ -38,7 +39,7 @@ import java.nio.ByteOrder; * Utility class to serialize and deserialize task events. */ public class EventSerializer { - + private static final int END_OF_PARTITION_EVENT = 0; private static final int CHECKPOINT_BARRIER_EVENT = 1; @@ -46,17 +47,19 @@ public class EventSerializer { private static final int END_OF_SUPERSTEP_EVENT = 2; private static final int OTHER_EVENT = 3; - + + private static final int CANCEL_CHECKPOINT_MARKER_EVENT = 4; + // ------------------------------------------------------------------------ - - public static ByteBuffer toSerializedEvent(AbstractEvent event) { + + public static ByteBuffer toSerializedEvent(AbstractEvent event) throws IOException { final Class<?> eventClass = event.getClass(); if (eventClass == EndOfPartitionEvent.class) { return ByteBuffer.wrap(new byte[] { 0, 0, 0, END_OF_PARTITION_EVENT }); } else if (eventClass == CheckpointBarrier.class) { CheckpointBarrier barrier = (CheckpointBarrier) event; - + ByteBuffer buf = ByteBuffer.allocate(20); buf.putInt(0, CHECKPOINT_BARRIER_EVENT); buf.putLong(4, barrier.getId()); @@ -66,32 +69,39 @@ public class EventSerializer { else if (eventClass == EndOfSuperstepEvent.class) { return ByteBuffer.wrap(new byte[] { 0, 0, 0, END_OF_SUPERSTEP_EVENT }); } + else if (eventClass == CancelCheckpointMarker.class) { + CancelCheckpointMarker marker = (CancelCheckpointMarker) event; + + ByteBuffer buf = ByteBuffer.allocate(12); + buf.putInt(0, CANCEL_CHECKPOINT_MARKER_EVENT); + buf.putLong(4, marker.getCheckpointId()); + return buf; + } else { try { final DataOutputSerializer serializer = new DataOutputSerializer(128); serializer.writeInt(OTHER_EVENT); serializer.writeUTF(event.getClass().getName()); event.write(serializer); - return serializer.wrapAsByteBuffer(); } catch (IOException e) { - throw new RuntimeException("Error while serializing event.", e); + throw new IOException("Error while serializing event.", e); } } } - public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, ClassLoader classLoader) { + public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, ClassLoader classLoader) throws IOException { if (buffer.remaining() < 4) { - throw new RuntimeException("Incomplete event"); + throw new IOException("Incomplete event"); } - + final ByteOrder bufferOrder = buffer.order(); buffer.order(ByteOrder.BIG_ENDIAN); - + try { int type = buffer.getInt(); - + if (type == END_OF_PARTITION_EVENT) { return EndOfPartitionEvent.INSTANCE; } @@ -103,35 +113,38 @@ public class EventSerializer { else if (type == END_OF_SUPERSTEP_EVENT) { return EndOfSuperstepEvent.INSTANCE; } + else if (type == CANCEL_CHECKPOINT_MARKER_EVENT) { + long id = buffer.getLong(); + return new CancelCheckpointMarker(id); + } else if (type == OTHER_EVENT) { try { final DataInputDeserializer deserializer = new DataInputDeserializer(buffer); - final String className = deserializer.readUTF(); - + final Class<? extends AbstractEvent> clazz; try { clazz = classLoader.loadClass(className).asSubclass(AbstractEvent.class); } catch (ClassNotFoundException e) { - throw new RuntimeException("Could not load event class '" + className + "'.", e); + throw new IOException("Could not load event class '" + className + "'.", e); } catch (ClassCastException e) { - throw new RuntimeException("The class '" + className + "' is not a valid subclass of '" + throw new IOException("The class '" + className + "' is not a valid subclass of '" + AbstractEvent.class.getName() + "'.", e); } - + final AbstractEvent event = InstantiationUtil.instantiate(clazz, AbstractEvent.class); event.read(deserializer); - + return event; } catch (Exception e) { - throw new RuntimeException("Error while deserializing or instantiating event.", e); + throw new IOException("Error while deserializing or instantiating event.", e); } } else { - throw new RuntimeException("Corrupt byte stream for event"); + throw new IOException("Corrupt byte stream for event"); } } finally { @@ -143,7 +156,7 @@ public class EventSerializer { // Buffer helpers // ------------------------------------------------------------------------ - public static Buffer toBuffer(AbstractEvent event) { + public static Buffer toBuffer(AbstractEvent event) throws IOException { final ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(event); MemorySegment data = MemorySegmentFactory.wrap(serializedEvent.array()); @@ -154,7 +167,7 @@ public class EventSerializer { return buffer; } - public static AbstractEvent fromBuffer(Buffer buffer, ClassLoader classLoader) { + public static AbstractEvent fromBuffer(Buffer buffer, ClassLoader classLoader) throws IOException { return fromSerializedEvent(buffer.getNioBuffer(), classLoader); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java index 3a24181..6f6001b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java @@ -463,7 +463,7 @@ abstract class NettyMessage { } @Override - public void readFrom(ByteBuf buffer) { + public void readFrom(ByteBuf buffer) throws IOException { // TODO Directly deserialize fromNetty's buffer int length = buffer.readInt(); ByteBuffer serializedEvent = ByteBuffer.allocate(length); http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index 2d7097d..b703acb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.util.event.NotificationListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.ArrayDeque; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -88,7 +89,7 @@ class PipelinedSubpartition extends ResultSubpartition { } @Override - public void finish() { + public void finish() throws IOException { final NotificationListener listener; synchronized (buffers) { http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java index f8bba1a..7c581df 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java @@ -47,6 +47,27 @@ public interface StatefulTask<T extends StateHandle<?>> { */ boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception; + /** + * This method is called when a checkpoint is triggered as a result of receiving checkpoint + * barriers on all input streams. + * + * @param checkpointId The ID of the checkpoint, incrementing. + * @param timestamp The timestamp when the checkpoint was triggered at the JobManager. + * + * @throws Exception Exceptions thrown as the result of triggering a checkpoint are forwarded. + */ + void triggerCheckpointOnBarrier(long checkpointId, long timestamp) throws Exception; + + /** + * Aborts a checkpoint as the result of receiving possibly some checkpoint barriers, + * but at least one {@link org.apache.flink.runtime.io.network.api.CancelCheckpointMarker}. + * + * <p>This requires implementing tasks to forward a + * {@link org.apache.flink.runtime.io.network.api.CancelCheckpointMarker} to their outputs. + * + * @param checkpointId The ID of the checkpoint to be aborted. + */ + void abortCheckpointOnBarrier(long checkpointId) throws Exception; /** * Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java index ddfd758..d47a0b5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.api.serialization; import org.apache.flink.runtime.event.AbstractEvent; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent; @@ -28,34 +29,30 @@ import org.junit.Test; import java.nio.ByteBuffer; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public class EventSerializerTest { @Test - public void testSerializeDeserializeEvent() { - try { - AbstractEvent[] events = { - EndOfPartitionEvent.INSTANCE, - EndOfSuperstepEvent.INSTANCE, - new CheckpointBarrier(1678L, 4623784L), - new TestTaskEvent(Math.random(), 12361231273L) - }; - - for (AbstractEvent evt : events) { - ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(evt); - assertTrue(serializedEvent.hasRemaining()); - - AbstractEvent deserialized = - EventSerializer.fromSerializedEvent(serializedEvent, getClass().getClassLoader()); - assertNotNull(deserialized); - assertEquals(evt, deserialized); - } - - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + public void testSerializeDeserializeEvent() throws Exception { + AbstractEvent[] events = { + EndOfPartitionEvent.INSTANCE, + EndOfSuperstepEvent.INSTANCE, + new CheckpointBarrier(1678L, 4623784L), + new TestTaskEvent(Math.random(), 12361231273L), + new CancelCheckpointMarker(287087987329842L) + }; + + for (AbstractEvent evt : events) { + ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(evt); + assertTrue(serializedEvent.hasRemaining()); + + AbstractEvent deserialized = + EventSerializer.fromSerializedEvent(serializedEvent, getClass().getClassLoader()); + assertNotNull(deserialized); + assertEquals(evt, deserialized); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index f050e29..4dfaf95 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -450,6 +450,16 @@ public class JobManagerHARecoveryTest { } @Override + public void triggerCheckpointOnBarrier(long checkpointId, long timestamp) throws Exception { + throw new UnsupportedOperationException("should not be called!"); + } + + @Override + public void abortCheckpointOnBarrier(long checkpointId) { + throw new UnsupportedOperationException("should not be called!"); + } + + @Override public void notifyCheckpointComplete(long checkpointId) { if (completedCheckpoints++ > NUM_CHECKPOINTS_TO_COMPLETE) { completedCheckpointsLatch.countDown(); http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index 0c0d064..5b344eb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -221,6 +221,16 @@ public class TaskAsyncCallTest { } @Override + public void triggerCheckpointOnBarrier(long checkpointId, long timestamp) throws Exception { + throw new UnsupportedOperationException("Should not be called"); + } + + @Override + public void abortCheckpointOnBarrier(long checkpointId) { + throw new UnsupportedOperationException("Should not be called"); + } + + @Override public void notifyCheckpointComplete(long checkpointId) { if (checkpointId != lastCheckpointId && this.error == null) { this.error = new Exception("calls out of order"); http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java index dcd76c6..36de717 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java @@ -17,42 +17,43 @@ package org.apache.flink.streaming.runtime.io; -import java.io.IOException; -import java.util.ArrayDeque; - import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; -import org.apache.flink.runtime.util.event.EventListener; -import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayDeque; + /** * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until * all inputs have received the barrier for a given checkpoint. * * <p>To avoid back-pressuring the input streams (which may cause distributed deadlocks), the * BarrierBuffer continues receiving buffers from the blocked channels and stores them internally until - * the blocks are released.</p> + * the blocks are released. */ @Internal public class BarrierBuffer implements CheckpointBarrierHandler { private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class); - + /** The gate that the buffer draws its input from */ private final InputGate inputGate; /** Flags that indicate whether a channel is currently blocked/buffered */ private final boolean[] blockedChannels; - + /** The total number of channels that this buffer handles data from */ private final int totalNumberOfInputChannels; - + /** To utility to write blocked data to a file channel */ private final BufferSpiller bufferSpiller; @@ -65,17 +66,24 @@ public class BarrierBuffer implements CheckpointBarrierHandler { private BufferSpiller.SpilledBufferOrEventSequence currentBuffered; /** Handler that receives the checkpoint notifications */ - private EventListener<CheckpointBarrier> checkpointHandler; + private StatefulTask<?> toNotifyOnCheckpoint; /** The ID of the checkpoint for which we expect barriers */ private long currentCheckpointId = -1L; - /** The number of received barriers (= number of blocked/buffered channels) */ + /** The number of received barriers (= number of blocked/buffered channels) + * IMPORTANT: A canceled checkpoint must always have 0 barriers */ private int numBarriersReceived; - + /** The number of already closed channels */ private int numClosedChannels; - + + /** The timestamp as in {@link System#nanoTime()} at which the last alignment started */ + private long startOfAlignmentTimestamp; + + /** The time (in nanoseconds) that the latest alignment took */ + private long latestAlignmentDurationNanos; + /** Flag to indicate whether we have drawn all available input */ private boolean endOfStream; @@ -90,7 +98,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { this.inputGate = inputGate; this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels(); this.blockedChannels = new boolean[this.totalNumberOfInputChannels]; - + this.bufferSpiller = new BufferSpiller(ioManager, inputGate.getPageSize()); this.queuedBuffered = new ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence>(); } @@ -100,7 +108,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { // ------------------------------------------------------------------------ @Override - public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException { + public BufferOrEvent getNextNonBlocked() throws Exception { while (true) { // process buffered BufferOrEvents before grabbing new ones BufferOrEvent next; @@ -114,7 +122,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { return getNextNonBlocked(); } } - + if (next != null) { if (isBlocked(next.getChannelIndex())) { // if the channel is blocked we, we just store the BufferOrEvent @@ -129,27 +137,29 @@ public class BarrierBuffer implements CheckpointBarrierHandler { processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex()); } } + else if (next.getEvent().getClass() == CancelCheckpointMarker.class) { + processCancellationBarrier((CancelCheckpointMarker) next.getEvent()); + } else { if (next.getEvent().getClass() == EndOfPartitionEvent.class) { - numClosedChannels++; - // no chance to complete this checkpoint - releaseBlocks(); + processEndOfPartition(next.getChannelIndex()); } return next; } } else if (!endOfStream) { - // end of stream. we feed the data that is still buffered + // end of input stream. stream continues with the buffered data endOfStream = true; - releaseBlocks(); + releaseBlocksAndResetBarriers(); return getNextNonBlocked(); } else { + // final end of both input and buffered data return null; } } } - + private void completeBufferedSequence() throws IOException { currentBuffered.cleanup(); currentBuffered = queuedBuffered.pollFirst(); @@ -157,66 +167,175 @@ public class BarrierBuffer implements CheckpointBarrierHandler { currentBuffered.open(); } } - - private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws IOException { + + private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception { final long barrierId = receivedBarrier.getId(); + // fast path for single channel cases + if (totalNumberOfInputChannels == 1) { + if (barrierId > currentCheckpointId) { + // new checkpoint + currentCheckpointId = barrierId; + notifyCheckpoint(receivedBarrier); + } + return; + } + + // -- general code path for multiple input channels -- + if (numBarriersReceived > 0) { - // subsequent barrier of a checkpoint. + // this is only true if some alignment is already progress and was not canceled + if (barrierId == currentCheckpointId) { // regular case onBarrier(channelIndex); } else if (barrierId > currentCheckpointId) { - // we did not complete the current checkpoint + // we did not complete the current checkpoint, another started before LOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " + "Skipping current checkpoint.", barrierId, currentCheckpointId); - releaseBlocks(); - currentCheckpointId = barrierId; - onBarrier(channelIndex); + // let the task know we are not completing this + notifyAbort(currentCheckpointId); + + // abort the current checkpoint + releaseBlocksAndResetBarriers(); + + // begin a the new checkpoint + beginNewAlignment(barrierId, channelIndex); } else { - // ignore trailing barrier from aborted checkpoint + // ignore trailing barrier from an earlier checkpoint (obsolete now) return; } - } else if (barrierId > currentCheckpointId) { // first barrier of a new checkpoint - currentCheckpointId = barrierId; - onBarrier(channelIndex); + beginNewAlignment(barrierId, channelIndex); } else { - // trailing barrier from previous (skipped) checkpoint + // either the current checkpoint was canceled (numBarriers == 0) or + // this barrier is from an old subsumed checkpoint return; } - // check if we have all barriers + // check if we have all barriers - since canceled checkpoints always have zero barriers + // this can only happen on a non canceled checkpoint if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) { + // actually trigger checkpoint if (LOG.isDebugEnabled()) { - LOG.debug("Received all barrier, triggering checkpoint {} at {}", + LOG.debug("Received all barriers, triggering checkpoint {} at {}", receivedBarrier.getId(), receivedBarrier.getTimestamp()); } - if (checkpointHandler != null) { - checkpointHandler.onEvent(receivedBarrier); + releaseBlocksAndResetBarriers(); + notifyCheckpoint(receivedBarrier); + } + } + + private void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception { + final long barrierId = cancelBarrier.getCheckpointId(); + + // fast path for single channel cases + if (totalNumberOfInputChannels == 1) { + if (barrierId > currentCheckpointId) { + // new checkpoint + currentCheckpointId = barrierId; + notifyAbort(barrierId); } - - releaseBlocks(); + return; + } + + // -- general code path for multiple input channels -- + + if (numBarriersReceived > 0) { + // this is only true if some alignment is in progress and nothing was canceled + + if (barrierId == currentCheckpointId) { + // cancel this alignment + if (LOG.isDebugEnabled()) { + LOG.debug("Checkpoint {} canceled, aborting alignment", barrierId); + } + + releaseBlocksAndResetBarriers(); + notifyAbort(barrierId); + } + else if (barrierId > currentCheckpointId) { + // we canceled the next which also cancels the current + LOG.warn("Received cancellation barrier for checkpoint {} before completing current checkpoint {}. " + + "Skipping current checkpoint.", barrierId, currentCheckpointId); + + // this stops the current alignment + releaseBlocksAndResetBarriers(); + + // the next checkpoint starts as canceled + currentCheckpointId = barrierId; + startOfAlignmentTimestamp = 0L; + latestAlignmentDurationNanos = 0L; + notifyAbort(barrierId); + } + + // else: ignore trailing (cancellation) barrier from an earlier checkpoint (obsolete now) + + } + else if (barrierId > currentCheckpointId) { + // first barrier of a new checkpoint is directly a cancellation + + // by setting the currentCheckpointId to this checkpoint while keeping the numBarriers + // at zero means that no checkpoint barrier can start a new alignment + currentCheckpointId = barrierId; + + startOfAlignmentTimestamp = 0L; + latestAlignmentDurationNanos = 0L; + + if (LOG.isDebugEnabled()) { + LOG.debug("Checkpoint {} canceled, skipping alignment", barrierId); + } + + notifyAbort(barrierId); } + + // else: trailing barrier from either + // - a previous (subsumed) checkpoint + // - the current checkpoint if it was already canceled } - + + private void processEndOfPartition(int channel) throws Exception { + numClosedChannels++; + + if (numBarriersReceived > 0) { + // let the task know we skip a checkpoint + notifyAbort(currentCheckpointId); + + // no chance to complete this checkpoint + releaseBlocksAndResetBarriers(); + } + } + + private void notifyCheckpoint(CheckpointBarrier checkpointBarrier) throws Exception { + if (toNotifyOnCheckpoint != null) { + toNotifyOnCheckpoint.triggerCheckpointOnBarrier( + checkpointBarrier.getId(), checkpointBarrier.getTimestamp()); + } + } + + private void notifyAbort(long checkpointId) throws Exception { + if (toNotifyOnCheckpoint != null) { + toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId); + } + } + + @Override - public void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler) { - if (this.checkpointHandler == null) { - this.checkpointHandler = checkpointHandler; + public void registerCheckpointEventHandler(StatefulTask<?> toNotifyOnCheckpoint) { + if (this.toNotifyOnCheckpoint == null) { + this.toNotifyOnCheckpoint = toNotifyOnCheckpoint; } else { - throw new IllegalStateException("BarrierBuffer already has a registered checkpoint handler"); + throw new IllegalStateException("BarrierBuffer already has a registered checkpoint notifyee"); } } - + @Override public boolean isEmpty() { return currentBuffered == null; @@ -231,8 +350,20 @@ public class BarrierBuffer implements CheckpointBarrierHandler { for (BufferSpiller.SpilledBufferOrEventSequence seq : queuedBuffered) { seq.cleanup(); } + queuedBuffered.clear(); } - + + private void beginNewAlignment(long checkpointId, int channelIndex) throws IOException { + currentCheckpointId = checkpointId; + onBarrier(channelIndex); + + startOfAlignmentTimestamp = System.nanoTime(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Starting stream alignment for checkpoint " + checkpointId); + } + } + /** * Checks whether the channel with the given index is blocked. * @@ -242,7 +373,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { private boolean isBlocked(int channelIndex) { return blockedChannels[channelIndex]; } - + /** * Blocks the given channel index, from which a barrier has been received. * @@ -251,30 +382,28 @@ public class BarrierBuffer implements CheckpointBarrierHandler { private void onBarrier(int channelIndex) throws IOException { if (!blockedChannels[channelIndex]) { blockedChannels[channelIndex] = true; + numBarriersReceived++; - + if (LOG.isDebugEnabled()) { LOG.debug("Received barrier from channel " + channelIndex); } } else { - throw new IOException("Stream corrupt: Repeated barrier for same checkpoint and input stream"); + throw new IOException("Stream corrupt: Repeated barrier for same checkpoint on input " + channelIndex); } } /** - * Releases the blocks on all channels. Makes sure the just written data - * is the next to be consumed. + * Releases the blocks on all channels and resets the barrier count. + * Makes sure the just written data is the next to be consumed. */ - private void releaseBlocks() throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Releasing blocks"); - } + private void releaseBlocksAndResetBarriers() throws IOException { + LOG.debug("End of stream alignment, feeding buffered data back"); for (int i = 0; i < blockedChannels.length; i++) { blockedChannels[i] = false; } - numBarriersReceived = 0; if (currentBuffered == null) { // common case: no more buffered data @@ -295,10 +424,18 @@ public class BarrierBuffer implements CheckpointBarrierHandler { currentBuffered = bufferedNow; } } + + // the next barrier that comes must assume it is the first + numBarriersReceived = 0; + + if (startOfAlignmentTimestamp > 0) { + latestAlignmentDurationNanos = System.nanoTime() - startOfAlignmentTimestamp; + startOfAlignmentTimestamp = 0; + } } // ------------------------------------------------------------------------ - // For Testing + // Properties // ------------------------------------------------------------------------ /** @@ -309,7 +446,17 @@ public class BarrierBuffer implements CheckpointBarrierHandler { public long getCurrentCheckpointId() { return this.currentCheckpointId; } - + + @Override + public long getAlignmentDurationNanos() { + long start = this.startOfAlignmentTimestamp; + if (start <= 0) { + return latestAlignmentDurationNanos; + } else { + return System.nanoTime() - start; + } + } + // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java index 9c9ec4f..5157336 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java @@ -19,12 +19,12 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; -import org.apache.flink.runtime.util.event.EventListener; -import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; -import java.io.IOException; import java.util.ArrayDeque; /** @@ -34,9 +34,9 @@ import java.util.ArrayDeque; * * <p>Unlike the {@link BarrierBuffer}, the BarrierTracker does not block the input * channels that have sent barriers, so it cannot be used to gain "exactly-once" processing - * guarantees. It can, however, be used to gain "at least once" processing guarantees.</p> + * guarantees. It can, however, be used to gain "at least once" processing guarantees. * - * <p>NOTE: This implementation strictly assumes that newer checkpoints have higher checkpoint IDs.</p> + * <p>NOTE: This implementation strictly assumes that newer checkpoints have higher checkpoint IDs. */ @Internal public class BarrierTracker implements CheckpointBarrierHandler { @@ -57,11 +57,12 @@ public class BarrierTracker implements CheckpointBarrierHandler { private final ArrayDeque<CheckpointBarrierCount> pendingCheckpoints; /** The listener to be notified on complete checkpoints */ - private EventListener<CheckpointBarrier> checkpointHandler; + private StatefulTask<?> toNotifyOnCheckpoint; /** The highest checkpoint ID encountered so far */ private long latestPendingCheckpointID = -1; - + + // ------------------------------------------------------------------------ public BarrierTracker(InputGate inputGate) { this.inputGate = inputGate; @@ -70,28 +71,33 @@ public class BarrierTracker implements CheckpointBarrierHandler { } @Override - public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException { + public BufferOrEvent getNextNonBlocked() throws Exception { while (true) { BufferOrEvent next = inputGate.getNextBufferOrEvent(); - if (next == null) { - return null; - } - else if (next.isBuffer() || next.getEvent().getClass() != CheckpointBarrier.class) { + if (next == null || next.isBuffer()) { + // buffer or input exhausted return next; } - else { + else if (next.getEvent().getClass() == CheckpointBarrier.class) { processBarrier((CheckpointBarrier) next.getEvent()); } + else if (next.getEvent().getClass() == CancelCheckpointMarker.class) { + processCheckpointAbortBarrier((CancelCheckpointMarker) next.getEvent()); + } + else { + // some other event + return next; + } } } @Override - public void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler) { - if (this.checkpointHandler == null) { - this.checkpointHandler = checkpointHandler; + public void registerCheckpointEventHandler(StatefulTask<?> toNotifyOnCheckpoint) { + if (this.toNotifyOnCheckpoint == null) { + this.toNotifyOnCheckpoint = toNotifyOnCheckpoint; } else { - throw new IllegalStateException("BarrierTracker already has a registered checkpoint handler"); + throw new IllegalStateException("BarrierTracker already has a registered checkpoint notifyee"); } } @@ -105,22 +111,27 @@ public class BarrierTracker implements CheckpointBarrierHandler { return pendingCheckpoints.isEmpty(); } - private void processBarrier(CheckpointBarrier receivedBarrier) { + @Override + public long getAlignmentDurationNanos() { + // this one does not do alignment at all + return 0L; + } + + private void processBarrier(CheckpointBarrier receivedBarrier) throws Exception { + final long barrierId = receivedBarrier.getId(); + // fast path for single channel trackers if (totalNumberOfInputChannels == 1) { - if (checkpointHandler != null) { - checkpointHandler.onEvent(receivedBarrier); - } + notifyCheckpoint(barrierId, receivedBarrier.getTimestamp()); return; } - + // general path for multiple input channels - final long barrierId = receivedBarrier.getId(); // find the checkpoint barrier in the queue of bending barriers CheckpointBarrierCount cbc = null; int pos = 0; - + for (CheckpointBarrierCount next : pendingCheckpoints) { if (next.checkpointId == barrierId) { cbc = next; @@ -128,21 +139,21 @@ public class BarrierTracker implements CheckpointBarrierHandler { } pos++; } - + if (cbc != null) { // add one to the count to that barrier and check for completion int numBarriersNew = cbc.incrementBarrierCount(); if (numBarriersNew == totalNumberOfInputChannels) { - // checkpoint can be triggered + // checkpoint can be triggered (or is aborted and all barriers have been seen) // first, remove this checkpoint and all all prior pending // checkpoints (which are now subsumed) for (int i = 0; i <= pos; i++) { pendingCheckpoints.pollFirst(); } - + // notify the listener - if (checkpointHandler != null) { - checkpointHandler.onEvent(receivedBarrier); + if (!cbc.isAborted()) { + notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp()); } } } @@ -163,45 +174,104 @@ public class BarrierTracker implements CheckpointBarrierHandler { } } + private void processCheckpointAbortBarrier(CancelCheckpointMarker barrier) throws Exception { + final long checkpointId = barrier.getCheckpointId(); + + // fast path for single channel trackers + if (totalNumberOfInputChannels == 1) { + notifyAbort(checkpointId); + return; + } + + // -- general path for multiple input channels -- + + // find the checkpoint barrier in the queue of pending barriers + // while doing this we "abort" all checkpoints before that one + CheckpointBarrierCount cbc; + while ((cbc = pendingCheckpoints.peekFirst()) != null && cbc.checkpointId() < checkpointId) { + pendingCheckpoints.removeFirst(); + } + + if (cbc != null && cbc.checkpointId() == checkpointId) { + // make sure the checkpoint is remembered as aborted + if (cbc.markAborted()) { + // this was the first time the checkpoint was aborted - notify + notifyAbort(checkpointId); + } + + // we still count the barriers to be able to remove the entry once all barriers have been seen + if (cbc.incrementBarrierCount() == totalNumberOfInputChannels) { + // we can remove this entry + pendingCheckpoints.removeFirst(); + } + } + else { + notifyAbort(checkpointId); + + // first barrier for this checkpoint - remember it as aborted + // since we polled away all entries with lower checkpoint IDs + // this entry will become the new first entry + if (pendingCheckpoints.size() < MAX_CHECKPOINTS_TO_TRACK) { + CheckpointBarrierCount abortedMarker = new CheckpointBarrierCount(checkpointId); + abortedMarker.markAborted(); + pendingCheckpoints.addFirst(abortedMarker); + } + } + } + + private void notifyCheckpoint(long checkpointId, long timestamp) throws Exception { + if (toNotifyOnCheckpoint != null) { + toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointId, timestamp); + } + } + + private void notifyAbort(long checkpointId) throws Exception { + if (toNotifyOnCheckpoint != null) { + toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId); + } + } + // ------------------------------------------------------------------------ /** * Simple class for a checkpoint ID with a barrier counter. */ private static final class CheckpointBarrierCount { - + private final long checkpointId; - + private int barrierCount; - - private CheckpointBarrierCount(long checkpointId) { + + private boolean aborted; + + CheckpointBarrierCount(long checkpointId) { this.checkpointId = checkpointId; this.barrierCount = 1; } + public long checkpointId() { + return checkpointId; + } + public int incrementBarrierCount() { return ++barrierCount; } - - @Override - public int hashCode() { - return (int) ((checkpointId >>> 32) ^ checkpointId) + 17 * barrierCount; + + public boolean isAborted() { + return aborted; } - @Override - public boolean equals(Object obj) { - if (obj instanceof CheckpointBarrierCount) { - CheckpointBarrierCount that = (CheckpointBarrierCount) obj; - return this.checkpointId == that.checkpointId && this.barrierCount == that.barrierCount; - } - else { - return false; - } + public boolean markAborted() { + boolean firstAbort = !this.aborted; + this.aborted = true; + return firstAbort; } @Override public String toString() { - return String.format("checkpointID=%d, count=%d", checkpointId, barrierCount); + return isAborted() ? + String.format("checkpointID=%d - ABORTED", checkpointId) : + String.format("checkpointID=%d, count=%d", checkpointId, barrierCount); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java index 1b38a56..dc8d245 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java @@ -134,7 +134,7 @@ public class BufferSpiller { else { contents = EventSerializer.toSerializedEvent(boe.getEvent()); } - + headBuffer.clear(); headBuffer.putInt(boe.getChannelIndex()); headBuffer.putInt(contents.remaining()); http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java index 5aa2030..ca23491 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java @@ -20,8 +20,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; -import org.apache.flink.runtime.util.event.EventListener; -import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import java.io.IOException; @@ -43,14 +42,14 @@ public interface CheckpointBarrierHandler { * @throws java.lang.InterruptedException Thrown if the thread is interrupted while blocking during * waiting for the next BufferOrEvent to become available. */ - BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException; + BufferOrEvent getNextNonBlocked() throws Exception; /** - * Registers the given event handler to be notified on successful checkpoints. - * - * @param checkpointHandler The handler to register. + * Registers the task be notified once all checkpoint barriers have been received for a checkpoint. + * + * @param task The task to notify */ - void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler); + void registerCheckpointEventHandler(StatefulTask<?> task); /** * Cleans up all internally held resources. @@ -64,4 +63,13 @@ public interface CheckpointBarrierHandler { * @return {@code True}, if no data is buffered internally, {@code false} otherwise. */ boolean isEmpty(); + + /** + * Gets the time that the latest alignment took, in nanoseconds. + * If there is currently an alignment in progress, it will return the time spent in the + * current alignment so far. + * + * @return The duration in nanoseconds + */ + long getAlignmentDurationNanos(); } http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index d11990e..7d9e4d2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -24,6 +24,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; +import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.apache.flink.runtime.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.event.AbstractEvent; @@ -37,7 +38,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.plugable.DeserializationDelegate; import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate; -import org.apache.flink.runtime.util.event.EventListener; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; @@ -45,7 +45,6 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; -import org.apache.flink.runtime.io.network.api.CheckpointBarrier; /** * Input reader for {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}. @@ -85,7 +84,7 @@ public class StreamInputProcessor<IN> { @SuppressWarnings("unchecked") public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer, - EventListener<CheckpointBarrier> checkpointListener, + StatefulTask<?> checkpointListener, CheckpointingMode checkpointMode, IOManager ioManager, boolean enableWatermarkMultiplexing) throws IOException { http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index ce764b7..a3ae077 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.metrics.Gauge; +import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.apache.flink.runtime.metrics.groups.IOMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.event.AbstractEvent; @@ -34,7 +35,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.plugable.DeserializationDelegate; import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate; -import org.apache.flink.runtime.util.event.EventListener; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; @@ -42,7 +42,6 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; -import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import java.io.IOException; import java.util.Arrays; @@ -95,7 +94,7 @@ public class StreamTwoInputProcessor<IN1, IN2> { Collection<InputGate> inputGates2, TypeSerializer<IN1> inputSerializer1, TypeSerializer<IN2> inputSerializer2, - EventListener<CheckpointBarrier> checkpointListener, + StatefulTask<?> checkpointListener, CheckpointingMode checkpointMode, IOManager ioManager, boolean enableWatermarkMultiplexing) throws IOException { http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index 938d8c1..d18ca16 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -43,7 +43,7 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO if (numberOfInputs > 0) { InputGate[] inputGates = getEnvironment().getAllInputGates(); inputProcessor = new StreamInputProcessor<IN>(inputGates, inSerializer, - getCheckpointBarrierListener(), + this, configuration.getCheckpointMode(), getEnvironment().getIOManager(), isSerializingTimestamps()); http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 0e24516..351acaa 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -29,6 +29,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.plugable.SerializationDelegate; @@ -126,15 +127,32 @@ public class OperatorChain<OUT> { } } - - - public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException, InterruptedException { - CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp); - for (RecordWriterOutput<?> streamOutput : streamOutputs) { - streamOutput.broadcastEvent(barrier); + + + public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException { + try { + CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp); + for (RecordWriterOutput<?> streamOutput : streamOutputs) { + streamOutput.broadcastEvent(barrier); + } + } + catch (InterruptedException e) { + throw new IOException("Interrupted while broadcasting checkpoint barrier"); } } - + + public void broadcastCheckpointCancelMarker(long id) throws IOException { + try { + CancelCheckpointMarker barrier = new CancelCheckpointMarker(id); + for (RecordWriterOutput<?> streamOutput : streamOutputs) { + streamOutput.broadcastEvent(barrier); + } + } + catch (InterruptedException e) { + throw new IOException("Interrupted while broadcasting checkpoint cancellation"); + } + } + public RecordWriterOutput<?>[] getStreamOutputs() { return streamOutputs; } http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 8f28cef..d55a9c5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.execution.CancelTaskException; -import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.apache.flink.runtime.state.AbstractStateBackend; @@ -37,7 +36,6 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; -import org.apache.flink.runtime.util.event.EventListener; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; @@ -580,9 +578,34 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> } } - protected boolean performCheckpoint(final long checkpointId, final long timestamp) throws Exception { + @Override + public void triggerCheckpointOnBarrier(long checkpointId, long timestamp) throws Exception { + try { + performCheckpoint(checkpointId, timestamp); + } + catch (CancelTaskException e) { + throw e; + } + catch (Exception e) { + throw new Exception("Error while performing a checkpoint", e); + } + } + + @Override + public void abortCheckpointOnBarrier(long checkpointId) throws Exception { + LOG.debug("Aborting checkpoint via cancel-barrier {} for task {}", checkpointId, getName()); + + synchronized (lock) { + if (isRunning) { + operatorChain.broadcastCheckpointCancelMarker(checkpointId); + } + } + } + + private boolean performCheckpoint(final long checkpointId, final long timestamp) throws Exception { + LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName()); - + synchronized (lock) { if (isRunning) { @@ -759,23 +782,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> return getName(); } - protected final EventListener<CheckpointBarrier> getCheckpointBarrierListener() { - return new EventListener<CheckpointBarrier>() { - @Override - public void onEvent(CheckpointBarrier barrier) { - try { - performCheckpoint(barrier.getId(), barrier.getTimestamp()); - } - catch (CancelTaskException e) { - throw e; - } - catch (Exception e) { - throw new RuntimeException("Error triggering a checkpoint as the result of receiving checkpoint barrier", e); - } - } - }; - } - // ------------------------------------------------------------------------ /** http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java index c3305eb..9252063 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java @@ -68,7 +68,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS this.inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(inputList1, inputList2, inputDeserializer1, inputDeserializer2, - getCheckpointBarrierListener(), + this, configuration.getCheckpointMode(), getEnvironment().getIOManager(), isSerializingTimestamps());
