Repository: flink Updated Branches: refs/heads/master b2e8792b8 -> 381bf5912
[FLINK-4939] WriteAheadSink: Decouple creation and commit of a pending checkpoint So far the GenericWriteAheadSink expected that the subtask that wrote a temporary buffer to the state backend, will be also the one to commit it to the third-party storage system. This commit removes this assumption. To do this it changes the CheckpointCommitter to dynamically take the subtaskIdx as a parameter when asking if a checkpoint was committed and also changes the state kept by the GenericWriteAheadSink to also include that subtask index of the subtask that wrote the pending buffer. This closes #2707. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/381bf591 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/381bf591 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/381bf591 Branch: refs/heads/master Commit: 381bf5912237420b8e294c1fd38006a85403fd4f Parents: b2e8792 Author: kl0u <[email protected]> Authored: Wed Oct 26 17:19:12 2016 +0200 Committer: zentol <[email protected]> Committed: Mon Nov 14 13:41:24 2016 +0100 ---------------------------------------------------------------------- .../cassandra/CassandraCommitter.java | 54 +++-- .../cassandra/CassandraConnectorITCase.java | 26 ++- .../runtime/operators/CheckpointCommitter.java | 22 +-- .../operators/GenericWriteAheadSink.java | 196 ++++++++++++------- .../operators/GenericWriteAheadSinkTest.java | 14 +- .../operators/WriteAheadSinkTestBase.java | 2 +- 6 files changed, 191 insertions(+), 123 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/381bf591/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java index e83b1be..63b76da 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java @@ -18,11 +18,15 @@ package org.apache.flink.streaming.connectors.cassandra; import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + /** * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra * database. @@ -40,10 +44,11 @@ public class CassandraCommitter extends CheckpointCommitter { private String keySpace = "flink_auxiliary"; private String table = "checkpoints_"; - private transient PreparedStatement updateStatement; - private transient PreparedStatement selectStatement; - - private long lastCommittedCheckpointID = -1; + /** + * A cache of the last committed checkpoint ids per subtask index. This is used to + * avoid redundant round-trips to Cassandra (see {@link #isCheckpointCommitted(int, long)}. + */ + private final Map<Integer, Long> lastCommittedCheckpoints = new HashMap<>(); public CassandraCommitter(ClusterBuilder builder) { this.builder = builder; @@ -95,16 +100,11 @@ public class CassandraCommitter extends CheckpointCommitter { } cluster = builder.getCluster(); session = cluster.connect(); - - updateStatement = session.prepare(String.format("UPDATE %s.%s set checkpoint_id=? where sink_id='%s' and sub_id=%d;", keySpace, table, operatorId, subtaskId)); - selectStatement = session.prepare(String.format("SELECT checkpoint_id FROM %s.%s where sink_id='%s' and sub_id=%d;", keySpace, table, operatorId, subtaskId)); - - session.execute(String.format("INSERT INTO %s.%s (sink_id, sub_id, checkpoint_id) values ('%s', %d, " + -1 + ") IF NOT EXISTS;", keySpace, table, operatorId, subtaskId)); } @Override public void close() throws Exception { - this.lastCommittedCheckpointID = -1; + this.lastCommittedCheckpoints.clear(); try { session.close(); } catch (Exception e) { @@ -118,16 +118,34 @@ public class CassandraCommitter extends CheckpointCommitter { } @Override - public void commitCheckpoint(long checkpointID) { - session.execute(updateStatement.bind(checkpointID)); - this.lastCommittedCheckpointID = checkpointID; + public void commitCheckpoint(int subtaskIdx, long checkpointId) { + String statement = String.format( + "UPDATE %s.%s set checkpoint_id=%d where sink_id='%s' and sub_id=%d;", + keySpace, table, checkpointId, operatorId, subtaskIdx); + + session.execute(statement); + lastCommittedCheckpoints.put(subtaskIdx, checkpointId); } @Override - public boolean isCheckpointCommitted(long checkpointID) { - if (this.lastCommittedCheckpointID == -1) { - this.lastCommittedCheckpointID = session.execute(selectStatement.bind()).one().getLong("checkpoint_id"); + public boolean isCheckpointCommitted(int subtaskIdx, long checkpointId) { + // Pending checkpointed buffers are committed in ascending order of their + // checkpoint id. This way we can tell if a checkpointed buffer was committed + // just by asking the third-party storage system for the last checkpoint id + // committed by the specified subtask. + + Long lastCommittedCheckpoint = lastCommittedCheckpoints.get(subtaskIdx); + if (lastCommittedCheckpoint == null) { + String statement = String.format( + "SELECT checkpoint_id FROM %s.%s where sink_id='%s' and sub_id=%d;", + keySpace, table, operatorId, subtaskIdx); + + Iterator<Row> resultIt = session.execute(statement).iterator(); + if (resultIt.hasNext()) { + lastCommittedCheckpoint = resultIt.next().getLong("checkpoint_id"); + lastCommittedCheckpoints.put(subtaskIdx, lastCommittedCheckpoint); + } } - return checkpointID <= this.lastCommittedCheckpointID; + return lastCommittedCheckpoint != null && checkpointId <= lastCommittedCheckpoint; } } http://git-wip-us.apache.org/repos/asf/flink/blob/381bf591/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index a29e881..2bb6fd1 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -320,17 +320,14 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri CassandraCommitter cc1 = new CassandraCommitter(builder); cc1.setJobId("job"); cc1.setOperatorId("operator"); - cc1.setOperatorSubtaskId(0); CassandraCommitter cc2 = new CassandraCommitter(builder); cc2.setJobId("job"); cc2.setOperatorId("operator"); - cc2.setOperatorSubtaskId(1); CassandraCommitter cc3 = new CassandraCommitter(builder); cc3.setJobId("job"); cc3.setOperatorId("operator1"); - cc3.setOperatorSubtaskId(0); cc1.createResource(); @@ -338,18 +335,18 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri cc2.open(); cc3.open(); - Assert.assertFalse(cc1.isCheckpointCommitted(1)); - Assert.assertFalse(cc2.isCheckpointCommitted(1)); - Assert.assertFalse(cc3.isCheckpointCommitted(1)); + Assert.assertFalse(cc1.isCheckpointCommitted(0, 1)); + Assert.assertFalse(cc2.isCheckpointCommitted(1, 1)); + Assert.assertFalse(cc3.isCheckpointCommitted(0, 1)); - cc1.commitCheckpoint(1); - Assert.assertTrue(cc1.isCheckpointCommitted(1)); + cc1.commitCheckpoint(0, 1); + Assert.assertTrue(cc1.isCheckpointCommitted(0, 1)); //verify that other sub-tasks aren't affected - Assert.assertFalse(cc2.isCheckpointCommitted(1)); + Assert.assertFalse(cc2.isCheckpointCommitted(1, 1)); //verify that other tasks aren't affected - Assert.assertFalse(cc3.isCheckpointCommitted(1)); + Assert.assertFalse(cc3.isCheckpointCommitted(0, 1)); - Assert.assertFalse(cc1.isCheckpointCommitted(2)); + Assert.assertFalse(cc1.isCheckpointCommitted(0, 2)); cc1.close(); cc2.close(); @@ -358,13 +355,12 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri cc1 = new CassandraCommitter(builder); cc1.setJobId("job"); cc1.setOperatorId("operator"); - cc1.setOperatorSubtaskId(0); cc1.open(); //verify that checkpoint data is not destroyed within open/close and not reliant on internally cached data - Assert.assertTrue(cc1.isCheckpointCommitted(1)); - Assert.assertFalse(cc1.isCheckpointCommitted(2)); + Assert.assertTrue(cc1.isCheckpointCommitted(0, 1)); + Assert.assertFalse(cc1.isCheckpointCommitted(0, 2)); cc1.close(); } http://git-wip-us.apache.org/repos/asf/flink/blob/381bf591/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/CheckpointCommitter.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/CheckpointCommitter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/CheckpointCommitter.java index 9ecc2ee..90e3a57 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/CheckpointCommitter.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/CheckpointCommitter.java @@ -41,9 +41,9 @@ import java.io.Serializable; */ public abstract class CheckpointCommitter implements Serializable { protected static final Logger LOG = LoggerFactory.getLogger(CheckpointCommitter.class); + protected String jobId; protected String operatorId; - protected int subtaskId; /** * Internally used to set the job ID after instantiation. @@ -66,16 +66,6 @@ public abstract class CheckpointCommitter implements Serializable { } /** - * Internally used to set the operator subtask ID after instantiation. - * - * @param id - * @throws Exception - */ - public void setOperatorSubtaskId(int id) throws Exception { - this.subtaskId = id; - } - - /** * Opens/connects to the resource, and possibly creates it beforehand. * * @throws Exception @@ -98,17 +88,19 @@ public abstract class CheckpointCommitter implements Serializable { /** * Mark the given checkpoint as completed in the resource. * - * @param checkpointID + * @param subtaskIdx the index of the subtask responsible for committing the checkpoint. + * @param checkpointID the id of the checkpoint to be committed. * @throws Exception */ - public abstract void commitCheckpoint(long checkpointID) throws Exception; + public abstract void commitCheckpoint(int subtaskIdx, long checkpointID) throws Exception; /** * Checked the resource whether the given checkpoint was committed completely. * - * @param checkpointID + * @param subtaskIdx the index of the subtask responsible for committing the checkpoint. + * @param checkpointID the id of the checkpoint we are interested in. * @return true if the checkpoint was committed completely, false otherwise * @throws Exception */ - public abstract boolean isCheckpointCommitted(long checkpointID) throws Exception; + public abstract boolean isCheckpointCommitted(int subtaskIdx, long checkpointID) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/381bf591/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java index 499fe83..b08b2e9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.runtime.operators; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.memory.DataInputViewStreamWrapper; @@ -27,23 +26,24 @@ import org.apache.flink.runtime.io.disk.InputViewIterator; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper; -import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; -import java.util.HashSet; +import java.util.Iterator; import java.util.Set; -import java.util.TreeMap; +import java.util.TreeSet; import java.util.UUID; /** - * Generic Sink that emits its input elements into an arbitrary backend. This sink is integrated with the checkpointing + * Generic Sink that emits its input elements into an arbitrary backend. This sink is integrated with Flink's checkpointing * mechanism and can provide exactly-once guarantees; depending on the storage backend and sink/committer implementation. * <p/> * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a @@ -57,18 +57,21 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I private static final long serialVersionUID = 1L; protected static final Logger LOG = LoggerFactory.getLogger(GenericWriteAheadSink.class); + + private final String id; private final CheckpointCommitter committer; - private transient CheckpointStreamFactory.CheckpointStateOutputStream out; protected final TypeSerializer<IN> serializer; - private final String id; + + private transient CheckpointStreamFactory.CheckpointStateOutputStream out; private transient CheckpointStreamFactory checkpointStreamFactory; - private ExactlyOnceState state = new ExactlyOnceState(); + private final Set<PendingCheckpoint> pendingCheckpoints = new TreeSet<>(); - public GenericWriteAheadSink(CheckpointCommitter committer, TypeSerializer<IN> serializer, String jobID) throws Exception { - this.committer = committer; - this.serializer = serializer; + public GenericWriteAheadSink(CheckpointCommitter committer, TypeSerializer<IN> serializer, String jobID) throws Exception { + this.committer = Preconditions.checkNotNull(committer); + this.serializer = Preconditions.checkNotNull(serializer); this.id = UUID.randomUUID().toString(); + this.committer.setJobId(jobID); this.committer.createResource(); } @@ -77,11 +80,11 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I public void open() throws Exception { super.open(); committer.setOperatorId(id); - committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask()); committer.open(); - cleanState(); - checkpointStreamFactory = - getContainingTask().createCheckpointStreamFactory(this); + + checkpointStreamFactory = getContainingTask().createCheckpointStreamFactory(this); + + cleanRestoredHandles(); } public void close() throws Exception { @@ -89,52 +92,68 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I } /** - * Saves a handle in the state. + * Called when a checkpoint barrier arrives. It closes any open streams to the backend + * and marks them as pending for committing to the external, third-party storage system. * - * @param checkpointId - * @throws IOException + * @param checkpointId the id of the latest received checkpoint. + * @throws IOException in case something went wrong when handling the stream to the backend. */ private void saveHandleInState(final long checkpointId, final long timestamp) throws Exception { //only add handle if a new OperatorState was created since the last snapshot if (out != null) { + int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); StreamStateHandle handle = out.closeAndGetHandle(); - if (state.pendingHandles.containsKey(checkpointId)) { + + PendingCheckpoint pendingCheckpoint = new PendingCheckpoint(checkpointId, subtaskIdx, timestamp, handle); + + if (pendingCheckpoints.contains(pendingCheckpoint)) { //we already have a checkpoint stored for that ID that may have been partially written, //so we discard this "alternate version" and use the stored checkpoint handle.discardState(); } else { - state.pendingHandles.put(checkpointId, new Tuple2<>(timestamp, handle)); + pendingCheckpoints.add(pendingCheckpoint); } out = null; } } @Override - public void snapshotState(FSDataOutputStream out, - long checkpointId, - long timestamp) throws Exception { + public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception { saveHandleInState(checkpointId, timestamp); - InstantiationUtil.serializeObject(out, state); + DataOutputViewStreamWrapper outStream = new DataOutputViewStreamWrapper(out); + outStream.writeInt(pendingCheckpoints.size()); + for (PendingCheckpoint pendingCheckpoint : pendingCheckpoints) { + pendingCheckpoint.serialize(outStream); + } } @Override public void restoreState(FSDataInputStream in) throws Exception { - this.state = InstantiationUtil.deserializeObject(in, getUserCodeClassloader()); + final DataInputViewStreamWrapper inStream = new DataInputViewStreamWrapper(in); + int numPendingHandles = inStream.readInt(); + for (int i = 0; i < numPendingHandles; i++) { + pendingCheckpoints.add(PendingCheckpoint.restore(inStream, getUserCodeClassloader())); + } } - private void cleanState() throws Exception { - synchronized (this.state.pendingHandles) { //remove all handles that were already committed - Set<Long> pastCheckpointIds = this.state.pendingHandles.keySet(); - Set<Long> checkpointsToRemove = new HashSet<>(); - for (Long pastCheckpointId : pastCheckpointIds) { - if (committer.isCheckpointCommitted(pastCheckpointId)) { - checkpointsToRemove.add(pastCheckpointId); + /** + * Called at {@link #open()} to clean-up the pending handle list. + * It iterates over all restored pending handles, checks which ones are already + * committed to the outside storage system and removes them from the list. + */ + private void cleanRestoredHandles() throws Exception { + synchronized (pendingCheckpoints) { + + Iterator<PendingCheckpoint> pendingCheckpointIt = pendingCheckpoints.iterator(); + while (pendingCheckpointIt.hasNext()) { + PendingCheckpoint pendingCheckpoint = pendingCheckpointIt.next(); + + if (committer.isCheckpointCommitted(pendingCheckpoint.subtaskId, pendingCheckpoint.checkpointId)) { + pendingCheckpoint.stateHandle.discardState(); + pendingCheckpointIt.remove(); } } - for (Long toRemove : checkpointsToRemove) { - this.state.pendingHandles.remove(toRemove); - } } } @@ -142,15 +161,19 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception { super.notifyOfCompletedCheckpoint(checkpointId); - synchronized (state.pendingHandles) { - Set<Long> pastCheckpointIds = state.pendingHandles.keySet(); - Set<Long> checkpointsToRemove = new HashSet<>(); - for (Long pastCheckpointId : pastCheckpointIds) { + synchronized (pendingCheckpoints) { + Iterator<PendingCheckpoint> pendingCheckpointIt = pendingCheckpoints.iterator(); + while (pendingCheckpointIt.hasNext()) { + PendingCheckpoint pendingCheckpoint = pendingCheckpointIt.next(); + long pastCheckpointId = pendingCheckpoint.checkpointId; + int subtaskId = pendingCheckpoint.subtaskId; + long timestamp = pendingCheckpoint.timestamp; + StreamStateHandle streamHandle = pendingCheckpoint.stateHandle; + if (pastCheckpointId <= checkpointId) { try { - if (!committer.isCheckpointCommitted(pastCheckpointId)) { - Tuple2<Long, StreamStateHandle> handle = state.pendingHandles.get(pastCheckpointId); - try (FSDataInputStream in = handle.f1.openInputStream()) { + if (!committer.isCheckpointCommitted(subtaskId, pastCheckpointId)) { + try (FSDataInputStream in = streamHandle.openInputStream()) { boolean success = sendValues( new ReusingMutableToRegularIteratorWrapper<>( new InputViewIterator<>( @@ -158,30 +181,31 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I in), serializer), serializer), - handle.f0); - if (success) { //if the sending has failed we will retry on the next notify - committer.commitCheckpoint(pastCheckpointId); - checkpointsToRemove.add(pastCheckpointId); + timestamp); + if (success) { + // in case the checkpoint was successfully committed, + // discard its state from the backend and mark it for removal + // in case it failed, we retry on the next checkpoint + committer.commitCheckpoint(subtaskId, pastCheckpointId); + streamHandle.discardState(); + pendingCheckpointIt.remove(); } } } else { - checkpointsToRemove.add(pastCheckpointId); + streamHandle.discardState(); + pendingCheckpointIt.remove(); } } catch (Exception e) { + // we have to break here to prevent a new (later) checkpoint + // from being committed before this one LOG.error("Could not commit checkpoint.", e); - break; // we have to break here to prevent a new checkpoint from being committed before this one + break; } } } - for (Long toRemove : checkpointsToRemove) { - Tuple2<Long, StreamStateHandle> handle = state.pendingHandles.get(toRemove); - state.pendingHandles.remove(toRemove); - handle.f1.discardState(); - } } } - /** * Write the given element into the backend. * @@ -201,27 +225,65 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I serializer.serialize(value, new DataOutputViewStreamWrapper(out)); } - /** - * This state is used to keep a list of all StateHandles (essentially references to past OperatorStates) that were - * used since the last completed checkpoint. - **/ - public static class ExactlyOnceState implements Serializable { + private static final class PendingCheckpoint implements Comparable<PendingCheckpoint>, Serializable { + + private static final long serialVersionUID = -3571036395734603443L; - private static final long serialVersionUID = -3571063495273460743L; + private final long checkpointId; + private final int subtaskId; + private final long timestamp; + private final StreamStateHandle stateHandle; - protected TreeMap<Long, Tuple2<Long, StreamStateHandle>> pendingHandles; + PendingCheckpoint(long checkpointId, int subtaskId, long timestamp, StreamStateHandle handle) { + this.checkpointId = checkpointId; + this.subtaskId = subtaskId; + this.timestamp = timestamp; + this.stateHandle = handle; + } - public ExactlyOnceState() { - pendingHandles = new TreeMap<>(); + void serialize(DataOutputViewStreamWrapper outputStream) throws IOException { + outputStream.writeLong(checkpointId); + outputStream.writeInt(subtaskId); + outputStream.writeLong(timestamp); + InstantiationUtil.serializeObject(outputStream, stateHandle); } - public TreeMap<Long, Tuple2<Long, StreamStateHandle>> getState(ClassLoader userCodeClassLoader) throws Exception { - return pendingHandles; + static PendingCheckpoint restore( + DataInputViewStreamWrapper inputStream, + ClassLoader classLoader) throws IOException, ClassNotFoundException { + + long checkpointId = inputStream.readLong(); + int subtaskId = inputStream.readInt(); + long timestamp = inputStream.readLong(); + StreamStateHandle handle = InstantiationUtil.deserializeObject(inputStream, classLoader); + + return new PendingCheckpoint(checkpointId, subtaskId, timestamp, handle); + } + + @Override + public int compareTo(PendingCheckpoint o) { + int res = Long.compare(this.checkpointId, o.checkpointId); + return res != 0 ? res : Integer.compare(this.subtaskId, o.subtaskId); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof GenericWriteAheadSink.PendingCheckpoint)) { + return false; + } + PendingCheckpoint other = (PendingCheckpoint) o; + return this.checkpointId == other.checkpointId && + this.subtaskId == other.subtaskId && + this.timestamp == other.timestamp; } @Override - public String toString() { - return this.pendingHandles.toString(); + public int hashCode() { + int hash = 17; + hash = 31 * hash + (int) (checkpointId ^ (checkpointId >>> 32)); + hash = 31 * hash + subtaskId; + hash = 31 * hash + (int) (timestamp ^ (timestamp >>> 32)); + return hash; } } } http://git-wip-us.apache.org/repos/asf/flink/blob/381bf591/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java index e186be0..8d092ed 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java @@ -128,7 +128,7 @@ public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Int testHarness.notifyOfCompletedCheckpoint(0); //isCommitted should have failed, thus sendValues() should never have been called - Assert.assertTrue(sink.values.size() == 0); + Assert.assertEquals(0, sink.values.size()); for (int x = 0; x < 10; x++) { testHarness.processElement(new StreamRecord<>(generateValue(elementCounter, 1))); @@ -139,7 +139,7 @@ public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Int testHarness.notifyOfCompletedCheckpoint(1); //previous CP should be retried, but will fail the CP commit. Second CP should be skipped. - Assert.assertTrue(sink.values.size() == 10); + Assert.assertEquals(10, sink.values.size()); for (int x = 0; x < 10; x++) { testHarness.processElement(new StreamRecord<>(generateValue(elementCounter, 2))); @@ -150,7 +150,7 @@ public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Int testHarness.notifyOfCompletedCheckpoint(2); //all CP's should be retried and succeed; since one CP was written twice we have 2 * 10 + 10 + 10 = 40 values - Assert.assertTrue(sink.values.size() == 40); + Assert.assertEquals(40, sink.values.size()); } /** @@ -193,12 +193,12 @@ public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Int } @Override - public void commitCheckpoint(long checkpointID) { + public void commitCheckpoint(int subtaskIdx, long checkpointID) { checkpoints.add(checkpointID); } @Override - public boolean isCheckpointCommitted(long checkpointID) { + public boolean isCheckpointCommitted(int subtaskIdx, long checkpointID) { return checkpoints.contains(checkpointID); } } @@ -245,7 +245,7 @@ public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Int } @Override - public void commitCheckpoint(long checkpointID) { + public void commitCheckpoint(int subtaskIdx, long checkpointID) { if (failCommit) { failCommit = false; throw new RuntimeException("Expected exception"); @@ -255,7 +255,7 @@ public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Int } @Override - public boolean isCheckpointCommitted(long checkpointID) { + public boolean isCheckpointCommitted(int subtaskIdx, long checkpointID) { if (failIsCommitted) { failIsCommitted = false; throw new RuntimeException("Expected exception"); http://git-wip-us.apache.org/repos/asf/flink/blob/381bf591/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java index ab84bc1..a9c5792 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java @@ -149,7 +149,7 @@ public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink sink = createSink(); - testHarness =new OneInputStreamOperatorTestHarness<>(sink); + testHarness = new OneInputStreamOperatorTestHarness<>(sink); testHarness.setup(); testHarness.restore(latestSnapshot);
