[FLINK-5501] [runtime] Followups and improvements to RunningJobsRegistry This commit changes the following: - Remove the unsafe 'isJobRunning()' method. - Exctract duplicate code into utility functions - Simplify the NonHaRegistry by using a map rather than two sets - Improve exception handling / error messages for the ZooKeeper-based registry - Slight improvement of error handling in the JobManagerRunner - Compare enums with '==' (better null-pointer safety) - Correct 'expected' and 'actual' parameters in 'assertEquals' - Forward tests also to the HDFS file based registry test
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0dede9f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0dede9f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0dede9f Branch: refs/heads/master Commit: e0dede9fb0a2ef7560254b6fc40d852ebf16c956 Parents: e7011d7 Author: Stephan Ewen <[email protected]> Authored: Mon Feb 27 21:56:36 2017 +0100 Committer: Stephan Ewen <[email protected]> Committed: Tue Feb 28 18:59:10 2017 +0100 ---------------------------------------------------------------------- .../configuration/HighAvailabilityOptions.java | 4 ++ .../FsNegativeRunningJobsRegistryTest.java | 40 +++++++---- .../FsNegativeRunningJobsRegistry.java | 68 +++++++----------- .../highavailability/RunningJobsRegistry.java | 39 ++++++----- .../highavailability/ZookeeperRegistry.java | 73 ++++++++++---------- .../highavailability/nonha/NonHaRegistry.java | 45 ++++-------- .../runtime/jobmaster/JobManagerRunner.java | 20 +++--- .../minicluster/MiniClusterJobDispatcher.java | 2 +- .../resourcemanager/JobLeaderIdService.java | 4 +- .../FsNegativeRunningJobsRegistryTest.java | 37 +++++----- .../highavailability/ZooKeeperRegistryTest.java | 33 ++++----- 11 files changed, 166 insertions(+), 199 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java index 4792eba..b883bc3 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java @@ -124,6 +124,10 @@ public class HighAvailabilityOptions { .defaultValue(3) .withDeprecatedKeys("recovery.zookeeper.client.max-retry-attempts"); + public static final ConfigOption<String> ZOOKEEPER_RUNNING_JOB_REGISTRY_PATH = + key("high-availability.zookeeper.job.registry") + .defaultValue("/running_job_registry/"); + // ------------------------------------------------------------------------ /** Not intended to be instantiated */ http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java index 40d75e8..bb27b8b 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.highavailability.FsNegativeRunningJobsRegistry; +import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -33,9 +34,11 @@ import org.junit.rules.TemporaryFolder; import java.io.File; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; +/** + * Tests for the {@link FsNegativeRunningJobsRegistry} on HDFS. + */ public class FsNegativeRunningJobsRegistryTest { @ClassRule @@ -83,20 +86,22 @@ public class FsNegativeRunningJobsRegistryTest { FsNegativeRunningJobsRegistry registry = new FsNegativeRunningJobsRegistry(workDir); - // initially, without any call, the job is considered running - assertTrue(registry.isJobRunning(jid)); + // another registry should pick this up + FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(workDir); + + // initially, without any call, the job is pending + assertEquals(JobSchedulingStatus.PENDING, registry.getJobSchedulingStatus(jid)); + assertEquals(JobSchedulingStatus.PENDING, otherRegistry.getJobSchedulingStatus(jid)); - // repeated setting should not affect the status + // after set running, the job is running registry.setJobRunning(jid); - assertTrue(registry.isJobRunning(jid)); + assertEquals(JobSchedulingStatus.RUNNING, registry.getJobSchedulingStatus(jid)); + assertEquals(JobSchedulingStatus.RUNNING, otherRegistry.getJobSchedulingStatus(jid)); // set the job to finished and validate registry.setJobFinished(jid); - assertFalse(registry.isJobRunning(jid)); - - // another registry should pick this up - FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(workDir); - assertFalse(otherRegistry.isJobRunning(jid)); + assertEquals(JobSchedulingStatus.DONE, registry.getJobSchedulingStatus(jid)); + assertEquals(JobSchedulingStatus.DONE, otherRegistry.getJobSchedulingStatus(jid)); } @Test @@ -108,14 +113,19 @@ public class FsNegativeRunningJobsRegistryTest { // set the job to finished and validate registry.setJobFinished(jid); - assertFalse(registry.isJobRunning(jid)); + assertEquals(JobSchedulingStatus.DONE, registry.getJobSchedulingStatus(jid)); - // set the job to back to running and validate + // set the job to running does not overwrite the finished status registry.setJobRunning(jid); - assertTrue(registry.isJobRunning(jid)); + assertEquals(JobSchedulingStatus.DONE, registry.getJobSchedulingStatus(jid)); // another registry should pick this up FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(workDir); - assertTrue(otherRegistry.isJobRunning(jid)); + assertEquals(JobSchedulingStatus.DONE, otherRegistry.getJobSchedulingStatus(jid)); + + // clear the running and finished marker, it will be pending + otherRegistry.clearJob(jid); + assertEquals(JobSchedulingStatus.PENDING, registry.getJobSchedulingStatus(jid)); + assertEquals(JobSchedulingStatus.PENDING, otherRegistry.getJobSchedulingStatus(jid)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java index 9e92263..cb79a65 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.highavailability; import org.apache.flink.api.common.JobID; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.core.fs.Path; import java.io.FileNotFoundException; @@ -30,19 +31,18 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** * This {@link RunningJobsRegistry} tracks the status jobs via marker files, - * marking running jobs via running marker files, - * marking finished jobs via finished marker files. + * marking running jobs viarunning marker files, marking finished jobs via finished marker files. * * <p>The general contract is the following: * <ul> * <li>Initially, a marker file does not exist (no one created it, yet), which means - * the specific job is pending</li> + * the specific job is pending.</li> * <li>The first JobManager that granted leadership calls this service to create the running marker file, * which marks the job as running.</li> + * <li>If a JobManager gains leadership but sees the running marker file, + * it will realize that the job has been scheduled already and needs reconciling.</li> * <li>The JobManager that finishes calls this service to create the marker file, * which marks the job as finished.</li> - * <li>If a JobManager gains leadership but see the running marker file, - * it will realize that the job has been scheduled and need reconciling.</li> * <li>If a JobManager gains leadership at some point when shutdown is in progress, * it will see the marker file and realize that the job is finished.</li> * <li>The application framework is expected to clean the file once the application @@ -50,7 +50,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * start the job, even if it gains leadership.</li> * </ul> * - * <p>It is especially tailored towards deployment modes like for example + * <p>This registry is especially tailored towards deployment modes like for example * YARN, where HDFS is available as a persistent file system, and the YARN * application's working directories on HDFS are automatically cleaned * up after the application completed. @@ -99,8 +99,8 @@ public class FsNegativeRunningJobsRegistry implements RunningJobsRegistry { // to be safe, attempt to write to the working directory, to // catch problems early final Path testFile = new Path(workingDirectory, ".registry_test"); - try (FSDataOutputStream out = fileSystem.create(testFile, false)) { - out.write(42); + try { + createFile(testFile, false); } catch (IOException e) { throw new IOException("Unable to write to working directory: " + workingDirectory, e); @@ -119,9 +119,7 @@ public class FsNegativeRunningJobsRegistry implements RunningJobsRegistry { // create the file // to avoid an exception if the job already exists, set overwrite=true - try (FSDataOutputStream out = fileSystem.create(filePath, true)) { - out.write(42); - } + createFile(filePath, true); } @Override @@ -131,25 +129,7 @@ public class FsNegativeRunningJobsRegistry implements RunningJobsRegistry { // create the file // to avoid an exception if the job already exists, set overwrite=true - try (FSDataOutputStream out = fileSystem.create(filePath, true)) { - out.write(42); - } - } - - @Override - public boolean isJobRunning(JobID jobID) throws IOException { - checkNotNull(jobID, "jobID"); - - // check for the existence of the file - try { - fileSystem.getFileStatus(createMarkerFilePath(RUNNING_PREFIX, jobID)); - // file was found --> job is running - return true; - } - catch (FileNotFoundException e) { - // file does not exist, job is not running - return false; - } + createFile(filePath, true); } @Override @@ -157,21 +137,16 @@ public class FsNegativeRunningJobsRegistry implements RunningJobsRegistry { checkNotNull(jobID, "jobID"); // first check for the existence of the complete file - try { - fileSystem.getFileStatus(createMarkerFilePath(DONE_PREFIX, jobID)); + if (fileSystem.exists(createMarkerFilePath(DONE_PREFIX, jobID))) { // complete file was found --> job is terminated return JobSchedulingStatus.DONE; } - catch (FileNotFoundException e) { - // file does not exist, job is running or pending - } // check for the existence of the running file - try { - fileSystem.getFileStatus(createMarkerFilePath(RUNNING_PREFIX, jobID)); + else if (fileSystem.exists(createMarkerFilePath(RUNNING_PREFIX, jobID))) { // running file was found --> job is terminated return JobSchedulingStatus.RUNNING; } - catch (FileNotFoundException e) { + else { // file does not exist, job is not scheduled return JobSchedulingStatus.PENDING; } @@ -181,25 +156,30 @@ public class FsNegativeRunningJobsRegistry implements RunningJobsRegistry { public void clearJob(JobID jobID) throws IOException { checkNotNull(jobID, "jobID"); final Path runningFilePath = createMarkerFilePath(RUNNING_PREFIX, jobID); + final Path doneFilePath = createMarkerFilePath(DONE_PREFIX, jobID); // delete the running marker file, if it exists try { fileSystem.delete(runningFilePath, false); } - catch (FileNotFoundException e) { - } - - final Path doneFilePath = createMarkerFilePath(DONE_PREFIX, jobID); + catch (FileNotFoundException ignored) {} // delete the finished marker file, if it exists try { fileSystem.delete(doneFilePath, false); } - catch (FileNotFoundException e) { - } + catch (FileNotFoundException ignored) {} } private Path createMarkerFilePath(String prefix, JobID jobId) { return new Path(basePath, prefix + jobId.toString()); } + + private void createFile(Path path, boolean overwrite) throws IOException { + final WriteMode writeMode = overwrite ? WriteMode.OVERWRITE : WriteMode.NO_OVERWRITE; + + try (FSDataOutputStream out = fileSystem.create(path, writeMode)) { + out.write(42); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java index 020f2ca..43e5ac5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java @@ -23,29 +23,40 @@ import org.apache.flink.api.common.JobID; import java.io.IOException; /** - * This registry tracks if a certain job is running. + * A simple registry that tracks if a certain job is pending execution, running, or completed. * * <p>This registry is used in highly-available setups with multiple master nodes, * to determine whether a new leader should attempt to recover a certain job (because the * job is still running), or whether the job has already finished successfully (in case of a * finite job) and the leader has only been granted leadership because the previous leader * quit cleanly after the job was finished. + * + * <p>In addition, the registry can help to determine whether a newly assigned leader JobManager + * should attempt reconciliation with running TaskManagers, or immediately schedule the job from + * the latest checkpoint/savepoint. */ public interface RunningJobsRegistry { - public enum JobSchedulingStatus { - /** Job has not been scheduled */ + /** + * The scheduling status of a job, as maintained by the {@code RunningJobsRegistry}. + */ + enum JobSchedulingStatus { + + /** Job has not been scheduled, yet */ PENDING, - /** Job has been scheduled */ + /** Job has been scheduled and is not yet finished */ RUNNING, - /** Job has been finished */ + /** Job has been finished, successfully or unsuccessfully */ DONE; } + // ------------------------------------------------------------------------ + /** - * Marks a job as running. + * Marks a job as running. Requesting the job's status via the {@link #getJobSchedulingStatus(JobID)} + * method will return {@link JobSchedulingStatus#RUNNING}. * * @param jobID The id of the job. * @@ -55,7 +66,8 @@ public interface RunningJobsRegistry { void setJobRunning(JobID jobID) throws IOException; /** - * Marks a job as running. + * Marks a job as completed. Requesting the job's status via the {@link #getJobSchedulingStatus(JobID)} + * method will return {@link JobSchedulingStatus#DONE}. * * @param jobID The id of the job. * @@ -65,18 +77,7 @@ public interface RunningJobsRegistry { void setJobFinished(JobID jobID) throws IOException; /** - * Checks whether a job is running. - * - * @param jobID The id of the job to check. - * @return True if the job is still running, false otherwise. - * - * @throws IOException Thrown when the communication with the highly-available storage or registry - * failed and could not be retried. - */ - boolean isJobRunning(JobID jobID) throws IOException; - - /** - * Get the scheduing status of a job. + * Gets the scheduling status of a job. * * @param jobID The id of the job to check. * @return The job scheduling status. http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java index 31a4535..a8be35a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java @@ -26,6 +26,7 @@ import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.nio.charset.Charset; +import java.util.Arrays; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -33,20 +34,17 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * A zookeeper based registry for running jobs, highly available. */ public class ZookeeperRegistry implements RunningJobsRegistry { - - private static final String DEFAULT_HA_JOB_REGISTRY_PATH = "/running_job_registry/"; + + private static final Charset ENCODING = Charset.forName("utf-8"); /** The ZooKeeper client to use */ private final CuratorFramework client; private final String runningJobPath; - private static final String HA_JOB_REGISTRY_PATH = "high-availability.zookeeper.job.registry"; - public ZookeeperRegistry(final CuratorFramework client, final Configuration configuration) { - this.client = client; - runningJobPath = configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_ROOT) + - configuration.getString(HA_JOB_REGISTRY_PATH, DEFAULT_HA_JOB_REGISTRY_PATH); + this.client = checkNotNull(client, "client"); + this.runningJobPath = configuration.getString(HighAvailabilityOptions.ZOOKEEPER_RUNNING_JOB_REGISTRY_PATH); } @Override @@ -54,12 +52,10 @@ public class ZookeeperRegistry implements RunningJobsRegistry { checkNotNull(jobID); try { - String zkPath = runningJobPath + jobID.toString(); - this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient()); - this.client.setData().forPath(zkPath, JobSchedulingStatus.RUNNING.name().getBytes(Charset.forName("utf-8"))); + writeEnumToZooKeeper(jobID, JobSchedulingStatus.RUNNING); } catch (Exception e) { - throw new IOException("Set running state to zk fail for job " + jobID.toString(), e); + throw new IOException("Failed to set RUNNING state in ZooKeeper for job " + jobID, e); } } @@ -68,44 +64,36 @@ public class ZookeeperRegistry implements RunningJobsRegistry { checkNotNull(jobID); try { - String zkPath = runningJobPath + jobID.toString(); - this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient()); - this.client.setData().forPath(zkPath, JobSchedulingStatus.DONE.name().getBytes(Charset.forName("utf-8"))); + writeEnumToZooKeeper(jobID, JobSchedulingStatus.DONE); } catch (Exception e) { - throw new IOException("Set finished state to zk fail for job " + jobID.toString(), e); + throw new IOException("Failed to set DONE state in ZooKeeper for job " + jobID, e); } } @Override - public boolean isJobRunning(JobID jobID) throws IOException { + public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws IOException { checkNotNull(jobID); try { - Stat stat = client.checkExists().forPath(runningJobPath + jobID.toString()); + final String zkPath = createZkPath(jobID); + final Stat stat = client.checkExists().forPath(zkPath); if (stat != null) { - byte[] data = client.getData().forPath(runningJobPath + jobID.toString()); - if (JobSchedulingStatus.RUNNING.name().equals(new String(data))) { - return true; + // found some data, try to parse it + final byte[] data = client.getData().forPath(zkPath); + if (data != null) { + try { + final String name = new String(data, ENCODING); + return JobSchedulingStatus.valueOf(name); + } + catch (IllegalArgumentException e) { + throw new IOException("Found corrupt data in ZooKeeper: " + + Arrays.toString(data) + " is no valid job status"); + } } } - return false; - } - catch (Exception e) { - throw new IOException("Get running state from zk fail for job " + jobID.toString(), e); - } - } - @Override - public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws IOException { - checkNotNull(jobID); - - try { - Stat stat = client.checkExists().forPath(runningJobPath + jobID.toString()); - if (stat != null) { - byte[] data = client.getData().forPath(runningJobPath + jobID.toString()); - return JobSchedulingStatus.valueOf(new String(data)); - } + // nothing found, yet, must be in status 'PENDING' return JobSchedulingStatus.PENDING; } catch (Exception e) { @@ -118,13 +106,22 @@ public class ZookeeperRegistry implements RunningJobsRegistry { checkNotNull(jobID); try { - String zkPath = runningJobPath + jobID.toString(); + final String zkPath = createZkPath(jobID); this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient()); this.client.delete().forPath(zkPath); } catch (Exception e) { - throw new IOException("Clear job state from zk fail for " + jobID.toString(), e); + throw new IOException("Failed to clear job state from ZooKeeper for job " + jobID, e); } } + private String createZkPath(JobID jobID) { + return runningJobPath + jobID.toString(); + } + + private void writeEnumToZooKeeper(JobID jobID, JobSchedulingStatus status) throws Exception { + final String zkPath = createZkPath(jobID); + this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient()); + this.client.setData().forPath(zkPath, status.name().getBytes(ENCODING)); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java index e331212..ab1ce47 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java @@ -20,9 +20,8 @@ package org.apache.flink.runtime.highavailability.nonha; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.highavailability.RunningJobsRegistry; -import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus; -import java.util.HashSet; +import java.util.HashMap; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -32,18 +31,14 @@ import static org.apache.flink.util.Preconditions.checkNotNull; public class NonHaRegistry implements RunningJobsRegistry { /** The currently running jobs */ - private final HashSet<JobID> running = new HashSet<>(); - - /** The currently finished jobs */ - private final HashSet<JobID> finished = new HashSet<>(); + private final HashMap<JobID, JobSchedulingStatus> jobStatus = new HashMap<>(); @Override public void setJobRunning(JobID jobID) { checkNotNull(jobID); - synchronized (running) { - running.add(jobID); - finished.remove(jobID); + synchronized (jobStatus) { + jobStatus.put(jobID, JobSchedulingStatus.RUNNING); } } @@ -51,33 +46,18 @@ public class NonHaRegistry implements RunningJobsRegistry { public void setJobFinished(JobID jobID) { checkNotNull(jobID); - synchronized (running) { - running.remove(jobID); - finished.add(jobID); - } - } - - @Override - public boolean isJobRunning(JobID jobID) { - checkNotNull(jobID); - - synchronized (running) { - return running.contains(jobID); + synchronized (jobStatus) { + jobStatus.put(jobID, JobSchedulingStatus.DONE); } } @Override public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) { checkNotNull(jobID); - - synchronized (running) { - if (finished.contains(jobID)) { - return JobSchedulingStatus.DONE; - } else if (running.contains(jobID)) { - return JobSchedulingStatus.RUNNING; - } else { - return JobSchedulingStatus.PENDING; - } + + synchronized (jobStatus) { + JobSchedulingStatus status = jobStatus.get(jobID); + return status == null ? JobSchedulingStatus.PENDING : status; } } @@ -85,9 +65,8 @@ public class NonHaRegistry implements RunningJobsRegistry { public void clearJob(JobID jobID) { checkNotNull(jobID); - synchronized (running) { - running.remove(jobID); - finished.remove(jobID); + synchronized (jobStatus) { + jobStatus.remove(jobID); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java index 6bebd90..6e02813 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java @@ -360,20 +360,22 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F // it's okay that job manager wait for the operation complete leaderElectionService.confirmLeaderSessionID(leaderSessionID); - JobSchedulingStatus schedulingStatus = JobSchedulingStatus.PENDING; + final JobSchedulingStatus schedulingStatus; try { schedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobGraph.getJobID()); - if (schedulingStatus.equals(JobSchedulingStatus.DONE)) { - log.info("Granted leader ship but job {} has been finished. ", jobGraph.getJobID()); - jobFinishedByOther(); - return; - } - } catch (Throwable t) { - log.error("Could not access status (running/finished) of job {}. ", jobGraph.getJobID(), t); + } + catch (Throwable t) { + log.error("Could not access status (running/finished) of job {}. ", jobGraph.getJobID(), t); onFatalError(t); return; } + if (schedulingStatus == JobSchedulingStatus.DONE) { + log.info("Granted leader ship but job {} has been finished. ", jobGraph.getJobID()); + jobFinishedByOther(); + return; + } + // Double check the leadership after we confirm that, there is a small chance that multiple // job managers schedule the same job after if they try to recover at the same time. // This will eventually be noticed, but can not be ruled out from the beginning. @@ -382,7 +384,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F // Now set the running status is after getting leader ship and // set finished status after job in terminated status. // So if finding the job is running, it means someone has already run the job, need recover. - if (schedulingStatus.equals(JobSchedulingStatus.PENDING)) { + if (schedulingStatus == JobSchedulingStatus.PENDING) { runningJobsRegistry.setJobRunning(jobGraph.getJobID()); } http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java index 9178684..dd80ada 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java @@ -271,7 +271,7 @@ public class MiniClusterJobDispatcher { haServices.getRunningJobsRegistry().clearJob(jobID); } catch (Throwable t) { - LOG.warn("Could not clear the job {} at the high-availability services", jobID.toString(), t); + LOG.warn("Could not clear job {} at the status registry of the high-availability services", jobID, t); } } http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java index 6c7e249..7ef39de 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.RunningJobsRegistry; +import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.util.ExceptionUtils; @@ -244,7 +245,8 @@ public class JobLeaderIdService { } try { - if (runningJobsRegistry.isJobRunning(jobId)) { + final JobSchedulingStatus jobStatus = runningJobsRegistry.getJobSchedulingStatus(jobId); + if (jobStatus == JobSchedulingStatus.PENDING || jobStatus == JobSchedulingStatus.RUNNING) { if (leaderSessionId == null) { // there is no new leader if (previousJobLeaderId != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java index bbafcf0..b0c7778 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java @@ -30,9 +30,10 @@ import org.junit.rules.TemporaryFolder; import java.io.File; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +/** + * Tests for the {@link FsNegativeRunningJobsRegistry} on a local file system. + */ public class FsNegativeRunningJobsRegistryTest extends TestLogger { @Rule @@ -47,23 +48,22 @@ public class FsNegativeRunningJobsRegistryTest extends TestLogger { FsNegativeRunningJobsRegistry registry = new FsNegativeRunningJobsRegistry(new Path(uri)); + // another registry should pick this up + FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(new Path(uri)); + // initially, without any call, the job is pending - assertFalse(registry.isJobRunning(jid)); - assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.PENDING); + assertEquals(JobSchedulingStatus.PENDING, registry.getJobSchedulingStatus(jid)); + assertEquals(JobSchedulingStatus.PENDING, otherRegistry.getJobSchedulingStatus(jid)); // after set running, the job is running registry.setJobRunning(jid); - assertTrue(registry.isJobRunning(jid)); - assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.RUNNING); + assertEquals(JobSchedulingStatus.RUNNING, registry.getJobSchedulingStatus(jid)); + assertEquals(JobSchedulingStatus.RUNNING, otherRegistry.getJobSchedulingStatus(jid)); // set the job to finished and validate registry.setJobFinished(jid); - assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.DONE); - - // another registry should pick this up - FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(new Path(uri)); - assertTrue(otherRegistry.isJobRunning(jid)); - assertEquals(otherRegistry.getJobSchedulingStatus(jid), JobSchedulingStatus.DONE); + assertEquals(JobSchedulingStatus.DONE, registry.getJobSchedulingStatus(jid)); + assertEquals(JobSchedulingStatus.DONE, otherRegistry.getJobSchedulingStatus(jid)); } @Test @@ -77,22 +77,19 @@ public class FsNegativeRunningJobsRegistryTest extends TestLogger { // set the job to finished and validate registry.setJobFinished(jid); - assertFalse(registry.isJobRunning(jid)); - assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.DONE); + assertEquals(JobSchedulingStatus.DONE, registry.getJobSchedulingStatus(jid)); // set the job to running does not overwrite the finished status registry.setJobRunning(jid); - assertTrue(registry.isJobRunning(jid)); - assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.DONE); + assertEquals(JobSchedulingStatus.DONE, registry.getJobSchedulingStatus(jid)); // another registry should pick this up FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(new Path(uri)); - assertTrue(otherRegistry.isJobRunning(jid)); - assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.DONE); + assertEquals(JobSchedulingStatus.DONE, otherRegistry.getJobSchedulingStatus(jid)); // clear the running and finished marker, it will be pending otherRegistry.clearJob(jid); - assertFalse(otherRegistry.isJobRunning(jid)); - assertEquals(registry.getJobSchedulingStatus(jid), JobSchedulingStatus.PENDING); + assertEquals(JobSchedulingStatus.PENDING, registry.getJobSchedulingStatus(jid)); + assertEquals(JobSchedulingStatus.PENDING, otherRegistry.getJobSchedulingStatus(jid)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/e0dede9f/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java index 8c91898..b1881e5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java @@ -22,22 +22,21 @@ import org.apache.curator.test.TestingServer; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus; +import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.util.TestLogger; + import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; + public class ZooKeeperRegistryTest extends TestLogger { - private TestingServer testingServer; - private static Logger LOG = LoggerFactory.getLogger(ZooKeeperRegistryTest.class); + private TestingServer testingServer; @Before public void before() throws Exception { @@ -59,29 +58,25 @@ public class ZooKeeperRegistryTest extends TestLogger { configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString()); configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); - HighAvailabilityServices zkHaService = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration); - RunningJobsRegistry zkRegistry = zkHaService.getRunningJobsRegistry(); + final HighAvailabilityServices zkHaService = new ZookeeperHaServices( + ZooKeeperUtils.startCuratorFramework(configuration), Executors.directExecutor(), configuration); + + final RunningJobsRegistry zkRegistry = zkHaService.getRunningJobsRegistry(); try { JobID jobID = JobID.generate(); - assertFalse(zkRegistry.isJobRunning(jobID)); - assertEquals(zkRegistry.getJobSchedulingStatus(jobID), JobSchedulingStatus.PENDING); + assertEquals(JobSchedulingStatus.PENDING, zkRegistry.getJobSchedulingStatus(jobID)); zkRegistry.setJobRunning(jobID); - assertTrue(zkRegistry.isJobRunning(jobID)); - assertEquals(zkRegistry.getJobSchedulingStatus(jobID), JobSchedulingStatus.RUNNING); + assertEquals(JobSchedulingStatus.RUNNING, zkRegistry.getJobSchedulingStatus(jobID)); zkRegistry.setJobFinished(jobID); - assertEquals(zkRegistry.getJobSchedulingStatus(jobID), JobSchedulingStatus.DONE); - assertFalse(zkRegistry.isJobRunning(jobID)); + assertEquals(JobSchedulingStatus.DONE, zkRegistry.getJobSchedulingStatus(jobID)); zkRegistry.clearJob(jobID); - assertFalse(zkRegistry.isJobRunning(jobID)); - assertEquals(zkRegistry.getJobSchedulingStatus(jobID), JobSchedulingStatus.PENDING); + assertEquals(JobSchedulingStatus.PENDING, zkRegistry.getJobSchedulingStatus(jobID)); } finally { - if (zkHaService != null) { - zkHaService.close(); - } + zkHaService.close(); } } }
