Repository: flink Updated Branches: refs/heads/master f025c455f -> 07ab9f453
[FLINK-4984] [checkpointing] Add Cancellation Barriers as a way to signal aborted checkpoints Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a79dd5f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a79dd5f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a79dd5f Branch: refs/heads/master Commit: 0a79dd5fb165c714bdac8de770b45d9406177401 Parents: f025c45 Author: Stephan Ewen <[email protected]> Authored: Sun Oct 23 18:41:32 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Tue Nov 8 21:14:39 2016 +0100 ---------------------------------------------------------------------- .../runtime/checkpoint/CheckpointMetrics.java | 13 +- .../io/network/api/CancelCheckpointMarker.java | 77 +++ .../api/serialization/EventSerializer.java | 57 +- .../runtime/io/network/netty/NettyMessage.java | 2 +- .../partition/PipelinedSubpartition.java | 3 +- .../runtime/jobgraph/tasks/StatefulTask.java | 11 + .../api/serialization/EventSerializerTest.java | 45 +- .../jobmanager/JobManagerHARecoveryTest.java | 37 +- .../runtime/taskmanager/TaskAsyncCallTest.java | 5 + .../streaming/runtime/io/BarrierBuffer.java | 215 +++++-- .../streaming/runtime/io/BarrierTracker.java | 147 +++-- .../streaming/runtime/io/BufferSpiller.java | 3 +- .../runtime/io/RecordWriterOutput.java | 4 +- .../streaming/runtime/tasks/OperatorChain.java | 32 +- .../streaming/runtime/tasks/StreamTask.java | 12 +- .../streaming/runtime/io/BarrierBufferTest.java | 572 +++++++++++++++++-- .../runtime/io/BarrierTrackerTest.java | 130 ++++- 17 files changed, 1144 insertions(+), 221 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java index 4155290..f72b00e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java @@ -20,16 +20,17 @@ package org.apache.flink.runtime.checkpoint; import java.io.Serializable; +/** + * A collection of simple metrics, around the triggering of a checkpoint. + */ public class CheckpointMetrics implements Serializable { - /** - * The number of bytes that were buffered during the checkpoint alignment phase - */ + private static final long serialVersionUID = 1L; + + /** The number of bytes that were buffered during the checkpoint alignment phase */ private long bytesBufferedInAlignment; - /** - * The duration (in nanoseconds) that the stream alignment for the checkpoint took - */ + /** The duration (in nanoseconds) that the stream alignment for the checkpoint took */ private long alignmentDurationNanos; /* The duration (in milliseconds) of the synchronous part of the operator checkpoint */ http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java new file mode 100644 index 0000000..52a2517 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.api; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.event.RuntimeEvent; + +import java.io.IOException; + +/** + * The CancelCheckpointMarker travels through the data streams, similar to the {@link CheckpointBarrier}, + * but signals that a certain checkpoint should be canceled. Any in-progress alignment for that + * checkpoint needs to be canceled and regular processing should be resumed. + */ +public class CancelCheckpointMarker extends RuntimeEvent { + + /** The id of the checkpoint to be canceled */ + private final long checkpointId; + + public CancelCheckpointMarker(long checkpointId) { + this.checkpointId = checkpointId; + } + + public long getCheckpointId() { + return checkpointId; + } + + // ------------------------------------------------------------------------ + // These known and common event go through special code paths, rather than + // through generic serialization + + @Override + public void write(DataOutputView out) throws IOException { + throw new UnsupportedOperationException("this method should never be called"); + } + + @Override + public void read(DataInputView in) throws IOException { + throw new UnsupportedOperationException("this method should never be called"); + } + + // ------------------------------------------------------------------------ + + @Override + public int hashCode() { + return (int) (checkpointId ^ (checkpointId >>> 32)); + } + + @Override + public boolean equals(Object other) { + return other != null && + other.getClass() == CancelCheckpointMarker.class && + this.checkpointId == ((CancelCheckpointMarker) other).checkpointId; + } + + @Override + public String toString() { + return "CancelCheckpointMarker " + checkpointId; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java index a34f8cf..0bc3b28 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.api.serialization; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.event.AbstractEvent; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent; @@ -38,7 +39,7 @@ import java.nio.ByteOrder; * Utility class to serialize and deserialize task events. */ public class EventSerializer { - + private static final int END_OF_PARTITION_EVENT = 0; private static final int CHECKPOINT_BARRIER_EVENT = 1; @@ -46,17 +47,19 @@ public class EventSerializer { private static final int END_OF_SUPERSTEP_EVENT = 2; private static final int OTHER_EVENT = 3; - + + private static final int CANCEL_CHECKPOINT_MARKER_EVENT = 4; + // ------------------------------------------------------------------------ - - public static ByteBuffer toSerializedEvent(AbstractEvent event) { + + public static ByteBuffer toSerializedEvent(AbstractEvent event) throws IOException { final Class<?> eventClass = event.getClass(); if (eventClass == EndOfPartitionEvent.class) { return ByteBuffer.wrap(new byte[] { 0, 0, 0, END_OF_PARTITION_EVENT }); } else if (eventClass == CheckpointBarrier.class) { CheckpointBarrier barrier = (CheckpointBarrier) event; - + ByteBuffer buf = ByteBuffer.allocate(20); buf.putInt(0, CHECKPOINT_BARRIER_EVENT); buf.putLong(4, barrier.getId()); @@ -66,32 +69,39 @@ public class EventSerializer { else if (eventClass == EndOfSuperstepEvent.class) { return ByteBuffer.wrap(new byte[] { 0, 0, 0, END_OF_SUPERSTEP_EVENT }); } + else if (eventClass == CancelCheckpointMarker.class) { + CancelCheckpointMarker marker = (CancelCheckpointMarker) event; + + ByteBuffer buf = ByteBuffer.allocate(12); + buf.putInt(0, CANCEL_CHECKPOINT_MARKER_EVENT); + buf.putLong(4, marker.getCheckpointId()); + return buf; + } else { try { final DataOutputSerializer serializer = new DataOutputSerializer(128); serializer.writeInt(OTHER_EVENT); serializer.writeUTF(event.getClass().getName()); event.write(serializer); - return serializer.wrapAsByteBuffer(); } catch (IOException e) { - throw new RuntimeException("Error while serializing event.", e); + throw new IOException("Error while serializing event.", e); } } } - public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, ClassLoader classLoader) { + public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, ClassLoader classLoader) throws IOException { if (buffer.remaining() < 4) { - throw new RuntimeException("Incomplete event"); + throw new IOException("Incomplete event"); } - + final ByteOrder bufferOrder = buffer.order(); buffer.order(ByteOrder.BIG_ENDIAN); - + try { int type = buffer.getInt(); - + if (type == END_OF_PARTITION_EVENT) { return EndOfPartitionEvent.INSTANCE; } @@ -103,35 +113,38 @@ public class EventSerializer { else if (type == END_OF_SUPERSTEP_EVENT) { return EndOfSuperstepEvent.INSTANCE; } + else if (type == CANCEL_CHECKPOINT_MARKER_EVENT) { + long id = buffer.getLong(); + return new CancelCheckpointMarker(id); + } else if (type == OTHER_EVENT) { try { final DataInputDeserializer deserializer = new DataInputDeserializer(buffer); - final String className = deserializer.readUTF(); - + final Class<? extends AbstractEvent> clazz; try { clazz = classLoader.loadClass(className).asSubclass(AbstractEvent.class); } catch (ClassNotFoundException e) { - throw new RuntimeException("Could not load event class '" + className + "'.", e); + throw new IOException("Could not load event class '" + className + "'.", e); } catch (ClassCastException e) { - throw new RuntimeException("The class '" + className + "' is not a valid subclass of '" + throw new IOException("The class '" + className + "' is not a valid subclass of '" + AbstractEvent.class.getName() + "'.", e); } - + final AbstractEvent event = InstantiationUtil.instantiate(clazz, AbstractEvent.class); event.read(deserializer); - + return event; } catch (Exception e) { - throw new RuntimeException("Error while deserializing or instantiating event.", e); + throw new IOException("Error while deserializing or instantiating event.", e); } } else { - throw new RuntimeException("Corrupt byte stream for event"); + throw new IOException("Corrupt byte stream for event"); } } finally { @@ -143,7 +156,7 @@ public class EventSerializer { // Buffer helpers // ------------------------------------------------------------------------ - public static Buffer toBuffer(AbstractEvent event) { + public static Buffer toBuffer(AbstractEvent event) throws IOException { final ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(event); MemorySegment data = MemorySegmentFactory.wrap(serializedEvent.array()); @@ -154,7 +167,7 @@ public class EventSerializer { return buffer; } - public static AbstractEvent fromBuffer(Buffer buffer, ClassLoader classLoader) { + public static AbstractEvent fromBuffer(Buffer buffer, ClassLoader classLoader) throws IOException { return fromSerializedEvent(buffer.getNioBuffer(), classLoader); } } http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java index b97bd82..93775df 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java @@ -440,7 +440,7 @@ abstract class NettyMessage { } @Override - public void readFrom(ByteBuf buffer) { + public void readFrom(ByteBuf buffer) throws IOException { // TODO Directly deserialize fromNetty's buffer int length = buffer.readInt(); ByteBuffer serializedEvent = ByteBuffer.allocate(length); http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index 266f581..3981a26 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.util.event.NotificationListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.ArrayDeque; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -88,7 +89,7 @@ class PipelinedSubpartition extends ResultSubpartition { } @Override - public void finish() { + public void finish() throws IOException { final NotificationListener listener; synchronized (buffers) { http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java index b0c3730..c91dcf2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java @@ -61,6 +61,17 @@ public interface StatefulTask { void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception; /** + * Aborts a checkpoint as the result of receiving possibly some checkpoint barriers, + * but at least one {@link org.apache.flink.runtime.io.network.api.CancelCheckpointMarker}. + * + * <p>This requires implementing tasks to forward a + * {@link org.apache.flink.runtime.io.network.api.CancelCheckpointMarker} to their outputs. + * + * @param checkpointId The ID of the checkpoint to be aborted. + */ + void abortCheckpointOnBarrier(long checkpointId) throws Exception; + + /** * Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received * the notification from all participating tasks. * http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java index ddfd758..d47a0b5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.api.serialization; import org.apache.flink.runtime.event.AbstractEvent; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent; @@ -28,34 +29,30 @@ import org.junit.Test; import java.nio.ByteBuffer; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public class EventSerializerTest { @Test - public void testSerializeDeserializeEvent() { - try { - AbstractEvent[] events = { - EndOfPartitionEvent.INSTANCE, - EndOfSuperstepEvent.INSTANCE, - new CheckpointBarrier(1678L, 4623784L), - new TestTaskEvent(Math.random(), 12361231273L) - }; - - for (AbstractEvent evt : events) { - ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(evt); - assertTrue(serializedEvent.hasRemaining()); - - AbstractEvent deserialized = - EventSerializer.fromSerializedEvent(serializedEvent, getClass().getClassLoader()); - assertNotNull(deserialized); - assertEquals(evt, deserialized); - } - - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + public void testSerializeDeserializeEvent() throws Exception { + AbstractEvent[] events = { + EndOfPartitionEvent.INSTANCE, + EndOfSuperstepEvent.INSTANCE, + new CheckpointBarrier(1678L, 4623784L), + new TestTaskEvent(Math.random(), 12361231273L), + new CancelCheckpointMarker(287087987329842L) + }; + + for (AbstractEvent evt : events) { + ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(evt); + assertTrue(serializedEvent.hasRemaining()); + + AbstractEvent deserialized = + EventSerializer.fromSerializedEvent(serializedEvent, getClass().getClassLoader()); + assertNotNull(deserialized); + assertEquals(evt, deserialized); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index b195858..e40b2df 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -454,24 +454,20 @@ public class JobManagerHARecoveryTest { } @Override - public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) { - try { - ByteStreamStateHandle byteStreamStateHandle = new TestByteStreamStateHandleDeepCompare( - String.valueOf(UUID.randomUUID()), - InstantiationUtil.serializeObject(checkpointMetaData.getCheckpointId())); - - ChainedStateHandle<StreamStateHandle> chainedStateHandle = - new ChainedStateHandle<StreamStateHandle>(Collections.singletonList(byteStreamStateHandle)); - SubtaskState checkpointStateHandles = - new SubtaskState(chainedStateHandle, null, null, null, null, 0L); - - getEnvironment().acknowledgeCheckpoint( - new CheckpointMetaData(checkpointMetaData.getCheckpointId(), -1, 0L, 0L, 0L, 0L), - checkpointStateHandles); - return true; - } catch (Exception ex) { - throw new RuntimeException(ex); - } + public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception { + ByteStreamStateHandle byteStreamStateHandle = new TestByteStreamStateHandleDeepCompare( + String.valueOf(UUID.randomUUID()), + InstantiationUtil.serializeObject(checkpointMetaData.getCheckpointId())); + + ChainedStateHandle<StreamStateHandle> chainedStateHandle = + new ChainedStateHandle<StreamStateHandle>(Collections.singletonList(byteStreamStateHandle)); + SubtaskState checkpointStateHandles = + new SubtaskState(chainedStateHandle, null, null, null, null, 0L); + + getEnvironment().acknowledgeCheckpoint( + new CheckpointMetaData(checkpointMetaData.getCheckpointId(), -1, 0L, 0L, 0L, 0L), + checkpointStateHandles); + return true; } @Override @@ -480,6 +476,11 @@ public class JobManagerHARecoveryTest { } @Override + public void abortCheckpointOnBarrier(long checkpointId) { + throw new UnsupportedOperationException("should not be called!"); + } + + @Override public void notifyCheckpointComplete(long checkpointId) { if (completedCheckpoints++ > NUM_CHECKPOINTS_TO_COMPLETE) { completedCheckpointsLatch.countDown(); http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index ed107c7..ab29bb0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -232,6 +232,11 @@ public class TaskAsyncCallTest { } @Override + public void abortCheckpointOnBarrier(long checkpointId) { + throw new UnsupportedOperationException("Should not be called"); + } + + @Override public void notifyCheckpointComplete(long checkpointId) { if (checkpointId != lastCheckpointId && this.error == null) { this.error = new Exception("calls out of order"); http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java index 7f01129..3ccb575 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java @@ -20,11 +20,13 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,7 +72,8 @@ public class BarrierBuffer implements CheckpointBarrierHandler { /** The ID of the checkpoint for which we expect barriers */ private long currentCheckpointId = -1L; - /** The number of received barriers (= number of blocked/buffered channels) */ + /** The number of received barriers (= number of blocked/buffered channels) + * IMPORTANT: A canceled checkpoint must always have 0 barriers */ private int numBarriersReceived; /** The number of already closed channels */ @@ -96,7 +99,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { this.inputGate = inputGate; this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels(); this.blockedChannels = new boolean[this.totalNumberOfInputChannels]; - + this.bufferSpiller = new BufferSpiller(ioManager, inputGate.getPageSize()); this.queuedBuffered = new ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence>(); } @@ -135,11 +138,12 @@ public class BarrierBuffer implements CheckpointBarrierHandler { processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex()); } } + else if (next.getEvent().getClass() == CancelCheckpointMarker.class) { + processCancellationBarrier((CancelCheckpointMarker) next.getEvent()); + } else { if (next.getEvent().getClass() == EndOfPartitionEvent.class) { - numClosedChannels++; - // no chance to complete this checkpoint - releaseBlocks(); + processEndOfPartition(next.getChannelIndex()); } return next; } @@ -147,7 +151,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { else if (!endOfStream) { // end of input stream. stream continues with the buffered data endOfStream = true; - releaseBlocks(); + releaseBlocksAndResetBarriers(); return getNextNonBlocked(); } else { @@ -156,7 +160,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { } } } - + private void completeBufferedSequence() throws IOException { currentBuffered.cleanup(); currentBuffered = queuedBuffered.pollFirst(); @@ -164,72 +168,171 @@ public class BarrierBuffer implements CheckpointBarrierHandler { currentBuffered.open(); } } - + private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception { final long barrierId = receivedBarrier.getId(); + // fast path for single channel cases + if (totalNumberOfInputChannels == 1) { + if (barrierId > currentCheckpointId) { + // new checkpoint + currentCheckpointId = barrierId; + notifyCheckpoint(receivedBarrier); + } + return; + } + + // -- general code path for multiple input channels -- + if (numBarriersReceived > 0) { - // subsequent barrier of a checkpoint. + // this is only true if some alignment is already progress and was not canceled + if (barrierId == currentCheckpointId) { // regular case onBarrier(channelIndex); } else if (barrierId > currentCheckpointId) { - // we did not complete the current checkpoint + // we did not complete the current checkpoint, another started before LOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " + "Skipping current checkpoint.", barrierId, currentCheckpointId); - releaseBlocks(); - currentCheckpointId = barrierId; - onBarrier(channelIndex); + // let the task know we are not completing this + notifyAbort(currentCheckpointId); - if (LOG.isDebugEnabled()) { - LOG.debug("Starting stream alignment for checkpoint {}", barrierId); - } - startOfAlignmentTimestamp = System.nanoTime(); + // abort the current checkpoint + releaseBlocksAndResetBarriers(); + + // begin a the new checkpoint + beginNewAlignment(barrierId, channelIndex); } else { - // ignore trailing barrier from aborted checkpoint + // ignore trailing barrier from an earlier checkpoint (obsolete now) return; } - } else if (barrierId > currentCheckpointId) { // first barrier of a new checkpoint - currentCheckpointId = barrierId; - onBarrier(channelIndex); - - if (LOG.isDebugEnabled()) { - LOG.debug("Starting stream alignment for checkpoint {}", barrierId); - } - startOfAlignmentTimestamp = System.nanoTime(); + beginNewAlignment(barrierId, channelIndex); } else { - // trailing barrier from previous (skipped) checkpoint + // either the current checkpoint was canceled (numBarriers == 0) or + // this barrier is from an old subsumed checkpoint return; } - // check if we have all barriers + // check if we have all barriers - since canceled checkpoints always have zero barriers + // this can only happen on a non canceled checkpoint if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) { + // actually trigger checkpoint if (LOG.isDebugEnabled()) { - LOG.debug("Received all barrier, triggering checkpoint {} at {}", + LOG.debug("Received all barriers, triggering checkpoint {} at {}", receivedBarrier.getId(), receivedBarrier.getTimestamp()); } - releaseBlocks(); + releaseBlocksAndResetBarriers(); + notifyCheckpoint(receivedBarrier); + } + } + + private void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception { + final long barrierId = cancelBarrier.getCheckpointId(); + + // fast path for single channel cases + if (totalNumberOfInputChannels == 1) { + if (barrierId > currentCheckpointId) { + // new checkpoint + currentCheckpointId = barrierId; + notifyAbort(barrierId); + } + return; + } + + // -- general code path for multiple input channels -- + + if (numBarriersReceived > 0) { + // this is only true if some alignment is in progress and nothing was canceled + + if (barrierId == currentCheckpointId) { + // cancel this alignment + if (LOG.isDebugEnabled()) { + LOG.debug("Checkpoint {} canceled, aborting alignment", barrierId); + } + + releaseBlocksAndResetBarriers(); + notifyAbort(barrierId); + } + else if (barrierId > currentCheckpointId) { + // we canceled the next which also cancels the current + LOG.warn("Received cancellation barrier for checkpoint {} before completing current checkpoint {}. " + + "Skipping current checkpoint.", barrierId, currentCheckpointId); + + // this stops the current alignment + releaseBlocksAndResetBarriers(); + + // the next checkpoint starts as canceled + currentCheckpointId = barrierId; + startOfAlignmentTimestamp = 0L; + latestAlignmentDurationNanos = 0L; + notifyAbort(barrierId); + } + + // else: ignore trailing (cancellation) barrier from an earlier checkpoint (obsolete now) + + } + else if (barrierId > currentCheckpointId) { + // first barrier of a new checkpoint is directly a cancellation + + // by setting the currentCheckpointId to this checkpoint while keeping the numBarriers + // at zero means that no checkpoint barrier can start a new alignment + currentCheckpointId = barrierId; - if (toNotifyOnCheckpoint != null) { - CheckpointMetaData checkpointMetaData = - new CheckpointMetaData(receivedBarrier.getId(), receivedBarrier.getTimestamp()); - checkpointMetaData. - setBytesBufferedInAlignment(bufferSpiller.getBytesWritten()). - setAlignmentDurationNanos(latestAlignmentDurationNanos); + startOfAlignmentTimestamp = 0L; + latestAlignmentDurationNanos = 0L; - toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData); + if (LOG.isDebugEnabled()) { + LOG.debug("Checkpoint {} canceled, skipping alignment", barrierId); } + + notifyAbort(barrierId); } + + // else: trailing barrier from either + // - a previous (subsumed) checkpoint + // - the current checkpoint if it was already canceled } - + + private void processEndOfPartition(int channel) throws Exception { + numClosedChannels++; + + if (numBarriersReceived > 0) { + // let the task know we skip a checkpoint + notifyAbort(currentCheckpointId); + + // no chance to complete this checkpoint + releaseBlocksAndResetBarriers(); + } + } + + private void notifyCheckpoint(CheckpointBarrier checkpointBarrier) throws Exception { + if (toNotifyOnCheckpoint != null) { + CheckpointMetaData checkpointMetaData = + new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp()); + + checkpointMetaData + .setBytesBufferedInAlignment(bufferSpiller.getBytesWritten()) + .setAlignmentDurationNanos(latestAlignmentDurationNanos); + + toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData); + } + } + + private void notifyAbort(long checkpointId) throws Exception { + if (toNotifyOnCheckpoint != null) { + toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId); + } + } + + @Override public void registerCheckpointEventHandler(StatefulTask toNotifyOnCheckpoint) { if (this.toNotifyOnCheckpoint == null) { @@ -239,7 +342,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { throw new IllegalStateException("BarrierBuffer already has a registered checkpoint notifyee"); } } - + @Override public boolean isEmpty() { return currentBuffered == null; @@ -254,8 +357,20 @@ public class BarrierBuffer implements CheckpointBarrierHandler { for (BufferSpiller.SpilledBufferOrEventSequence seq : queuedBuffered) { seq.cleanup(); } + queuedBuffered.clear(); } - + + private void beginNewAlignment(long checkpointId, int channelIndex) throws IOException { + currentCheckpointId = checkpointId; + onBarrier(channelIndex); + + startOfAlignmentTimestamp = System.nanoTime(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Starting stream alignment for checkpoint " + checkpointId); + } + } + /** * Checks whether the channel with the given index is blocked. * @@ -265,7 +380,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { private boolean isBlocked(int channelIndex) { return blockedChannels[channelIndex]; } - + /** * Blocks the given channel index, from which a barrier has been received. * @@ -274,28 +389,28 @@ public class BarrierBuffer implements CheckpointBarrierHandler { private void onBarrier(int channelIndex) throws IOException { if (!blockedChannels[channelIndex]) { blockedChannels[channelIndex] = true; + numBarriersReceived++; - + if (LOG.isDebugEnabled()) { LOG.debug("Received barrier from channel " + channelIndex); } } else { - throw new IOException("Stream corrupt: Repeated barrier for same checkpoint and input stream"); + throw new IOException("Stream corrupt: Repeated barrier for same checkpoint on input " + channelIndex); } } /** - * Releases the blocks on all channels. Makes sure the just written data - * is the next to be consumed. + * Releases the blocks on all channels and resets the barrier count. + * Makes sure the just written data is the next to be consumed. */ - private void releaseBlocks() throws IOException { + private void releaseBlocksAndResetBarriers() throws IOException { LOG.debug("End of stream alignment, feeding buffered data back"); for (int i = 0; i < blockedChannels.length; i++) { blockedChannels[i] = false; } - numBarriersReceived = 0; if (currentBuffered == null) { // common case: no more buffered data @@ -317,9 +432,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler { } } - final long now = System.nanoTime(); + // the next barrier that comes must assume it is the first + numBarriersReceived = 0; + if (startOfAlignmentTimestamp > 0) { - latestAlignmentDurationNanos = now - startOfAlignmentTimestamp; + latestAlignmentDurationNanos = System.nanoTime() - startOfAlignmentTimestamp; startOfAlignmentTimestamp = 0; } } http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java index 86945a8..cce7cb4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; @@ -34,9 +35,9 @@ import java.util.ArrayDeque; * * <p>Unlike the {@link BarrierBuffer}, the BarrierTracker does not block the input * channels that have sent barriers, so it cannot be used to gain "exactly-once" processing - * guarantees. It can, however, be used to gain "at least once" processing guarantees.</p> + * guarantees. It can, however, be used to gain "at least once" processing guarantees. * - * <p>NOTE: This implementation strictly assumes that newer checkpoints have higher checkpoint IDs.</p> + * <p>NOTE: This implementation strictly assumes that newer checkpoints have higher checkpoint IDs. */ @Internal public class BarrierTracker implements CheckpointBarrierHandler { @@ -74,15 +75,20 @@ public class BarrierTracker implements CheckpointBarrierHandler { public BufferOrEvent getNextNonBlocked() throws Exception { while (true) { BufferOrEvent next = inputGate.getNextBufferOrEvent(); - if (next == null) { - return null; - } - else if (next.isBuffer() || next.getEvent().getClass() != CheckpointBarrier.class) { + if (next == null || next.isBuffer()) { + // buffer or input exhausted return next; } - else { + else if (next.getEvent().getClass() == CheckpointBarrier.class) { processBarrier((CheckpointBarrier) next.getEvent()); } + else if (next.getEvent().getClass() == CancelCheckpointMarker.class) { + processCheckpointAbortBarrier((CancelCheckpointMarker) next.getEvent()); + } + else { + // some other event + return next; + } } } @@ -113,23 +119,15 @@ public class BarrierTracker implements CheckpointBarrierHandler { } private void processBarrier(CheckpointBarrier receivedBarrier) throws Exception { + final long barrierId = receivedBarrier.getId(); + // fast path for single channel trackers if (totalNumberOfInputChannels == 1) { - if (toNotifyOnCheckpoint != null) { - CheckpointMetaData checkpointMetaData = - new CheckpointMetaData(receivedBarrier.getId(), receivedBarrier.getTimestamp()); - - checkpointMetaData. - setBytesBufferedInAlignment(0L). - setAlignmentDurationNanos(0L); - - toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData); - } + notifyCheckpoint(barrierId, receivedBarrier.getTimestamp()); return; } // general path for multiple input channels - final long barrierId = receivedBarrier.getId(); // find the checkpoint barrier in the queue of bending barriers CheckpointBarrierCount cbc = null; @@ -147,22 +145,16 @@ public class BarrierTracker implements CheckpointBarrierHandler { // add one to the count to that barrier and check for completion int numBarriersNew = cbc.incrementBarrierCount(); if (numBarriersNew == totalNumberOfInputChannels) { - // checkpoint can be triggered + // checkpoint can be triggered (or is aborted and all barriers have been seen) // first, remove this checkpoint and all all prior pending // checkpoints (which are now subsumed) for (int i = 0; i <= pos; i++) { pendingCheckpoints.pollFirst(); } - + // notify the listener - if (toNotifyOnCheckpoint != null) { - CheckpointMetaData checkpointMetaData = - new CheckpointMetaData(receivedBarrier.getId(), receivedBarrier.getTimestamp()); - checkpointMetaData. - setBytesBufferedInAlignment(0L). - setAlignmentDurationNanos(0L); - - toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData); + if (!cbc.isAborted()) { + notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp()); } } } @@ -183,45 +175,110 @@ public class BarrierTracker implements CheckpointBarrierHandler { } } + private void processCheckpointAbortBarrier(CancelCheckpointMarker barrier) throws Exception { + final long checkpointId = barrier.getCheckpointId(); + + // fast path for single channel trackers + if (totalNumberOfInputChannels == 1) { + notifyAbort(checkpointId); + return; + } + + // -- general path for multiple input channels -- + + // find the checkpoint barrier in the queue of pending barriers + // while doing this we "abort" all checkpoints before that one + CheckpointBarrierCount cbc; + while ((cbc = pendingCheckpoints.peekFirst()) != null && cbc.checkpointId() < checkpointId) { + pendingCheckpoints.removeFirst(); + } + + if (cbc != null && cbc.checkpointId() == checkpointId) { + // make sure the checkpoint is remembered as aborted + if (cbc.markAborted()) { + // this was the first time the checkpoint was aborted - notify + notifyAbort(checkpointId); + } + + // we still count the barriers to be able to remove the entry once all barriers have been seen + if (cbc.incrementBarrierCount() == totalNumberOfInputChannels) { + // we can remove this entry + pendingCheckpoints.removeFirst(); + } + } + else { + notifyAbort(checkpointId); + + // first barrier for this checkpoint - remember it as aborted + // since we polled away all entries with lower checkpoint IDs + // this entry will become the new first entry + if (pendingCheckpoints.size() < MAX_CHECKPOINTS_TO_TRACK) { + CheckpointBarrierCount abortedMarker = new CheckpointBarrierCount(checkpointId); + abortedMarker.markAborted(); + pendingCheckpoints.addFirst(abortedMarker); + } + } + } + + private void notifyCheckpoint(long checkpointId, long timestamp) throws Exception { + if (toNotifyOnCheckpoint != null) { + CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp); + + checkpointMetaData + .setBytesBufferedInAlignment(0L) + .setAlignmentDurationNanos(0L); + + toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData); + } + } + + private void notifyAbort(long checkpointId) throws Exception { + if (toNotifyOnCheckpoint != null) { + toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId); + } + } + // ------------------------------------------------------------------------ /** * Simple class for a checkpoint ID with a barrier counter. */ private static final class CheckpointBarrierCount { - + private final long checkpointId; - + private int barrierCount; - - private CheckpointBarrierCount(long checkpointId) { + + private boolean aborted; + + CheckpointBarrierCount(long checkpointId) { this.checkpointId = checkpointId; this.barrierCount = 1; } + public long checkpointId() { + return checkpointId; + } + public int incrementBarrierCount() { return ++barrierCount; } - @Override - public int hashCode() { - return (int) ((checkpointId >>> 32) ^ checkpointId) + 17 * barrierCount; + public boolean isAborted() { + return aborted; } - @Override - public boolean equals(Object obj) { - if (obj instanceof CheckpointBarrierCount) { - CheckpointBarrierCount that = (CheckpointBarrierCount) obj; - return this.checkpointId == that.checkpointId && this.barrierCount == that.barrierCount; - } - else { - return false; - } + public boolean markAborted() { + boolean firstAbort = !this.aborted; + this.aborted = true; + return firstAbort; } @Override public String toString() { - return String.format("checkpointID=%d, count=%d", checkpointId, barrierCount); + return isAborted() ? + String.format("checkpointID=%d - ABORTED", checkpointId) : + String.format("checkpointID=%d, count=%d", checkpointId, barrierCount); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java index 5a8a4cd..45a330b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java @@ -132,9 +132,8 @@ public class BufferSpiller { } else { contents = EventSerializer.toSerializedEvent(boe.getEvent()); - } - + headBuffer.clear(); headBuffer.putInt(boe.getChannelIndex()); headBuffer.putInt(contents.remaining()); http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java index c3ef464..2625031 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java @@ -100,8 +100,8 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> { } } - public void broadcastEvent(AbstractEvent barrier) throws IOException, InterruptedException { - recordWriter.broadcastEvent(barrier); + public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException { + recordWriter.broadcastEvent(event); } http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 30bc377..5ea84fb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -29,6 +29,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; @@ -134,15 +135,32 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> { } } - - - public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException, InterruptedException { - CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp); - for (RecordWriterOutput<?> streamOutput : streamOutputs) { - streamOutput.broadcastEvent(barrier); + + + public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException { + try { + CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp); + for (RecordWriterOutput<?> streamOutput : streamOutputs) { + streamOutput.broadcastEvent(barrier); + } + } + catch (InterruptedException e) { + throw new IOException("Interrupted while broadcasting checkpoint barrier"); } } - + + public void broadcastCheckpointCancelMarker(long id) throws IOException { + try { + CancelCheckpointMarker barrier = new CancelCheckpointMarker(id); + for (RecordWriterOutput<?> streamOutput : streamOutputs) { + streamOutput.broadcastEvent(barrier); + } + } + catch (InterruptedException e) { + throw new IOException("Interrupted while broadcasting checkpoint cancellation"); + } + } + public RecordWriterOutput<?>[] getStreamOutputs() { return streamOutputs; } http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index c75458e..1e03a96 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -528,7 +528,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> @Override public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception { - try { performCheckpoint(checkpointMetaData); } @@ -540,6 +539,17 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } } + @Override + public void abortCheckpointOnBarrier(long checkpointId) throws Exception { + LOG.debug("Aborting checkpoint via cancel-barrier {} for task {}", checkpointId, getName()); + + synchronized (lock) { + if (isRunning) { + operatorChain.broadcastCheckpointCancelMarker(checkpointId); + } + } + } + private boolean performCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception { LOG.debug("Starting checkpoint {} on task {}", checkpointMetaData.getCheckpointId(), getName()); http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java index f2fc876..8754e10 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java @@ -18,29 +18,30 @@ package org.apache.flink.streaming.runtime.io; +import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; -import org.apache.flink.runtime.state.ChainedStateHandle; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; -import org.apache.flink.runtime.state.OperatorStateHandle; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TaskStateHandles; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; + import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import java.io.File; import java.util.Arrays; -import java.util.Collection; -import java.util.List; +import java.util.Random; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -48,15 +49,24 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + /** * Tests for the behavior of the {@link BarrierBuffer}. */ public class BarrierBufferTest { + private static final Random RND = new Random(); + private static final int PAGE_SIZE = 512; - + private static int SIZE_COUNTER = 0; - + private static IOManager IO_MANAGER; @BeforeClass @@ -528,36 +538,53 @@ public class BarrierBufferTest { MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); - ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(); - buffer.registerCheckpointEventHandler(handler); - handler.setNextExpectedCheckpointId(1L); + StatefulTask toNotify = mock(StatefulTask.class); + buffer.registerCheckpointEventHandler(toNotify); - // checkpoint 1 + long startTs; + + // initial data check(sequence[0], buffer.getNextNonBlocked()); check(sequence[1], buffer.getNextNonBlocked()); check(sequence[2], buffer.getNextNonBlocked()); + + // align checkpoint 1 + startTs = System.nanoTime(); check(sequence[7], buffer.getNextNonBlocked()); assertEquals(1L, buffer.getCurrentCheckpointId()); - + + // checkpoint done - replay buffered check(sequence[5], buffer.getNextNonBlocked()); + validateAlignmentTime(startTs, buffer); + verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L))); check(sequence[6], buffer.getNextNonBlocked()); + check(sequence[9], buffer.getNextNonBlocked()); check(sequence[10], buffer.getNextNonBlocked()); // alignment of checkpoint 2 + startTs = System.nanoTime(); check(sequence[13], buffer.getNextNonBlocked()); - assertEquals(2L, buffer.getCurrentCheckpointId()); check(sequence[15], buffer.getNextNonBlocked()); // checkpoint 2 aborted, checkpoint 3 started check(sequence[12], buffer.getNextNonBlocked()); assertEquals(3L, buffer.getCurrentCheckpointId()); + validateAlignmentTime(startTs, buffer); + verify(toNotify).abortCheckpointOnBarrier(2L); check(sequence[16], buffer.getNextNonBlocked()); + + // checkpoint 3 alignment in progress check(sequence[19], buffer.getNextNonBlocked()); - check(sequence[20], buffer.getNextNonBlocked()); - + // checkpoint 3 aborted (end of partition) + check(sequence[20], buffer.getNextNonBlocked()); + verify(toNotify).abortCheckpointOnBarrier(3L); + + // replay buffered data from checkpoint 3 check(sequence[18], buffer.getNextNonBlocked()); + + // all the remaining messages check(sequence[21], buffer.getNextNonBlocked()); check(sequence[22], buffer.getNextNonBlocked()); check(sequence[23], buffer.getNextNonBlocked()); @@ -887,9 +914,9 @@ public class BarrierBufferTest { assertNull(buffer.getNextNonBlocked()); assertNull(buffer.getNextNonBlocked()); - + buffer.cleanup(); - + checkNoTempFilesRemain(); } catch (Exception e) { @@ -899,26 +926,480 @@ public class BarrierBufferTest { } @Test - public void testEndOfStreamWhileCheckpoint() { + public void testEndOfStreamWhileCheckpoint() throws Exception { + BufferOrEvent[] sequence = { + // one checkpoint + createBarrier(1, 0), createBarrier(1, 1), createBarrier(1, 2), + + // some buffers + createBuffer(0), createBuffer(0), createBuffer(2), + + // start the checkpoint that will be incomplete + createBarrier(2, 2), createBarrier(2, 0), + createBuffer(0), createBuffer(2), createBuffer(1), + + // close one after the barrier one before the barrier + createEndOfPartition(2), createEndOfPartition(1), + createBuffer(0), + + // final end of stream + createEndOfPartition(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + // data after first checkpoint + check(sequence[3], buffer.getNextNonBlocked()); + check(sequence[4], buffer.getNextNonBlocked()); + check(sequence[5], buffer.getNextNonBlocked()); + assertEquals(1L, buffer.getCurrentCheckpointId()); + + // alignment of second checkpoint + check(sequence[10], buffer.getNextNonBlocked()); + assertEquals(2L, buffer.getCurrentCheckpointId()); + + // first end-of-partition encountered: checkpoint will not be completed + check(sequence[12], buffer.getNextNonBlocked()); + check(sequence[8], buffer.getNextNonBlocked()); + check(sequence[9], buffer.getNextNonBlocked()); + check(sequence[11], buffer.getNextNonBlocked()); + check(sequence[13], buffer.getNextNonBlocked()); + check(sequence[14], buffer.getNextNonBlocked()); + + // all done + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + + checkNoTempFilesRemain(); + } + + @Test + public void testSingleChannelAbortCheckpoint() throws Exception { + BufferOrEvent[] sequence = { + createBuffer(0), + createBarrier(1, 0), + createBuffer(0), + createBarrier(2, 0), + createCancellationBarrier(4, 0), + createBarrier(5, 0), + createBuffer(0), + createCancellationBarrier(6, 0), + createBuffer(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + StatefulTask toNotify = mock(StatefulTask.class); + buffer.registerCheckpointEventHandler(toNotify); + + check(sequence[0], buffer.getNextNonBlocked()); + check(sequence[2], buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L))); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + check(sequence[6], buffer.getNextNonBlocked()); + assertEquals(5L, buffer.getCurrentCheckpointId()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L))); + verify(toNotify, times(1)).abortCheckpointOnBarrier(4L); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L))); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + check(sequence[8], buffer.getNextNonBlocked()); + assertEquals(6L, buffer.getCurrentCheckpointId()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(6L); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + buffer.cleanup(); + checkNoTempFilesRemain(); + } + + @Test + public void testMultiChannelAbortCheckpoint() throws Exception { + BufferOrEvent[] sequence = { + // some buffers and a successful checkpoint + /* 0 */ createBuffer(0), createBuffer(2), createBuffer(0), + /* 3 */ createBarrier(1, 1), createBarrier(1, 2), + /* 5 */ createBuffer(2), createBuffer(1), + /* 7 */ createBarrier(1, 0), + /* 8 */ createBuffer(0), createBuffer(2), + + // aborted on last barrier + /* 10 */ createBarrier(2, 0), createBarrier(2, 2), + /* 12 */ createBuffer(0), createBuffer(2), + /* 14 */ createCancellationBarrier(2, 1), + + // successful checkpoint + /* 15 */ createBuffer(2), createBuffer(1), + /* 17 */ createBarrier(3, 1), createBarrier(3, 2), createBarrier(3, 0), + + // abort on first barrier + /* 20 */ createBuffer(0), createBuffer(1), + /* 22 */ createCancellationBarrier(4, 1), createBarrier(4, 2), + /* 24 */ createBuffer(0), + /* 25 */ createBarrier(4, 0), + + // another successful checkpoint + /* 26 */ createBuffer(0), createBuffer(1), createBuffer(2), + /* 29 */ createBarrier(5, 2), createBarrier(5, 1), createBarrier(5, 0), + /* 32 */ createBuffer(0), createBuffer(1), + + // abort multiple cancellations and a barrier after the cancellations + /* 34 */ createCancellationBarrier(6, 1), createCancellationBarrier(6, 2), + /* 36 */ createBarrier(6, 0), + + /* 37 */ createBuffer(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + StatefulTask toNotify = mock(StatefulTask.class); + buffer.registerCheckpointEventHandler(toNotify); + + long startTs; + + // successful first checkpoint, with some aligned buffers + check(sequence[0], buffer.getNextNonBlocked()); + check(sequence[1], buffer.getNextNonBlocked()); + check(sequence[2], buffer.getNextNonBlocked()); + startTs = System.nanoTime(); + check(sequence[5], buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L))); + validateAlignmentTime(startTs, buffer); + + check(sequence[6], buffer.getNextNonBlocked()); + check(sequence[8], buffer.getNextNonBlocked()); + check(sequence[9], buffer.getNextNonBlocked()); + + // canceled checkpoint on last barrier + startTs = System.nanoTime(); + check(sequence[12], buffer.getNextNonBlocked()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(2L); + validateAlignmentTime(startTs, buffer); + check(sequence[13], buffer.getNextNonBlocked()); + + // one more successful checkpoint + check(sequence[15], buffer.getNextNonBlocked()); + check(sequence[16], buffer.getNextNonBlocked()); + startTs = System.nanoTime(); + check(sequence[20], buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L))); + validateAlignmentTime(startTs, buffer); + check(sequence[21], buffer.getNextNonBlocked()); + + // this checkpoint gets immediately canceled + check(sequence[24], buffer.getNextNonBlocked()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(4L); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + // some buffers + check(sequence[26], buffer.getNextNonBlocked()); + check(sequence[27], buffer.getNextNonBlocked()); + check(sequence[28], buffer.getNextNonBlocked()); + + // a simple successful checkpoint + startTs = System.nanoTime(); + check(sequence[32], buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L))); + validateAlignmentTime(startTs, buffer); + check(sequence[33], buffer.getNextNonBlocked()); + + check(sequence[37], buffer.getNextNonBlocked()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(6L); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + // all done + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + checkNoTempFilesRemain(); + } + + @Test + public void testAbortViaQueuedBarriers() throws Exception { + BufferOrEvent[] sequence = { + // starting a checkpoint + /* 0 */ createBuffer(1), + /* 1 */ createBarrier(1, 1), createBarrier(1, 2), + /* 3 */ createBuffer(2), createBuffer(0), createBuffer(1), + + // queued barrier and cancellation barrier + /* 6 */ createCancellationBarrier(2, 2), + /* 7 */ createBarrier(2, 1), + + // some intermediate buffers (some queued) + /* 8 */ createBuffer(0), createBuffer(1), createBuffer(2), + + // complete initial checkpoint + /* 11 */ createBarrier(1, 0), + + // some buffers (none queued, since checkpoint is aborted) + /* 12 */ createBuffer(2), createBuffer(1), createBuffer(0), + + // final barrier of aborted checkpoint + /* 15 */ createBarrier(2, 0), + + // some more buffers + /* 16 */ createBuffer(0), createBuffer(1), createBuffer(2) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + StatefulTask toNotify = mock(StatefulTask.class); + buffer.registerCheckpointEventHandler(toNotify); + + long startTs; + + check(sequence[0], buffer.getNextNonBlocked()); + + // starting first checkpoint + startTs = System.nanoTime(); + check(sequence[4], buffer.getNextNonBlocked()); + check(sequence[8], buffer.getNextNonBlocked()); + + // finished first checkpoint + check(sequence[3], buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L))); + validateAlignmentTime(startTs, buffer); + + check(sequence[5], buffer.getNextNonBlocked()); + + // re-read the queued cancellation barriers + check(sequence[9], buffer.getNextNonBlocked()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(2L); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + check(sequence[10], buffer.getNextNonBlocked()); + check(sequence[12], buffer.getNextNonBlocked()); + check(sequence[13], buffer.getNextNonBlocked()); + check(sequence[14], buffer.getNextNonBlocked()); + + check(sequence[16], buffer.getNextNonBlocked()); + check(sequence[17], buffer.getNextNonBlocked()); + check(sequence[18], buffer.getNextNonBlocked()); + + // no further alignment should have happened + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + // no further checkpoint (abort) notifications + verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class)); + verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong()); + + // all done + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + checkNoTempFilesRemain(); + } + + /** + * This tests the where a replay of queued checkpoint barriers meets + * a canceled checkpoint. + * + * The replayed newer checkpoint barrier must not try to cancel the + * already canceled checkpoint. + */ + @Test + public void testAbortWhileHavingQueuedBarriers() throws Exception { + BufferOrEvent[] sequence = { + // starting a checkpoint + /* 0 */ createBuffer(1), + /* 1 */ createBarrier(1, 1), + /* 2 */ createBuffer(2), createBuffer(0), createBuffer(1), + + // queued barrier and cancellation barrier + /* 5 */ createBarrier(2, 1), + + // some queued buffers + /* 6 */ createBuffer(2), createBuffer(1), + + // cancel the initial checkpoint + /* 8 */ createCancellationBarrier(1, 0), + + // some more buffers + /* 9 */ createBuffer(2), createBuffer(1), createBuffer(0), + + // ignored barrier - already canceled and moved to next checkpoint + /* 12 */ createBarrier(1, 2), + + // some more buffers + /* 13 */ createBuffer(0), createBuffer(1), createBuffer(2), + + // complete next checkpoint regularly + /* 16 */ createBarrier(2, 0), createBarrier(2, 2), + + // some more buffers + /* 18 */ createBuffer(0), createBuffer(1), createBuffer(2) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + StatefulTask toNotify = mock(StatefulTask.class); + buffer.registerCheckpointEventHandler(toNotify); + + long startTs; + + check(sequence[0], buffer.getNextNonBlocked()); + + // starting first checkpoint + startTs = System.nanoTime(); + check(sequence[2], buffer.getNextNonBlocked()); + check(sequence[3], buffer.getNextNonBlocked()); + check(sequence[6], buffer.getNextNonBlocked()); + + // cancelled by cancellation barrier + check(sequence[4], buffer.getNextNonBlocked()); + validateAlignmentTime(startTs, buffer); + verify(toNotify, times(1)).abortCheckpointOnBarrier(1L); + + // the next checkpoint alignment starts now + startTs = System.nanoTime(); + check(sequence[9], buffer.getNextNonBlocked()); + check(sequence[11], buffer.getNextNonBlocked()); + check(sequence[13], buffer.getNextNonBlocked()); + check(sequence[15], buffer.getNextNonBlocked()); + + // checkpoint done + check(sequence[7], buffer.getNextNonBlocked()); + validateAlignmentTime(startTs, buffer); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L))); + + // queued data + check(sequence[10], buffer.getNextNonBlocked()); + check(sequence[14], buffer.getNextNonBlocked()); + + // trailing data + check(sequence[18], buffer.getNextNonBlocked()); + check(sequence[19], buffer.getNextNonBlocked()); + check(sequence[20], buffer.getNextNonBlocked()); + + // all done + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + checkNoTempFilesRemain(); + + // check overall notifications + verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class)); + verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong()); + } + + /** + * This tests the where a cancellation barrier is received for a checkpoint already + * canceled due to receiving a newer checkpoint barrier. + */ + @Test + public void testIgnoreCancelBarrierIfCheckpointSubsumed() throws Exception { + BufferOrEvent[] sequence = { + // starting a checkpoint + /* 0 */ createBuffer(2), + /* 1 */ createBarrier(3, 1), createBarrier(3, 0), + /* 3 */ createBuffer(0), createBuffer(1), createBuffer(2), + + // newer checkpoint barrier cancels/subsumes pending checkpoint + /* 6 */ createBarrier(5, 2), + + // some queued buffers + /* 7 */ createBuffer(2), createBuffer(1), createBuffer(0), + + // cancel barrier the initial checkpoint /it is already canceled) + /* 10 */ createCancellationBarrier(3, 2), + + // some more buffers + /* 11 */ createBuffer(2), createBuffer(0), createBuffer(1), + + // complete next checkpoint regularly + /* 14 */ createBarrier(5, 0), createBarrier(5, 1), + + // some more buffers + /* 16 */ createBuffer(0), createBuffer(1), createBuffer(2) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + StatefulTask toNotify = mock(StatefulTask.class); + buffer.registerCheckpointEventHandler(toNotify); + + long startTs; + + // validate the sequence + + check(sequence[0], buffer.getNextNonBlocked()); + + // beginning of first checkpoint + check(sequence[5], buffer.getNextNonBlocked()); + + // future barrier aborts checkpoint + startTs = System.nanoTime(); + check(sequence[3], buffer.getNextNonBlocked()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(3L); + check(sequence[4], buffer.getNextNonBlocked()); + + // alignment of next checkpoint + check(sequence[8], buffer.getNextNonBlocked()); + check(sequence[9], buffer.getNextNonBlocked()); + check(sequence[12], buffer.getNextNonBlocked()); + check(sequence[13], buffer.getNextNonBlocked()); + + // checkpoint finished + check(sequence[7], buffer.getNextNonBlocked()); + validateAlignmentTime(startTs, buffer); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L))); + check(sequence[11], buffer.getNextNonBlocked()); + + // remaining data + check(sequence[16], buffer.getNextNonBlocked()); + check(sequence[17], buffer.getNextNonBlocked()); + check(sequence[18], buffer.getNextNonBlocked()); + + // all done + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + checkNoTempFilesRemain(); + + // check overall notifications + verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class)); + verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong()); } // ------------------------------------------------------------------------ // Utils // ------------------------------------------------------------------------ - private static BufferOrEvent createBarrier(long id, int channel) { - return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel); + private static BufferOrEvent createBarrier(long checkpointId, int channel) { + return new BufferOrEvent(new CheckpointBarrier(checkpointId, System.currentTimeMillis()), channel); + } + + private static BufferOrEvent createCancellationBarrier(long checkpointId, int channel) { + return new BufferOrEvent(new CancelCheckpointMarker(checkpointId), channel); } private static BufferOrEvent createBuffer(int channel) { - // since we have no access to the contents, we need to use the size as an - // identifier to validate correctness here - Buffer buf = new Buffer( - MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE), - FreeingBufferRecycler.INSTANCE); - - buf.setSize(SIZE_COUNTER++); + final int size = SIZE_COUNTER++; + byte[] bytes = new byte[size]; + RND.nextBytes(bytes); + + MemorySegment memory = MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE); + memory.put(0, bytes); + + Buffer buf = new Buffer(memory, FreeingBufferRecycler.INSTANCE); + buf.setSize(size); + + // retain an additional time so it does not get disposed after being read by the input gate + buf.retain(); + return new BufferOrEvent(buf, channel); } @@ -932,15 +1413,16 @@ public class BarrierBufferTest { assertEquals(expected.isBuffer(), present.isBuffer()); if (expected.isBuffer()) { - // since we have no access to the contents, we need to use the size as an - // identifier to validate correctness here assertEquals(expected.getBuffer().getSize(), present.getBuffer().getSize()); + MemorySegment expectedMem = expected.getBuffer().getMemorySegment(); + MemorySegment presentMem = present.getBuffer().getMemorySegment(); + assertTrue("memory contents differs", expectedMem.compare(presentMem, 0, 0, PAGE_SIZE) == 0); } else { assertEquals(expected.getEvent(), present.getEvent()); } } - + private static void checkNoTempFilesRemain() { // validate that all temp files have been removed for (File dir : IO_MANAGER.getSpillingDirectories()) { @@ -985,8 +1467,10 @@ public class BarrierBufferTest { @Override public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception { + assertTrue("wrong checkpoint id", + nextExpectedCheckpointId == -1L || + nextExpectedCheckpointId == checkpointMetaData.getCheckpointId()); - assertTrue("wrong checkpoint id", nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == checkpointMetaData.getCheckpointId()); assertTrue(checkpointMetaData.getTimestamp() > 0); assertTrue(checkpointMetaData.getBytesBufferedInAlignment() >= 0); assertTrue(checkpointMetaData.getAlignmentDurationNanos() >= 0); @@ -995,8 +1479,32 @@ public class BarrierBufferTest { } @Override + public void abortCheckpointOnBarrier(long checkpointId) {} + + @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { throw new UnsupportedOperationException("should never be called"); } } + + private static class CheckpointMatcher extends BaseMatcher<CheckpointMetaData> { + + private final long checkpointId; + + CheckpointMatcher(long checkpointId) { + this.checkpointId = checkpointId; + } + + @Override + public boolean matches(Object o) { + return o != null && + o.getClass() == CheckpointMetaData.class && + ((CheckpointMetaData) o).getCheckpointId() == checkpointId; + } + + @Override + public void describeTo(Description description) { + description.appendText("CheckpointMetaData - id = " + checkpointId); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java index 7cfbb66..fa3363e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java @@ -20,21 +20,17 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; -import org.apache.flink.runtime.state.ChainedStateHandle; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; -import org.apache.flink.runtime.state.OperatorStateHandle; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TaskStateHandles; + import org.junit.Test; import java.util.Arrays; -import java.util.Collection; -import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -45,9 +41,9 @@ import static org.junit.Assert.fail; * Tests for the behavior of the barrier tracker. */ public class BarrierTrackerTest { - + private static final int PAGE_SIZE = 512; - + @Test public void testSingleChannelNoBarriers() { try { @@ -339,6 +335,98 @@ public class BarrierTrackerTest { } } + @Test + public void testSingleChannelAbortCheckpoint() throws Exception { + BufferOrEvent[] sequence = { + createBuffer(0), + createBarrier(1, 0), + createBuffer(0), + createBarrier(2, 0), + createCancellationBarrier(4, 0), + createBarrier(5, 0), + createBuffer(0), + createCancellationBarrier(6, 0), + createBuffer(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence)); + BarrierTracker tracker = new BarrierTracker(gate); + + // negative values mean an expected cancellation call! + CheckpointSequenceValidator validator = + new CheckpointSequenceValidator(1, 2, -4, 5, -6); + tracker.registerCheckpointEventHandler(validator); + + for (BufferOrEvent boe : sequence) { + if (boe.isBuffer()) { + assertEquals(boe, tracker.getNextNonBlocked()); + } + assertTrue(tracker.isEmpty()); + } + + assertNull(tracker.getNextNonBlocked()); + assertNull(tracker.getNextNonBlocked()); + } + + @Test + public void testMultiChannelAbortCheckpoint() throws Exception { + BufferOrEvent[] sequence = { + // some buffers and a successful checkpoint + createBuffer(0), createBuffer(2), createBuffer(0), + createBarrier(1, 1), createBarrier(1, 2), + createBuffer(2), createBuffer(1), + createBarrier(1, 0), + + // aborted on last barrier + createBuffer(0), createBuffer(2), + createBarrier(2, 0), createBarrier(2, 2), + createBuffer(0), createBuffer(2), + createCancellationBarrier(2, 1), + + // successful checkpoint + createBuffer(2), createBuffer(1), + createBarrier(3, 1), createBarrier(3, 2), createBarrier(3, 0), + + // abort on first barrier + createBuffer(0), createBuffer(1), + createCancellationBarrier(4, 1), createBarrier(4, 2), + createBuffer(0), + createBarrier(4, 0), + + // another successful checkpoint + createBuffer(0), createBuffer(1), createBuffer(2), + createBarrier(5, 2), createBarrier(5, 1), createBarrier(5, 0), + + // abort multiple cancellations and a barrier after the cancellations + createBuffer(0), createBuffer(1), + createCancellationBarrier(6, 1), createCancellationBarrier(6, 2), + createBarrier(6, 0), + + createBuffer(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierTracker tracker = new BarrierTracker(gate); + + // negative values mean an expected cancellation call! + CheckpointSequenceValidator validator = + new CheckpointSequenceValidator(1, -2, 3, -4, 5, -6); + tracker.registerCheckpointEventHandler(validator); + + for (BufferOrEvent boe : sequence) { + if (boe.isBuffer()) { + assertEquals(boe, tracker.getNextNonBlocked()); + } + } + + assertTrue(tracker.isEmpty()); + + assertNull(tracker.getNextNonBlocked()); + assertNull(tracker.getNextNonBlocked()); + + assertTrue(tracker.isEmpty()); + } + // ------------------------------------------------------------------------ // Utils // ------------------------------------------------------------------------ @@ -347,6 +435,10 @@ public class BarrierTrackerTest { return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel); } + private static BufferOrEvent createCancellationBarrier(long id, int channel) { + return new BufferOrEvent(new CancelCheckpointMarker(id), channel); + } + private static BufferOrEvent createBuffer(int channel) { return new BufferOrEvent( new Buffer(MemorySegmentFactory.wrap(new byte[]{1, 2}), FreeingBufferRecycler.INSTANCE), channel); @@ -368,7 +460,6 @@ public class BarrierTrackerTest { @Override public void setInitialState(TaskStateHandles taskStateHandles) throws Exception { - throw new UnsupportedOperationException("should never be called"); } @@ -379,10 +470,27 @@ public class BarrierTrackerTest { @Override public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception { + assertTrue("More checkpoints than expected", i < checkpointIDs.length); + + final long expectedId = checkpointIDs[i++]; + if (expectedId >= 0) { + assertEquals("wrong checkpoint id", expectedId, checkpointMetaData.getCheckpointId()); + assertTrue(checkpointMetaData.getTimestamp() > 0); + } else { + fail("got 'triggerCheckpointOnBarrier()' when expecting an 'abortCheckpointOnBarrier()'"); + } + } + @Override + public void abortCheckpointOnBarrier(long checkpointId) { assertTrue("More checkpoints than expected", i < checkpointIDs.length); - assertEquals("wrong checkpoint id", checkpointIDs[i++], checkpointMetaData.getCheckpointId()); - assertTrue(checkpointMetaData.getTimestamp() > 0); + + final long expectedId = checkpointIDs[i++]; + if (expectedId < 0) { + assertEquals("wrong checkpoint id for checkoint abort", -expectedId, checkpointId); + } else { + fail("got 'abortCheckpointOnBarrier()' when expecting an 'triggerCheckpointOnBarrier()'"); + } } @Override
