This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 12626a5b381dfebb40515d03992bdf243f7e603d Author: Igal Shilman <[email protected]> AuthorDate: Tue May 5 22:32:10 2020 +0200 [FLINK-17533] Add support for multiple concurrent checkpoints --- .../statefun/flink/core/feedback/Checkpoints.java | 61 +++++++++ .../flink/core/feedback/FeedbackUnionOperator.java | 34 ++--- .../statefun/flink/core/logger/FeedbackLogger.java | 33 +++++ .../flink/core/logger/UnboundedFeedbackLogger.java | 5 +- .../flink/core/feedback/CheckpointsTest.java | 143 +++++++++++++++++++++ 5 files changed, 260 insertions(+), 16 deletions(-) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/Checkpoints.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/Checkpoints.java new file mode 100644 index 0000000..8fd0322 --- /dev/null +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/Checkpoints.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.flink.core.feedback; + +import java.io.OutputStream; +import java.util.Objects; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.function.Supplier; +import org.apache.flink.statefun.flink.core.logger.FeedbackLogger; +import org.apache.flink.util.IOUtils; + +final class Checkpoints<T> implements AutoCloseable { + private final Supplier<? extends FeedbackLogger<T>> feedbackLoggerFactory; + private final TreeMap<Long, FeedbackLogger<T>> uncompletedCheckpoints = new TreeMap<>(); + + Checkpoints(Supplier<? extends FeedbackLogger<T>> feedbackLoggerFactory) { + this.feedbackLoggerFactory = Objects.requireNonNull(feedbackLoggerFactory); + } + + public void startLogging(long checkpointId, OutputStream outputStream) { + FeedbackLogger<T> logger = feedbackLoggerFactory.get(); + logger.startLogging(outputStream); + uncompletedCheckpoints.put(checkpointId, logger); + } + + public void append(T element) { + for (FeedbackLogger<T> logger : uncompletedCheckpoints.values()) { + logger.append(element); + } + } + + public void commitCheckpointsUntil(long checkpointId) { + SortedMap<Long, FeedbackLogger<T>> completedCheckpoints = + uncompletedCheckpoints.headMap(checkpointId, true); + completedCheckpoints.values().forEach(FeedbackLogger::commit); + completedCheckpoints.clear(); + } + + @Override + public void close() { + IOUtils.closeAllQuietly(uncompletedCheckpoints.values()); + uncompletedCheckpoints.clear(); + } +} diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java index 402ded8..78fe10d 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java @@ -18,6 +18,7 @@ package org.apache.flink.statefun.flink.core.feedback; import java.util.Objects; +import java.util.OptionalLong; import java.util.concurrent.Executor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -26,9 +27,9 @@ import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.statefun.flink.core.common.MailboxExecutorFacade; import org.apache.flink.statefun.flink.core.common.SerializableFunction; -import org.apache.flink.statefun.flink.core.common.SerializablePredicate; import org.apache.flink.statefun.flink.core.logger.Loggers; import org.apache.flink.statefun.flink.core.logger.UnboundedFeedbackLogger; +import org.apache.flink.statefun.flink.core.logger.UnboundedFeedbackLoggerFactory; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.MailboxExecutor; @@ -43,20 +44,20 @@ public final class FeedbackUnionOperator<T> extends AbstractStreamOperator<T> // -- configuration private final FeedbackKey<T> feedbackKey; - private final SerializablePredicate<T> isBarrierMessage; + private final SerializableFunction<T, OptionalLong> isBarrierMessage; private final SerializableFunction<T, ?> keySelector; private final long totalMemoryUsedForFeedbackCheckpointing; private final TypeSerializer<T> elementSerializer; // -- runtime - private transient UnboundedFeedbackLogger<T> feedbackLogger; + private transient Checkpoints<T> checkpoints; private transient boolean closedOrDisposed; private transient MailboxExecutor mailboxExecutor; private transient StreamRecord<T> reusable; FeedbackUnionOperator( FeedbackKey<T> feedbackKey, - SerializablePredicate<T> isBarrierMessage, + SerializableFunction<T, OptionalLong> isBarrierMessage, SerializableFunction<T, ?> keySelector, long totalMemoryUsedForFeedbackCheckpointing, TypeSerializer<T> elementSerializer, @@ -84,11 +85,12 @@ public final class FeedbackUnionOperator<T> extends AbstractStreamOperator<T> if (closedOrDisposed) { return; } - if (isBarrierMessage.test(element)) { - feedbackLogger.commit(); + OptionalLong maybeCheckpoint = isBarrierMessage.apply(element); + if (maybeCheckpoint.isPresent()) { + checkpoints.commitCheckpointsUntil(maybeCheckpoint.getAsLong()); } else { sendDownstream(element); - feedbackLogger.append(element); + checkpoints.append(element); } } @@ -105,22 +107,24 @@ public final class FeedbackUnionOperator<T> extends AbstractStreamOperator<T> // Initialize the unbounded feedback logger // @SuppressWarnings("unchecked") - UnboundedFeedbackLogger<T> feedbackLogger = - (UnboundedFeedbackLogger<T>) - Loggers.unboundedSpillableLogger( + UnboundedFeedbackLoggerFactory<T> feedbackLoggerFactory = + (UnboundedFeedbackLoggerFactory<T>) + Loggers.unboundedSpillableLoggerFactory( ioManager, maxParallelism, totalMemoryUsedForFeedbackCheckpointing, elementSerializer, keySelector); - this.feedbackLogger = feedbackLogger; + this.checkpoints = new Checkpoints<>(feedbackLoggerFactory::create); + // // we first must reply previously check-pointed envelopes before we start // processing any new envelopes. // + UnboundedFeedbackLogger<T> logger = feedbackLoggerFactory.create(); for (KeyGroupStatePartitionStreamProvider keyedStateInput : context.getRawKeyedStateInputs()) { - this.feedbackLogger.replyLoggedEnvelops(keyedStateInput.getStream(), this); + logger.replyLoggedEnvelops(keyedStateInput.getStream(), this); } // // now we can start processing new messages. We do so by registering ourselves as a @@ -132,7 +136,7 @@ public final class FeedbackUnionOperator<T> extends AbstractStreamOperator<T> @Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); - this.feedbackLogger.startLogging(context.getRawKeyedOperatorStateOutput()); + checkpoints.startLogging(context.getCheckpointId(), context.getRawKeyedOperatorStateOutput()); } @Override @@ -152,8 +156,8 @@ public final class FeedbackUnionOperator<T> extends AbstractStreamOperator<T> // ------------------------------------------------------------------------------------------------------------------ private void closeInternally() { - IOUtils.closeQuietly(feedbackLogger); - feedbackLogger = null; + IOUtils.closeQuietly(checkpoints); + checkpoints = null; closedOrDisposed = true; } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/FeedbackLogger.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/FeedbackLogger.java new file mode 100644 index 0000000..465a717 --- /dev/null +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/FeedbackLogger.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.flink.core.logger; + +import java.io.OutputStream; + +public interface FeedbackLogger<T> extends AutoCloseable { + + /** Start logging messages into the supplied output stream. */ + void startLogging(OutputStream keyedStateCheckpointOutputStream); + + /** Append a message to the currently logging logger. */ + void append(T message); + + /** Commit the currently logging logger. */ + void commit(); +} diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java index ef0360a..409f714 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java @@ -38,7 +38,7 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.statefun.flink.core.feedback.FeedbackConsumer; import org.apache.flink.util.IOUtils; -public final class UnboundedFeedbackLogger<T> implements Closeable { +public final class UnboundedFeedbackLogger<T> implements FeedbackLogger<T> { private final Supplier<KeyGroupStream<T>> supplier; private final ToIntFunction<T> keyGroupAssigner; private final Map<Integer, KeyGroupStream<T>> keyGroupStreams; @@ -60,6 +60,7 @@ public final class UnboundedFeedbackLogger<T> implements Closeable { this.checkpointedStreamOperations = Objects.requireNonNull(ops); } + @Override public void startLogging(OutputStream keyedStateCheckpointOutputStream) { this.checkpointedStreamOperations.requireKeyedStateCheckpointed( keyedStateCheckpointOutputStream); @@ -68,6 +69,7 @@ public final class UnboundedFeedbackLogger<T> implements Closeable { checkpointedStreamOperations.acquireLease(keyedStateCheckpointOutputStream); } + @Override public void append(T message) { if (keyedStateOutputStream == null) { // @@ -79,6 +81,7 @@ public final class UnboundedFeedbackLogger<T> implements Closeable { keyGroup.append(message); } + @Override public void commit() { try { flushToKeyedStateOutputStream(); diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/feedback/CheckpointsTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/feedback/CheckpointsTest.java new file mode 100644 index 0000000..7b8dbee --- /dev/null +++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/feedback/CheckpointsTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.flink.core.feedback; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import java.io.ByteArrayOutputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Supplier; +import org.apache.flink.statefun.flink.core.logger.FeedbackLogger; +import org.apache.flink.util.Preconditions; +import org.junit.Test; + +public class CheckpointsTest { + + @Test + public void usageExample() { + Loggers loggers = new Loggers(); + + Checkpoints<String> checkpoints = new Checkpoints<>(loggers); + checkpoints.startLogging(1, new ByteArrayOutputStream()); + checkpoints.append("hello"); + checkpoints.append("world"); + checkpoints.commitCheckpointsUntil(1); + + assertThat(loggers.items(0), contains("hello", "world")); + assertThat(loggers.state(0), is(LoggerState.COMMITTED)); + } + + @Test + public void dataIsAppendedToMultipleLoggers() { + Loggers loggers = new Loggers(); + + Checkpoints<String> checkpoints = new Checkpoints<>(loggers); + + checkpoints.startLogging(1, new ByteArrayOutputStream()); + checkpoints.append("a"); + + checkpoints.startLogging(2, new ByteArrayOutputStream()); + checkpoints.append("b"); + + checkpoints.commitCheckpointsUntil(1); + checkpoints.append("c"); + + checkpoints.commitCheckpointsUntil(2); + + assertThat(loggers.items(0), contains("a", "b")); + assertThat(loggers.items(1), contains("b", "c")); + } + + @Test + public void committingALaterCheckpointCommitsPreviousCheckpoints() { + Loggers loggers = new Loggers(); + + Checkpoints<String> checkpoints = new Checkpoints<>(loggers); + + checkpoints.startLogging(1, new ByteArrayOutputStream()); + checkpoints.startLogging(2, new ByteArrayOutputStream()); + checkpoints.commitCheckpointsUntil(2); + + assertThat(loggers.state(0), is(LoggerState.COMMITTED)); + assertThat(loggers.state(1), is(LoggerState.COMMITTED)); + } + + private enum LoggerState { + IDLE, + LOGGING, + COMMITTED, + CLOSED + } + + private static final class Loggers implements Supplier<FeedbackLogger<String>> { + private final List<FakeLogger> loggers = new ArrayList<>(); + + @Override + public FeedbackLogger<String> get() { + FakeLogger logger = new FakeLogger(); + loggers.add(logger); + return logger; + } + + List<String> items(int loggerIndex) { + Preconditions.checkElementIndex(loggerIndex, loggers.size()); + FakeLogger logger = loggers.get(loggerIndex); + return logger.items; + } + + LoggerState state(int loggerIndex) { + Preconditions.checkElementIndex(loggerIndex, loggers.size()); + FakeLogger logger = loggers.get(loggerIndex); + return logger.state; + } + } + + private static final class FakeLogger implements FeedbackLogger<String> { + + List<String> items = new ArrayList<>(); + LoggerState state = LoggerState.IDLE; + + @Override + public void startLogging(OutputStream keyedStateCheckpointOutputStream) { + Preconditions.checkState(state == LoggerState.IDLE); + state = LoggerState.LOGGING; + } + + @Override + public void append(String message) { + Preconditions.checkState(state != LoggerState.COMMITTED); + Preconditions.checkState(state != LoggerState.CLOSED); + items.add(message); + } + + @Override + public void commit() { + Preconditions.checkState(state == LoggerState.LOGGING); + state = LoggerState.COMMITTED; + } + + @Override + public void close() { + state = LoggerState.CLOSED; + } + } +}
