Repository: flink
Updated Branches:
  refs/heads/flip-6 930334ef7 -> 0de568963


[FLINK-4882] [flip-6] Remove exceptions from HighAvailabilityServices where not 
necessary

Cleanup of the interface HighAvailabilityServices so that only methods which 
really throw an
exception have an exception clause defined.

This closes #2679.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0de56896
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0de56896
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0de56896

Branch: refs/heads/flip-6
Commit: 0de5689632bd1f8eac6e436959d80d31df9e5ef9
Parents: 930334e
Author: Till Rohrmann <[email protected]>
Authored: Wed Oct 19 14:09:31 2016 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Sun Oct 23 11:07:56 2016 +0200

----------------------------------------------------------------------
 .../highavailability/EmbeddedNonHaServices.java |  4 +--
 .../HighAvailabilityServices.java               | 33 +++++++++++++++-----
 .../runtime/highavailability/NonHaServices.java |  4 +--
 .../highavailability/ZookeeperHaServices.java   | 12 +++----
 .../nonha/AbstractNonHaServices.java            | 10 +++---
 .../flink/runtime/jobmaster/JobMaster.java      | 17 ++--------
 .../TestingHighAvailabilityServices.java        | 14 ++++-----
 7 files changed, 49 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0de56896/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
index 58da287..523218e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
@@ -43,12 +43,12 @@ public class EmbeddedNonHaServices extends 
AbstractNonHaServices implements High
        // 
------------------------------------------------------------------------
 
        @Override
-       public LeaderRetrievalService getResourceManagerLeaderRetriever() 
throws Exception {
+       public LeaderRetrievalService getResourceManagerLeaderRetriever() {
                return 
resourceManagerLeaderService.createLeaderRetrievalService();
        }
 
        @Override
-       public LeaderElectionService getResourceManagerLeaderElectionService() 
throws Exception {
+       public LeaderElectionService getResourceManagerLeaderElectionService() {
                return 
resourceManagerLeaderService.createLeaderElectionService();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0de56896/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index f6db682..360de7b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -43,50 +43,67 @@ public interface HighAvailabilityServices {
        /**
         * Gets the leader retriever for the cluster's resource manager.
         */
-       LeaderRetrievalService getResourceManagerLeaderRetriever() throws 
Exception;
+       LeaderRetrievalService getResourceManagerLeaderRetriever();
 
        /**
         * Gets the leader retriever for the job JobMaster which is responsible 
for the given job
         *
         * @param jobID The identifier of the job.
-        * @return
-        * @throws Exception
+        * @return Leader retrieval service to retrieve the job manager for the 
given job
         */
-       LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws 
Exception;
+       LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID);
 
        /**
         * Gets the leader election service for the cluster's resource manager.
+        *
+        * @return Leader election service for the resource manager leader 
election
         */
-       LeaderElectionService getResourceManagerLeaderElectionService() throws 
Exception;
+       LeaderElectionService getResourceManagerLeaderElectionService();
 
        /**
         * Gets the leader election service for the given job.
         *
         * @param jobID The identifier of the job running the election.
+        * @return Leader election service for the job manager leader election
         */
-       LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) 
throws Exception;
+       LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);
 
        /**
         * Gets the checkpoint recovery factory for the job manager
+        *
+        * @return Checkpoint recovery factory
         */
-       CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws 
Exception;
+       CheckpointRecoveryFactory getCheckpointRecoveryFactory();
 
        /**
         * Gets the submitted job graph store for the job manager
+        *
+        * @return Submitted job graph store
+        * @throws Exception if the submitted job graph store could not be 
created
         */
        SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception;
 
        /**
         * Gets the registry that holds information about whether jobs are 
currently running.
+        *
+        * @return Running job registry to retrieve running jobs
         */
-       RunningJobsRegistry getRunningJobsRegistry() throws Exception;
+       RunningJobsRegistry getRunningJobsRegistry();
 
        /**
         * Creates the BLOB store in which BLOBs are stored in a 
highly-available fashion.
+        *
+        * @return Blob store
+        * @throws IOException if the blob store could not be created
         */
        BlobStore createBlobStore() throws IOException;
 
        // 
------------------------------------------------------------------------
 
+       /**
+        * Shut the high availability service down.
+        *
+        * @throws Exception if the shut down fails
+        */
        void shutdown() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0de56896/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index 107cbd0..75f44ed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -57,12 +57,12 @@ public class NonHaServices extends AbstractNonHaServices 
implements HighAvailabi
        // 
------------------------------------------------------------------------
 
        @Override
-       public LeaderRetrievalService getResourceManagerLeaderRetriever() 
throws Exception {
+       public LeaderRetrievalService getResourceManagerLeaderRetriever() {
                return new 
StandaloneLeaderRetrievalService(resourceManagerAddress, new UUID(0, 0));
        }
 
        @Override
-       public LeaderElectionService getResourceManagerLeaderElectionService() 
throws Exception {
+       public LeaderElectionService getResourceManagerLeaderElectionService() {
                return new StandaloneLeaderElectionService();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0de56896/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
index a9d2610..07c5011 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -107,27 +107,27 @@ public class ZookeeperHaServices implements 
HighAvailabilityServices {
        // 
------------------------------------------------------------------------
 
        @Override
-       public LeaderRetrievalService getResourceManagerLeaderRetriever() 
throws Exception {
+       public LeaderRetrievalService getResourceManagerLeaderRetriever() {
                return ZooKeeperUtils.createLeaderRetrievalService(client, 
configuration, RESOURCE_MANAGER_LEADER_PATH);
        }
 
        @Override
-       public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) 
throws Exception {
+       public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) 
{
                return ZooKeeperUtils.createLeaderRetrievalService(client, 
configuration, getPathForJobManager(jobID));
        }
 
        @Override
-       public LeaderElectionService getResourceManagerLeaderElectionService() 
throws Exception {
+       public LeaderElectionService getResourceManagerLeaderElectionService() {
                return ZooKeeperUtils.createLeaderElectionService(client, 
configuration, RESOURCE_MANAGER_LEADER_PATH);
        }
 
        @Override
-       public LeaderElectionService getJobManagerLeaderElectionService(JobID 
jobID) throws Exception {
+       public LeaderElectionService getJobManagerLeaderElectionService(JobID 
jobID) {
                return ZooKeeperUtils.createLeaderElectionService(client, 
configuration, getPathForJobManager(jobID));
        }
 
        @Override
-       public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws 
Exception {
+       public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
                return new ZooKeeperCheckpointRecoveryFactory(client, 
configuration);
        }
 
@@ -137,7 +137,7 @@ public class ZookeeperHaServices implements 
HighAvailabilityServices {
        }
 
        @Override
-       public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
+       public RunningJobsRegistry getRunningJobsRegistry() {
                throw new UnsupportedOperationException("not yet implemented");
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0de56896/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
index 8c15a52..237727f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
@@ -72,7 +72,7 @@ public abstract class AbstractNonHaServices implements 
HighAvailabilityServices
        // 
------------------------------------------------------------------------
 
        @Override
-       public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) 
throws Exception {
+       public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) 
{
                checkNotNull(jobID);
 
                synchronized (lock) {
@@ -83,7 +83,7 @@ public abstract class AbstractNonHaServices implements 
HighAvailabilityServices
        }
 
        @Override
-       public LeaderElectionService getJobManagerLeaderElectionService(JobID 
jobID) throws Exception {
+       public LeaderElectionService getJobManagerLeaderElectionService(JobID 
jobID) {
                checkNotNull(jobID);
 
                synchronized (lock) {
@@ -104,19 +104,19 @@ public abstract class AbstractNonHaServices implements 
HighAvailabilityServices
        }
 
        @Override
-       public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws 
Exception {
+       public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
                checkNotShutdown();
                return new StandaloneCheckpointRecoveryFactory();
        }
 
        @Override
-       public SubmittedJobGraphStore getSubmittedJobGraphStore() throws 
Exception {
+       public SubmittedJobGraphStore getSubmittedJobGraphStore() {
                checkNotShutdown();
                return new StandaloneSubmittedJobGraphStore();
        }
 
        @Override
-       public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
+       public RunningJobsRegistry getRunningJobsRegistry() {
                checkNotShutdown();
                return runningJobsRegistry;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0de56896/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 5a7c9a1..a9ac1fe 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -33,7 +33,6 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.client.SerializedJobExecutionResult;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -229,21 +228,9 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
 
                log.info("Using restart strategy {} for {} ({}).", 
restartStrategy, jobName, jid);
 
-               CheckpointRecoveryFactory checkpointRecoveryFactory;
-               try {
-                       checkpointRecoveryFactory = 
highAvailabilityServices.getCheckpointRecoveryFactory();
-               } catch (Exception e) {
-                       log.error("Could not create the access to 
highly-available checkpoint storage.", e);
-                       throw new Exception("Could not create the access to 
highly-available checkpoint storage.", e);
-               }
+               CheckpointRecoveryFactory checkpointRecoveryFactory = 
highAvailabilityServices.getCheckpointRecoveryFactory();
 
-               try {
-                       resourceManagerLeaderRetriever = 
highAvailabilityServices.getResourceManagerLeaderRetriever();
-               } catch (Exception e) {
-                       log.error("Could not get the resource manager leader 
retriever.", e);
-                       throw new JobSubmissionException(jobGraph.getJobID(),
-                                       "Could not get the resource manager 
leader retriever.", e);
-               }
+               resourceManagerLeaderRetriever = 
highAvailabilityServices.getResourceManagerLeaderRetriever();
 
                this.executionGraph = ExecutionGraphBuilder.buildGraph(
                                null,

http://git-wip-us.apache.org/repos/asf/flink/blob/0de56896/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 877812b..e0f71ee 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -81,7 +81,7 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
        // 
------------------------------------------------------------------------
 
        @Override
-       public LeaderRetrievalService getResourceManagerLeaderRetriever() 
throws Exception {
+       public LeaderRetrievalService getResourceManagerLeaderRetriever() {
                LeaderRetrievalService service = 
this.resourceManagerLeaderRetriever;
                if (service != null) {
                        return service;
@@ -91,7 +91,7 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
        }
 
        @Override
-       public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) 
throws Exception {
+       public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) 
{
                LeaderRetrievalService service = 
this.jobMasterLeaderRetrievers.get(jobID);
                if (service != null) {
                        return service;
@@ -101,7 +101,7 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
        }
 
        @Override
-       public LeaderElectionService getResourceManagerLeaderElectionService() 
throws Exception {
+       public LeaderElectionService getResourceManagerLeaderElectionService() {
                LeaderElectionService service = 
resourceManagerLeaderElectionService;
 
                if (service != null) {
@@ -112,7 +112,7 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
        }
 
        @Override
-       public LeaderElectionService getJobManagerLeaderElectionService(JobID 
jobID) throws Exception {
+       public LeaderElectionService getJobManagerLeaderElectionService(JobID 
jobID) {
                LeaderElectionService service = 
this.jobManagerLeaderElectionServices.get(jobID);
 
                if (service != null) {
@@ -123,7 +123,7 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
        }
 
        @Override
-       public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws 
Exception {
+       public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
                CheckpointRecoveryFactory factory = checkpointRecoveryFactory;
 
                if (factory != null) {
@@ -134,7 +134,7 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
        }
 
        @Override
-       public SubmittedJobGraphStore getSubmittedJobGraphStore() throws 
Exception {
+       public SubmittedJobGraphStore getSubmittedJobGraphStore() {
                SubmittedJobGraphStore store = submittedJobGraphStore;
 
                if (store != null) {
@@ -146,7 +146,7 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
        }
 
        @Override
-       public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
+       public RunningJobsRegistry getRunningJobsRegistry() {
                return new NonHaRegistry();
        }
 

Reply via email to