Repository: flink Updated Branches: refs/heads/master 64e8de97d -> 84c2a928c
[FLINK-7507] [dispatcher] Fence Dispatcher Let the Dispatcher extend the FencedRpcEndpoint and introduce DispatcherId which replaces the UUID as leader id/fencing token. This closes #4584. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84c2a928 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84c2a928 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84c2a928 Branch: refs/heads/master Commit: 84c2a928cc4c5ee612bf57a1b944362d9114c92c Parents: 64e8de9 Author: Till Rohrmann <trohrm...@apache.org> Authored: Thu Aug 24 19:16:10 2017 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Mon Sep 4 08:48:24 2017 +0200 ---------------------------------------------------------------------- .../flink/runtime/dispatcher/Dispatcher.java | 44 +++++---------- .../runtime/dispatcher/DispatcherGateway.java | 7 +-- .../flink/runtime/dispatcher/DispatcherId.java | 57 ++++++++++++++++++++ .../runtime/dispatcher/DispatcherTest.java | 2 +- 4 files changed, 74 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/84c2a928/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index e7e1ec2..00cbb2f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -28,7 +28,6 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.highavailability.LeaderIdMismatchException; import org.apache.flink.runtime.highavailability.RunningJobsRegistry; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.OnCompletionActions; @@ -40,7 +39,7 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.rpc.FatalErrorHandler; -import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.FencedRpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.util.ExceptionUtils; @@ -60,7 +59,7 @@ import java.util.concurrent.CompletableFuture; * the jobs and to recover them in case of a master failure. Furthermore, it knows * about the state of the Flink session cluster. */ -public abstract class Dispatcher extends RpcEndpoint implements DispatcherGateway, LeaderContender { +public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> implements DispatcherGateway, LeaderContender { public static final String DISPATCHER_NAME = "dispatcher"; @@ -80,8 +79,6 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa private final LeaderElectionService leaderElectionService; - private volatile UUID leaderSessionId; - protected Dispatcher( RpcService rpcService, String endpointId, @@ -91,7 +88,7 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler) throws Exception { - super(rpcService, endpointId); + super(rpcService, endpointId, DispatcherId.generate()); this.configuration = Preconditions.checkNotNull(configuration); this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices); @@ -106,9 +103,6 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa jobManagerRunners = new HashMap<>(16); leaderElectionService = highAvailabilityServices.getDispatcherLeaderElectionService(); - - // we are not the leader when this object is created - leaderSessionId = null; } //------------------------------------------------------ @@ -156,13 +150,7 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa //------------------------------------------------------ @Override - public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, UUID leaderSessionId, Time timeout) { - - try { - validateLeaderSessionId(leaderSessionId); - } catch (LeaderIdMismatchException e) { - return FutureUtils.completedExceptionally(e); - } + public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) { final JobID jobId = jobGraph.getJobID(); @@ -274,8 +262,6 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa private void recoverJobs() { log.info("Recovering all persisted jobs."); - final UUID currentLeaderSessionId = leaderSessionId; - getRpcService().execute( () -> { final Collection<JobID> jobIds; @@ -291,7 +277,7 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa try { SubmittedJobGraph submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId); - runAsync(() -> submitJob(submittedJobGraph.getJobGraph(), currentLeaderSessionId, RpcUtils.INF_TIMEOUT)); + runAsync(() -> submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT)); } catch (Exception e) { log.error("Could not recover the job graph for " + jobId + '.', e); } @@ -304,12 +290,6 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa fatalErrorHandler.onFatalError(throwable); } - private void validateLeaderSessionId(UUID leaderSessionID) throws LeaderIdMismatchException { - if (this.leaderSessionId == null || !this.leaderSessionId.equals(leaderSessionID)) { - throw new LeaderIdMismatchException(this.leaderSessionId, leaderSessionID); - } - } - protected abstract JobManagerRunner createJobManagerRunner( ResourceID resourceId, JobGraph jobGraph, @@ -333,16 +313,18 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa */ @Override public void grantLeadership(final UUID newLeaderSessionID) { - runAsync( + runAsyncWithoutFencing( () -> { - log.info("Dispatcher {} was granted leadership with leader session ID {}", getAddress(), newLeaderSessionID); + final DispatcherId dispatcherId = new DispatcherId(newLeaderSessionID); + + log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(), dispatcherId); // clear the state if we've been the leader before - if (leaderSessionId != null) { + if (getFencingToken() != null) { clearState(); } - leaderSessionId = newLeaderSessionID; + setFencingToken(dispatcherId); // confirming the leader session ID might be blocking, getRpcService().execute( @@ -357,10 +339,12 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa */ @Override public void revokeLeadership() { - runAsync( + runAsyncWithoutFencing( () -> { log.info("Dispatcher {} was revoked leadership.", getAddress()); clearState(); + + setFencingToken(DispatcherId.generate()); }); } http://git-wip-us.apache.org/repos/asf/flink/blob/84c2a928/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java index 669f616..09254c3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java @@ -22,29 +22,26 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.FencedRpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; import java.util.Collection; -import java.util.UUID; import java.util.concurrent.CompletableFuture; /** * Gateway for the Dispatcher component. */ -public interface DispatcherGateway extends RpcGateway { +public interface DispatcherGateway extends FencedRpcGateway<DispatcherId> { /** * Submit a job to the dispatcher. * * @param jobGraph JobGraph to submit - * @param leaderSessionId leader session id * @param timeout RPC timeout * @return A future acknowledge if the submission succeeded */ CompletableFuture<Acknowledge> submitJob( JobGraph jobGraph, - UUID leaderSessionId, @RpcTimeout Time timeout); /** http://git-wip-us.apache.org/repos/asf/flink/blob/84c2a928/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java new file mode 100644 index 0000000..e563090 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.util.AbstractID; + +import java.util.UUID; + +/** + * Fencing token of the {@link Dispatcher}. + */ +public class DispatcherId extends AbstractID { + + private static final long serialVersionUID = -1654056277003743966L; + + public DispatcherId(byte[] bytes) { + super(bytes); + } + + public DispatcherId(long lowerPart, long upperPart) { + super(lowerPart, upperPart); + } + + public DispatcherId(AbstractID id) { + super(id); + } + + public DispatcherId() {} + + public DispatcherId(UUID uuid) { + this(uuid.getLeastSignificantBits(), uuid.getMostSignificantBits()); + } + + public UUID toUUID() { + return new UUID(getUpperPart(), getLowerPart()); + } + + public static DispatcherId generate() { + return new DispatcherId(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/84c2a928/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 091608c..8846686 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -113,7 +113,7 @@ public class DispatcherTest extends TestLogger { DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); - CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, HighAvailabilityServices.DEFAULT_LEADER_ID, timeout); + CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, timeout); acknowledgeFuture.get();