Repository: flink
Updated Branches:
  refs/heads/master daf0ccda4 -> 3086af534


[FLINK-5501] [runtime] Extend RunningJobRegistry to job status 
created/running/done

This closes #3385


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7011d78
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7011d78
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7011d78

Branch: refs/heads/master
Commit: e7011d78a3019880a4e00ab5f697c3cfd20161bb
Parents: 40743aa
Author: shuai.xus <[email protected]>
Authored: Wed Feb 22 14:15:43 2017 +0800
Committer: Stephan Ewen <[email protected]>
Committed: Tue Feb 28 18:59:09 2017 +0100

----------------------------------------------------------------------
 .../FsNegativeRunningJobsRegistry.java          | 86 ++++++++++++++++----
 .../highavailability/RunningJobsRegistry.java   | 32 ++++++++
 .../highavailability/ZookeeperRegistry.java     | 42 +++++++++-
 .../highavailability/nonha/NonHaRegistry.java   | 31 +++++++
 .../runtime/jobmaster/JobManagerRunner.java     | 35 ++++----
 .../minicluster/MiniClusterJobDispatcher.java   | 30 ++++---
 .../FsNegativeRunningJobsRegistryTest.java      | 25 ++++--
 .../highavailability/ZooKeeperRegistryTest.java | 15 +++-
 .../yarn/YarnFlinkApplicationMasterRunner.java  | 19 ++---
 9 files changed, 249 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e7011d78/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
index 9d8b226..9e92263 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
@@ -30,14 +30,19 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * This {@link RunningJobsRegistry} tracks the status jobs via marker files,
- * marking finished jobs via marker files.
+ * marking running jobs via running marker files,
+ * marking finished jobs via finished marker files.
  * 
  * <p>The general contract is the following:
  * <ul>
  *     <li>Initially, a marker file does not exist (no one created it, yet), 
which means
- *         the specific job is assumed to be running</li>
+ *         the specific job is pending</li>
+ *     <li>The first JobManager that granted leadership calls this service to 
create the running marker file,
+ *         which marks the job as running.</li>
  *     <li>The JobManager that finishes calls this service to create the 
marker file,
  *         which marks the job as finished.</li>
+ *     <li>If a JobManager gains leadership but see the running marker file,
+ *         it will realize that the job has been scheduled and need 
reconciling.</li>
  *     <li>If a JobManager gains leadership at some point when shutdown is in 
progress,
  *         it will see the marker file and realize that the job is 
finished.</li>
  *     <li>The application framework is expected to clean the file once the 
application
@@ -52,7 +57,9 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class FsNegativeRunningJobsRegistry implements RunningJobsRegistry {
 
-       private static final String PREFIX = ".job_complete_";
+       private static final String DONE_PREFIX = ".job_complete_";
+
+       private static final String RUNNING_PREFIX = ".job_runing_";
 
        private final FileSystem fileSystem;
 
@@ -108,21 +115,19 @@ public class FsNegativeRunningJobsRegistry implements 
RunningJobsRegistry {
        @Override
        public void setJobRunning(JobID jobID) throws IOException {
                checkNotNull(jobID, "jobID");
-               final Path filePath = createMarkerFilePath(jobID);
+               final Path filePath = createMarkerFilePath(RUNNING_PREFIX, 
jobID);
 
-               // delete the marker file, if it exists
-               try {
-                       fileSystem.delete(filePath, false);
-               }
-               catch (FileNotFoundException e) {
-                       // apparently job was already considered running
+               // create the file
+               // to avoid an exception if the job already exists, set 
overwrite=true
+               try (FSDataOutputStream out = fileSystem.create(filePath, 
true)) {
+                       out.write(42);
                }
        }
 
        @Override
        public void setJobFinished(JobID jobID) throws IOException {
                checkNotNull(jobID, "jobID");
-               final Path filePath = createMarkerFilePath(jobID);
+               final Path filePath = createMarkerFilePath(DONE_PREFIX, jobID);
 
                // create the file
                // to avoid an exception if the job already exists, set 
overwrite=true
@@ -137,17 +142,64 @@ public class FsNegativeRunningJobsRegistry implements 
RunningJobsRegistry {
 
                // check for the existence of the file
                try {
-                       fileSystem.getFileStatus(createMarkerFilePath(jobID));
-                       // file was found --> job is terminated
+                       
fileSystem.getFileStatus(createMarkerFilePath(RUNNING_PREFIX, jobID));
+                       // file was found --> job is running
+                       return true;
+               }
+               catch (FileNotFoundException e) {
+                       // file does not exist, job is not running
                        return false;
                }
+       }
+
+       @Override
+       public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws 
IOException {
+               checkNotNull(jobID, "jobID");
+
+               // first check for the existence of the complete file
+               try {
+                       
fileSystem.getFileStatus(createMarkerFilePath(DONE_PREFIX, jobID));
+                       // complete file was found --> job is terminated
+                       return JobSchedulingStatus.DONE;
+               }
+               catch (FileNotFoundException e) {
+                       // file does not exist, job is running or pending
+               }
+               // check for the existence of the running file
+               try {
+                       
fileSystem.getFileStatus(createMarkerFilePath(RUNNING_PREFIX, jobID));
+                       // running file was found --> job is terminated
+                       return JobSchedulingStatus.RUNNING;
+               }
+               catch (FileNotFoundException e) {
+                       // file does not exist, job is not scheduled
+                       return JobSchedulingStatus.PENDING;
+               }
+       }
+
+       @Override
+       public void clearJob(JobID jobID) throws IOException {
+               checkNotNull(jobID, "jobID");
+               final Path runningFilePath = 
createMarkerFilePath(RUNNING_PREFIX, jobID);
+
+               // delete the running marker file, if it exists
+               try {
+                       fileSystem.delete(runningFilePath, false);
+               }
+               catch (FileNotFoundException e) {
+               }
+
+               final Path doneFilePath = createMarkerFilePath(DONE_PREFIX, 
jobID);
+
+               // delete the finished marker file, if it exists
+               try {
+                       fileSystem.delete(doneFilePath, false);
+               }
                catch (FileNotFoundException e) {
-                       // file does not exist, job is still running
-                       return true;
                }
        }
 
-       private Path createMarkerFilePath(JobID jobId) {
-               return new Path(basePath, PREFIX + jobId.toString());
+       private Path createMarkerFilePath(String prefix, JobID jobId) {
+               return new Path(basePath, prefix + jobId.toString());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e7011d78/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
index e7c131c..020f2ca 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
@@ -33,6 +33,17 @@ import java.io.IOException;
  */
 public interface RunningJobsRegistry {
 
+       public enum JobSchedulingStatus {
+               /** Job has not been scheduled */
+               PENDING,
+
+               /** Job has been scheduled */
+               RUNNING,
+
+               /** Job has been finished */
+               DONE;
+       }
+
        /**
         * Marks a job as running.
         * 
@@ -63,4 +74,25 @@ public interface RunningJobsRegistry {
         *                     failed and could not be retried.
         */
        boolean isJobRunning(JobID jobID) throws IOException;
+
+       /**
+        * Get the scheduing status of a job.
+        *
+        * @param jobID The id of the job to check.
+        * @return The job scheduling status.
+        * 
+        * @throws IOException Thrown when the communication with the 
highly-available storage or registry
+        *                     failed and could not be retried.
+        */
+       JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws 
IOException;
+
+       /**
+        * Clear job state form the registry, usually called after job finish
+        *
+        * @param jobID The id of the job to check.
+        * 
+        * @throws IOException Thrown when the communication with the 
highly-available storage or registry
+        *                     failed and could not be retried.
+        */
+       void clearJob(JobID jobID) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e7011d78/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
index c0621af..31a4535 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.zookeeper.data.Stat;
 
 import java.io.IOException;
+import java.nio.charset.Charset;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -55,7 +56,7 @@ public class ZookeeperRegistry implements RunningJobsRegistry 
{
                try {
                        String zkPath = runningJobPath + jobID.toString();
                        
this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
-                       this.client.setData().forPath(zkPath);
+                       this.client.setData().forPath(zkPath, 
JobSchedulingStatus.RUNNING.name().getBytes(Charset.forName("utf-8")));
                }
                catch (Exception e) {
                        throw new IOException("Set running state to zk fail for 
job " + jobID.toString(), e);
@@ -69,7 +70,7 @@ public class ZookeeperRegistry implements RunningJobsRegistry 
{
                try {
                        String zkPath = runningJobPath + jobID.toString();
                        
this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
-                       this.client.delete().forPath(zkPath);
+                       this.client.setData().forPath(zkPath, 
JobSchedulingStatus.DONE.name().getBytes(Charset.forName("utf-8")));
                }
                catch (Exception e) {
                        throw new IOException("Set finished state to zk fail 
for job " + jobID.toString(), e);
@@ -83,7 +84,10 @@ public class ZookeeperRegistry implements 
RunningJobsRegistry {
                try {
                        Stat stat = client.checkExists().forPath(runningJobPath 
+ jobID.toString());
                        if (stat != null) {
-                               return true;
+                               byte[] data = 
client.getData().forPath(runningJobPath + jobID.toString());
+                               if 
(JobSchedulingStatus.RUNNING.name().equals(new String(data))) {
+                                       return true;
+                               }
                        }
                        return false;
                }
@@ -91,4 +95,36 @@ public class ZookeeperRegistry implements 
RunningJobsRegistry {
                        throw new IOException("Get running state from zk fail 
for job " + jobID.toString(), e);
                }
        }
+
+       @Override
+       public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws 
IOException {
+               checkNotNull(jobID);
+
+               try {
+                       Stat stat = client.checkExists().forPath(runningJobPath 
+ jobID.toString());
+                       if (stat != null) {
+                               byte[] data = 
client.getData().forPath(runningJobPath + jobID.toString());
+                               return JobSchedulingStatus.valueOf(new 
String(data));
+                       }
+                       return JobSchedulingStatus.PENDING;
+               }
+               catch (Exception e) {
+                       throw new IOException("Get finished state from zk fail 
for job " + jobID.toString(), e);
+               }
+       }
+
+       @Override
+       public void clearJob(JobID jobID) throws IOException {
+               checkNotNull(jobID);
+
+               try {
+                       String zkPath = runningJobPath + jobID.toString();
+                       
this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
+                       this.client.delete().forPath(zkPath);
+               }
+               catch (Exception e) {
+                       throw new IOException("Clear job state from zk fail for 
" + jobID.toString(), e);
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e7011d78/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
index 85dd711..e331212 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.highavailability.nonha;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import 
org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
 
 import java.util.HashSet;
 
@@ -33,12 +34,16 @@ public class NonHaRegistry implements RunningJobsRegistry {
        /** The currently running jobs */
        private final HashSet<JobID> running = new HashSet<>();
 
+       /** The currently finished jobs */
+       private final HashSet<JobID> finished = new HashSet<>();
+
        @Override
        public void setJobRunning(JobID jobID) {
                checkNotNull(jobID);
 
                synchronized (running) {
                        running.add(jobID);
+                       finished.remove(jobID);
                }
        }
 
@@ -48,6 +53,7 @@ public class NonHaRegistry implements RunningJobsRegistry {
 
                synchronized (running) {
                        running.remove(jobID);
+                       finished.add(jobID);
                }
        }
 
@@ -59,4 +65,29 @@ public class NonHaRegistry implements RunningJobsRegistry {
                        return running.contains(jobID);
                }
        }
+
+       @Override
+       public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) {
+               checkNotNull(jobID);
+
+               synchronized (running) {
+                       if (finished.contains(jobID)) {
+                               return JobSchedulingStatus.DONE;
+                       } else if (running.contains(jobID)) {
+                               return JobSchedulingStatus.RUNNING;
+                       } else {
+                               return JobSchedulingStatus.PENDING;
+                       }
+               }
+       }
+
+       @Override
+       public void clearJob(JobID jobID) {
+               checkNotNull(jobID);
+
+               synchronized (running) {
+                       running.remove(jobID);
+                       finished.remove(jobID);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e7011d78/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 9d8e004..6bebd90 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import 
org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
@@ -359,29 +360,35 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
                        // it's okay that job manager wait for the operation 
complete
                        
leaderElectionService.confirmLeaderSessionID(leaderSessionID);
 
-                       boolean jobRunning;
+                       JobSchedulingStatus schedulingStatus = 
JobSchedulingStatus.PENDING;
                        try {
-                               jobRunning = 
runningJobsRegistry.isJobRunning(jobGraph.getJobID());
+                               schedulingStatus = 
runningJobsRegistry.getJobSchedulingStatus(jobGraph.getJobID());
+                               if 
(schedulingStatus.equals(JobSchedulingStatus.DONE)) {
+                                       log.info("Granted leader ship but job 
{} has been finished. ", jobGraph.getJobID());
+                                       jobFinishedByOther();
+                                       return;
+                               }
                        } catch (Throwable t) {
-                               log.error("Could not access status 
(running/finished) of job {}. " +
-                                               "Falling back to assumption 
that job is running and attempting recovery...",
-                                               jobGraph.getJobID(), t);
-                               jobRunning = true;
+                               log.error("Could not access status 
(running/finished) of job {}. ",     jobGraph.getJobID(), t);
+                               onFatalError(t);
+                               return;
                        }
 
                        // Double check the leadership after we confirm that, 
there is a small chance that multiple
                        // job managers schedule the same job after if they try 
to recover at the same time.
                        // This will eventually be noticed, but can not be 
ruled out from the beginning.
                        if (leaderElectionService.hasLeadership()) {
-                               if (jobRunning) {
-                                       try {
-                                               
jobManager.start(leaderSessionID);
-                                       } catch (Exception e) {
-                                               onFatalError(new 
Exception("Could not start the job manager.", e));
+                               try {
+                                       // Now set the running status is after 
getting leader ship and 
+                                       // set finished status after job in 
terminated status.
+                                       // So if finding the job is running, it 
means someone has already run the job, need recover.
+                                       if 
(schedulingStatus.equals(JobSchedulingStatus.PENDING)) {
+                                               
runningJobsRegistry.setJobRunning(jobGraph.getJobID());
                                        }
-                               } else {
-                                       log.info("Job {} ({}) already finished 
by others.", jobGraph.getName(), jobGraph.getJobID());
-                                       jobFinishedByOther();
+
+                                       jobManager.start(leaderSessionID);
+                               } catch (Exception e) {
+                                       onFatalError(new Exception("Could not 
start the job manager.", e));
                                }
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/e7011d78/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index 7fffaee..9178684 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -182,7 +182,7 @@ public class MiniClusterJobDispatcher {
                        checkState(!shutdown, "mini cluster is shut down");
                        checkState(runners == null, "mini cluster can only 
execute one job at a time");
 
-                       DetachedFinalizer finalizer = new 
DetachedFinalizer(numJobManagers);
+                       DetachedFinalizer finalizer = new 
DetachedFinalizer(job.getJobID(), numJobManagers);
 
                        this.runners = startJobRunners(job, finalizer, 
finalizer);
                }
@@ -217,6 +217,7 @@ public class MiniClusterJobDispatcher {
                finally {
                        // always clear the status for the next job
                        runners = null;
+                       clearJobRunningState(job.getJobID());
                }
        }
 
@@ -227,16 +228,6 @@ public class MiniClusterJobDispatcher {
 
                LOG.info("Starting {} JobMaster(s) for job {} ({})", 
numJobManagers, job.getName(), job.getJobID());
 
-               // we first need to mark the job as running in the HA services, 
so that the
-               // JobManager leader will recognize that it as work to do
-               try {
-                       
haServices.getRunningJobsRegistry().setJobRunning(job.getJobID());
-               }
-               catch (Throwable t) {
-                       throw new JobExecutionException(job.getJobID(),
-                                       "Could not register the job at the 
high-availability services", t);
-               }
-
                // start all JobManagers
                JobManagerRunner[] runners = new 
JobManagerRunner[numJobManagers];
                for (int i = 0; i < numJobManagers; i++) {
@@ -273,6 +264,17 @@ public class MiniClusterJobDispatcher {
                return runners;
        }
 
+       private void clearJobRunningState(JobID jobID) {
+               // we mark the job as finished in the HA services, so need
+               // to remove the data after job finished
+               try {
+                       haServices.getRunningJobsRegistry().clearJob(jobID);
+               }
+               catch (Throwable t) {
+                       LOG.warn("Could not clear the job {} at the 
high-availability services", jobID.toString(), t);
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  test methods to simulate job master failures
        // 
------------------------------------------------------------------------
@@ -298,9 +300,12 @@ public class MiniClusterJobDispatcher {
         */
        private class DetachedFinalizer implements OnCompletionActions, 
FatalErrorHandler {
 
+               private final JobID jobID;
+
                private final AtomicInteger numJobManagersToWaitFor;
 
-               private DetachedFinalizer(int numJobManagersToWaitFor) {
+               private DetachedFinalizer(JobID jobID, int 
numJobManagersToWaitFor) {
+                       this.jobID = jobID;
                        this.numJobManagersToWaitFor = new 
AtomicInteger(numJobManagersToWaitFor);
                }
 
@@ -327,6 +332,7 @@ public class MiniClusterJobDispatcher {
                private void decrementCheckAndCleanup() {
                        if (numJobManagersToWaitFor.decrementAndGet() == 0) {
                                MiniClusterJobDispatcher.this.runners = null;
+                               
MiniClusterJobDispatcher.this.clearJobRunningState(jobID);
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e7011d78/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
index f1ece0e..bbafcf0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.highavailability;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Rule;
@@ -28,6 +29,7 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -45,20 +47,23 @@ public class FsNegativeRunningJobsRegistryTest extends 
TestLogger {
 
                FsNegativeRunningJobsRegistry registry = new 
FsNegativeRunningJobsRegistry(new Path(uri));
 
-               // initially, without any call, the job is considered running
-               assertTrue(registry.isJobRunning(jid));
+               // initially, without any call, the job is pending
+               assertFalse(registry.isJobRunning(jid));
+               assertEquals(registry.getJobSchedulingStatus(jid), 
JobSchedulingStatus.PENDING);
 
-               // repeated setting should not affect the status
+               // after set running, the job is running
                registry.setJobRunning(jid);
                assertTrue(registry.isJobRunning(jid));
+               assertEquals(registry.getJobSchedulingStatus(jid), 
JobSchedulingStatus.RUNNING);
 
                // set the job to finished and validate
                registry.setJobFinished(jid);
-               assertFalse(registry.isJobRunning(jid));
+               assertEquals(registry.getJobSchedulingStatus(jid), 
JobSchedulingStatus.DONE);
 
                // another registry should pick this up
                FsNegativeRunningJobsRegistry otherRegistry = new 
FsNegativeRunningJobsRegistry(new Path(uri));
-               assertFalse(otherRegistry.isJobRunning(jid));
+               assertTrue(otherRegistry.isJobRunning(jid));
+               assertEquals(otherRegistry.getJobSchedulingStatus(jid), 
JobSchedulingStatus.DONE);
        }
 
        @Test
@@ -73,13 +78,21 @@ public class FsNegativeRunningJobsRegistryTest extends 
TestLogger {
                // set the job to finished and validate
                registry.setJobFinished(jid);
                assertFalse(registry.isJobRunning(jid));
+               assertEquals(registry.getJobSchedulingStatus(jid), 
JobSchedulingStatus.DONE);
 
-               // set the job to back to running and validate
+               // set the job to running does not overwrite the finished status
                registry.setJobRunning(jid);
                assertTrue(registry.isJobRunning(jid));
+               assertEquals(registry.getJobSchedulingStatus(jid), 
JobSchedulingStatus.DONE);
 
                // another registry should pick this up
                FsNegativeRunningJobsRegistry otherRegistry = new 
FsNegativeRunningJobsRegistry(new Path(uri));
                assertTrue(otherRegistry.isJobRunning(jid));
+               assertEquals(registry.getJobSchedulingStatus(jid), 
JobSchedulingStatus.DONE);
+
+               // clear the running and finished marker, it will be pending
+               otherRegistry.clearJob(jid);
+               assertFalse(otherRegistry.isJobRunning(jid));
+               assertEquals(registry.getJobSchedulingStatus(jid), 
JobSchedulingStatus.PENDING);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e7011d78/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
index 72982c8..8c91898 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
@@ -22,6 +22,7 @@ import org.apache.curator.test.TestingServer;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import 
org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
 import org.apache.flink.util.TestLogger;
 import org.junit.After;
 import org.junit.Before;
@@ -29,7 +30,9 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class ZooKeeperRegistryTest extends TestLogger {
        private TestingServer testingServer;
@@ -61,14 +64,20 @@ public class ZooKeeperRegistryTest extends TestLogger {
 
                try {
                        JobID jobID = JobID.generate();
-                       assertTrue(!zkRegistry.isJobRunning(jobID));
+                       assertFalse(zkRegistry.isJobRunning(jobID));
+                       assertEquals(zkRegistry.getJobSchedulingStatus(jobID), 
JobSchedulingStatus.PENDING);
 
                        zkRegistry.setJobRunning(jobID);
                        assertTrue(zkRegistry.isJobRunning(jobID));
+                       assertEquals(zkRegistry.getJobSchedulingStatus(jobID), 
JobSchedulingStatus.RUNNING);
 
                        zkRegistry.setJobFinished(jobID);
-                       assertTrue(!zkRegistry.isJobRunning(jobID));
+                       assertEquals(zkRegistry.getJobSchedulingStatus(jobID), 
JobSchedulingStatus.DONE);
+                       assertFalse(zkRegistry.isJobRunning(jobID));
 
+                       zkRegistry.clearJob(jobID);
+                       assertFalse(zkRegistry.isJobRunning(jobID));
+                       assertEquals(zkRegistry.getJobSchedulingStatus(jobID), 
JobSchedulingStatus.PENDING);
                } finally {
                        if (zkHaService != null) {
                                zkHaService.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/e7011d78/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index 257212b..e2aa6ec 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -206,16 +205,6 @@ public class YarnFlinkApplicationMasterRunner extends 
AbstractYarnFlinkApplicati
                //TODO: generate the job graph from user's jar
                jobGraph = loadJobGraph(config);
 
-               // we first need to mark the job as running in the HA services, 
so that the
-               // JobManager leader will recognize that it as work to do
-               try {
-                       
haServices.getRunningJobsRegistry().setJobRunning(jobGraph.getJobID());
-               }
-               catch (Throwable t) {
-                       throw new JobExecutionException(jobGraph.getJobID(),
-                                       "Could not register the job at the 
high-availability services", t);
-               }
-
                // now the JobManagerRunner
                return new JobManagerRunner(
                                jobGraph, config,
@@ -226,6 +215,14 @@ public class YarnFlinkApplicationMasterRunner extends 
AbstractYarnFlinkApplicati
        }
 
        protected void shutdown(ApplicationStatus status, String msg) {
+               // Need to clear the job state in the HA services before 
shutdown
+               try {
+                       
haServices.getRunningJobsRegistry().clearJob(jobGraph.getJobID());
+               }
+               catch (Throwable t) {
+                       LOG.warn("Could not clear the job at the 
high-availability services", t);
+               }
+
                synchronized (lock) {
                        if (jobManagerRunner != null) {
                                try {

Reply via email to