[
https://issues.apache.org/jira/browse/FLINK-4516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15450685#comment-15450685
]
ASF GitHub Bot commented on FLINK-4516:
---------------------------------------
Github user beyond1920 commented on a diff in the pull request:
https://github.com/apache/flink/pull/2427#discussion_r76909528
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
---
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import akka.dispatch.ExecutionContexts;
+import akka.dispatch.Futures;
+import akka.util.Timeout;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.util.DirectExecutorService;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.Await;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.BitSet;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * An RPC Service implementation for testing. This RPC service directly
executes all asynchronous calls one by one in the main thread.
+ */
+public class TestingSerialRpcService extends TestingRpcService {
+
+ private final DirectExecutorService executorService;
+ private final ConcurrentHashMap<String, RpcGateway>
registeredConnections;
+
+ public TestingSerialRpcService() {
+ executorService = new DirectExecutorService();
+ this.registeredConnections = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public void scheduleRunnable(final Runnable runnable, final long delay,
final TimeUnit unit) {
+ try {
+ unit.sleep(delay);
+ runnable.run();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public ExecutionContext getExecutionContext() {
+ return ExecutionContexts.fromExecutorService(executorService);
+ }
+
+ @Override
+ public void stopService() {
+ executorService.shutdown();
+ registeredConnections.clear();
+ }
+
+ @Override
+ public void stopServer(RpcGateway selfGateway) {
+
+ }
+
+ @Override
+ public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S
rpcEndpoint) {
+ final String address = UUID.randomUUID().toString();
+
+ InvocationHandler akkaInvocationHandler = new
TestingSerialInvocationHandler(address, rpcEndpoint);
+ ClassLoader classLoader = getClass().getClassLoader();
+
+ @SuppressWarnings("unchecked")
+ C self = (C) Proxy.newProxyInstance(
+ classLoader,
+ new Class<?>[]{
+ rpcEndpoint.getSelfGatewayType(),
+ MainThreadExecutor.class,
+ StartStoppable.class,
+ RpcGateway.class},
+ akkaInvocationHandler);
+
+ return self;
+ }
+
+ private static class TestingSerialInvocationHandler<C extends
RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway,
MainThreadExecutor, StartStoppable {
+
+ private final T rpcEndpoint;
+
+ /** default timeout for asks */
+ private final Timeout timeout;
+
+ private final String address;
+
+ private TestingSerialInvocationHandler(String address, T
rpcEndpoint) {
+ this(address, rpcEndpoint, new Timeout(new
FiniteDuration(10, TimeUnit.SECONDS)));
+ }
+
+ private TestingSerialInvocationHandler(String address, T
rpcEndpoint, Timeout timeout) {
+ this.rpcEndpoint = rpcEndpoint;
+ this.timeout = timeout;
+ this.address = address;
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[]
args) throws Throwable {
+ Class<?> declaringClass = method.getDeclaringClass();
+ if (declaringClass.equals(MainThreadExecutor.class) ||
+ declaringClass.equals(Object.class) ||
declaringClass.equals(StartStoppable.class) ||
+ declaringClass.equals(RpcGateway.class)) {
+ return method.invoke(this, args);
+ } else {
+ final String methodName = method.getName();
+ Class<?>[] parameterTypes =
method.getParameterTypes();
+ Annotation[][] parameterAnnotations =
method.getParameterAnnotations();
+ Timeout futureTimeout =
extractRpcTimeout(parameterAnnotations, args, timeout);
+
+ final Tuple2<Class<?>[], Object[]>
filteredArguments = filterArguments(
+ parameterTypes,
+ parameterAnnotations,
+ args);
+
+ Class<?> returnType = method.getReturnType();
+
+ if (returnType.equals(Future.class)) {
+ try {
+ Object result =
handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1,
futureTimeout);
+ return
Futures.successful(result);
+ } catch (Throwable e) {
+ return Futures.failed(e);
+ }
+ } else {
+ return
handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1,
futureTimeout);
+ }
+ }
+ }
+
+ /**
+ * Handle rpc invocations by looking up the rpc method on the
rpc endpoint and calling this
+ * method with the provided method arguments. If the method has
a return value, it is returned
+ * to the sender of the call.
+ */
+ private Object handleRpcInvocationSync(final String methodName,
+ final Class<?>[] parameterTypes,
+ final Object[] args,
+ final Timeout futureTimeout) throws Exception {
+ final Method rpcMethod = lookupRpcMethod(methodName,
parameterTypes);
+ Object result = rpcMethod.invoke(rpcEndpoint, args);
+
+ if (result != null && result instanceof Future) {
+ Future<?> future = (Future<?>) result;
+ return Await.result(future,
futureTimeout.duration());
+ } else {
+ return result;
+ }
+ }
+
+ @Override
+ public void runAsync(Runnable runnable) {
+ runnable.run();
+ }
+
+ @Override
+ public <V> Future<V> callAsync(Callable<V> callable, Timeout
callTimeout) {
+ try {
+
TimeUnit.MILLISECONDS.sleep(callTimeout.duration().toMillis());
--- End diff --
yes, i confuse the delay with callTimeout parameters, executing immediately
is right.
> ResourceManager leadership election
> -----------------------------------
>
> Key: FLINK-4516
> URL: https://issues.apache.org/jira/browse/FLINK-4516
> Project: Flink
> Issue Type: Sub-task
> Components: Cluster Management
> Reporter: zhangjing
> Assignee: zhangjing
>
> 1. When a resourceManager is started, it starts the leadership election
> service first and take part in contending for leadership
> 2. Every resourceManager contains a ResourceManagerLeaderContender, when it
> is granted leadership, it will start SlotManager and other main components.
> when it is revoked leadership, it will stop all its components and clear
> everything.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)