[FLINK-4443] [rpc] Add support for rpc gateway and rpc endpoint inheritance
This commit extends the RpcCompletenessTest such that it can now check for inherited remote procedure calls. All methods defined at the RpcGateway are considered native. This means that they need no RpcEndpoint counterpart because they are implemented by the RpcGateway implementation. This closes #2401. update comments remove native method annotation add line break Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/762b5d41 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/762b5d41 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/762b5d41 Branch: refs/heads/flip-6 Commit: 762b5d414ddbbc983e5c0954ee2d521146248b31 Parents: 8fd8c99 Author: wenlong.lwl <wenlong....@alibaba-inc.com> Authored: Sun Aug 21 00:46:51 2016 +0800 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Wed Sep 21 11:39:15 2016 +0200 ---------------------------------------------------------------------- .../org/apache/flink/runtime/rpc/RpcMethod.java | 2 ++ .../TestingHighAvailabilityServices.java | 19 +++++++++++ .../flink/runtime/rpc/RpcCompletenessTest.java | 33 ++++++++++++++++++-- 3 files changed, 52 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/762b5d41/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java index 875e557..e4b0e94 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.rpc; import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; @@ -29,6 +30,7 @@ import java.lang.annotation.Target; * RpcCompletenessTest makes sure that the set of rpc methods in a rpc server and the set of * gateway methods in the corresponding gateway implementation are identical. */ +@Inherited @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface RpcMethod { http://git-wip-us.apache.org/repos/asf/flink/blob/762b5d41/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java index 3a9f943..4d654a3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.highavailability; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; /** @@ -28,6 +30,8 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices private volatile LeaderRetrievalService resourceManagerLeaderRetriever; + private volatile LeaderElectionService jobMasterLeaderElectionService; + // ------------------------------------------------------------------------ // Setters for mock / testing implementations @@ -36,6 +40,10 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices public void setResourceManagerLeaderRetriever(LeaderRetrievalService resourceManagerLeaderRetriever) { this.resourceManagerLeaderRetriever = resourceManagerLeaderRetriever; } + + public void setJobMasterLeaderElectionService(LeaderElectionService leaderElectionService) { + this.jobMasterLeaderElectionService = leaderElectionService; + } // ------------------------------------------------------------------------ // HA Services Methods @@ -50,4 +58,15 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices throw new IllegalStateException("ResourceManagerLeaderRetriever has not been set"); } } + + @Override + public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception { + LeaderElectionService service = jobMasterLeaderElectionService; + + if (service != null) { + return service; + } else { + throw new IllegalStateException("JobMasterLeaderElectionService has not been set"); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/762b5d41/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java index b8aad62..b431eb9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java @@ -68,8 +68,8 @@ public class RpcCompletenessTest extends TestLogger { @SuppressWarnings("rawtypes") private void checkCompleteness(Class<? extends RpcEndpoint> rpcEndpoint, Class<? extends RpcGateway> rpcGateway) { - Method[] gatewayMethods = rpcGateway.getDeclaredMethods(); - Method[] serverMethods = rpcEndpoint.getDeclaredMethods(); + Method[] gatewayMethods = getRpcMethodsFromGateway(rpcGateway).toArray(new Method[0]); + Method[] serverMethods = rpcEndpoint.getMethods(); Map<String, Set<Method>> rpcMethods = new HashMap<>(); Set<Method> unmatchedRpcMethods = new HashSet<>(); @@ -340,4 +340,33 @@ public class RpcCompletenessTest extends TestLogger { throw new RuntimeException("Could not retrive basic type information for primitive type " + primitveType + '.'); } } + + /** + * Extract all rpc methods defined by the gateway interface + * + * @param interfaceClass the given rpc gateway interface + * @return all methods defined by the given interface + */ + private List<Method> getRpcMethodsFromGateway(Class<? extends RpcGateway> interfaceClass) { + if(!interfaceClass.isInterface()) { + fail(interfaceClass.getName() + "is not a interface"); + } + + ArrayList<Method> allMethods = new ArrayList<>(); + // Methods defined in RpcGateway are native method + if(interfaceClass.equals(RpcGateway.class)) { + return allMethods; + } + + // Get all methods declared in current interface + for(Method method : interfaceClass.getDeclaredMethods()) { + allMethods.add(method); + } + + // Get all method inherited from super interface + for(Class superClass : interfaceClass.getInterfaces()) { + allMethods.addAll(getRpcMethodsFromGateway(superClass)); + } + return allMethods; + } }