This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.6 in repository https://gitbox.apache.org/repos/asf/flink.git
commit fabd246e5342001d64be55c820ba50b3cf75d2a6 Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Mon Aug 20 09:42:28 2018 +0200 [FLINK-10011] Introduce SubmittedJobGraphStore#releaseJobGraph SubmitedJobGraphStore#releaseJobGraph removes a potentially existing lock from the specified JobGraph. This allows other SubmittedJobGraphStores to remove the JobGraph given that it is no longer locked. --- .../SingleJobSubmittedJobGraphStore.java | 9 ++++++-- .../StandaloneSubmittedJobGraphStore.java | 13 ++++++++---- .../runtime/jobmanager/SubmittedJobGraphStore.java | 12 +++++++++++ .../ZooKeeperSubmittedJobGraphStore.java | 24 +++++++++++++++++++--- .../flink/runtime/dispatcher/DispatcherHATest.java | 5 +++++ .../testutils/InMemorySubmittedJobGraphStore.java | 5 +++++ 6 files changed, 59 insertions(+), 9 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobSubmittedJobGraphStore.java index 26d3abc..fe7f5f1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobSubmittedJobGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobSubmittedJobGraphStore.java @@ -66,12 +66,17 @@ public class SingleJobSubmittedJobGraphStore implements SubmittedJobGraphStore { } @Override - public void removeJobGraph(JobID jobId) throws Exception { + public void removeJobGraph(JobID jobId) { // ignore } @Override - public Collection<JobID> getJobIds() throws Exception { + public void releaseJobGraph(JobID jobId) { + // ignore + } + + @Override + public Collection<JobID> getJobIds() { return Collections.singleton(jobGraph.getJobID()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java index d1ca1a3..f28621f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java @@ -43,22 +43,27 @@ public class StandaloneSubmittedJobGraphStore implements SubmittedJobGraphStore } @Override - public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception { + public void putJobGraph(SubmittedJobGraph jobGraph) { // Nothing to do } @Override - public void removeJobGraph(JobID jobId) throws Exception { + public void removeJobGraph(JobID jobId) { // Nothing to do } @Override - public Collection<JobID> getJobIds() throws Exception { + public void releaseJobGraph(JobID jobId) { + // nothing to do + } + + @Override + public Collection<JobID> getJobIds() { return Collections.emptyList(); } @Override - public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception { + public SubmittedJobGraph recoverJobGraph(JobID jobId) { return null; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java index 7e624ec..b40a4a2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobmanager; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobGraph; import javax.annotation.Nullable; @@ -59,6 +60,17 @@ public interface SubmittedJobGraphStore { void removeJobGraph(JobID jobId) throws Exception; /** + * Releases the locks on the specified {@link JobGraph}. + * + * Releasing the locks allows that another instance can delete the job from + * the {@link SubmittedJobGraphStore}. + * + * @param jobId specifying the job to release the locks for + * @throws Exception if the locks cannot be released + */ + void releaseJobGraph(JobID jobId) throws Exception; + + /** * Get all job ids of submitted job graphs to the submitted job graph store. * * @return Collection of submitted job ids diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java index 0510815..2b935af 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java @@ -68,13 +68,13 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore { /** Lock to synchronize with the {@link SubmittedJobGraphListener}. */ private final Object cacheLock = new Object(); - /** Client (not a namespace facade) */ + /** Client (not a namespace facade). */ private final CuratorFramework client; /** The set of IDs of all added job graphs. */ private final Set<JobID> addedJobGraphs = new HashSet<>(); - /** Completed checkpoints in ZooKeeper */ + /** Completed checkpoints in ZooKeeper. */ private final ZooKeeperStateHandleStore<SubmittedJobGraph> jobGraphsInZooKeeper; /** @@ -93,7 +93,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore { private boolean isRunning; /** - * Submitted job graph store backed by ZooKeeper + * Submitted job graph store backed by ZooKeeper. * * @param client ZooKeeper client * @param currentJobsPath ZooKeeper path for current job graphs @@ -274,6 +274,24 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore { } @Override + public void releaseJobGraph(JobID jobId) throws Exception { + checkNotNull(jobId, "Job ID"); + final String path = getPathForJob(jobId); + + LOG.debug("Releasing locks of job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path); + + synchronized (cacheLock) { + if (addedJobGraphs.contains(jobId)) { + jobGraphsInZooKeeper.release(path); + + addedJobGraphs.remove(jobId); + } + } + + LOG.info("Released locks of job graph {} from ZooKeeper.", jobId); + } + + @Override public Collection<JobID> getJobIds() throws Exception { Collection<String> paths; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java index 2c030d2..5876c5f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java @@ -244,6 +244,11 @@ public class DispatcherHATest extends TestLogger { } @Override + public void releaseJobGraph(JobID jobId) throws Exception { + throw new UnsupportedOperationException("Should not be called."); + } + + @Override public Collection<JobID> getJobIds() throws Exception { enterGetJobIdsLatch.trigger(); proceedGetJobIdsLatch.await(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java index ba0dc80..3b9c578 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java @@ -97,6 +97,11 @@ public class InMemorySubmittedJobGraphStore implements SubmittedJobGraphStore { } @Override + public void releaseJobGraph(JobID jobId) { + verifyIsStarted(); + } + + @Override public synchronized Collection<JobID> getJobIds() throws Exception { verifyIsStarted();