This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fabd246e5342001d64be55c820ba50b3cf75d2a6
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Mon Aug 20 09:42:28 2018 +0200

    [FLINK-10011] Introduce SubmittedJobGraphStore#releaseJobGraph
    
    SubmitedJobGraphStore#releaseJobGraph removes a potentially existing lock
    from the specified JobGraph. This allows other SubmittedJobGraphStores to
    remove the JobGraph given that it is no longer locked.
---
 .../SingleJobSubmittedJobGraphStore.java           |  9 ++++++--
 .../StandaloneSubmittedJobGraphStore.java          | 13 ++++++++----
 .../runtime/jobmanager/SubmittedJobGraphStore.java | 12 +++++++++++
 .../ZooKeeperSubmittedJobGraphStore.java           | 24 +++++++++++++++++++---
 .../flink/runtime/dispatcher/DispatcherHATest.java |  5 +++++
 .../testutils/InMemorySubmittedJobGraphStore.java  |  5 +++++
 6 files changed, 59 insertions(+), 9 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobSubmittedJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobSubmittedJobGraphStore.java
index 26d3abc..fe7f5f1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobSubmittedJobGraphStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobSubmittedJobGraphStore.java
@@ -66,12 +66,17 @@ public class SingleJobSubmittedJobGraphStore implements 
SubmittedJobGraphStore {
        }
 
        @Override
-       public void removeJobGraph(JobID jobId) throws Exception {
+       public void removeJobGraph(JobID jobId) {
                // ignore
        }
 
        @Override
-       public Collection<JobID> getJobIds() throws Exception {
+       public void releaseJobGraph(JobID jobId) {
+               // ignore
+       }
+
+       @Override
+       public Collection<JobID> getJobIds() {
                return Collections.singleton(jobGraph.getJobID());
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
index d1ca1a3..f28621f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
@@ -43,22 +43,27 @@ public class StandaloneSubmittedJobGraphStore implements 
SubmittedJobGraphStore
        }
 
        @Override
-       public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
+       public void putJobGraph(SubmittedJobGraph jobGraph) {
                // Nothing to do
        }
 
        @Override
-       public void removeJobGraph(JobID jobId) throws Exception {
+       public void removeJobGraph(JobID jobId) {
                // Nothing to do
        }
 
        @Override
-       public Collection<JobID> getJobIds() throws Exception {
+       public void releaseJobGraph(JobID jobId) {
+               // nothing to do
+       }
+
+       @Override
+       public Collection<JobID> getJobIds() {
                return Collections.emptyList();
        }
 
        @Override
-       public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
+       public SubmittedJobGraph recoverJobGraph(JobID jobId) {
                return null;
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
index 7e624ec..b40a4a2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 
 import javax.annotation.Nullable;
 
@@ -59,6 +60,17 @@ public interface SubmittedJobGraphStore {
        void removeJobGraph(JobID jobId) throws Exception;
 
        /**
+        * Releases the locks on the specified {@link JobGraph}.
+        *
+        * Releasing the locks allows that another instance can delete the job 
from
+        * the {@link SubmittedJobGraphStore}.
+        *
+        * @param jobId specifying the job to release the locks for
+        * @throws Exception if the locks cannot be released
+        */
+       void releaseJobGraph(JobID jobId) throws Exception;
+
+       /**
         * Get all job ids of submitted job graphs to the submitted job graph 
store.
         *
         * @return Collection of submitted job ids
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index 0510815..2b935af 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -68,13 +68,13 @@ public class ZooKeeperSubmittedJobGraphStore implements 
SubmittedJobGraphStore {
        /** Lock to synchronize with the {@link SubmittedJobGraphListener}. */
        private final Object cacheLock = new Object();
 
-       /** Client (not a namespace facade) */
+       /** Client (not a namespace facade). */
        private final CuratorFramework client;
 
        /** The set of IDs of all added job graphs. */
        private final Set<JobID> addedJobGraphs = new HashSet<>();
 
-       /** Completed checkpoints in ZooKeeper */
+       /** Completed checkpoints in ZooKeeper. */
        private final ZooKeeperStateHandleStore<SubmittedJobGraph> 
jobGraphsInZooKeeper;
 
        /**
@@ -93,7 +93,7 @@ public class ZooKeeperSubmittedJobGraphStore implements 
SubmittedJobGraphStore {
        private boolean isRunning;
 
        /**
-        * Submitted job graph store backed by ZooKeeper
+        * Submitted job graph store backed by ZooKeeper.
         *
         * @param client ZooKeeper client
         * @param currentJobsPath ZooKeeper path for current job graphs
@@ -274,6 +274,24 @@ public class ZooKeeperSubmittedJobGraphStore implements 
SubmittedJobGraphStore {
        }
 
        @Override
+       public void releaseJobGraph(JobID jobId) throws Exception {
+               checkNotNull(jobId, "Job ID");
+               final String path = getPathForJob(jobId);
+
+               LOG.debug("Releasing locks of job graph {} from {}{}.", jobId, 
zooKeeperFullBasePath, path);
+
+               synchronized (cacheLock) {
+                       if (addedJobGraphs.contains(jobId)) {
+                               jobGraphsInZooKeeper.release(path);
+
+                               addedJobGraphs.remove(jobId);
+                       }
+               }
+
+               LOG.info("Released locks of job graph {} from ZooKeeper.", 
jobId);
+       }
+
+       @Override
        public Collection<JobID> getJobIds() throws Exception {
                Collection<String> paths;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index 2c030d2..5876c5f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -244,6 +244,11 @@ public class DispatcherHATest extends TestLogger {
                }
 
                @Override
+               public void releaseJobGraph(JobID jobId) throws Exception {
+                       throw new UnsupportedOperationException("Should not be 
called.");
+               }
+
+               @Override
                public Collection<JobID> getJobIds() throws Exception {
                        enterGetJobIdsLatch.trigger();
                        proceedGetJobIdsLatch.await();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
index ba0dc80..3b9c578 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
@@ -97,6 +97,11 @@ public class InMemorySubmittedJobGraphStore implements 
SubmittedJobGraphStore {
        }
 
        @Override
+       public void releaseJobGraph(JobID jobId) {
+               verifyIsStarted();
+       }
+
+       @Override
        public synchronized Collection<JobID> getJobIds() throws Exception {
                verifyIsStarted();
 

Reply via email to