Repository: flink
Updated Branches:
  refs/heads/master 64e8de97d -> 84c2a928c


[FLINK-7507] [dispatcher] Fence Dispatcher

Let the Dispatcher extend the FencedRpcEndpoint and introduce DispatcherId which
replaces the UUID as leader id/fencing token.

This closes #4584.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84c2a928
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84c2a928
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84c2a928

Branch: refs/heads/master
Commit: 84c2a928cc4c5ee612bf57a1b944362d9114c92c
Parents: 64e8de9
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Thu Aug 24 19:16:10 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Mon Sep 4 08:48:24 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    | 44 +++++----------
 .../runtime/dispatcher/DispatcherGateway.java   |  7 +--
 .../flink/runtime/dispatcher/DispatcherId.java  | 57 ++++++++++++++++++++
 .../runtime/dispatcher/DispatcherTest.java      |  2 +-
 4 files changed, 74 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/84c2a928/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index e7e1ec2..00cbb2f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -28,7 +28,6 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.LeaderIdMismatchException;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
@@ -40,7 +39,7 @@ import 
org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.util.ExceptionUtils;
@@ -60,7 +59,7 @@ import java.util.concurrent.CompletableFuture;
  * the jobs and to recover them in case of a master failure. Furthermore, it 
knows
  * about the state of the Flink session cluster.
  */
-public abstract class Dispatcher extends RpcEndpoint implements 
DispatcherGateway, LeaderContender {
+public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> 
implements DispatcherGateway, LeaderContender {
 
        public static final String DISPATCHER_NAME = "dispatcher";
 
@@ -80,8 +79,6 @@ public abstract class Dispatcher extends RpcEndpoint 
implements DispatcherGatewa
 
        private final LeaderElectionService leaderElectionService;
 
-       private volatile UUID leaderSessionId;
-
        protected Dispatcher(
                        RpcService rpcService,
                        String endpointId,
@@ -91,7 +88,7 @@ public abstract class Dispatcher extends RpcEndpoint 
implements DispatcherGatewa
                        HeartbeatServices heartbeatServices,
                        MetricRegistry metricRegistry,
                        FatalErrorHandler fatalErrorHandler) throws Exception {
-               super(rpcService, endpointId);
+               super(rpcService, endpointId, DispatcherId.generate());
 
                this.configuration = Preconditions.checkNotNull(configuration);
                this.highAvailabilityServices = 
Preconditions.checkNotNull(highAvailabilityServices);
@@ -106,9 +103,6 @@ public abstract class Dispatcher extends RpcEndpoint 
implements DispatcherGatewa
                jobManagerRunners = new HashMap<>(16);
 
                leaderElectionService = 
highAvailabilityServices.getDispatcherLeaderElectionService();
-
-               // we are not the leader when this object is created
-               leaderSessionId = null;
        }
 
        //------------------------------------------------------
@@ -156,13 +150,7 @@ public abstract class Dispatcher extends RpcEndpoint 
implements DispatcherGatewa
        //------------------------------------------------------
 
        @Override
-       public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, UUID 
leaderSessionId, Time timeout) {
-
-               try {
-                       validateLeaderSessionId(leaderSessionId);
-               } catch (LeaderIdMismatchException e) {
-                       return FutureUtils.completedExceptionally(e);
-               }
+       public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time 
timeout) {
 
                final JobID jobId = jobGraph.getJobID();
 
@@ -274,8 +262,6 @@ public abstract class Dispatcher extends RpcEndpoint 
implements DispatcherGatewa
        private void recoverJobs() {
                log.info("Recovering all persisted jobs.");
 
-               final UUID currentLeaderSessionId = leaderSessionId;
-
                getRpcService().execute(
                        () -> {
                                final Collection<JobID> jobIds;
@@ -291,7 +277,7 @@ public abstract class Dispatcher extends RpcEndpoint 
implements DispatcherGatewa
                                        try {
                                                SubmittedJobGraph 
submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId);
 
-                                               runAsync(() -> 
submitJob(submittedJobGraph.getJobGraph(), currentLeaderSessionId, 
RpcUtils.INF_TIMEOUT));
+                                               runAsync(() -> 
submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT));
                                        } catch (Exception e) {
                                                log.error("Could not recover 
the job graph for " + jobId + '.', e);
                                        }
@@ -304,12 +290,6 @@ public abstract class Dispatcher extends RpcEndpoint 
implements DispatcherGatewa
                fatalErrorHandler.onFatalError(throwable);
        }
 
-       private void validateLeaderSessionId(UUID leaderSessionID) throws 
LeaderIdMismatchException {
-               if (this.leaderSessionId == null || 
!this.leaderSessionId.equals(leaderSessionID)) {
-                       throw new 
LeaderIdMismatchException(this.leaderSessionId, leaderSessionID);
-               }
-       }
-
        protected abstract JobManagerRunner createJobManagerRunner(
                ResourceID resourceId,
                JobGraph jobGraph,
@@ -333,16 +313,18 @@ public abstract class Dispatcher extends RpcEndpoint 
implements DispatcherGatewa
         */
        @Override
        public void grantLeadership(final UUID newLeaderSessionID) {
-               runAsync(
+               runAsyncWithoutFencing(
                        () -> {
-                               log.info("Dispatcher {} was granted leadership 
with leader session ID {}", getAddress(), newLeaderSessionID);
+                               final DispatcherId dispatcherId = new 
DispatcherId(newLeaderSessionID);
+
+                               log.info("Dispatcher {} was granted leadership 
with fencing token {}", getAddress(), dispatcherId);
 
                                // clear the state if we've been the leader 
before
-                               if (leaderSessionId != null) {
+                               if (getFencingToken() != null) {
                                        clearState();
                                }
 
-                               leaderSessionId = newLeaderSessionID;
+                               setFencingToken(dispatcherId);
 
                                // confirming the leader session ID might be 
blocking,
                                getRpcService().execute(
@@ -357,10 +339,12 @@ public abstract class Dispatcher extends RpcEndpoint 
implements DispatcherGatewa
         */
        @Override
        public void revokeLeadership() {
-               runAsync(
+               runAsyncWithoutFencing(
                        () -> {
                                log.info("Dispatcher {} was revoked 
leadership.", getAddress());
                                clearState();
+
+                               setFencingToken(DispatcherId.generate());
                        });
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/84c2a928/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
index 669f616..09254c3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
@@ -22,29 +22,26 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 
 import java.util.Collection;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
 /**
  * Gateway for the Dispatcher component.
  */
-public interface DispatcherGateway extends RpcGateway {
+public interface DispatcherGateway extends FencedRpcGateway<DispatcherId> {
 
        /**
         * Submit a job to the dispatcher.
         *
         * @param jobGraph JobGraph to submit
-        * @param leaderSessionId leader session id
         * @param timeout RPC timeout
         * @return A future acknowledge if the submission succeeded
         */
        CompletableFuture<Acknowledge> submitJob(
                JobGraph jobGraph,
-               UUID leaderSessionId,
                @RpcTimeout Time timeout);
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/84c2a928/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java
new file mode 100644
index 0000000..e563090
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.util.AbstractID;
+
+import java.util.UUID;
+
+/**
+ * Fencing token of the {@link Dispatcher}.
+ */
+public class DispatcherId extends AbstractID {
+
+       private static final long serialVersionUID = -1654056277003743966L;
+
+       public DispatcherId(byte[] bytes) {
+               super(bytes);
+       }
+
+       public DispatcherId(long lowerPart, long upperPart) {
+               super(lowerPart, upperPart);
+       }
+
+       public DispatcherId(AbstractID id) {
+               super(id);
+       }
+
+       public DispatcherId() {}
+
+       public DispatcherId(UUID uuid) {
+               this(uuid.getLeastSignificantBits(), 
uuid.getMostSignificantBits());
+       }
+
+       public UUID toUUID() {
+               return new UUID(getUpperPart(), getLowerPart());
+       }
+
+       public static DispatcherId generate() {
+               return new DispatcherId();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/84c2a928/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 091608c..8846686 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -113,7 +113,7 @@ public class DispatcherTest extends TestLogger {
 
                        DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
 
-                       CompletableFuture<Acknowledge> acknowledgeFuture = 
dispatcherGateway.submitJob(jobGraph, 
HighAvailabilityServices.DEFAULT_LEADER_ID, timeout);
+                       CompletableFuture<Acknowledge> acknowledgeFuture = 
dispatcherGateway.submitJob(jobGraph, timeout);
 
                        acknowledgeFuture.get();
 

Reply via email to