Repository: flink Updated Branches: refs/heads/master a86b64686 -> 42cc3a2a9
[FLINK-7655] [flip6] Set fencing token to null if not leader This commit changes the fencing behaviour such that a component which is not the leader will set its fencing token to null. This distinction allows to throw different exceptions depending on whether it is a token mismatch or whether the receiver has no fencing token set (== not being the leader). This closes #4689. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/42cc3a2a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/42cc3a2a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/42cc3a2a Branch: refs/heads/master Commit: 42cc3a2a9c41dda7cf338db36b45131db9150674 Parents: a86b646 Author: Till Rohrmann <[email protected]> Authored: Wed Sep 20 17:39:35 2017 +0200 Committer: Till Rohrmann <[email protected]> Committed: Fri Sep 22 00:59:07 2017 +0200 ---------------------------------------------------------------------- .../flink/runtime/dispatcher/Dispatcher.java | 5 +- .../flink/runtime/jobmaster/JobMaster.java | 10 +-- .../flink/runtime/jobmaster/JobMasterId.java | 2 - .../resourcemanager/ResourceManager.java | 8 +-- .../flink/runtime/rpc/FencedRpcEndpoint.java | 16 +++-- .../runtime/rpc/akka/FencedAkkaRpcActor.java | 37 ++++++---- .../rpc/exceptions/FencingTokenException.java | 42 +++++++++++ .../FencingTokenMismatchException.java | 42 ----------- .../rpc/messages/LocalFencedMessage.java | 6 +- .../rpc/messages/RemoteFencedMessage.java | 6 +- .../resourcemanager/ResourceManagerHATest.java | 2 +- .../ResourceManagerJobMasterTest.java | 20 +++++- .../ResourceManagerTaskExecutorTest.java | 4 +- .../flink/runtime/rpc/AsyncCallsTest.java | 15 +++- .../runtime/rpc/FencedRpcEndpointTest.java | 75 ++++++++++++-------- 15 files changed, 171 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 6b9999c..153ee53 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -94,7 +94,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler, Optional<String> restAddress) throws Exception { - super(rpcService, endpointId, DispatcherId.generate()); + super(rpcService, endpointId); this.configuration = Preconditions.checkNotNull(configuration); this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices); @@ -399,7 +399,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme log.warn("Could not properly clear the Dispatcher state while revoking leadership.", e); } - setFencingToken(DispatcherId.generate()); + // clear the fencing token indicating that we don't have the leadership right now + setFencingToken(null); }); } http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 2bfe277..343fbf6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -212,7 +212,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast FatalErrorHandler errorHandler, ClassLoader userCodeLoader) throws Exception { - super(rpcService, AkkaRpcServiceUtils.createRandomName(JobMaster.JOB_MANAGER_NAME), JobMasterId.INITIAL_JOB_MASTER_ID); + super(rpcService, AkkaRpcServiceUtils.createRandomName(JobMaster.JOB_MANAGER_NAME)); selfGateway = getSelfGateway(JobMasterGateway.class); @@ -735,7 +735,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast return Acknowledge.get(); } - if (!Objects.equals(getFencingToken(), JobMasterId.INITIAL_JOB_MASTER_ID)) { + if (getFencingToken() != null) { log.info("Restarting old job with JobMasterId {}. The new JobMasterId is {}.", getFencingToken(), newJobMasterId); // first we have to suspend the current execution @@ -791,13 +791,13 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast private Acknowledge suspendExecution(final Throwable cause) { validateRunsInMainThread(); - if (Objects.equals(JobMasterId.INITIAL_JOB_MASTER_ID, getFencingToken())) { + if (getFencingToken() == null) { log.debug("Job has already been suspended or shutdown."); return Acknowledge.get(); } - // not leader anymore --> set the JobMasterId to the initial id - setFencingToken(JobMasterId.INITIAL_JOB_MASTER_ID); + // not leader anymore --> set the JobMasterId to null + setFencingToken(null); try { resourceManagerLeaderRetriever.stop(); http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java index ffd53b3..39f7ded 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java @@ -29,8 +29,6 @@ public class JobMasterId extends AbstractID { private static final long serialVersionUID = -933276753644003754L; - public static final JobMasterId INITIAL_JOB_MASTER_ID = new JobMasterId(0L, 0L); - public JobMasterId(byte[] bytes) { super(bytes); } http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 87cf7d1..f69998c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -139,7 +139,7 @@ public abstract class ResourceManager<WorkerType extends Serializable> JobLeaderIdService jobLeaderIdService, FatalErrorHandler fatalErrorHandler) { - super(rpcService, resourceManagerEndpointId, ResourceManagerId.generate()); + super(rpcService, resourceManagerEndpointId); this.resourceId = checkNotNull(resourceId); this.resourceManagerConfiguration = checkNotNull(resourceManagerConfiguration); @@ -772,13 +772,11 @@ public abstract class ResourceManager<WorkerType extends Serializable> public void revokeLeadership() { runAsyncWithoutFencing( () -> { - final ResourceManagerId newResourceManagerId = ResourceManagerId.generate(); - - log.info("ResourceManager {} was revoked leadership. Setting fencing token to {}.", getAddress(), newResourceManagerId); + log.info("ResourceManager {} was revoked leadership. Clearing fencing token.", getAddress()); clearState(); - setFencingToken(newResourceManagerId); + setFencingToken(null); slotManager.suspend(); }); http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java index 81bae29..ff74f47 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java @@ -19,7 +19,8 @@ package org.apache.flink.runtime.rpc; import org.apache.flink.api.common.time.Time; -import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; import java.io.Serializable; import java.util.UUID; @@ -39,25 +40,26 @@ public class FencedRpcEndpoint<F extends Serializable> extends RpcEndpoint { private volatile F fencingToken; private volatile MainThreadExecutor fencedMainThreadExecutor; - protected FencedRpcEndpoint(RpcService rpcService, String endpointId, F initialFencingToken) { + protected FencedRpcEndpoint(RpcService rpcService, String endpointId) { super(rpcService, endpointId); - this.fencingToken = Preconditions.checkNotNull(initialFencingToken); + // no fencing token == no leadership + this.fencingToken = null; this.fencedMainThreadExecutor = new MainThreadExecutor( getRpcService().fenceRpcServer( rpcServer, - initialFencingToken)); + null)); } - protected FencedRpcEndpoint(RpcService rpcService, F initialFencingToken) { - this(rpcService, UUID.randomUUID().toString(), initialFencingToken); + protected FencedRpcEndpoint(RpcService rpcService) { + this(rpcService, UUID.randomUUID().toString()); } public F getFencingToken() { return fencingToken; } - protected void setFencingToken(F newFencingToken) { + protected void setFencingToken(@Nullable F newFencingToken) { // this method should only be called from within the main thread validateRunsInMainThread(); http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java index 1ace3b7..369af6a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.rpc.akka; import org.apache.flink.runtime.rpc.FencedRpcEndpoint; import org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException; -import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException; +import org.apache.flink.runtime.rpc.exceptions.FencingTokenException; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.messages.FencedMessage; import org.apache.flink.runtime.rpc.messages.UnfencedMessage; @@ -45,23 +45,36 @@ public class FencedAkkaRpcActor<F extends Serializable, T extends FencedRpcEndpo @Override protected void handleMessage(Object message) { if (message instanceof FencedMessage) { - @SuppressWarnings("unchecked") - FencedMessage<F, ?> fencedMessage = ((FencedMessage<F, ?>) message); - F fencingToken = fencedMessage.getFencingToken(); + final F expectedFencingToken = rpcEndpoint.getFencingToken(); - if (Objects.equals(rpcEndpoint.getFencingToken(), fencingToken)) { - super.handleMessage(fencedMessage.getPayload()); - } else { + if (expectedFencingToken == null) { if (log.isDebugEnabled()) { - log.debug("Fencing token mismatch: Ignoring message {} because the fencing token {} did " + - "not match the expected fencing token {}.", message, fencingToken, rpcEndpoint.getFencingToken()); + log.debug("Fencing token not set: Ignoring message {} because the fencing token is null.", message); } sendErrorIfSender( - new FencingTokenMismatchException("Fencing token mismatch: Ignoring message " + message + - " because the fencing token " + fencingToken + " did not match the expected fencing token " + - rpcEndpoint.getFencingToken() + '.')); + new FencingTokenException( + "Fencing token not set: Ignoring message " + message + " because the fencing token is null.")); + } else { + @SuppressWarnings("unchecked") + FencedMessage<F, ?> fencedMessage = ((FencedMessage<F, ?>) message); + + F fencingToken = fencedMessage.getFencingToken(); + + if (Objects.equals(expectedFencingToken, fencingToken)) { + super.handleMessage(fencedMessage.getPayload()); + } else { + if (log.isDebugEnabled()) { + log.debug("Fencing token mismatch: Ignoring message {} because the fencing token {} did " + + "not match the expected fencing token {}.", message, fencingToken, expectedFencingToken); + } + + sendErrorIfSender( + new FencingTokenException("Fencing token mismatch: Ignoring message " + message + + " because the fencing token " + fencingToken + " did not match the expected fencing token " + + expectedFencingToken + '.')); + } } } else if (message instanceof UnfencedMessage) { super.handleMessage(((UnfencedMessage<?>) message).getPayload()); http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenException.java new file mode 100644 index 0000000..71520c8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.exceptions; + +import org.apache.flink.runtime.rpc.FencedRpcEndpoint; +import org.apache.flink.runtime.rpc.exceptions.RpcException; + +/** + * Exception which is thrown if the fencing tokens of a {@link FencedRpcEndpoint} do + * not match. + */ +public class FencingTokenException extends RpcException { + private static final long serialVersionUID = -500634972988881467L; + + public FencingTokenException(String message) { + super(message); + } + + public FencingTokenException(String message, Throwable cause) { + super(message, cause); + } + + public FencingTokenException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenMismatchException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenMismatchException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenMismatchException.java deleted file mode 100644 index 9a59101..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenMismatchException.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.exceptions; - -import org.apache.flink.runtime.rpc.FencedRpcEndpoint; -import org.apache.flink.runtime.rpc.exceptions.RpcException; - -/** - * Exception which is thrown if the fencing tokens of a {@link FencedRpcEndpoint} do - * not match. - */ -public class FencingTokenMismatchException extends RpcException { - private static final long serialVersionUID = -500634972988881467L; - - public FencingTokenMismatchException(String message) { - super(message); - } - - public FencingTokenMismatchException(String message, Throwable cause) { - super(message, cause); - } - - public FencingTokenMismatchException(Throwable cause) { - super(cause); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalFencedMessage.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalFencedMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalFencedMessage.java index 2481065..0ee4940 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalFencedMessage.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalFencedMessage.java @@ -20,6 +20,8 @@ package org.apache.flink.runtime.rpc.messages; import org.apache.flink.util.Preconditions; +import javax.annotation.Nullable; + import java.io.Serializable; /** @@ -34,8 +36,8 @@ public class LocalFencedMessage<F extends Serializable, P> implements FencedMess private final F fencingToken; private final P payload; - public LocalFencedMessage(F fencingToken, P payload) { - this.fencingToken = Preconditions.checkNotNull(fencingToken); + public LocalFencedMessage(@Nullable F fencingToken, P payload) { + this.fencingToken = fencingToken; this.payload = Preconditions.checkNotNull(payload); } http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteFencedMessage.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteFencedMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteFencedMessage.java index 5cf9b98..ad8c349 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteFencedMessage.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteFencedMessage.java @@ -20,6 +20,8 @@ package org.apache.flink.runtime.rpc.messages; import org.apache.flink.util.Preconditions; +import javax.annotation.Nullable; + import java.io.Serializable; /** @@ -35,8 +37,8 @@ public class RemoteFencedMessage<F extends Serializable, P extends Serializable> private final F fencingToken; private final P payload; - public RemoteFencedMessage(F fencingToken, P payload) { - this.fencingToken = Preconditions.checkNotNull(fencingToken); + public RemoteFencedMessage(@Nullable F fencingToken, P payload) { + this.fencingToken = fencingToken; this.payload = Preconditions.checkNotNull(payload); } http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java index 2b8792b..d0dd973 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java @@ -108,7 +108,7 @@ public class ResourceManagerHATest extends TestLogger { try { resourceManager.start(); - Assert.assertNotNull(resourceManager.getFencingToken()); + Assert.assertNull(resourceManager.getFencingToken()); final UUID leaderId = UUID.randomUUID(); leaderElectionService.isLeader(leaderId); // after grant leadership, resourceManager's leaderId has value http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java index 156bc73..73c5b5c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java @@ -38,7 +38,7 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.registration.RegistrationResponse; -import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException; +import org.apache.flink.runtime.rpc.exceptions.FencingTokenException; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.util.ExceptionUtils; @@ -47,6 +47,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -81,10 +82,14 @@ public class ResourceManagerJobMasterTest extends TestLogger { JobMasterId jobMasterId = JobMasterId.generate(); final ResourceID jmResourceId = new ResourceID(jobMasterAddress); TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jobMasterId.toUUID()); + TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService(); TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); - final ResourceManager<?> resourceManager = createAndStartResourceManager(mock(LeaderElectionService.class), jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler); + final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler); final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class); + // wait until the leader election has been completed + resourceManagerLeaderElectionService.isLeader(UUID.randomUUID()).get(); + // test response successful CompletableFuture<RegistrationResponse> successfulFuture = rmGateway.registerJobManager( jobMasterId, @@ -127,7 +132,7 @@ public class ResourceManagerJobMasterTest extends TestLogger { unMatchedLeaderFuture.get(5L, TimeUnit.SECONDS); fail("Should fail because we are using the wrong fencing token."); } catch (ExecutionException e) { - assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException); + assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException); } if (testingFatalErrorHandler.hasExceptionOccurred()) { @@ -151,6 +156,9 @@ public class ResourceManagerJobMasterTest extends TestLogger { final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class); final ResourceID jmResourceId = new ResourceID(jobMasterAddress); + // wait until the leader election has been completed + resourceManagerLeaderElectionService.isLeader(UUID.randomUUID()).get(); + // test throw exception when receive a registration from job master which takes unmatched leaderSessionId JobMasterId differentJobMasterId = JobMasterId.generate(); CompletableFuture<RegistrationResponse> unMatchedLeaderFuture = rmGateway.registerJobManager( @@ -182,6 +190,9 @@ public class ResourceManagerJobMasterTest extends TestLogger { final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class); final ResourceID jmResourceId = new ResourceID(jobMasterAddress); + // wait until the leader election has been completed + resourceManagerLeaderElectionService.isLeader(UUID.randomUUID()).get(); + // test throw exception when receive a registration from job master which takes invalid address String invalidAddress = "/jobMasterAddress2"; CompletableFuture<RegistrationResponse> invalidAddressFuture = rmGateway.registerJobManager( @@ -219,6 +230,9 @@ public class ResourceManagerJobMasterTest extends TestLogger { JobID unknownJobIDToHAServices = new JobID(); + // wait until the leader election has been completed + resourceManagerLeaderElectionService.isLeader(UUID.randomUUID()).get(); + // this should fail because we try to register a job leader listener for an unknown job id CompletableFuture<RegistrationResponse> registrationFuture = rmGateway.registerJobManager( new JobMasterId(HighAvailabilityServices.DEFAULT_LEADER_ID), http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java index 8add168..0206ade 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java @@ -30,7 +30,7 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.registration.RegistrationResponse; -import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException; +import org.apache.flink.runtime.rpc.exceptions.FencingTokenException; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; @@ -134,7 +134,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger { unMatchedLeaderFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); fail("Should have failed because we are using a wrongly fenced ResourceManagerGateway."); } catch (ExecutionException e) { - assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException); + assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException); } } finally { if (testingFatalErrorHandler.hasExceptionOccurred()) { http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java index f8eca16..9fe9904 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java @@ -25,7 +25,7 @@ import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; -import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException; +import org.apache.flink.runtime.rpc.exceptions.FencingTokenException; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; @@ -231,7 +231,7 @@ public class AsyncCallsTest extends TestLogger { fail("The async call operation should fail due to the changed fencing token."); } catch (ExecutionException e) { - assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException); + assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException); } } @@ -346,10 +346,19 @@ public class AsyncCallsTest extends TestLogger { UUID initialFencingToken, OneShotLatch enteringSetNewFencingToken, OneShotLatch triggerSetNewFencingToken) { - super(rpcService, initialFencingToken); + super(rpcService); this.enteringSetNewFencingToken = enteringSetNewFencingToken; this.triggerSetNewFencingToken = triggerSetNewFencingToken; + + // make it look as if we are running in the main thread + currentMainThread.set(Thread.currentThread()); + + try { + setFencingToken(initialFencingToken); + } finally { + currentMainThread.set(null); + } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java index 62d5354..6162a2d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.concurrent.FlinkFutureException; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException; +import org.apache.flink.runtime.rpc.exceptions.FencingTokenException; import org.apache.flink.runtime.rpc.exceptions.RpcException; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; @@ -39,6 +39,7 @@ import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -67,17 +68,15 @@ public class FencedRpcEndpointTest extends TestLogger { */ @Test public void testFencingTokenSetting() throws Exception { - final UUID initialFencingToken = UUID.randomUUID(); final String value = "foobar"; - FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, initialFencingToken, value); - FencedTestingGateway fencedTestingGateway = fencedTestingEndpoint.getSelfGateway(FencedTestingGateway.class); + FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, value); FencedTestingGateway fencedGateway = fencedTestingEndpoint.getSelfGateway(FencedTestingGateway.class); try { fencedTestingEndpoint.start(); - assertEquals(initialFencingToken, fencedGateway.getFencingToken()); - assertEquals(initialFencingToken, fencedTestingEndpoint.getFencingToken()); + assertNull(fencedGateway.getFencingToken()); + assertNull(fencedTestingEndpoint.getFencingToken()); final UUID newFencingToken = UUID.randomUUID(); @@ -88,9 +87,9 @@ public class FencedRpcEndpointTest extends TestLogger { // expected to fail } - assertEquals(initialFencingToken, fencedTestingEndpoint.getFencingToken()); + assertNull(fencedTestingEndpoint.getFencingToken()); - CompletableFuture<Acknowledge> setFencingFuture = fencedTestingGateway.rpcSetFencingToken(newFencingToken, timeout); + CompletableFuture<Acknowledge> setFencingFuture = fencedTestingEndpoint.setFencingTokenInMainThread(newFencingToken, timeout); // wait for the completion of the set fencing token operation setFencingFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); @@ -109,15 +108,15 @@ public class FencedRpcEndpointTest extends TestLogger { */ @Test public void testFencing() throws Exception { - final UUID initialFencingToken = UUID.randomUUID(); + final UUID fencingToken = UUID.randomUUID(); final UUID wrongFencingToken = UUID.randomUUID(); final String value = "barfoo"; - FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, initialFencingToken, value); + FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, value, fencingToken); try { fencedTestingEndpoint.start(); - final FencedTestingGateway properFencedGateway = rpcService.connect(fencedTestingEndpoint.getAddress(), initialFencingToken, FencedTestingGateway.class) + final FencedTestingGateway properFencedGateway = rpcService.connect(fencedTestingEndpoint.getAddress(), fencingToken, FencedTestingGateway.class) .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); final FencedTestingGateway wronglyFencedGateway = rpcService.connect(fencedTestingEndpoint.getAddress(), wrongFencingToken, FencedTestingGateway.class) .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); @@ -128,12 +127,12 @@ public class FencedRpcEndpointTest extends TestLogger { wronglyFencedGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); fail("This should fail since we have the wrong fencing token."); } catch (ExecutionException e) { - assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException); + assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException); } final UUID newFencingToken = UUID.randomUUID(); - CompletableFuture<Acknowledge> newFencingTokenFuture = properFencedGateway.rpcSetFencingToken(newFencingToken, timeout); + CompletableFuture<Acknowledge> newFencingTokenFuture = fencedTestingEndpoint.setFencingTokenInMainThread(newFencingToken, timeout); // wait for the new fencing token to be set newFencingTokenFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); @@ -144,7 +143,7 @@ public class FencedRpcEndpointTest extends TestLogger { fail("This should fail since we have the wrong fencing token by now."); } catch (ExecutionException e) { - assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException); + assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException); } } finally { @@ -163,7 +162,7 @@ public class FencedRpcEndpointTest extends TestLogger { final UUID newFencingToken = UUID.randomUUID(); final String value = "foobar"; - final FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, initialFencingToken, value); + final FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, value, initialFencingToken); try { fencedTestingEndpoint.start(); @@ -178,7 +177,7 @@ public class FencedRpcEndpointTest extends TestLogger { assertEquals(value, selfGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS)); assertEquals(value, remoteGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS)); - CompletableFuture<Acknowledge> newFencingTokenFuture = selfGateway.rpcSetFencingToken(newFencingToken, timeout); + CompletableFuture<Acknowledge> newFencingTokenFuture = fencedTestingEndpoint.setFencingTokenInMainThread(newFencingToken, timeout); // wait for the new fencing token to be set newFencingTokenFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); @@ -192,7 +191,7 @@ public class FencedRpcEndpointTest extends TestLogger { remoteGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); fail("This should have failed because we don't have the right fencing token."); } catch (ExecutionException e) { - assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException); + assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException); } } finally { fencedTestingEndpoint.shutDown(); @@ -208,7 +207,7 @@ public class FencedRpcEndpointTest extends TestLogger { final Time shortTimeout = Time.milliseconds(100L); final UUID initialFencingToken = UUID.randomUUID(); final String value = "foobar"; - final FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, initialFencingToken, value); + final FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, value, initialFencingToken); try { fencedTestingEndpoint.start(); @@ -221,7 +220,7 @@ public class FencedRpcEndpointTest extends TestLogger { // therefore, we know that the change fencing token call is executed after the trigger MainThreadExecutor // computation final UUID newFencingToken = UUID.randomUUID(); - CompletableFuture<Acknowledge> newFencingTokenFuture = selfGateway.rpcSetFencingToken(newFencingToken, timeout); + CompletableFuture<Acknowledge> newFencingTokenFuture = fencedTestingEndpoint.setFencingTokenInMainThread(newFencingToken, timeout); newFencingTokenFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); @@ -253,7 +252,7 @@ public class FencedRpcEndpointTest extends TestLogger { final UUID initialFencingToken = UUID.randomUUID(); final String value = "foobar"; - final FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, initialFencingToken, value); + final FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, value, initialFencingToken); try { fencedTestingEndpoint.start(); @@ -283,8 +282,6 @@ public class FencedRpcEndpointTest extends TestLogger { public interface FencedTestingGateway extends FencedRpcGateway<UUID> { CompletableFuture<String> foobar(@RpcTimeout Time timeout); - CompletableFuture<Acknowledge> rpcSetFencingToken(UUID fencingToken, @RpcTimeout Time timeout); - CompletableFuture<Acknowledge> triggerMainThreadExecutorComputation(@RpcTimeout Time timeout); CompletableFuture<Acknowledge> triggerComputationLatch(@RpcTimeout Time timeout); @@ -296,12 +293,25 @@ public class FencedRpcEndpointTest extends TestLogger { private final String value; - protected FencedTestingEndpoint(RpcService rpcService, UUID initialFencingToken, String value) { - super(rpcService, initialFencingToken); + protected FencedTestingEndpoint(RpcService rpcService, String value) { + this(rpcService, value, null); + } + + protected FencedTestingEndpoint(RpcService rpcService, String value, UUID initialFencingToken) { + super(rpcService); computationLatch = new OneShotLatch(); this.value = value; + + // make sure that it looks as if we are running in the main thread + currentMainThread.set(Thread.currentThread()); + + try { + setFencingToken(initialFencingToken); + } finally { + currentMainThread.set(null); + } } @Override @@ -310,13 +320,6 @@ public class FencedRpcEndpointTest extends TestLogger { } @Override - public CompletableFuture<Acknowledge> rpcSetFencingToken(UUID fencingToken, Time timeout) { - setFencingToken(fencingToken); - - return CompletableFuture.completedFuture(Acknowledge.get()); - } - - @Override public CompletableFuture<Acknowledge> triggerMainThreadExecutorComputation(Time timeout) { return CompletableFuture.supplyAsync( () -> { @@ -340,5 +343,15 @@ public class FencedRpcEndpointTest extends TestLogger { return CompletableFuture.completedFuture(Acknowledge.get()); } + + public CompletableFuture<Acknowledge> setFencingTokenInMainThread(UUID fencingToken, Time timeout) { + return callAsyncWithoutFencing( + () -> { + setFencingToken(fencingToken); + + return Acknowledge.get(); + }, + timeout); + } } }
