Repository: flink Updated Branches: refs/heads/master daf0ccda4 -> 3086af534
[FLINK-5501] [runtime] Extend RunningJobRegistry to job status created/running/done This closes #3385 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7011d78 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7011d78 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7011d78 Branch: refs/heads/master Commit: e7011d78a3019880a4e00ab5f697c3cfd20161bb Parents: 40743aa Author: shuai.xus <[email protected]> Authored: Wed Feb 22 14:15:43 2017 +0800 Committer: Stephan Ewen <[email protected]> Committed: Tue Feb 28 18:59:09 2017 +0100 ---------------------------------------------------------------------- .../FsNegativeRunningJobsRegistry.java | 86 ++++++++++++++++---- .../highavailability/RunningJobsRegistry.java | 32 ++++++++ .../highavailability/ZookeeperRegistry.java | 42 +++++++++- .../highavailability/nonha/NonHaRegistry.java | 31 +++++++ .../runtime/jobmaster/JobManagerRunner.java | 35 ++++---- .../minicluster/MiniClusterJobDispatcher.java | 30 ++++--- .../FsNegativeRunningJobsRegistryTest.java | 25 ++++-- .../highavailability/ZooKeeperRegistryTest.java | 15 +++- .../yarn/YarnFlinkApplicationMasterRunner.java | 19 ++--- 9 files changed, 249 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e7011d78/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java index 9d8b226..9e92263 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java @@ -30,14 +30,19 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** * This {@link RunningJobsRegistry} tracks the status jobs via marker files, - * marking finished jobs via marker files. + * marking running jobs via running marker files, + * marking finished jobs via finished marker files. * * <p>The general contract is the following: * <ul> * <li>Initially, a marker file does not exist (no one created it, yet), which means - * the specific job is assumed to be running</li> + * the specific job is pending</li> + * <li>The first JobManager that granted leadership calls this service to create the running marker file, + * which marks the job as running.</li> * <li>The JobManager that finishes calls this service to create the marker file, * which marks the job as finished.</li> + * <li>If a JobManager gains leadership but see the running marker file, + * it will realize that the job has been scheduled and need reconciling.</li> * <li>If a JobManager gains leadership at some point when shutdown is in progress, * it will see the marker file and realize that the job is finished.</li> * <li>The application framework is expected to clean the file once the application @@ -52,7 +57,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class FsNegativeRunningJobsRegistry implements RunningJobsRegistry { - private static final String PREFIX = ".job_complete_"; + private static final String DONE_PREFIX = ".job_complete_"; + + private static final String RUNNING_PREFIX = ".job_runing_"; private final FileSystem fileSystem; @@ -108,21 +115,19 @@ public class FsNegativeRunningJobsRegistry implements RunningJobsRegistry { @Override public void setJobRunning(JobID jobID) throws IOException { checkNotNull(jobID, "jobID"); - final Path filePath = createMarkerFilePath(jobID); + final Path filePath = createMarkerFilePath(RUNNING_PREFIX, jobID); - // delete the marker file, if it exists - try { - fileSystem.delete(filePath, false); - } - catch (FileNotFoundException e) { - // apparently job was already considered running + // create the file + // to avoid an exception if the job already exists, set overwrite=true + try (FSDataOutputStream out = fileSystem.create(filePath, true)) { + out.write(42); } } @Override public void setJobFinished(JobID jobID) throws IOException { checkNotNull(jobID, "jobID"); - final Path filePath = createMarkerFilePath(jobID); + final Path filePath = createMarkerFilePath(DONE_PREFIX, jobID); // create the file // to avoid an exception if the job already exists, set overwrite=true @@ -137,17 +142,64 @@ public class FsNegativeRunningJobsRegistry implements RunningJobsRegistry { // check for the existence of the file try { - fileSystem.getFileStatus(createMarkerFilePath(jobID)); - // file was found --> job is terminated + fileSystem.getFileStatus(createMarkerFilePath(RUNNING_PREFIX, jobID)); + // file was found --> job is running + return true; + } + catch (FileNotFoundException e) { + // file does not exist, job is not running return false; } + } + + @Override + public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws IOException { + checkNotNull(jobID, "jobID"); + + // first check for the existence of the complete file + try { + fileSystem.getFileStatus(createMarkerFilePath(DONE_PREFIX, jobID)); + // complete file was found --> job is terminated + return JobSchedulingStatus.DONE; + } + catch (FileNotFoundException e) { + // file does not exist, job is running or pending + } + // check for the existence of the running file + try { + fileSystem.getFileStatus(createMarkerFilePath(RUNNING_PREFIX, jobID)); + // running file was found --> job is terminated + return JobSchedulingStatus.RUNNING; + } + catch (FileNotFoundException e) { + // file does not exist, job is not scheduled + return JobSchedulingStatus.PENDING; + } + } + + @Override + public void clearJob(JobID jobID) throws IOException { + checkNotNull(jobID, "jobID"); + final Path runningFilePath = createMarkerFilePath(RUNNING_PREFIX, jobID); + + // delete the running marker file, if it exists + try { + fileSystem.delete(runningFilePath, false); + } + catch (FileNotFoundException e) { + } + + final Path doneFilePath = createMarkerFilePath(DONE_PREFIX, jobID); + + // delete the finished marker file, if it exists + try { + fileSystem.delete(doneFilePath, false); + } catch (FileNotFoundException e) { - // file does not exist, job is still running - return true; } } - private Path createMarkerFilePath(JobID jobId) { - return new Path(basePath, PREFIX + jobId.toString()); + private Path createMarkerFilePath(String prefix, JobID jobId) { + return new Path(basePath, prefix + jobId.toString()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/e7011d78/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java index e7c131c..020f2ca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java @@ -33,6 +33,17 @@ import java.io.IOException; */ public interface RunningJobsRegistry { + public enum JobSchedulingStatus { + /** Job has not been scheduled */ + PENDING, + + /** Job has been scheduled */ + RUNNING, + + /** Job has been finished */ + DONE; + } + /** * Marks a job as running. * @@ -63,4 +74,25 @@ public interface RunningJobsRegistry { * failed and could not be retried. */ boolean isJobRunning(JobID jobID) throws IOException; + + /** + * Get the scheduing status of a job. + * + * @param jobID The id of the job to check. + * @return The job scheduling status. + * + * @throws IOException Thrown when the communication with the highly-available storage or registry + * failed and could not be retried. + */ + JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws IOException; + + /** + * Clear job state form the registry, usually called after job finish + * + * @param jobID The id of the job to check. + * + * @throws IOException Thrown when the communication with the highly-available storage or registry + * failed and could not be retried. + */ + void clearJob(JobID jobID) throws IOException; } http://git-wip-us.apache.org/repos/asf/flink/blob/e7011d78/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java index c0621af..31a4535 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.zookeeper.data.Stat; import java.io.IOException; +import java.nio.charset.Charset; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -55,7 +56,7 @@ public class ZookeeperRegistry implements RunningJobsRegistry { try { String zkPath = runningJobPath + jobID.toString(); this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient()); - this.client.setData().forPath(zkPath); + this.client.setData().forPath(zkPath, JobSchedulingStatus.RUNNING.name().getBytes(Charset.forName("utf-8"))); } catch (Exception e) { throw new IOException("Set running state to zk fail for job " + jobID.toString(), e); @@ -69,7 +70,7 @@ public class ZookeeperRegistry implements RunningJobsRegistry { try { String zkPath = runningJobPath + jobID.toString(); this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient()); - this.client.delete().forPath(zkPath); + this.client.setData().forPath(zkPath, JobSchedulingStatus.DONE.name().getBytes(Charset.forName("utf-8"))); } catch (Exception e) { throw new IOException("Set finished state to zk fail for job " + jobID.toString(), e); @@ -83,7 +84,10 @@ public class ZookeeperRegistry implements RunningJobsRegistry { try { Stat stat = client.checkExists().forPath(runningJobPath + jobID.toString()); if (stat != null) { - return true; + byte[] data = client.getData().forPath(runningJobPath + jobID.toString()); + if (JobSchedulingStatus.RUNNING.name().equals(new String(data))) { + return true; + } } return false; } @@ -91,4 +95,36 @@ public class ZookeeperRegistry implements RunningJobsRegistry { throw new IOException("Get running state from zk fail for job " + jobID.toString(), e); } } + + @Override + public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws IOException { + checkNotNull(jobID); + + try { + Stat stat = client.checkExists().forPath(runningJobPath + jobID.toString()); + if (stat != null) { + byte[] data = client.getData().forPath(runningJobPath + jobID.toString()); + return JobSchedulingStatus.valueOf(new String(data)); + } + return JobSchedulingStatus.PENDING; + } + catch (Exception e) { + throw new IOException("Get finished state from zk fail for job " + jobID.toString(), e); + } + } + + @Override + public void clearJob(JobID jobID) throws IOException { + checkNotNull(jobID); + + try { + String zkPath = runningJobPath + jobID.toString(); + this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient()); + this.client.delete().forPath(zkPath); + } + catch (Exception e) { + throw new IOException("Clear job state from zk fail for " + jobID.toString(), e); + } + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/e7011d78/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java index 85dd711..e331212 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.highavailability.nonha; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.highavailability.RunningJobsRegistry; +import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus; import java.util.HashSet; @@ -33,12 +34,16 @@ public class NonHaRegistry implements RunningJobsRegistry { /** The currently running jobs */ private final HashSet<JobID> running = new HashSet<>(); + /** The currently finished jobs */ + private final HashSet<JobID> finished = new HashSet<>(); + @Override public void setJobRunning(JobID jobID) { checkNotNull(jobID); synchronized (running) { running.add(jobID); + finished.remove(jobID); } } @@ -48,6 +53,7 @@ public class NonHaRegistry implements RunningJobsRegistry { synchronized (running) { running.remove(jobID); + finished.add(jobID); } } @@ -59,4 +65,29 @@ public class NonHaRegistry implements RunningJobsRegistry { return running.contains(jobID); } } + + @Override + public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) { + checkNotNull(jobID); + + synchronized (running) { + if (finished.contains(jobID)) { + return JobSchedulingStatus.DONE; + } else if (running.contains(jobID)) { + return JobSchedulingStatus.RUNNING; + } else { + return JobSchedulingStatus.PENDING; + } + } + } + + @Override + public void clearJob(JobID jobID) { + checkNotNull(jobID); + + synchronized (running) { + running.remove(jobID); + finished.remove(jobID); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/e7011d78/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java index 9d8e004..6bebd90 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.RunningJobsRegistry; +import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.leaderelection.LeaderContender; @@ -359,29 +360,35 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F // it's okay that job manager wait for the operation complete leaderElectionService.confirmLeaderSessionID(leaderSessionID); - boolean jobRunning; + JobSchedulingStatus schedulingStatus = JobSchedulingStatus.PENDING; try { - jobRunning = runningJobsRegistry.isJobRunning(jobGraph.getJobID()); + schedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobGraph.getJobID()); + if (schedulingStatus.equals(JobSchedulingStatus.DONE)) { + log.info("Granted leader ship but job {} has been finished. ", jobGraph.getJobID()); + jobFinishedByOther(); + return; + } } catch (Throwable t) { - log.error("Could not access status (running/finished) of job {}. " + - "Falling back to assumption that job is running and attempting recovery...", - jobGraph.getJobID(), t); - jobRunning = true; + log.error("Could not access status (running/finished) of job {}. ", jobGraph.getJobID(), t); + onFatalError(t); + return; } // Double check the leadership after we confirm that, there is a small chance that multiple // job managers schedule the same job after if they try to recover at the same time. // This will eventually be noticed, but can not be ruled out from the beginning. if (leaderElectionService.hasLeadership()) { - if (jobRunning) { - try { - jobManager.start(leaderSessionID); - } catch (Exception e) { - onFatalError(new Exception("Could not start the job manager.", e)); + try { + // Now set the running status is after getting leader ship and + // set finished status after job in terminated status. + // So if finding the job is running, it means someone has already run the job, need recover. + if (schedulingStatus.equals(JobSchedulingStatus.PENDING)) { + runningJobsRegistry.setJobRunning(jobGraph.getJobID()); } - } else { - log.info("Job {} ({}) already finished by others.", jobGraph.getName(), jobGraph.getJobID()); - jobFinishedByOther(); + + jobManager.start(leaderSessionID); + } catch (Exception e) { + onFatalError(new Exception("Could not start the job manager.", e)); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/e7011d78/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java index 7fffaee..9178684 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java @@ -182,7 +182,7 @@ public class MiniClusterJobDispatcher { checkState(!shutdown, "mini cluster is shut down"); checkState(runners == null, "mini cluster can only execute one job at a time"); - DetachedFinalizer finalizer = new DetachedFinalizer(numJobManagers); + DetachedFinalizer finalizer = new DetachedFinalizer(job.getJobID(), numJobManagers); this.runners = startJobRunners(job, finalizer, finalizer); } @@ -217,6 +217,7 @@ public class MiniClusterJobDispatcher { finally { // always clear the status for the next job runners = null; + clearJobRunningState(job.getJobID()); } } @@ -227,16 +228,6 @@ public class MiniClusterJobDispatcher { LOG.info("Starting {} JobMaster(s) for job {} ({})", numJobManagers, job.getName(), job.getJobID()); - // we first need to mark the job as running in the HA services, so that the - // JobManager leader will recognize that it as work to do - try { - haServices.getRunningJobsRegistry().setJobRunning(job.getJobID()); - } - catch (Throwable t) { - throw new JobExecutionException(job.getJobID(), - "Could not register the job at the high-availability services", t); - } - // start all JobManagers JobManagerRunner[] runners = new JobManagerRunner[numJobManagers]; for (int i = 0; i < numJobManagers; i++) { @@ -273,6 +264,17 @@ public class MiniClusterJobDispatcher { return runners; } + private void clearJobRunningState(JobID jobID) { + // we mark the job as finished in the HA services, so need + // to remove the data after job finished + try { + haServices.getRunningJobsRegistry().clearJob(jobID); + } + catch (Throwable t) { + LOG.warn("Could not clear the job {} at the high-availability services", jobID.toString(), t); + } + } + // ------------------------------------------------------------------------ // test methods to simulate job master failures // ------------------------------------------------------------------------ @@ -298,9 +300,12 @@ public class MiniClusterJobDispatcher { */ private class DetachedFinalizer implements OnCompletionActions, FatalErrorHandler { + private final JobID jobID; + private final AtomicInteger numJobManagersToWaitFor; - private DetachedFinalizer(int numJobManagersToWaitFor) { + private DetachedFinalizer(JobID jobID, int numJobManagersToWaitFor) { + this.jobID = jobID; this.numJobManagersToWaitFor = new AtomicInteger(numJobManagersToWaitFor); } @@ -327,6 +332,7 @@ public class MiniClusterJobDispatcher { private void decrementCheckAndCleanup() { if (numJobManagersToWaitFor.decrementAndGet() == 0) { MiniClusterJobDispatcher.this.runners = null; + MiniClusterJobDispatcher.this.clearJobRunningState(jobID); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/e7011d78/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java index f1ece0e..bbafcf0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.highavailability; import org.apache.flink.api.common.JobID; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus; import org.apache.flink.util.TestLogger; import org.junit.Rule; @@ -28,6 +29,7 @@ import org.junit.rules.TemporaryFolder; import java.io.File; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -45,20 +47,23 @@ public class FsNegativeRunningJobsRegistryTest extends TestLogger { FsNegativeRunningJobsRegistry registry = new FsNegativeRunningJobsRegistry(new Path(uri)); - // initially, without any call, the job is considered running - assertTrue(registry.isJobRunning(jid)); + // initially, without any call, the job is pending + assertFalse(registry.isJobRunning(jid)); + assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.PENDING); - // repeated setting should not affect the status + // after set running, the job is running registry.setJobRunning(jid); assertTrue(registry.isJobRunning(jid)); + assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.RUNNING); // set the job to finished and validate registry.setJobFinished(jid); - assertFalse(registry.isJobRunning(jid)); + assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.DONE); // another registry should pick this up FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(new Path(uri)); - assertFalse(otherRegistry.isJobRunning(jid)); + assertTrue(otherRegistry.isJobRunning(jid)); + assertEquals(otherRegistry.getJobSchedulingStatus(jid), JobSchedulingStatus.DONE); } @Test @@ -73,13 +78,21 @@ public class FsNegativeRunningJobsRegistryTest extends TestLogger { // set the job to finished and validate registry.setJobFinished(jid); assertFalse(registry.isJobRunning(jid)); + assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.DONE); - // set the job to back to running and validate + // set the job to running does not overwrite the finished status registry.setJobRunning(jid); assertTrue(registry.isJobRunning(jid)); + assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.DONE); // another registry should pick this up FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(new Path(uri)); assertTrue(otherRegistry.isJobRunning(jid)); + assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.DONE); + + // clear the running and finished marker, it will be pending + otherRegistry.clearJob(jid); + assertFalse(otherRegistry.isJobRunning(jid)); + assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.PENDING); } } http://git-wip-us.apache.org/repos/asf/flink/blob/e7011d78/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java index 72982c8..8c91898 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java @@ -22,6 +22,7 @@ import org.apache.curator.test.TestingServer; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus; import org.apache.flink.util.TestLogger; import org.junit.After; import org.junit.Before; @@ -29,7 +30,9 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class ZooKeeperRegistryTest extends TestLogger { private TestingServer testingServer; @@ -61,14 +64,20 @@ public class ZooKeeperRegistryTest extends TestLogger { try { JobID jobID = JobID.generate(); - assertTrue(!zkRegistry.isJobRunning(jobID)); + assertFalse(zkRegistry.isJobRunning(jobID)); + assertEquals(zkRegistry.getJobSchedulingStatus(jobID), JobSchedulingStatus.PENDING); zkRegistry.setJobRunning(jobID); assertTrue(zkRegistry.isJobRunning(jobID)); + assertEquals(zkRegistry.getJobSchedulingStatus(jobID), JobSchedulingStatus.RUNNING); zkRegistry.setJobFinished(jobID); - assertTrue(!zkRegistry.isJobRunning(jobID)); + assertEquals(zkRegistry.getJobSchedulingStatus(jobID), JobSchedulingStatus.DONE); + assertFalse(zkRegistry.isJobRunning(jobID)); + zkRegistry.clearJob(jobID); + assertFalse(zkRegistry.isJobRunning(jobID)); + assertEquals(zkRegistry.getJobSchedulingStatus(jobID), JobSchedulingStatus.PENDING); } finally { if (zkHaService != null) { zkHaService.close(); http://git-wip-us.apache.org/repos/asf/flink/blob/e7011d78/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java index 257212b..e2aa6ec 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java @@ -24,7 +24,6 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -206,16 +205,6 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati //TODO: generate the job graph from user's jar jobGraph = loadJobGraph(config); - // we first need to mark the job as running in the HA services, so that the - // JobManager leader will recognize that it as work to do - try { - haServices.getRunningJobsRegistry().setJobRunning(jobGraph.getJobID()); - } - catch (Throwable t) { - throw new JobExecutionException(jobGraph.getJobID(), - "Could not register the job at the high-availability services", t); - } - // now the JobManagerRunner return new JobManagerRunner( jobGraph, config, @@ -226,6 +215,14 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati } protected void shutdown(ApplicationStatus status, String msg) { + // Need to clear the job state in the HA services before shutdown + try { + haServices.getRunningJobsRegistry().clearJob(jobGraph.getJobID()); + } + catch (Throwable t) { + LOG.warn("Could not clear the job at the high-availability services", t); + } + synchronized (lock) { if (jobManagerRunner != null) { try {
