[FLINK-5501] [runtime] Followups and improvements to RunningJobsRegistry

This commit changes the following:
  - Remove the unsafe 'isJobRunning()' method.
  - Exctract duplicate code into utility functions
  - Simplify the NonHaRegistry by using a map rather than two sets
  - Improve exception handling / error messages for the ZooKeeper-based registry
  - Slight improvement of error handling in the JobManagerRunner
  - Compare enums with '==' (better null-pointer safety)
  - Correct 'expected' and 'actual' parameters in 'assertEquals'
  - Forward tests also to the HDFS file based registry test


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0dede9f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0dede9f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0dede9f

Branch: refs/heads/master
Commit: e0dede9fb0a2ef7560254b6fc40d852ebf16c956
Parents: e7011d7
Author: Stephan Ewen <[email protected]>
Authored: Mon Feb 27 21:56:36 2017 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Tue Feb 28 18:59:10 2017 +0100

----------------------------------------------------------------------
 .../configuration/HighAvailabilityOptions.java  |  4 ++
 .../FsNegativeRunningJobsRegistryTest.java      | 40 +++++++----
 .../FsNegativeRunningJobsRegistry.java          | 68 +++++++-----------
 .../highavailability/RunningJobsRegistry.java   | 39 ++++++-----
 .../highavailability/ZookeeperRegistry.java     | 73 ++++++++++----------
 .../highavailability/nonha/NonHaRegistry.java   | 45 ++++--------
 .../runtime/jobmaster/JobManagerRunner.java     | 20 +++---
 .../minicluster/MiniClusterJobDispatcher.java   |  2 +-
 .../resourcemanager/JobLeaderIdService.java     |  4 +-
 .../FsNegativeRunningJobsRegistryTest.java      | 37 +++++-----
 .../highavailability/ZooKeeperRegistryTest.java | 33 ++++-----
 11 files changed, 166 insertions(+), 199 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
index 4792eba..b883bc3 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
@@ -124,6 +124,10 @@ public class HighAvailabilityOptions {
                        .defaultValue(3)
                        
.withDeprecatedKeys("recovery.zookeeper.client.max-retry-attempts");
 
+       public static final ConfigOption<String> 
ZOOKEEPER_RUNNING_JOB_REGISTRY_PATH = 
+                       key("high-availability.zookeeper.job.registry")
+                       .defaultValue("/running_job_registry/");
+
        // 
------------------------------------------------------------------------
 
        /** Not intended to be instantiated */

http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java
 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java
index 40d75e8..bb27b8b 100644
--- 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java
+++ 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.highavailability.FsNegativeRunningJobsRegistry;
 
+import 
org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 
@@ -33,9 +34,11 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for the {@link FsNegativeRunningJobsRegistry} on HDFS.
+ */
 public class FsNegativeRunningJobsRegistryTest {
 
        @ClassRule
@@ -83,20 +86,22 @@ public class FsNegativeRunningJobsRegistryTest {
 
                FsNegativeRunningJobsRegistry registry = new 
FsNegativeRunningJobsRegistry(workDir);
 
-               // initially, without any call, the job is considered running
-               assertTrue(registry.isJobRunning(jid));
+               // another registry should pick this up
+               FsNegativeRunningJobsRegistry otherRegistry = new 
FsNegativeRunningJobsRegistry(workDir);
+
+               // initially, without any call, the job is pending
+               assertEquals(JobSchedulingStatus.PENDING, 
registry.getJobSchedulingStatus(jid));
+               assertEquals(JobSchedulingStatus.PENDING, 
otherRegistry.getJobSchedulingStatus(jid));
 
-               // repeated setting should not affect the status
+               // after set running, the job is running
                registry.setJobRunning(jid);
-               assertTrue(registry.isJobRunning(jid));
+               assertEquals(JobSchedulingStatus.RUNNING, 
registry.getJobSchedulingStatus(jid));
+               assertEquals(JobSchedulingStatus.RUNNING, 
otherRegistry.getJobSchedulingStatus(jid));
 
                // set the job to finished and validate
                registry.setJobFinished(jid);
-               assertFalse(registry.isJobRunning(jid));
-
-               // another registry should pick this up
-               FsNegativeRunningJobsRegistry otherRegistry = new 
FsNegativeRunningJobsRegistry(workDir);
-               assertFalse(otherRegistry.isJobRunning(jid));
+               assertEquals(JobSchedulingStatus.DONE, 
registry.getJobSchedulingStatus(jid));
+               assertEquals(JobSchedulingStatus.DONE, 
otherRegistry.getJobSchedulingStatus(jid));
        }
 
        @Test
@@ -108,14 +113,19 @@ public class FsNegativeRunningJobsRegistryTest {
 
                // set the job to finished and validate
                registry.setJobFinished(jid);
-               assertFalse(registry.isJobRunning(jid));
+               assertEquals(JobSchedulingStatus.DONE, 
registry.getJobSchedulingStatus(jid));
 
-               // set the job to back to running and validate
+               // set the job to running does not overwrite the finished status
                registry.setJobRunning(jid);
-               assertTrue(registry.isJobRunning(jid));
+               assertEquals(JobSchedulingStatus.DONE, 
registry.getJobSchedulingStatus(jid));
 
                // another registry should pick this up
                FsNegativeRunningJobsRegistry otherRegistry = new 
FsNegativeRunningJobsRegistry(workDir);
-               assertTrue(otherRegistry.isJobRunning(jid));
+               assertEquals(JobSchedulingStatus.DONE, 
otherRegistry.getJobSchedulingStatus(jid));
+
+               // clear the running and finished marker, it will be pending
+               otherRegistry.clearJob(jid);
+               assertEquals(JobSchedulingStatus.PENDING, 
registry.getJobSchedulingStatus(jid));
+               assertEquals(JobSchedulingStatus.PENDING, 
otherRegistry.getJobSchedulingStatus(jid));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
index 9e92263..cb79a65 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.highavailability;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
 
 import java.io.FileNotFoundException;
@@ -30,19 +31,18 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * This {@link RunningJobsRegistry} tracks the status jobs via marker files,
- * marking running jobs via running marker files,
- * marking finished jobs via finished marker files.
+ * marking running jobs viarunning marker files, marking finished jobs via 
finished marker files.
  * 
  * <p>The general contract is the following:
  * <ul>
  *     <li>Initially, a marker file does not exist (no one created it, yet), 
which means
- *         the specific job is pending</li>
+ *         the specific job is pending.</li>
  *     <li>The first JobManager that granted leadership calls this service to 
create the running marker file,
  *         which marks the job as running.</li>
+ *     <li>If a JobManager gains leadership but sees the running marker file,
+ *         it will realize that the job has been scheduled already and needs 
reconciling.</li>
  *     <li>The JobManager that finishes calls this service to create the 
marker file,
  *         which marks the job as finished.</li>
- *     <li>If a JobManager gains leadership but see the running marker file,
- *         it will realize that the job has been scheduled and need 
reconciling.</li>
  *     <li>If a JobManager gains leadership at some point when shutdown is in 
progress,
  *         it will see the marker file and realize that the job is 
finished.</li>
  *     <li>The application framework is expected to clean the file once the 
application
@@ -50,7 +50,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *         start the job, even if it gains leadership.</li>
  * </ul>
  * 
- * <p>It is especially tailored towards deployment modes like for example
+ * <p>This registry is especially tailored towards deployment modes like for 
example
  * YARN, where HDFS is available as a persistent file system, and the YARN
  * application's working directories on HDFS are automatically cleaned
  * up after the application completed. 
@@ -99,8 +99,8 @@ public class FsNegativeRunningJobsRegistry implements 
RunningJobsRegistry {
                // to be safe, attempt to write to the working directory, to
                // catch problems early
                final Path testFile = new Path(workingDirectory, 
".registry_test");
-               try (FSDataOutputStream out = fileSystem.create(testFile, 
false)) {
-                       out.write(42);
+               try {
+                       createFile(testFile, false);
                }
                catch (IOException e) {
                        throw new IOException("Unable to write to working 
directory: " + workingDirectory, e);
@@ -119,9 +119,7 @@ public class FsNegativeRunningJobsRegistry implements 
RunningJobsRegistry {
 
                // create the file
                // to avoid an exception if the job already exists, set 
overwrite=true
-               try (FSDataOutputStream out = fileSystem.create(filePath, 
true)) {
-                       out.write(42);
-               }
+               createFile(filePath, true);
        }
 
        @Override
@@ -131,25 +129,7 @@ public class FsNegativeRunningJobsRegistry implements 
RunningJobsRegistry {
 
                // create the file
                // to avoid an exception if the job already exists, set 
overwrite=true
-               try (FSDataOutputStream out = fileSystem.create(filePath, 
true)) {
-                       out.write(42);
-               }
-       }
-
-       @Override
-       public boolean isJobRunning(JobID jobID) throws IOException {
-               checkNotNull(jobID, "jobID");
-
-               // check for the existence of the file
-               try {
-                       
fileSystem.getFileStatus(createMarkerFilePath(RUNNING_PREFIX, jobID));
-                       // file was found --> job is running
-                       return true;
-               }
-               catch (FileNotFoundException e) {
-                       // file does not exist, job is not running
-                       return false;
-               }
+               createFile(filePath, true);
        }
 
        @Override
@@ -157,21 +137,16 @@ public class FsNegativeRunningJobsRegistry implements 
RunningJobsRegistry {
                checkNotNull(jobID, "jobID");
 
                // first check for the existence of the complete file
-               try {
-                       
fileSystem.getFileStatus(createMarkerFilePath(DONE_PREFIX, jobID));
+               if (fileSystem.exists(createMarkerFilePath(DONE_PREFIX, 
jobID))) {
                        // complete file was found --> job is terminated
                        return JobSchedulingStatus.DONE;
                }
-               catch (FileNotFoundException e) {
-                       // file does not exist, job is running or pending
-               }
                // check for the existence of the running file
-               try {
-                       
fileSystem.getFileStatus(createMarkerFilePath(RUNNING_PREFIX, jobID));
+               else if (fileSystem.exists(createMarkerFilePath(RUNNING_PREFIX, 
jobID))) {
                        // running file was found --> job is terminated
                        return JobSchedulingStatus.RUNNING;
                }
-               catch (FileNotFoundException e) {
+               else {
                        // file does not exist, job is not scheduled
                        return JobSchedulingStatus.PENDING;
                }
@@ -181,25 +156,30 @@ public class FsNegativeRunningJobsRegistry implements 
RunningJobsRegistry {
        public void clearJob(JobID jobID) throws IOException {
                checkNotNull(jobID, "jobID");
                final Path runningFilePath = 
createMarkerFilePath(RUNNING_PREFIX, jobID);
+               final Path doneFilePath = createMarkerFilePath(DONE_PREFIX, 
jobID);
 
                // delete the running marker file, if it exists
                try {
                        fileSystem.delete(runningFilePath, false);
                }
-               catch (FileNotFoundException e) {
-               }
-
-               final Path doneFilePath = createMarkerFilePath(DONE_PREFIX, 
jobID);
+               catch (FileNotFoundException ignored) {}
 
                // delete the finished marker file, if it exists
                try {
                        fileSystem.delete(doneFilePath, false);
                }
-               catch (FileNotFoundException e) {
-               }
+               catch (FileNotFoundException ignored) {}
        }
 
        private Path createMarkerFilePath(String prefix, JobID jobId) {
                return new Path(basePath, prefix + jobId.toString());
        }
+
+       private void createFile(Path path, boolean overwrite) throws 
IOException {
+               final WriteMode writeMode = overwrite ? WriteMode.OVERWRITE : 
WriteMode.NO_OVERWRITE;
+
+               try (FSDataOutputStream out = fileSystem.create(path, 
writeMode)) {
+                       out.write(42);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
index 020f2ca..43e5ac5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
@@ -23,29 +23,40 @@ import org.apache.flink.api.common.JobID;
 import java.io.IOException;
 
 /**
- * This registry tracks if a certain job is running.
+ * A simple registry that tracks if a certain job is pending execution, 
running, or completed.
  * 
  * <p>This registry is used in highly-available setups with multiple master 
nodes,
  * to determine whether a new leader should attempt to recover a certain job 
(because the 
  * job is still running), or whether the job has already finished successfully 
(in case of a
  * finite job) and the leader has only been granted leadership because the 
previous leader
  * quit cleanly after the job was finished.
+ * 
+ * <p>In addition, the registry can help to determine whether a newly assigned 
leader JobManager
+ * should attempt reconciliation with running TaskManagers, or immediately 
schedule the job from
+ * the latest checkpoint/savepoint. 
  */
 public interface RunningJobsRegistry {
 
-       public enum JobSchedulingStatus {
-               /** Job has not been scheduled */
+       /**
+        * The scheduling status of a job, as maintained by the {@code 
RunningJobsRegistry}.
+        */
+       enum JobSchedulingStatus {
+
+               /** Job has not been scheduled, yet */
                PENDING,
 
-               /** Job has been scheduled */
+               /** Job has been scheduled and is not yet finished */
                RUNNING,
 
-               /** Job has been finished */
+               /** Job has been finished, successfully or unsuccessfully */
                DONE;
        }
 
+       // 
------------------------------------------------------------------------
+
        /**
-        * Marks a job as running.
+        * Marks a job as running. Requesting the job's status via the {@link 
#getJobSchedulingStatus(JobID)}
+        * method will return {@link JobSchedulingStatus#RUNNING}.
         * 
         * @param jobID The id of the job.
         *
@@ -55,7 +66,8 @@ public interface RunningJobsRegistry {
        void setJobRunning(JobID jobID) throws IOException;
 
        /**
-        * Marks a job as running.
+        * Marks a job as completed. Requesting the job's status via the {@link 
#getJobSchedulingStatus(JobID)}
+        * method will return {@link JobSchedulingStatus#DONE}.
         *
         * @param jobID The id of the job.
         * 
@@ -65,18 +77,7 @@ public interface RunningJobsRegistry {
        void setJobFinished(JobID jobID) throws IOException;
 
        /**
-        * Checks whether a job is running.
-        *
-        * @param jobID The id of the job to check.
-        * @return True if the job is still running, false otherwise.
-        * 
-        * @throws IOException Thrown when the communication with the 
highly-available storage or registry
-        *                     failed and could not be retried.
-        */
-       boolean isJobRunning(JobID jobID) throws IOException;
-
-       /**
-        * Get the scheduing status of a job.
+        * Gets the scheduling status of a job.
         *
         * @param jobID The id of the job to check.
         * @return The job scheduling status.

http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
index 31a4535..a8be35a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
@@ -26,6 +26,7 @@ import org.apache.zookeeper.data.Stat;
 
 import java.io.IOException;
 import java.nio.charset.Charset;
+import java.util.Arrays;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -33,20 +34,17 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * A zookeeper based registry for running jobs, highly available.
  */
 public class ZookeeperRegistry implements RunningJobsRegistry {
-       
-       private static final String DEFAULT_HA_JOB_REGISTRY_PATH = 
"/running_job_registry/";
+
+       private static final Charset ENCODING = Charset.forName("utf-8");
 
        /** The ZooKeeper client to use */
        private final CuratorFramework client;
 
        private final String runningJobPath;
 
-       private static final String HA_JOB_REGISTRY_PATH = 
"high-availability.zookeeper.job.registry";
-
        public ZookeeperRegistry(final CuratorFramework client, final 
Configuration configuration) {
-               this.client = client;
-               runningJobPath = 
configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_ROOT) + 
-                       configuration.getString(HA_JOB_REGISTRY_PATH, 
DEFAULT_HA_JOB_REGISTRY_PATH);
+               this.client = checkNotNull(client, "client");
+               this.runningJobPath = 
configuration.getString(HighAvailabilityOptions.ZOOKEEPER_RUNNING_JOB_REGISTRY_PATH);
        }
 
        @Override
@@ -54,12 +52,10 @@ public class ZookeeperRegistry implements 
RunningJobsRegistry {
                checkNotNull(jobID);
 
                try {
-                       String zkPath = runningJobPath + jobID.toString();
-                       
this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
-                       this.client.setData().forPath(zkPath, 
JobSchedulingStatus.RUNNING.name().getBytes(Charset.forName("utf-8")));
+                       writeEnumToZooKeeper(jobID, 
JobSchedulingStatus.RUNNING);
                }
                catch (Exception e) {
-                       throw new IOException("Set running state to zk fail for 
job " + jobID.toString(), e);
+                       throw new IOException("Failed to set RUNNING state in 
ZooKeeper for job " + jobID, e);
                }
        }
 
@@ -68,44 +64,36 @@ public class ZookeeperRegistry implements 
RunningJobsRegistry {
                checkNotNull(jobID);
 
                try {
-                       String zkPath = runningJobPath + jobID.toString();
-                       
this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
-                       this.client.setData().forPath(zkPath, 
JobSchedulingStatus.DONE.name().getBytes(Charset.forName("utf-8")));
+                       writeEnumToZooKeeper(jobID, JobSchedulingStatus.DONE);
                }
                catch (Exception e) {
-                       throw new IOException("Set finished state to zk fail 
for job " + jobID.toString(), e);
+                       throw new IOException("Failed to set DONE state in 
ZooKeeper for job " + jobID, e);
                }
        }
 
        @Override
-       public boolean isJobRunning(JobID jobID) throws IOException {
+       public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws 
IOException {
                checkNotNull(jobID);
 
                try {
-                       Stat stat = client.checkExists().forPath(runningJobPath 
+ jobID.toString());
+                       final String zkPath = createZkPath(jobID);
+                       final Stat stat = client.checkExists().forPath(zkPath);
                        if (stat != null) {
-                               byte[] data = 
client.getData().forPath(runningJobPath + jobID.toString());
-                               if 
(JobSchedulingStatus.RUNNING.name().equals(new String(data))) {
-                                       return true;
+                               // found some data, try to parse it
+                               final byte[] data = 
client.getData().forPath(zkPath);
+                               if (data != null) {
+                                       try {
+                                               final String name = new 
String(data, ENCODING);
+                                               return 
JobSchedulingStatus.valueOf(name);
+                                       }
+                                       catch (IllegalArgumentException e) {
+                                               throw new IOException("Found 
corrupt data in ZooKeeper: " + 
+                                                               
Arrays.toString(data) + " is no valid job status");
+                                       }
                                }
                        }
-                       return false;
-               }
-               catch (Exception e) {
-                       throw new IOException("Get running state from zk fail 
for job " + jobID.toString(), e);
-               }
-       }
 
-       @Override
-       public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws 
IOException {
-               checkNotNull(jobID);
-
-               try {
-                       Stat stat = client.checkExists().forPath(runningJobPath 
+ jobID.toString());
-                       if (stat != null) {
-                               byte[] data = 
client.getData().forPath(runningJobPath + jobID.toString());
-                               return JobSchedulingStatus.valueOf(new 
String(data));
-                       }
+                       // nothing found, yet, must be in status 'PENDING'
                        return JobSchedulingStatus.PENDING;
                }
                catch (Exception e) {
@@ -118,13 +106,22 @@ public class ZookeeperRegistry implements 
RunningJobsRegistry {
                checkNotNull(jobID);
 
                try {
-                       String zkPath = runningJobPath + jobID.toString();
+                       final String zkPath = createZkPath(jobID);
                        
this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
                        this.client.delete().forPath(zkPath);
                }
                catch (Exception e) {
-                       throw new IOException("Clear job state from zk fail for 
" + jobID.toString(), e);
+                       throw new IOException("Failed to clear job state from 
ZooKeeper for job " + jobID, e);
                }
        }
 
+       private String createZkPath(JobID jobID) {
+               return runningJobPath + jobID.toString();
+       }
+
+       private void writeEnumToZooKeeper(JobID jobID, JobSchedulingStatus 
status) throws Exception {
+               final String zkPath = createZkPath(jobID);
+               
this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
+               this.client.setData().forPath(zkPath, 
status.name().getBytes(ENCODING));
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
index e331212..ab1ce47 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
@@ -20,9 +20,8 @@ package org.apache.flink.runtime.highavailability.nonha;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
-import 
org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
 
-import java.util.HashSet;
+import java.util.HashMap;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -32,18 +31,14 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 public class NonHaRegistry implements RunningJobsRegistry {
 
        /** The currently running jobs */
-       private final HashSet<JobID> running = new HashSet<>();
-
-       /** The currently finished jobs */
-       private final HashSet<JobID> finished = new HashSet<>();
+       private final HashMap<JobID, JobSchedulingStatus> jobStatus = new 
HashMap<>();
 
        @Override
        public void setJobRunning(JobID jobID) {
                checkNotNull(jobID);
 
-               synchronized (running) {
-                       running.add(jobID);
-                       finished.remove(jobID);
+               synchronized (jobStatus) {
+                       jobStatus.put(jobID, JobSchedulingStatus.RUNNING);
                }
        }
 
@@ -51,33 +46,18 @@ public class NonHaRegistry implements RunningJobsRegistry {
        public void setJobFinished(JobID jobID) {
                checkNotNull(jobID);
 
-               synchronized (running) {
-                       running.remove(jobID);
-                       finished.add(jobID);
-               }
-       }
-
-       @Override
-       public boolean isJobRunning(JobID jobID) {
-               checkNotNull(jobID);
-
-               synchronized (running) {
-                       return running.contains(jobID);
+               synchronized (jobStatus) {
+                       jobStatus.put(jobID, JobSchedulingStatus.DONE);
                }
        }
 
        @Override
        public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) {
                checkNotNull(jobID);
-
-               synchronized (running) {
-                       if (finished.contains(jobID)) {
-                               return JobSchedulingStatus.DONE;
-                       } else if (running.contains(jobID)) {
-                               return JobSchedulingStatus.RUNNING;
-                       } else {
-                               return JobSchedulingStatus.PENDING;
-                       }
+               
+               synchronized (jobStatus) {
+                       JobSchedulingStatus status = jobStatus.get(jobID);
+                       return status == null ? JobSchedulingStatus.PENDING : 
status;
                }
        }
 
@@ -85,9 +65,8 @@ public class NonHaRegistry implements RunningJobsRegistry {
        public void clearJob(JobID jobID) {
                checkNotNull(jobID);
 
-               synchronized (running) {
-                       running.remove(jobID);
-                       finished.remove(jobID);
+               synchronized (jobStatus) {
+                       jobStatus.remove(jobID);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 6bebd90..6e02813 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -360,20 +360,22 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
                        // it's okay that job manager wait for the operation 
complete
                        
leaderElectionService.confirmLeaderSessionID(leaderSessionID);
 
-                       JobSchedulingStatus schedulingStatus = 
JobSchedulingStatus.PENDING;
+                       final JobSchedulingStatus schedulingStatus;
                        try {
                                schedulingStatus = 
runningJobsRegistry.getJobSchedulingStatus(jobGraph.getJobID());
-                               if 
(schedulingStatus.equals(JobSchedulingStatus.DONE)) {
-                                       log.info("Granted leader ship but job 
{} has been finished. ", jobGraph.getJobID());
-                                       jobFinishedByOther();
-                                       return;
-                               }
-                       } catch (Throwable t) {
-                               log.error("Could not access status 
(running/finished) of job {}. ",     jobGraph.getJobID(), t);
+                       }
+                       catch (Throwable t) {
+                               log.error("Could not access status 
(running/finished) of job {}. ", jobGraph.getJobID(), t);
                                onFatalError(t);
                                return;
                        }
 
+                       if (schedulingStatus == JobSchedulingStatus.DONE) {
+                               log.info("Granted leader ship but job {} has 
been finished. ", jobGraph.getJobID());
+                               jobFinishedByOther();
+                               return;
+                       }
+
                        // Double check the leadership after we confirm that, 
there is a small chance that multiple
                        // job managers schedule the same job after if they try 
to recover at the same time.
                        // This will eventually be noticed, but can not be 
ruled out from the beginning.
@@ -382,7 +384,7 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
                                        // Now set the running status is after 
getting leader ship and 
                                        // set finished status after job in 
terminated status.
                                        // So if finding the job is running, it 
means someone has already run the job, need recover.
-                                       if 
(schedulingStatus.equals(JobSchedulingStatus.PENDING)) {
+                                       if (schedulingStatus == 
JobSchedulingStatus.PENDING) {
                                                
runningJobsRegistry.setJobRunning(jobGraph.getJobID());
                                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index 9178684..dd80ada 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -271,7 +271,7 @@ public class MiniClusterJobDispatcher {
                        haServices.getRunningJobsRegistry().clearJob(jobID);
                }
                catch (Throwable t) {
-                       LOG.warn("Could not clear the job {} at the 
high-availability services", jobID.toString(), t);
+                       LOG.warn("Could not clear job {} at the status registry 
of the high-availability services", jobID, t);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
index 6c7e249..7ef39de 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import 
org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.util.ExceptionUtils;
@@ -244,7 +245,8 @@ public class JobLeaderIdService {
                                }
 
                                try {
-                                       if 
(runningJobsRegistry.isJobRunning(jobId)) {
+                                       final JobSchedulingStatus jobStatus = 
runningJobsRegistry.getJobSchedulingStatus(jobId);
+                                       if (jobStatus == 
JobSchedulingStatus.PENDING || jobStatus == JobSchedulingStatus.RUNNING) {
                                                if (leaderSessionId == null) {
                                                        // there is no new 
leader
                                                        if (previousJobLeaderId 
!= null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
index bbafcf0..b0c7778 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
@@ -30,9 +30,10 @@ import org.junit.rules.TemporaryFolder;
 import java.io.File;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for the {@link FsNegativeRunningJobsRegistry} on a local file system.
+ */
 public class FsNegativeRunningJobsRegistryTest extends TestLogger {
 
        @Rule
@@ -47,23 +48,22 @@ public class FsNegativeRunningJobsRegistryTest extends 
TestLogger {
 
                FsNegativeRunningJobsRegistry registry = new 
FsNegativeRunningJobsRegistry(new Path(uri));
 
+               // another registry should pick this up
+               FsNegativeRunningJobsRegistry otherRegistry = new 
FsNegativeRunningJobsRegistry(new Path(uri));
+
                // initially, without any call, the job is pending
-               assertFalse(registry.isJobRunning(jid));
-               assertEquals(registry.getJobSchedulingStatus(jid), 
JobSchedulingStatus.PENDING);
+               assertEquals(JobSchedulingStatus.PENDING, 
registry.getJobSchedulingStatus(jid));
+               assertEquals(JobSchedulingStatus.PENDING, 
otherRegistry.getJobSchedulingStatus(jid));
 
                // after set running, the job is running
                registry.setJobRunning(jid);
-               assertTrue(registry.isJobRunning(jid));
-               assertEquals(registry.getJobSchedulingStatus(jid), 
JobSchedulingStatus.RUNNING);
+               assertEquals(JobSchedulingStatus.RUNNING, 
registry.getJobSchedulingStatus(jid));
+               assertEquals(JobSchedulingStatus.RUNNING, 
otherRegistry.getJobSchedulingStatus(jid));
 
                // set the job to finished and validate
                registry.setJobFinished(jid);
-               assertEquals(registry.getJobSchedulingStatus(jid), 
JobSchedulingStatus.DONE);
-
-               // another registry should pick this up
-               FsNegativeRunningJobsRegistry otherRegistry = new 
FsNegativeRunningJobsRegistry(new Path(uri));
-               assertTrue(otherRegistry.isJobRunning(jid));
-               assertEquals(otherRegistry.getJobSchedulingStatus(jid), 
JobSchedulingStatus.DONE);
+               assertEquals(JobSchedulingStatus.DONE, 
registry.getJobSchedulingStatus(jid));
+               assertEquals(JobSchedulingStatus.DONE, 
otherRegistry.getJobSchedulingStatus(jid));
        }
 
        @Test
@@ -77,22 +77,19 @@ public class FsNegativeRunningJobsRegistryTest extends 
TestLogger {
 
                // set the job to finished and validate
                registry.setJobFinished(jid);
-               assertFalse(registry.isJobRunning(jid));
-               assertEquals(registry.getJobSchedulingStatus(jid), 
JobSchedulingStatus.DONE);
+               assertEquals(JobSchedulingStatus.DONE, 
registry.getJobSchedulingStatus(jid));
 
                // set the job to running does not overwrite the finished status
                registry.setJobRunning(jid);
-               assertTrue(registry.isJobRunning(jid));
-               assertEquals(registry.getJobSchedulingStatus(jid), 
JobSchedulingStatus.DONE);
+               assertEquals(JobSchedulingStatus.DONE, 
registry.getJobSchedulingStatus(jid));
 
                // another registry should pick this up
                FsNegativeRunningJobsRegistry otherRegistry = new 
FsNegativeRunningJobsRegistry(new Path(uri));
-               assertTrue(otherRegistry.isJobRunning(jid));
-               assertEquals(registry.getJobSchedulingStatus(jid), 
JobSchedulingStatus.DONE);
+               assertEquals(JobSchedulingStatus.DONE, 
otherRegistry.getJobSchedulingStatus(jid));
 
                // clear the running and finished marker, it will be pending
                otherRegistry.clearJob(jid);
-               assertFalse(otherRegistry.isJobRunning(jid));
-               assertEquals(registry.getJobSchedulingStatus(jid), 
JobSchedulingStatus.PENDING);
+               assertEquals(JobSchedulingStatus.PENDING, 
registry.getJobSchedulingStatus(jid));
+               assertEquals(JobSchedulingStatus.PENDING, 
otherRegistry.getJobSchedulingStatus(jid));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
index 8c91898..b1881e5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
@@ -22,22 +22,21 @@ import org.apache.curator.test.TestingServer;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.concurrent.Executors;
 import 
org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+
 
 public class ZooKeeperRegistryTest extends TestLogger {
-       private TestingServer testingServer;
 
-       private static Logger LOG = 
LoggerFactory.getLogger(ZooKeeperRegistryTest.class);
+       private TestingServer testingServer;
 
        @Before
        public void before() throws Exception {
@@ -59,29 +58,25 @@ public class ZooKeeperRegistryTest extends TestLogger {
                
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
testingServer.getConnectString());
                configuration.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");
 
-               HighAvailabilityServices zkHaService = 
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration);
-               RunningJobsRegistry zkRegistry = 
zkHaService.getRunningJobsRegistry();
+               final HighAvailabilityServices zkHaService = new 
ZookeeperHaServices(
+                               
ZooKeeperUtils.startCuratorFramework(configuration), 
Executors.directExecutor(), configuration);
+
+               final RunningJobsRegistry zkRegistry = 
zkHaService.getRunningJobsRegistry();
 
                try {
                        JobID jobID = JobID.generate();
-                       assertFalse(zkRegistry.isJobRunning(jobID));
-                       assertEquals(zkRegistry.getJobSchedulingStatus(jobID), 
JobSchedulingStatus.PENDING);
+                       assertEquals(JobSchedulingStatus.PENDING, 
zkRegistry.getJobSchedulingStatus(jobID));
 
                        zkRegistry.setJobRunning(jobID);
-                       assertTrue(zkRegistry.isJobRunning(jobID));
-                       assertEquals(zkRegistry.getJobSchedulingStatus(jobID), 
JobSchedulingStatus.RUNNING);
+                       assertEquals(JobSchedulingStatus.RUNNING, 
zkRegistry.getJobSchedulingStatus(jobID));
 
                        zkRegistry.setJobFinished(jobID);
-                       assertEquals(zkRegistry.getJobSchedulingStatus(jobID), 
JobSchedulingStatus.DONE);
-                       assertFalse(zkRegistry.isJobRunning(jobID));
+                       assertEquals(JobSchedulingStatus.DONE, 
zkRegistry.getJobSchedulingStatus(jobID));
 
                        zkRegistry.clearJob(jobID);
-                       assertFalse(zkRegistry.isJobRunning(jobID));
-                       assertEquals(zkRegistry.getJobSchedulingStatus(jobID), 
JobSchedulingStatus.PENDING);
+                       assertEquals(JobSchedulingStatus.PENDING, 
zkRegistry.getJobSchedulingStatus(jobID));
                } finally {
-                       if (zkHaService != null) {
-                               zkHaService.close();
-                       }
+                       zkHaService.close();
                }
        }
 }

Reply via email to