Repository: flink
Updated Branches:
  refs/heads/master a86b64686 -> 42cc3a2a9


[FLINK-7655] [flip6] Set fencing token to null if not leader

This commit changes the fencing behaviour such that a component which is not the
leader will set its fencing token to null. This distinction allows to throw 
different
exceptions depending on whether it is a token mismatch or whether the receiver 
has
no fencing token set (== not being the leader).

This closes #4689.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/42cc3a2a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/42cc3a2a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/42cc3a2a

Branch: refs/heads/master
Commit: 42cc3a2a9c41dda7cf338db36b45131db9150674
Parents: a86b646
Author: Till Rohrmann <[email protected]>
Authored: Wed Sep 20 17:39:35 2017 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Fri Sep 22 00:59:07 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    |  5 +-
 .../flink/runtime/jobmaster/JobMaster.java      | 10 +--
 .../flink/runtime/jobmaster/JobMasterId.java    |  2 -
 .../resourcemanager/ResourceManager.java        |  8 +--
 .../flink/runtime/rpc/FencedRpcEndpoint.java    | 16 +++--
 .../runtime/rpc/akka/FencedAkkaRpcActor.java    | 37 ++++++----
 .../rpc/exceptions/FencingTokenException.java   | 42 +++++++++++
 .../FencingTokenMismatchException.java          | 42 -----------
 .../rpc/messages/LocalFencedMessage.java        |  6 +-
 .../rpc/messages/RemoteFencedMessage.java       |  6 +-
 .../resourcemanager/ResourceManagerHATest.java  |  2 +-
 .../ResourceManagerJobMasterTest.java           | 20 +++++-
 .../ResourceManagerTaskExecutorTest.java        |  4 +-
 .../flink/runtime/rpc/AsyncCallsTest.java       | 15 +++-
 .../runtime/rpc/FencedRpcEndpointTest.java      | 75 ++++++++++++--------
 15 files changed, 171 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 6b9999c..153ee53 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -94,7 +94,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                        MetricRegistry metricRegistry,
                        FatalErrorHandler fatalErrorHandler,
                        Optional<String> restAddress) throws Exception {
-               super(rpcService, endpointId, DispatcherId.generate());
+               super(rpcService, endpointId);
 
                this.configuration = Preconditions.checkNotNull(configuration);
                this.highAvailabilityServices = 
Preconditions.checkNotNull(highAvailabilityServices);
@@ -399,7 +399,8 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                                        log.warn("Could not properly clear the 
Dispatcher state while revoking leadership.", e);
                                }
 
-                               setFencingToken(DispatcherId.generate());
+                               // clear the fencing token indicating that we 
don't have the leadership right now
+                               setFencingToken(null);
                        });
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 2bfe277..343fbf6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -212,7 +212,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                        FatalErrorHandler errorHandler,
                        ClassLoader userCodeLoader) throws Exception {
 
-               super(rpcService, 
AkkaRpcServiceUtils.createRandomName(JobMaster.JOB_MANAGER_NAME), 
JobMasterId.INITIAL_JOB_MASTER_ID);
+               super(rpcService, 
AkkaRpcServiceUtils.createRandomName(JobMaster.JOB_MANAGER_NAME));
 
                selfGateway = getSelfGateway(JobMasterGateway.class);
 
@@ -735,7 +735,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                        return Acknowledge.get();
                }
 
-               if (!Objects.equals(getFencingToken(), 
JobMasterId.INITIAL_JOB_MASTER_ID)) {
+               if (getFencingToken() != null) {
                        log.info("Restarting old job with JobMasterId {}. The 
new JobMasterId is {}.", getFencingToken(), newJobMasterId);
 
                        // first we have to suspend the current execution
@@ -791,13 +791,13 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
        private Acknowledge suspendExecution(final Throwable cause) {
                validateRunsInMainThread();
 
-               if (Objects.equals(JobMasterId.INITIAL_JOB_MASTER_ID, 
getFencingToken())) {
+               if (getFencingToken() == null) {
                        log.debug("Job has already been suspended or 
shutdown.");
                        return Acknowledge.get();
                }
 
-               // not leader anymore --> set the JobMasterId to the initial id
-               setFencingToken(JobMasterId.INITIAL_JOB_MASTER_ID);
+               // not leader anymore --> set the JobMasterId to null
+               setFencingToken(null);
 
                try {
                        resourceManagerLeaderRetriever.stop();

http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java
index ffd53b3..39f7ded 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java
@@ -29,8 +29,6 @@ public class JobMasterId extends AbstractID {
 
        private static final long serialVersionUID = -933276753644003754L;
 
-       public static final JobMasterId INITIAL_JOB_MASTER_ID = new 
JobMasterId(0L, 0L);
-
        public JobMasterId(byte[] bytes) {
                super(bytes);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 87cf7d1..f69998c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -139,7 +139,7 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                        JobLeaderIdService jobLeaderIdService,
                        FatalErrorHandler fatalErrorHandler) {
 
-               super(rpcService, resourceManagerEndpointId, 
ResourceManagerId.generate());
+               super(rpcService, resourceManagerEndpointId);
 
                this.resourceId = checkNotNull(resourceId);
                this.resourceManagerConfiguration = 
checkNotNull(resourceManagerConfiguration);
@@ -772,13 +772,11 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
        public void revokeLeadership() {
                runAsyncWithoutFencing(
                        () -> {
-                               final ResourceManagerId newResourceManagerId = 
ResourceManagerId.generate();
-
-                               log.info("ResourceManager {} was revoked 
leadership. Setting fencing token to {}.", getAddress(), newResourceManagerId);
+                               log.info("ResourceManager {} was revoked 
leadership. Clearing fencing token.", getAddress());
 
                                clearState();
 
-                               setFencingToken(newResourceManagerId);
+                               setFencingToken(null);
 
                                slotManager.suspend();
                        });

http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
index 81bae29..ff74f47 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
@@ -19,7 +19,8 @@
 package org.apache.flink.runtime.rpc;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
 
 import java.io.Serializable;
 import java.util.UUID;
@@ -39,25 +40,26 @@ public class FencedRpcEndpoint<F extends Serializable> 
extends RpcEndpoint {
        private volatile F fencingToken;
        private volatile MainThreadExecutor fencedMainThreadExecutor;
 
-       protected FencedRpcEndpoint(RpcService rpcService, String endpointId, F 
initialFencingToken) {
+       protected FencedRpcEndpoint(RpcService rpcService, String endpointId) {
                super(rpcService, endpointId);
 
-               this.fencingToken = 
Preconditions.checkNotNull(initialFencingToken);
+               // no fencing token == no leadership
+               this.fencingToken = null;
                this.fencedMainThreadExecutor = new MainThreadExecutor(
                        getRpcService().fenceRpcServer(
                                rpcServer,
-                               initialFencingToken));
+                               null));
        }
 
-       protected FencedRpcEndpoint(RpcService rpcService, F 
initialFencingToken) {
-               this(rpcService, UUID.randomUUID().toString(), 
initialFencingToken);
+       protected FencedRpcEndpoint(RpcService rpcService) {
+               this(rpcService, UUID.randomUUID().toString());
        }
 
        public F getFencingToken() {
                return fencingToken;
        }
 
-       protected void setFencingToken(F newFencingToken) {
+       protected void setFencingToken(@Nullable F newFencingToken) {
                // this method should only be called from within the main thread
                validateRunsInMainThread();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
index 1ace3b7..369af6a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.rpc.akka;
 
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import 
org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException;
-import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException;
+import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.messages.FencedMessage;
 import org.apache.flink.runtime.rpc.messages.UnfencedMessage;
@@ -45,23 +45,36 @@ public class FencedAkkaRpcActor<F extends Serializable, T 
extends FencedRpcEndpo
        @Override
        protected void handleMessage(Object message) {
                if (message instanceof FencedMessage) {
-                       @SuppressWarnings("unchecked")
-                       FencedMessage<F, ?> fencedMessage = ((FencedMessage<F, 
?>) message);
 
-                       F fencingToken = fencedMessage.getFencingToken();
+                       final F expectedFencingToken = 
rpcEndpoint.getFencingToken();
 
-                       if (Objects.equals(rpcEndpoint.getFencingToken(), 
fencingToken)) {
-                               super.handleMessage(fencedMessage.getPayload());
-                       } else {
+                       if (expectedFencingToken == null) {
                                if (log.isDebugEnabled()) {
-                                       log.debug("Fencing token mismatch: 
Ignoring message {} because the fencing token {} did " +
-                                               "not match the expected fencing 
token {}.", message, fencingToken, rpcEndpoint.getFencingToken());
+                                       log.debug("Fencing token not set: 
Ignoring message {} because the fencing token is null.", message);
                                }
 
                                sendErrorIfSender(
-                                       new 
FencingTokenMismatchException("Fencing token mismatch: Ignoring message " + 
message +
-                                               " because the fencing token " + 
fencingToken + " did not match the expected fencing token " +
-                                               rpcEndpoint.getFencingToken() + 
'.'));
+                                       new FencingTokenException(
+                                               "Fencing token not set: 
Ignoring message " + message + " because the fencing token is null."));
+                       } else {
+                               @SuppressWarnings("unchecked")
+                               FencedMessage<F, ?> fencedMessage = 
((FencedMessage<F, ?>) message);
+
+                               F fencingToken = 
fencedMessage.getFencingToken();
+
+                               if (Objects.equals(expectedFencingToken, 
fencingToken)) {
+                                       
super.handleMessage(fencedMessage.getPayload());
+                               } else {
+                                       if (log.isDebugEnabled()) {
+                                               log.debug("Fencing token 
mismatch: Ignoring message {} because the fencing token {} did " +
+                                                       "not match the expected 
fencing token {}.", message, fencingToken, expectedFencingToken);
+                                       }
+
+                                       sendErrorIfSender(
+                                               new 
FencingTokenException("Fencing token mismatch: Ignoring message " + message +
+                                                       " because the fencing 
token " + fencingToken + " did not match the expected fencing token " +
+                                                       expectedFencingToken + 
'.'));
+                               }
                        }
                } else if (message instanceof UnfencedMessage) {
                        super.handleMessage(((UnfencedMessage<?>) 
message).getPayload());

http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenException.java
new file mode 100644
index 0000000..71520c8
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.exceptions;
+
+import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
+import org.apache.flink.runtime.rpc.exceptions.RpcException;
+
+/**
+ * Exception which is thrown if the fencing tokens of a {@link 
FencedRpcEndpoint} do
+ * not match.
+ */
+public class FencingTokenException extends RpcException {
+       private static final long serialVersionUID = -500634972988881467L;
+
+       public FencingTokenException(String message) {
+               super(message);
+       }
+
+       public FencingTokenException(String message, Throwable cause) {
+               super(message, cause);
+       }
+
+       public FencingTokenException(Throwable cause) {
+               super(cause);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenMismatchException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenMismatchException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenMismatchException.java
deleted file mode 100644
index 9a59101..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenMismatchException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.exceptions;
-
-import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
-import org.apache.flink.runtime.rpc.exceptions.RpcException;
-
-/**
- * Exception which is thrown if the fencing tokens of a {@link 
FencedRpcEndpoint} do
- * not match.
- */
-public class FencingTokenMismatchException extends RpcException {
-       private static final long serialVersionUID = -500634972988881467L;
-
-       public FencingTokenMismatchException(String message) {
-               super(message);
-       }
-
-       public FencingTokenMismatchException(String message, Throwable cause) {
-               super(message, cause);
-       }
-
-       public FencingTokenMismatchException(Throwable cause) {
-               super(cause);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalFencedMessage.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalFencedMessage.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalFencedMessage.java
index 2481065..0ee4940 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalFencedMessage.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalFencedMessage.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.rpc.messages;
 
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 
 /**
@@ -34,8 +36,8 @@ public class LocalFencedMessage<F extends Serializable, P> 
implements FencedMess
        private final F fencingToken;
        private final P payload;
 
-       public LocalFencedMessage(F fencingToken, P payload) {
-               this.fencingToken = Preconditions.checkNotNull(fencingToken);
+       public LocalFencedMessage(@Nullable F fencingToken, P payload) {
+               this.fencingToken = fencingToken;
                this.payload = Preconditions.checkNotNull(payload);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteFencedMessage.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteFencedMessage.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteFencedMessage.java
index 5cf9b98..ad8c349 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteFencedMessage.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteFencedMessage.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.rpc.messages;
 
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 
 /**
@@ -35,8 +37,8 @@ public class RemoteFencedMessage<F extends Serializable, P 
extends Serializable>
        private final F fencingToken;
        private final P payload;
 
-       public RemoteFencedMessage(F fencingToken, P payload) {
-               this.fencingToken = Preconditions.checkNotNull(fencingToken);
+       public RemoteFencedMessage(@Nullable F fencingToken, P payload) {
+               this.fencingToken = fencingToken;
                this.payload = Preconditions.checkNotNull(payload);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 2b8792b..d0dd973 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -108,7 +108,7 @@ public class ResourceManagerHATest extends TestLogger {
                try {
                        resourceManager.start();
 
-                       Assert.assertNotNull(resourceManager.getFencingToken());
+                       Assert.assertNull(resourceManager.getFencingToken());
                        final UUID leaderId = UUID.randomUUID();
                        leaderElectionService.isLeader(leaderId);
                        // after grant leadership, resourceManager's leaderId 
has value

http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 156bc73..73c5b5c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -38,7 +38,7 @@ import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
-import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException;
+import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.ExceptionUtils;
@@ -47,6 +47,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -81,10 +82,14 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
                JobMasterId jobMasterId = JobMasterId.generate();
                final ResourceID jmResourceId = new 
ResourceID(jobMasterAddress);
                TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService(jobMasterAddress, jobMasterId.toUUID());
+               TestingLeaderElectionService 
resourceManagerLeaderElectionService = new TestingLeaderElectionService();
                TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
-               final ResourceManager<?> resourceManager = 
createAndStartResourceManager(mock(LeaderElectionService.class), jobID, 
jobMasterLeaderRetrievalService, testingFatalErrorHandler);
+               final ResourceManager<?> resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService, testingFatalErrorHandler);
                final ResourceManagerGateway rmGateway = 
resourceManager.getSelfGateway(ResourceManagerGateway.class);
 
+               // wait until the leader election has been completed
+               
resourceManagerLeaderElectionService.isLeader(UUID.randomUUID()).get();
+
                // test response successful
                CompletableFuture<RegistrationResponse> successfulFuture = 
rmGateway.registerJobManager(
                        jobMasterId,
@@ -127,7 +132,7 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
                        unMatchedLeaderFuture.get(5L, TimeUnit.SECONDS);
                        fail("Should fail because we are using the wrong 
fencing token.");
                } catch (ExecutionException e) {
-                       assertTrue(ExceptionUtils.stripExecutionException(e) 
instanceof FencingTokenMismatchException);
+                       assertTrue(ExceptionUtils.stripExecutionException(e) 
instanceof FencingTokenException);
                }
 
                if (testingFatalErrorHandler.hasExceptionOccurred()) {
@@ -151,6 +156,9 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
                final ResourceManagerGateway rmGateway = 
resourceManager.getSelfGateway(ResourceManagerGateway.class);
                final ResourceID jmResourceId = new 
ResourceID(jobMasterAddress);
 
+               // wait until the leader election has been completed
+               
resourceManagerLeaderElectionService.isLeader(UUID.randomUUID()).get();
+
                // test throw exception when receive a registration from job 
master which takes unmatched leaderSessionId
                JobMasterId differentJobMasterId = JobMasterId.generate();
                CompletableFuture<RegistrationResponse> unMatchedLeaderFuture = 
rmGateway.registerJobManager(
@@ -182,6 +190,9 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
                final ResourceManagerGateway rmGateway = 
resourceManager.getSelfGateway(ResourceManagerGateway.class);
                final ResourceID jmResourceId = new 
ResourceID(jobMasterAddress);
 
+               // wait until the leader election has been completed
+               
resourceManagerLeaderElectionService.isLeader(UUID.randomUUID()).get();
+
                // test throw exception when receive a registration from job 
master which takes invalid address
                String invalidAddress = "/jobMasterAddress2";
                CompletableFuture<RegistrationResponse> invalidAddressFuture = 
rmGateway.registerJobManager(
@@ -219,6 +230,9 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
 
                JobID unknownJobIDToHAServices = new JobID();
 
+               // wait until the leader election has been completed
+               
resourceManagerLeaderElectionService.isLeader(UUID.randomUUID()).get();
+
                // this should fail because we try to register a job leader 
listener for an unknown job id
                CompletableFuture<RegistrationResponse> registrationFuture = 
rmGateway.registerJobManager(
                        new 
JobMasterId(HighAvailabilityServices.DEFAULT_LEADER_ID),

http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 8add168..0206ade 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -30,7 +30,7 @@ import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
-import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException;
+import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
@@ -134,7 +134,7 @@ public class ResourceManagerTaskExecutorTest extends 
TestLogger {
                                
unMatchedLeaderFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
                                fail("Should have failed because we are using a 
wrongly fenced ResourceManagerGateway.");
                        } catch (ExecutionException e) {
-                               
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof 
FencingTokenMismatchException);
+                               
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof 
FencingTokenException);
                        }
                } finally {
                        if (testingFatalErrorHandler.hasExceptionOccurred()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
index f8eca16..9fe9904 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
-import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException;
+import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -231,7 +231,7 @@ public class AsyncCallsTest extends TestLogger {
 
                        fail("The async call operation should fail due to the 
changed fencing token.");
                } catch (ExecutionException e) {
-                       assertTrue(ExceptionUtils.stripExecutionException(e) 
instanceof FencingTokenMismatchException);
+                       assertTrue(ExceptionUtils.stripExecutionException(e) 
instanceof FencingTokenException);
                }
        }
 
@@ -346,10 +346,19 @@ public class AsyncCallsTest extends TestLogger {
                                UUID initialFencingToken,
                                OneShotLatch enteringSetNewFencingToken,
                                OneShotLatch triggerSetNewFencingToken) {
-                       super(rpcService, initialFencingToken);
+                       super(rpcService);
 
                        this.enteringSetNewFencingToken = 
enteringSetNewFencingToken;
                        this.triggerSetNewFencingToken = 
triggerSetNewFencingToken;
+
+                       // make it look as if we are running in the main thread
+                       currentMainThread.set(Thread.currentThread());
+
+                       try {
+                               setFencingToken(initialFencingToken);
+                       } finally {
+                               currentMainThread.set(null);
+                       }
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/42cc3a2a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
index 62d5354..6162a2d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException;
+import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
 import org.apache.flink.runtime.rpc.exceptions.RpcException;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
@@ -39,6 +39,7 @@ import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -67,17 +68,15 @@ public class FencedRpcEndpointTest extends TestLogger {
         */
        @Test
        public void testFencingTokenSetting() throws Exception {
-               final UUID initialFencingToken = UUID.randomUUID();
                final String value = "foobar";
-               FencedTestingEndpoint fencedTestingEndpoint = new 
FencedTestingEndpoint(rpcService, initialFencingToken, value);
-               FencedTestingGateway fencedTestingGateway = 
fencedTestingEndpoint.getSelfGateway(FencedTestingGateway.class);
+               FencedTestingEndpoint fencedTestingEndpoint = new 
FencedTestingEndpoint(rpcService, value);
                FencedTestingGateway fencedGateway = 
fencedTestingEndpoint.getSelfGateway(FencedTestingGateway.class);
 
                try {
                        fencedTestingEndpoint.start();
 
-                       assertEquals(initialFencingToken, 
fencedGateway.getFencingToken());
-                       assertEquals(initialFencingToken, 
fencedTestingEndpoint.getFencingToken());
+                       assertNull(fencedGateway.getFencingToken());
+                       assertNull(fencedTestingEndpoint.getFencingToken());
 
                        final UUID newFencingToken = UUID.randomUUID();
 
@@ -88,9 +87,9 @@ public class FencedRpcEndpointTest extends TestLogger {
                                // expected to fail
                        }
 
-                       assertEquals(initialFencingToken, 
fencedTestingEndpoint.getFencingToken());
+                       assertNull(fencedTestingEndpoint.getFencingToken());
 
-                       CompletableFuture<Acknowledge> setFencingFuture = 
fencedTestingGateway.rpcSetFencingToken(newFencingToken, timeout);
+                       CompletableFuture<Acknowledge> setFencingFuture = 
fencedTestingEndpoint.setFencingTokenInMainThread(newFencingToken, timeout);
 
                        // wait for the completion of the set fencing token 
operation
                        setFencingFuture.get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
@@ -109,15 +108,15 @@ public class FencedRpcEndpointTest extends TestLogger {
         */
        @Test
        public void testFencing() throws Exception {
-               final UUID initialFencingToken = UUID.randomUUID();
+               final UUID fencingToken = UUID.randomUUID();
                final UUID wrongFencingToken = UUID.randomUUID();
                final String value = "barfoo";
-               FencedTestingEndpoint fencedTestingEndpoint = new 
FencedTestingEndpoint(rpcService, initialFencingToken, value);
+               FencedTestingEndpoint fencedTestingEndpoint = new 
FencedTestingEndpoint(rpcService, value, fencingToken);
 
                try {
                        fencedTestingEndpoint.start();
 
-                       final FencedTestingGateway properFencedGateway = 
rpcService.connect(fencedTestingEndpoint.getAddress(), initialFencingToken, 
FencedTestingGateway.class)
+                       final FencedTestingGateway properFencedGateway = 
rpcService.connect(fencedTestingEndpoint.getAddress(), fencingToken, 
FencedTestingGateway.class)
                                .get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
                        final FencedTestingGateway wronglyFencedGateway = 
rpcService.connect(fencedTestingEndpoint.getAddress(), wrongFencingToken, 
FencedTestingGateway.class)
                                .get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
@@ -128,12 +127,12 @@ public class FencedRpcEndpointTest extends TestLogger {
                                
wronglyFencedGateway.foobar(timeout).get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
                                fail("This should fail since we have the wrong 
fencing token.");
                        } catch (ExecutionException e) {
-                               
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof 
FencingTokenMismatchException);
+                               
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof 
FencingTokenException);
                        }
 
                        final UUID newFencingToken = UUID.randomUUID();
 
-                       CompletableFuture<Acknowledge> newFencingTokenFuture = 
properFencedGateway.rpcSetFencingToken(newFencingToken, timeout);
+                       CompletableFuture<Acknowledge> newFencingTokenFuture = 
fencedTestingEndpoint.setFencingTokenInMainThread(newFencingToken, timeout);
 
                        // wait for the new fencing token to be set
                        newFencingTokenFuture.get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
@@ -144,7 +143,7 @@ public class FencedRpcEndpointTest extends TestLogger {
 
                                fail("This should fail since we have the wrong 
fencing token by now.");
                        } catch (ExecutionException e) {
-                               
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof 
FencingTokenMismatchException);
+                               
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof 
FencingTokenException);
                        }
 
                } finally {
@@ -163,7 +162,7 @@ public class FencedRpcEndpointTest extends TestLogger {
                final UUID newFencingToken = UUID.randomUUID();
                final String value = "foobar";
 
-               final FencedTestingEndpoint fencedTestingEndpoint = new 
FencedTestingEndpoint(rpcService, initialFencingToken, value);
+               final FencedTestingEndpoint fencedTestingEndpoint = new 
FencedTestingEndpoint(rpcService, value, initialFencingToken);
 
                try {
                        fencedTestingEndpoint.start();
@@ -178,7 +177,7 @@ public class FencedRpcEndpointTest extends TestLogger {
                        assertEquals(value, 
selfGateway.foobar(timeout).get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS));
                        assertEquals(value, 
remoteGateway.foobar(timeout).get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS));
 
-                       CompletableFuture<Acknowledge> newFencingTokenFuture = 
selfGateway.rpcSetFencingToken(newFencingToken, timeout);
+                       CompletableFuture<Acknowledge> newFencingTokenFuture = 
fencedTestingEndpoint.setFencingTokenInMainThread(newFencingToken, timeout);
 
                        // wait for the new fencing token to be set
                        newFencingTokenFuture.get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
@@ -192,7 +191,7 @@ public class FencedRpcEndpointTest extends TestLogger {
                                
remoteGateway.foobar(timeout).get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
                                fail("This should have failed because we don't 
have the right fencing token.");
                        } catch (ExecutionException e) {
-                               
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof 
FencingTokenMismatchException);
+                               
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof 
FencingTokenException);
                        }
                } finally {
                        fencedTestingEndpoint.shutDown();
@@ -208,7 +207,7 @@ public class FencedRpcEndpointTest extends TestLogger {
                final Time shortTimeout = Time.milliseconds(100L);
                final UUID initialFencingToken = UUID.randomUUID();
                final String value = "foobar";
-               final FencedTestingEndpoint fencedTestingEndpoint = new 
FencedTestingEndpoint(rpcService, initialFencingToken, value);
+               final FencedTestingEndpoint fencedTestingEndpoint = new 
FencedTestingEndpoint(rpcService, value, initialFencingToken);
 
                try {
                        fencedTestingEndpoint.start();
@@ -221,7 +220,7 @@ public class FencedRpcEndpointTest extends TestLogger {
                        // therefore, we know that the change fencing token 
call is executed after the trigger MainThreadExecutor
                        // computation
                        final UUID newFencingToken = UUID.randomUUID();
-                       CompletableFuture<Acknowledge> newFencingTokenFuture = 
selfGateway.rpcSetFencingToken(newFencingToken, timeout);
+                       CompletableFuture<Acknowledge> newFencingTokenFuture = 
fencedTestingEndpoint.setFencingTokenInMainThread(newFencingToken, timeout);
 
                        newFencingTokenFuture.get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
 
@@ -253,7 +252,7 @@ public class FencedRpcEndpointTest extends TestLogger {
                final UUID initialFencingToken = UUID.randomUUID();
                final String value = "foobar";
 
-               final FencedTestingEndpoint fencedTestingEndpoint = new 
FencedTestingEndpoint(rpcService, initialFencingToken, value);
+               final FencedTestingEndpoint fencedTestingEndpoint = new 
FencedTestingEndpoint(rpcService, value, initialFencingToken);
 
                try {
                        fencedTestingEndpoint.start();
@@ -283,8 +282,6 @@ public class FencedRpcEndpointTest extends TestLogger {
        public interface FencedTestingGateway extends FencedRpcGateway<UUID> {
                CompletableFuture<String> foobar(@RpcTimeout Time timeout);
 
-               CompletableFuture<Acknowledge> rpcSetFencingToken(UUID 
fencingToken, @RpcTimeout Time timeout);
-
                CompletableFuture<Acknowledge> 
triggerMainThreadExecutorComputation(@RpcTimeout Time timeout);
 
                CompletableFuture<Acknowledge> 
triggerComputationLatch(@RpcTimeout Time timeout);
@@ -296,12 +293,25 @@ public class FencedRpcEndpointTest extends TestLogger {
 
                private final String value;
 
-               protected FencedTestingEndpoint(RpcService rpcService, UUID 
initialFencingToken, String value) {
-                       super(rpcService, initialFencingToken);
+               protected FencedTestingEndpoint(RpcService rpcService, String 
value) {
+                       this(rpcService, value, null);
+               }
+
+               protected FencedTestingEndpoint(RpcService rpcService, String 
value, UUID initialFencingToken) {
+                       super(rpcService);
 
                        computationLatch = new OneShotLatch();
 
                        this.value = value;
+
+                       // make sure that it looks as if we are running in the 
main thread
+                       currentMainThread.set(Thread.currentThread());
+
+                       try {
+                               setFencingToken(initialFencingToken);
+                       } finally {
+                               currentMainThread.set(null);
+                       }
                }
 
                @Override
@@ -310,13 +320,6 @@ public class FencedRpcEndpointTest extends TestLogger {
                }
 
                @Override
-               public CompletableFuture<Acknowledge> rpcSetFencingToken(UUID 
fencingToken, Time timeout) {
-                       setFencingToken(fencingToken);
-
-                       return 
CompletableFuture.completedFuture(Acknowledge.get());
-               }
-
-               @Override
                public CompletableFuture<Acknowledge> 
triggerMainThreadExecutorComputation(Time timeout) {
                        return CompletableFuture.supplyAsync(
                                () -> {
@@ -340,5 +343,15 @@ public class FencedRpcEndpointTest extends TestLogger {
 
                        return 
CompletableFuture.completedFuture(Acknowledge.get());
                }
+
+               public CompletableFuture<Acknowledge> 
setFencingTokenInMainThread(UUID fencingToken, Time timeout) {
+                       return callAsyncWithoutFencing(
+                               () -> {
+                                       setFencingToken(fencingToken);
+
+                                       return Acknowledge.get();
+                               },
+                               timeout);
+               }
        }
 }

Reply via email to