[FLINK-6534] [checkpoint] Use async IO to dispose state in SharedStateRegistry


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/471263cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/471263cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/471263cf

Branch: refs/heads/release-1.3
Commit: 471263cfe493dc1bbbd5a5733dbf918cc0872b9b
Parents: 7c6f348
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Fri May 12 16:01:05 2017 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Sun May 14 14:07:26 2017 +0200

----------------------------------------------------------------------
 .../runtime/checkpoint/AbstractCompletedCheckpointStore.java   | 6 ++++++
 .../runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java  | 2 ++
 .../org/apache/flink/runtime/state/SharedStateRegistry.java    | 6 +++++-
 3 files changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/471263cf/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
index f42fd06..bf70501 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.state.SharedStateRegistry;
 
+import java.util.concurrent.Executor;
+
 /**
  * This is the base class that provides implementation of some aspects common 
for all
  * {@link CompletedCheckpointStore}s.
@@ -34,4 +36,8 @@ public abstract class AbstractCompletedCheckpointStore 
implements CompletedCheck
        public AbstractCompletedCheckpointStore() {
                this.sharedStateRegistry = new SharedStateRegistry();
        }
+
+       public AbstractCompletedCheckpointStore(Executor asyncIOExecutor) {
+               this.sharedStateRegistry = new 
SharedStateRegistry(asyncIOExecutor);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/471263cf/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 52a4eea..c8c68bc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -106,6 +106,8 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
                        RetrievableStateStorageHelper<CompletedCheckpoint> 
stateStorage,
                        Executor executor) throws Exception {
 
+               super(executor);
+
                checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain 
at least one checkpoint.");
                checkNotNull(stateStorage, "State storage");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/471263cf/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index 9cfdec7..f9161b0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -45,8 +45,12 @@ public class SharedStateRegistry {
        private final Executor asyncDisposalExecutor;
 
        public SharedStateRegistry() {
+               this(Executors.directExecutor());
+       }
+
+       public SharedStateRegistry(Executor asyncDisposalExecutor) {
                this.registeredStates = new HashMap<>();
-               this.asyncDisposalExecutor = Executors.directExecutor(); 
//TODO: FLINK-6534
+               this.asyncDisposalExecutor = 
Preconditions.checkNotNull(asyncDisposalExecutor);
        }
 
        /**

Reply via email to