Repository: flink Updated Branches: refs/heads/master b54f44888 -> 44fb035e0
[FLINK-6534] [checkpoint] Use async IO to dispose state in SharedStateRegistry Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44fb035e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44fb035e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44fb035e Branch: refs/heads/master Commit: 44fb035e021259986f0b1aac4126143510a758e5 Parents: 098e46f Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Fri May 12 16:01:05 2017 +0200 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Sun May 14 13:49:50 2017 +0200 ---------------------------------------------------------------------- .../runtime/checkpoint/AbstractCompletedCheckpointStore.java | 6 ++++++ .../runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java | 2 ++ .../org/apache/flink/runtime/state/SharedStateRegistry.java | 6 +++++- 3 files changed, 13 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/44fb035e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java index f42fd06..bf70501 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java @@ -20,6 +20,8 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.state.SharedStateRegistry; +import java.util.concurrent.Executor; + /** * This is the base class that provides implementation of some aspects common for all * {@link CompletedCheckpointStore}s. @@ -34,4 +36,8 @@ public abstract class AbstractCompletedCheckpointStore implements CompletedCheck public AbstractCompletedCheckpointStore() { this.sharedStateRegistry = new SharedStateRegistry(); } + + public AbstractCompletedCheckpointStore(Executor asyncIOExecutor) { + this.sharedStateRegistry = new SharedStateRegistry(asyncIOExecutor); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/44fb035e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index 52a4eea..c8c68bc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -106,6 +106,8 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage, Executor executor) throws Exception { + super(executor); + checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint."); checkNotNull(stateStorage, "State storage"); http://git-wip-us.apache.org/repos/asf/flink/blob/44fb035e/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java index 9cfdec7..f9161b0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java @@ -45,8 +45,12 @@ public class SharedStateRegistry { private final Executor asyncDisposalExecutor; public SharedStateRegistry() { + this(Executors.directExecutor()); + } + + public SharedStateRegistry(Executor asyncDisposalExecutor) { this.registeredStates = new HashMap<>(); - this.asyncDisposalExecutor = Executors.directExecutor(); //TODO: FLINK-6534 + this.asyncDisposalExecutor = Preconditions.checkNotNull(asyncDisposalExecutor); } /**