Repository: flink Updated Branches: refs/heads/flip-6 930334ef7 -> 0de568963
[FLINK-4882] [flip-6] Remove exceptions from HighAvailabilityServices where not necessary Cleanup of the interface HighAvailabilityServices so that only methods which really throw an exception have an exception clause defined. This closes #2679. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0de56896 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0de56896 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0de56896 Branch: refs/heads/flip-6 Commit: 0de5689632bd1f8eac6e436959d80d31df9e5ef9 Parents: 930334e Author: Till Rohrmann <[email protected]> Authored: Wed Oct 19 14:09:31 2016 +0200 Committer: Till Rohrmann <[email protected]> Committed: Sun Oct 23 11:07:56 2016 +0200 ---------------------------------------------------------------------- .../highavailability/EmbeddedNonHaServices.java | 4 +-- .../HighAvailabilityServices.java | 33 +++++++++++++++----- .../runtime/highavailability/NonHaServices.java | 4 +-- .../highavailability/ZookeeperHaServices.java | 12 +++---- .../nonha/AbstractNonHaServices.java | 10 +++--- .../flink/runtime/jobmaster/JobMaster.java | 17 ++-------- .../TestingHighAvailabilityServices.java | 14 ++++----- 7 files changed, 49 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0de56896/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java index 58da287..523218e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java @@ -43,12 +43,12 @@ public class EmbeddedNonHaServices extends AbstractNonHaServices implements High // ------------------------------------------------------------------------ @Override - public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception { + public LeaderRetrievalService getResourceManagerLeaderRetriever() { return resourceManagerLeaderService.createLeaderRetrievalService(); } @Override - public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception { + public LeaderElectionService getResourceManagerLeaderElectionService() { return resourceManagerLeaderService.createLeaderElectionService(); } http://git-wip-us.apache.org/repos/asf/flink/blob/0de56896/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java index f6db682..360de7b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java @@ -43,50 +43,67 @@ public interface HighAvailabilityServices { /** * Gets the leader retriever for the cluster's resource manager. */ - LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception; + LeaderRetrievalService getResourceManagerLeaderRetriever(); /** * Gets the leader retriever for the job JobMaster which is responsible for the given job * * @param jobID The identifier of the job. - * @return - * @throws Exception + * @return Leader retrieval service to retrieve the job manager for the given job */ - LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception; + LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID); /** * Gets the leader election service for the cluster's resource manager. + * + * @return Leader election service for the resource manager leader election */ - LeaderElectionService getResourceManagerLeaderElectionService() throws Exception; + LeaderElectionService getResourceManagerLeaderElectionService(); /** * Gets the leader election service for the given job. * * @param jobID The identifier of the job running the election. + * @return Leader election service for the job manager leader election */ - LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception; + LeaderElectionService getJobManagerLeaderElectionService(JobID jobID); /** * Gets the checkpoint recovery factory for the job manager + * + * @return Checkpoint recovery factory */ - CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception; + CheckpointRecoveryFactory getCheckpointRecoveryFactory(); /** * Gets the submitted job graph store for the job manager + * + * @return Submitted job graph store + * @throws Exception if the submitted job graph store could not be created */ SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception; /** * Gets the registry that holds information about whether jobs are currently running. + * + * @return Running job registry to retrieve running jobs */ - RunningJobsRegistry getRunningJobsRegistry() throws Exception; + RunningJobsRegistry getRunningJobsRegistry(); /** * Creates the BLOB store in which BLOBs are stored in a highly-available fashion. + * + * @return Blob store + * @throws IOException if the blob store could not be created */ BlobStore createBlobStore() throws IOException; // ------------------------------------------------------------------------ + /** + * Shut the high availability service down. + * + * @throws Exception if the shut down fails + */ void shutdown() throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/0de56896/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java index 107cbd0..75f44ed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java @@ -57,12 +57,12 @@ public class NonHaServices extends AbstractNonHaServices implements HighAvailabi // ------------------------------------------------------------------------ @Override - public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception { + public LeaderRetrievalService getResourceManagerLeaderRetriever() { return new StandaloneLeaderRetrievalService(resourceManagerAddress, new UUID(0, 0)); } @Override - public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception { + public LeaderElectionService getResourceManagerLeaderElectionService() { return new StandaloneLeaderElectionService(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/0de56896/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java index a9d2610..07c5011 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java @@ -107,27 +107,27 @@ public class ZookeeperHaServices implements HighAvailabilityServices { // ------------------------------------------------------------------------ @Override - public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception { + public LeaderRetrievalService getResourceManagerLeaderRetriever() { return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH); } @Override - public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception { + public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) { return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID)); } @Override - public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception { + public LeaderElectionService getResourceManagerLeaderElectionService() { return ZooKeeperUtils.createLeaderElectionService(client, configuration, RESOURCE_MANAGER_LEADER_PATH); } @Override - public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception { + public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) { return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathForJobManager(jobID)); } @Override - public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception { + public CheckpointRecoveryFactory getCheckpointRecoveryFactory() { return new ZooKeeperCheckpointRecoveryFactory(client, configuration); } @@ -137,7 +137,7 @@ public class ZookeeperHaServices implements HighAvailabilityServices { } @Override - public RunningJobsRegistry getRunningJobsRegistry() throws Exception { + public RunningJobsRegistry getRunningJobsRegistry() { throw new UnsupportedOperationException("not yet implemented"); } http://git-wip-us.apache.org/repos/asf/flink/blob/0de56896/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java index 8c15a52..237727f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java @@ -72,7 +72,7 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices // ------------------------------------------------------------------------ @Override - public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception { + public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) { checkNotNull(jobID); synchronized (lock) { @@ -83,7 +83,7 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices } @Override - public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception { + public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) { checkNotNull(jobID); synchronized (lock) { @@ -104,19 +104,19 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices } @Override - public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception { + public CheckpointRecoveryFactory getCheckpointRecoveryFactory() { checkNotShutdown(); return new StandaloneCheckpointRecoveryFactory(); } @Override - public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception { + public SubmittedJobGraphStore getSubmittedJobGraphStore() { checkNotShutdown(); return new StandaloneSubmittedJobGraphStore(); } @Override - public RunningJobsRegistry getRunningJobsRegistry() throws Exception { + public RunningJobsRegistry getRunningJobsRegistry() { checkNotShutdown(); return runningJobsRegistry; } http://git-wip-us.apache.org/repos/asf/flink/blob/0de56896/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 5a7c9a1..a9ac1fe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -33,7 +33,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.client.SerializedJobExecutionResult; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -229,21 +228,9 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { log.info("Using restart strategy {} for {} ({}).", restartStrategy, jobName, jid); - CheckpointRecoveryFactory checkpointRecoveryFactory; - try { - checkpointRecoveryFactory = highAvailabilityServices.getCheckpointRecoveryFactory(); - } catch (Exception e) { - log.error("Could not create the access to highly-available checkpoint storage.", e); - throw new Exception("Could not create the access to highly-available checkpoint storage.", e); - } + CheckpointRecoveryFactory checkpointRecoveryFactory = highAvailabilityServices.getCheckpointRecoveryFactory(); - try { - resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever(); - } catch (Exception e) { - log.error("Could not get the resource manager leader retriever.", e); - throw new JobSubmissionException(jobGraph.getJobID(), - "Could not get the resource manager leader retriever.", e); - } + resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever(); this.executionGraph = ExecutionGraphBuilder.buildGraph( null, http://git-wip-us.apache.org/repos/asf/flink/blob/0de56896/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java index 877812b..e0f71ee 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java @@ -81,7 +81,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices // ------------------------------------------------------------------------ @Override - public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception { + public LeaderRetrievalService getResourceManagerLeaderRetriever() { LeaderRetrievalService service = this.resourceManagerLeaderRetriever; if (service != null) { return service; @@ -91,7 +91,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices } @Override - public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception { + public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) { LeaderRetrievalService service = this.jobMasterLeaderRetrievers.get(jobID); if (service != null) { return service; @@ -101,7 +101,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices } @Override - public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception { + public LeaderElectionService getResourceManagerLeaderElectionService() { LeaderElectionService service = resourceManagerLeaderElectionService; if (service != null) { @@ -112,7 +112,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices } @Override - public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception { + public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) { LeaderElectionService service = this.jobManagerLeaderElectionServices.get(jobID); if (service != null) { @@ -123,7 +123,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices } @Override - public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception { + public CheckpointRecoveryFactory getCheckpointRecoveryFactory() { CheckpointRecoveryFactory factory = checkpointRecoveryFactory; if (factory != null) { @@ -134,7 +134,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices } @Override - public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception { + public SubmittedJobGraphStore getSubmittedJobGraphStore() { SubmittedJobGraphStore store = submittedJobGraphStore; if (store != null) { @@ -146,7 +146,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices } @Override - public RunningJobsRegistry getRunningJobsRegistry() throws Exception { + public RunningJobsRegistry getRunningJobsRegistry() { return new NonHaRegistry(); }
