This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 99d8276be7 Optimizing the scope of RPC base classes (#15946)
99d8276be7 is described below
commit 99d8276be7ff1f0cb47f6d67d7440a9380614073
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed May 8 11:46:09 2024 +0800
Optimizing the scope of RPC base classes (#15946)
* Optimizing the scope of RPC base classes
* Fix UT
---
.../dolphinscheduler/alert/rpc/AlertRpcServer.java | 16 +--
.../alert/rpc/AlertRpcServerTest.java | 22 +++--
.../api/service/LoggerServiceTest.java | 19 ++--
...voker.java => AbstractClientMethodInvoker.java} | 5 +-
.../base/client/ClientInvocationHandler.java | 5 +-
.../extract/base/client/ClientMethodInvoker.java | 2 +-
.../base/client/IRpcClientProxyFactory.java | 2 +-
.../client/JdkDynamicRpcClientProxyFactory.java | 5 +-
.../base/{ => client}/NettyClientHandler.java | 18 +---
.../base/{ => client}/NettyRemotingClient.java | 110 +++------------------
.../{ => client}/NettyRemotingClientFactory.java | 2 +-
.../SingletonJdkDynamicRpcClientProxyFactory.java | 1 -
.../base/client/SyncClientMethodInvoker.java | 5 +-
.../extract/base/future/ResponseFuture.java | 76 +-------------
.../base/server/JdkDynamicServerHandler.java | 12 +--
.../base/{ => server}/NettyRemotingServer.java | 57 +++++------
.../{ => server}/NettyRemotingServerFactory.java | 6 +-
.../extract/base/server/RpcServer.java | 74 ++++++++++++++
.../extract/base/server/ServerMethodInvoker.java | 4 +-
.../base/server/ServerMethodInvokerImpl.java | 9 +-
...voker.java => ServerMethodInvokerRegistry.java} | 10 +-
.../server/SpringServerMethodInvokerDiscovery.java | 37 ++-----
...ngletonJdkDynamicRpcClientProxyFactoryTest.java | 12 +--
.../server/master/rpc/MasterRpcServer.java | 18 +---
.../MasterRpcServerTest.java} | 28 +++---
.../microbench/rpc/RpcBenchMarkTest.java | 14 ++-
.../server/worker/rpc/WorkerRpcServer.java | 18 +---
.../server/worker/rpc/WorkerRpcServerTest.java | 29 +++---
28 files changed, 233 insertions(+), 383 deletions(-)
diff --git
a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServer.java
b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServer.java
index 3bd368573a..d73e4755dd 100644
---
a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServer.java
+++
b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServer.java
@@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.alert.rpc;
import org.apache.dolphinscheduler.alert.config.AlertConfig;
-import org.apache.dolphinscheduler.extract.base.NettyRemotingServerFactory;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import
org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
@@ -31,20 +30,7 @@ import org.springframework.stereotype.Service;
public class AlertRpcServer extends SpringServerMethodInvokerDiscovery
implements AutoCloseable {
public AlertRpcServer(AlertConfig alertConfig) {
- super(NettyRemotingServerFactory.buildNettyRemotingServer(
-
NettyServerConfig.builder().serverName("AlertRpcServer").listenPort(alertConfig.getPort()).build()));
+
super(NettyServerConfig.builder().serverName("AlertRpcServer").listenPort(alertConfig.getPort()).build());
}
- public void start() {
- log.info("Starting AlertRpcServer...");
- nettyRemotingServer.start();
- log.info("Started AlertRpcServer...");
- }
-
- @Override
- public void close() {
- log.info("Closing AlertRpcServer...");
- nettyRemotingServer.close();
- log.info("Closed AlertRpcServer...");
- }
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServerFactory.java
b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServerTest.java
similarity index 67%
copy from
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServerFactory.java
copy to
dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServerTest.java
index 6bf1b8d31c..75f16848fd 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServerFactory.java
+++
b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServerTest.java
@@ -15,16 +15,24 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.extract.base;
+package org.apache.dolphinscheduler.alert.rpc;
-import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
+import org.apache.dolphinscheduler.alert.config.AlertConfig;
-import lombok.experimental.UtilityClass;
+import org.junit.jupiter.api.Test;
-@UtilityClass
-public class NettyRemotingServerFactory {
+class AlertRpcServerTest {
- public NettyRemotingServer buildNettyRemotingServer(NettyServerConfig
nettyServerConfig) {
- return new NettyRemotingServer(nettyServerConfig);
+ private final AlertRpcServer alertRpcServer = new AlertRpcServer(new
AlertConfig());
+
+ @Test
+ void testStart() {
+ alertRpcServer.start();
+ }
+
+ @Test
+ void testClose() {
+ alertRpcServer.close();
}
+
}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
index 4861e1004e..972092602f 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
@@ -40,7 +40,6 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
-import org.apache.dolphinscheduler.extract.base.NettyRemotingServer;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import
org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
import org.apache.dolphinscheduler.extract.common.ILogService;
@@ -91,7 +90,7 @@ public class LoggerServiceTest {
@Mock
private TaskDefinitionMapper taskDefinitionMapper;
- private NettyRemotingServer nettyRemotingServer;
+ private SpringServerMethodInvokerDiscovery
springServerMethodInvokerDiscovery;
private int nettyServerPort = 18080;
@@ -103,11 +102,10 @@ public class LoggerServiceTest {
return;
}
- nettyRemotingServer = new
NettyRemotingServer(NettyServerConfig.builder().listenPort(nettyServerPort).build());
- nettyRemotingServer.start();
- SpringServerMethodInvokerDiscovery springServerMethodInvokerDiscovery =
- new SpringServerMethodInvokerDiscovery(nettyRemotingServer);
- springServerMethodInvokerDiscovery.postProcessAfterInitialization(new
ILogService() {
+ springServerMethodInvokerDiscovery = new
SpringServerMethodInvokerDiscovery(
+
NettyServerConfig.builder().serverName("TestLogServer").listenPort(nettyServerPort).build());
+ springServerMethodInvokerDiscovery.start();
+
springServerMethodInvokerDiscovery.registerServerMethodInvokerProvider(new
ILogService() {
@Override
public TaskInstanceLogFileDownloadResponse
getTaskInstanceWholeLogFileBytes(TaskInstanceLogFileDownloadRequest
taskInstanceLogFileDownloadRequest) {
@@ -142,13 +140,14 @@ public class LoggerServiceTest {
public void removeTaskInstanceLog(String
taskInstanceLogAbsolutePath) {
}
- }, "iLogServiceImpl");
+ });
+ springServerMethodInvokerDiscovery.start();
}
@AfterEach
public void tearDown() {
- if (nettyRemotingServer != null) {
- nettyRemotingServer.close();
+ if (springServerMethodInvokerDiscovery != null) {
+ springServerMethodInvokerDiscovery.close();
}
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/BaseRemoteMethodInvoker.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/AbstractClientMethodInvoker.java
similarity index 83%
rename from
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/BaseRemoteMethodInvoker.java
rename to
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/AbstractClientMethodInvoker.java
index 519dd87199..b753f1efa7 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/BaseRemoteMethodInvoker.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/AbstractClientMethodInvoker.java
@@ -17,12 +17,11 @@
package org.apache.dolphinscheduler.extract.base.client;
-import org.apache.dolphinscheduler.extract.base.NettyRemotingClient;
import org.apache.dolphinscheduler.extract.base.utils.Host;
import java.lang.reflect.Method;
-public abstract class BaseRemoteMethodInvoker implements ClientMethodInvoker {
+abstract class AbstractClientMethodInvoker implements ClientMethodInvoker {
protected final String methodIdentifier;
@@ -32,7 +31,7 @@ public abstract class BaseRemoteMethodInvoker implements
ClientMethodInvoker {
protected final Host serverHost;
- public BaseRemoteMethodInvoker(Host serverHost, Method localMethod,
NettyRemotingClient nettyRemotingClient) {
+ AbstractClientMethodInvoker(Host serverHost, Method localMethod,
NettyRemotingClient nettyRemotingClient) {
this.serverHost = serverHost;
this.localMethod = localMethod;
this.nettyRemotingClient = nettyRemotingClient;
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientInvocationHandler.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientInvocationHandler.java
index d5c9ab73d3..41ec3e056d 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientInvocationHandler.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientInvocationHandler.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.extract.base.client;
import static com.google.common.base.Preconditions.checkNotNull;
-import org.apache.dolphinscheduler.extract.base.NettyRemotingClient;
import org.apache.dolphinscheduler.extract.base.RpcMethod;
import org.apache.dolphinscheduler.extract.base.utils.Host;
@@ -31,7 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
@Slf4j
-public class ClientInvocationHandler implements InvocationHandler {
+class ClientInvocationHandler implements InvocationHandler {
private final NettyRemotingClient nettyRemotingClient;
@@ -39,7 +38,7 @@ public class ClientInvocationHandler implements
InvocationHandler {
private final Host serverHost;
- public ClientInvocationHandler(Host serverHost, NettyRemotingClient
nettyRemotingClient) {
+ ClientInvocationHandler(Host serverHost, NettyRemotingClient
nettyRemotingClient) {
this.serverHost = checkNotNull(serverHost);
this.nettyRemotingClient = checkNotNull(nettyRemotingClient);
this.methodInvokerMap = new ConcurrentHashMap<>();
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientMethodInvoker.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientMethodInvoker.java
index dcf53b0311..a287fd95ce 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientMethodInvoker.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/ClientMethodInvoker.java
@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.extract.base.client;
import java.lang.reflect.Method;
-public interface ClientMethodInvoker {
+interface ClientMethodInvoker {
Object invoke(Object proxy, Method method, Object[] args) throws Throwable;
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/IRpcClientProxyFactory.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/IRpcClientProxyFactory.java
index e60b0f18b0..afd3adf348 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/IRpcClientProxyFactory.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/IRpcClientProxyFactory.java
@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.extract.base.client;
-public interface IRpcClientProxyFactory {
+interface IRpcClientProxyFactory {
/**
* Create the client proxy.
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/JdkDynamicRpcClientProxyFactory.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/JdkDynamicRpcClientProxyFactory.java
index 5635a88f34..bf329ab3fc 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/JdkDynamicRpcClientProxyFactory.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/JdkDynamicRpcClientProxyFactory.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.extract.base.client;
-import org.apache.dolphinscheduler.extract.base.NettyRemotingClient;
import org.apache.dolphinscheduler.extract.base.utils.Host;
import java.lang.reflect.Proxy;
@@ -34,7 +33,7 @@ import com.google.common.cache.LoadingCache;
/**
* This class is used to create a proxy client which will transform local
method invocation to remove invocation.
*/
-public class JdkDynamicRpcClientProxyFactory implements IRpcClientProxyFactory
{
+class JdkDynamicRpcClientProxyFactory implements IRpcClientProxyFactory {
private final NettyRemotingClient nettyRemotingClient;
@@ -49,7 +48,7 @@ public class JdkDynamicRpcClientProxyFactory implements
IRpcClientProxyFactory {
}
});
- public JdkDynamicRpcClientProxyFactory(NettyRemotingClient
nettyRemotingClient) {
+ JdkDynamicRpcClientProxyFactory(NettyRemotingClient nettyRemotingClient) {
this.nettyRemotingClient = nettyRemotingClient;
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyClientHandler.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyClientHandler.java
similarity index 87%
rename from
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyClientHandler.java
rename to
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyClientHandler.java
index b0d998af83..be570eb577 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyClientHandler.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyClientHandler.java
@@ -15,16 +15,15 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.extract.base;
+package org.apache.dolphinscheduler.extract.base.client;
+import org.apache.dolphinscheduler.extract.base.StandardRpcResponse;
import org.apache.dolphinscheduler.extract.base.future.ResponseFuture;
import org.apache.dolphinscheduler.extract.base.protocal.HeartBeatTransporter;
import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
import org.apache.dolphinscheduler.extract.base.serialize.JsonSerializer;
import org.apache.dolphinscheduler.extract.base.utils.ChannelUtils;
-import java.util.concurrent.ExecutorService;
-
import lombok.extern.slf4j.Slf4j;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
@@ -38,11 +37,8 @@ public class NettyClientHandler extends
ChannelInboundHandlerAdapter {
private final NettyRemotingClient nettyRemotingClient;
- private final ExecutorService callbackExecutor;
-
- public NettyClientHandler(NettyRemotingClient nettyRemotingClient,
ExecutorService callbackExecutor) {
+ public NettyClientHandler(NettyRemotingClient nettyRemotingClient) {
this.nettyRemotingClient = nettyRemotingClient;
- this.callbackExecutor = callbackExecutor;
}
@Override
@@ -64,13 +60,7 @@ public class NettyClientHandler extends
ChannelInboundHandlerAdapter {
}
StandardRpcResponse deserialize =
JsonSerializer.deserialize(transporter.getBody(), StandardRpcResponse.class);
future.setIRpcResponse(deserialize);
- future.release();
- if (future.getInvokeCallback() != null) {
- future.removeFuture();
- this.callbackExecutor.execute(future::executeInvokeCallback);
- } else {
- future.putResponse(deserialize);
- }
+ future.putResponse(deserialize);
}
@Override
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClient.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
similarity index 62%
rename from
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClient.java
rename to
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
index e4682f5224..3999f5c9f5 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClient.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
@@ -15,33 +15,24 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.extract.base;
+package org.apache.dolphinscheduler.extract.base.client;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.extract.base.IRpcResponse;
import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
import org.apache.dolphinscheduler.extract.base.exception.RemotingException;
import
org.apache.dolphinscheduler.extract.base.exception.RemotingTimeoutException;
-import
org.apache.dolphinscheduler.extract.base.exception.RemotingTooMuchRequestException;
-import org.apache.dolphinscheduler.extract.base.future.InvokeCallback;
-import org.apache.dolphinscheduler.extract.base.future.ReleaseSemaphore;
import org.apache.dolphinscheduler.extract.base.future.ResponseFuture;
import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterDecoder;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterEncoder;
-import
org.apache.dolphinscheduler.extract.base.utils.CallerThreadExecutePolicy;
import org.apache.dolphinscheduler.extract.base.utils.Constants;
import org.apache.dolphinscheduler.extract.base.utils.Host;
import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -71,14 +62,8 @@ public class NettyRemotingClient implements AutoCloseable {
private final NettyClientConfig clientConfig;
- private final Semaphore asyncSemaphore = new Semaphore(1024, true);
-
- private final ExecutorService callbackExecutor;
-
private final NettyClientHandler clientHandler;
- private final ScheduledExecutorService responseFutureExecutor;
-
public NettyRemotingClient(final NettyClientConfig clientConfig) {
this.clientConfig = clientConfig;
ThreadFactory nettyClientThreadFactory =
ThreadUtils.newDaemonThreadFactory("NettyClientThread-");
@@ -87,18 +72,7 @@ public class NettyRemotingClient implements AutoCloseable {
} else {
this.workerGroup = new
NioEventLoopGroup(clientConfig.getWorkerThreads(), nettyClientThreadFactory);
}
- this.callbackExecutor = new ThreadPoolExecutor(
- Constants.CPUS,
- Constants.CPUS,
- 1,
- TimeUnit.MINUTES,
- new LinkedBlockingQueue<>(1000),
-
ThreadUtils.newDaemonThreadFactory("NettyClientCallbackThread-"),
- new CallerThreadExecutePolicy());
- this.clientHandler = new NettyClientHandler(this, callbackExecutor);
-
- this.responseFutureExecutor =
Executors.newSingleThreadScheduledExecutor(
-
ThreadUtils.newDaemonThreadFactory("NettyClientResponseFutureThread-"));
+ this.clientHandler = new NettyClientHandler(this);
this.start();
}
@@ -127,66 +101,9 @@ public class NettyRemotingClient implements AutoCloseable {
.addLast(new TransporterDecoder(),
clientHandler, new TransporterEncoder());
}
});
-
this.responseFutureExecutor.scheduleWithFixedDelay(ResponseFuture::scanFutureTable,
0, 1, TimeUnit.SECONDS);
isStarted.compareAndSet(false, true);
}
- public void sendAsync(final Host host,
- final Transporter transporter,
- final long timeoutMillis,
- final InvokeCallback invokeCallback) throws
InterruptedException, RemotingException {
- final Channel channel = getChannel(host);
- if (channel == null) {
- throw new RemotingException("network error");
- }
- /*
- * request unique identification
- */
- final long opaque = transporter.getHeader().getOpaque();
- /*
- * control concurrency number
- */
- boolean acquired = this.asyncSemaphore.tryAcquire(timeoutMillis,
TimeUnit.MILLISECONDS);
- if (acquired) {
- final ReleaseSemaphore releaseSemaphore = new
ReleaseSemaphore(this.asyncSemaphore);
-
- /*
- * response future
- */
- final ResponseFuture responseFuture = new ResponseFuture(opaque,
- timeoutMillis,
- invokeCallback,
- releaseSemaphore);
- try {
- channel.writeAndFlush(transporter).addListener(future -> {
- if (future.isSuccess()) {
- responseFuture.setSendOk(true);
- return;
- } else {
- responseFuture.setSendOk(false);
- }
- responseFuture.setCause(future.cause());
- responseFuture.putResponse(null);
- try {
- responseFuture.executeInvokeCallback();
- } catch (Exception ex) {
- log.error("execute callback error", ex);
- } finally {
- responseFuture.release();
- }
- });
- } catch (Exception ex) {
- responseFuture.release();
- throw new RemotingException(String.format("Send transporter to
host: %s failed", host), ex);
- }
- } else {
- String message = String.format(
- "try to acquire async semaphore timeout: %d, waiting
thread num: %d, total permits: %d",
- timeoutMillis, asyncSemaphore.getQueueLength(),
asyncSemaphore.availablePermits());
- throw new RemotingTooMuchRequestException(message);
- }
- }
-
public IRpcResponse sendSync(final Host host, final Transporter
transporter,
final long timeoutMillis) throws
InterruptedException, RemotingException {
final Channel channel = getChannel(host);
@@ -194,7 +111,7 @@ public class NettyRemotingClient implements AutoCloseable {
throw new RemotingException(String.format("connect to : %s fail",
host));
}
final long opaque = transporter.getHeader().getOpaque();
- final ResponseFuture responseFuture = new ResponseFuture(opaque,
timeoutMillis, null, null);
+ final ResponseFuture responseFuture = new ResponseFuture(opaque,
timeoutMillis);
channel.writeAndFlush(transporter).addListener(future -> {
if (future.isSuccess()) {
responseFuture.setSendOk(true);
@@ -220,7 +137,7 @@ public class NettyRemotingClient implements AutoCloseable {
return iRpcResponse;
}
- public Channel getChannel(Host host) {
+ private Channel getChannel(Host host) {
Channel channel = channels.get(host);
if (channel != null && channel.isActive()) {
return channel;
@@ -235,9 +152,9 @@ public class NettyRemotingClient implements AutoCloseable {
* @param isSync sync flag
* @return channel
*/
- public Channel createChannel(Host host, boolean isSync) {
- ChannelFuture future;
+ private Channel createChannel(Host host, boolean isSync) {
try {
+ ChannelFuture future;
synchronized (bootstrap) {
future = bootstrap.connect(new InetSocketAddress(host.getIp(),
host.getPort()));
}
@@ -249,10 +166,11 @@ public class NettyRemotingClient implements AutoCloseable
{
channels.put(host, channel);
return channel;
}
- } catch (Exception ex) {
- log.warn(String.format("connect to %s error", host), ex);
+ throw new IllegalArgumentException("connect to host: " + host + "
failed");
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Connect to host: " + host + " failed",
e);
}
- return null;
}
@Override
@@ -263,12 +181,6 @@ public class NettyRemotingClient implements AutoCloseable {
if (workerGroup != null) {
this.workerGroup.shutdownGracefully();
}
- if (callbackExecutor != null) {
- this.callbackExecutor.shutdownNow();
- }
- if (this.responseFutureExecutor != null) {
- this.responseFutureExecutor.shutdownNow();
- }
log.info("netty client closed");
} catch (Exception ex) {
log.error("netty client close exception", ex);
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClientFactory.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClientFactory.java
similarity index 95%
rename from
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClientFactory.java
rename to
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClientFactory.java
index 7bbebfbf3d..d14a8aa54e 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClientFactory.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClientFactory.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.extract.base;
+package org.apache.dolphinscheduler.extract.base.client;
import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java
index 28d82532be..44d310e70b 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.extract.base.client;
-import org.apache.dolphinscheduler.extract.base.NettyRemotingClientFactory;
import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
public class SingletonJdkDynamicRpcClientProxyFactory {
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java
index b5fdf3fb71..4731a22d0a 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java
@@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.extract.base.client;
import org.apache.dolphinscheduler.extract.base.IRpcResponse;
-import org.apache.dolphinscheduler.extract.base.NettyRemotingClient;
import org.apache.dolphinscheduler.extract.base.RpcMethod;
import org.apache.dolphinscheduler.extract.base.StandardRpcRequest;
import
org.apache.dolphinscheduler.extract.base.exception.MethodInvocationException;
@@ -29,9 +28,9 @@ import org.apache.dolphinscheduler.extract.base.utils.Host;
import java.lang.reflect.Method;
-public class SyncClientMethodInvoker extends BaseRemoteMethodInvoker {
+class SyncClientMethodInvoker extends AbstractClientMethodInvoker {
- public SyncClientMethodInvoker(Host serverHost, Method localMethod,
NettyRemotingClient nettyRemotingClient) {
+ SyncClientMethodInvoker(Host serverHost, Method localMethod,
NettyRemotingClient nettyRemotingClient) {
super(serverHost, localMethod, nettyRemotingClient);
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/future/ResponseFuture.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/future/ResponseFuture.java
index 35405c5578..1fbbd9ed6c 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/future/ResponseFuture.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/future/ResponseFuture.java
@@ -19,8 +19,6 @@ package org.apache.dolphinscheduler.extract.base.future;
import org.apache.dolphinscheduler.extract.base.IRpcResponse;
-import java.util.Iterator;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -34,17 +32,13 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ResponseFuture {
- private static final ConcurrentHashMap<Long, ResponseFuture> FUTURE_TABLE
= new ConcurrentHashMap<>(256);
+ private static final ConcurrentHashMap<Long, ResponseFuture> FUTURE_TABLE
= new ConcurrentHashMap<>();
private final long opaque;
// remove the timeout
private final long timeoutMillis;
- private final InvokeCallback invokeCallback;
-
- private final ReleaseSemaphore releaseSemaphore;
-
private final CountDownLatch latch = new CountDownLatch(1);
private final long beginTimestamp = System.currentTimeMillis();
@@ -57,14 +51,9 @@ public class ResponseFuture {
private Throwable cause;
- public ResponseFuture(long opaque,
- long timeoutMillis,
- InvokeCallback invokeCallback,
- ReleaseSemaphore releaseSemaphore) {
+ public ResponseFuture(long opaque, long timeoutMillis) {
this.opaque = opaque;
this.timeoutMillis = timeoutMillis;
- this.invokeCallback = invokeCallback;
- this.releaseSemaphore = releaseSemaphore;
FUTURE_TABLE.put(opaque, this);
}
@@ -90,10 +79,6 @@ public class ResponseFuture {
return FUTURE_TABLE.get(opaque);
}
- public void removeFuture() {
- FUTURE_TABLE.remove(opaque);
- }
-
/**
* whether timeout
*
@@ -104,15 +89,6 @@ public class ResponseFuture {
return diff > this.timeoutMillis;
}
- /**
- * execute invoke callback
- */
- public void executeInvokeCallback() {
- if (invokeCallback != null) {
- invokeCallback.operationComplete(this);
- }
- }
-
public boolean isSendOK() {
return sendOk;
}
@@ -129,52 +105,4 @@ public class ResponseFuture {
return cause;
}
- public long getOpaque() {
- return opaque;
- }
-
- public long getTimeoutMillis() {
- return timeoutMillis;
- }
-
- public long getBeginTimestamp() {
- return beginTimestamp;
- }
-
- public InvokeCallback getInvokeCallback() {
- return invokeCallback;
- }
-
- /**
- * release
- */
- public void release() {
- if (this.releaseSemaphore != null) {
- this.releaseSemaphore.release();
- }
- }
-
- /**
- * scan future table
- */
- public static void scanFutureTable() {
- Iterator<Map.Entry<Long, ResponseFuture>> it =
FUTURE_TABLE.entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry<Long, ResponseFuture> next = it.next();
- ResponseFuture future = next.getValue();
- if ((future.getBeginTimestamp() + future.getTimeoutMillis() +
1000) > System.currentTimeMillis()) {
- continue;
- }
- try {
- // todo: use thread pool to execute the async callback,
otherwise will block the scan thread
- future.release();
- future.executeInvokeCallback();
- } catch (Exception ex) {
- log.error("ScanFutureTable, execute callback error, requestId:
{}", future.getOpaque(), ex);
- }
- it.remove();
- log.debug("Remove timeout request: {}", future);
- }
- }
-
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
index b4978172f1..f57ff0b609 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.extract.base.server;
import static com.google.common.base.Preconditions.checkNotNull;
-import org.apache.dolphinscheduler.extract.base.NettyRemotingServer;
import org.apache.dolphinscheduler.extract.base.StandardRpcRequest;
import org.apache.dolphinscheduler.extract.base.StandardRpcResponse;
import org.apache.dolphinscheduler.extract.base.protocal.HeartBeatTransporter;
@@ -30,6 +29,7 @@ import
org.apache.dolphinscheduler.extract.base.utils.ChannelUtils;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import lombok.extern.slf4j.Slf4j;
@@ -42,14 +42,14 @@ import io.netty.handler.timeout.IdleStateEvent;
@Slf4j
@ChannelHandler.Sharable
-public class JdkDynamicServerHandler extends ChannelInboundHandlerAdapter {
+class JdkDynamicServerHandler extends ChannelInboundHandlerAdapter {
- private final NettyRemotingServer nettyRemotingServer;
+ private final ExecutorService methodInvokeExecutor;
private final Map<String, ServerMethodInvoker> methodInvokerMap;
- public JdkDynamicServerHandler(NettyRemotingServer nettyRemotingServer) {
- this.nettyRemotingServer = nettyRemotingServer;
+ JdkDynamicServerHandler(ExecutorService methodInvokeExecutor) {
+ this.methodInvokeExecutor = methodInvokeExecutor;
this.methodInvokerMap = new ConcurrentHashMap<>();
}
@@ -90,7 +90,7 @@ public class JdkDynamicServerHandler extends
ChannelInboundHandlerAdapter {
channel.writeAndFlush(response);
return;
}
- nettyRemotingServer.getDefaultExecutor().execute(() -> {
+ methodInvokeExecutor.execute(() -> {
StandardRpcResponse iRpcResponse;
try {
StandardRpcRequest standardRpcRequest =
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServer.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java
similarity index 75%
rename from
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServer.java
rename to
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java
index 365a17dd03..9beeaced3d 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServer.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java
@@ -15,15 +15,13 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.extract.base;
+package org.apache.dolphinscheduler.extract.base.server;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import org.apache.dolphinscheduler.extract.base.exception.RemoteException;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterDecoder;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterEncoder;
-import org.apache.dolphinscheduler.extract.base.server.JdkDynamicServerHandler;
-import org.apache.dolphinscheduler.extract.base.server.ServerMethodInvoker;
import org.apache.dolphinscheduler.extract.base.utils.Constants;
import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;
@@ -32,6 +30,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
@@ -48,12 +47,15 @@ import io.netty.handler.timeout.IdleStateHandler;
* remoting netty server
*/
@Slf4j
-public class NettyRemotingServer {
+class NettyRemotingServer {
private final ServerBootstrap serverBootstrap = new ServerBootstrap();
- private final ExecutorService defaultExecutor = ThreadUtils
- .newDaemonFixedThreadExecutor("NettyRemotingServerThread",
Runtime.getRuntime().availableProcessors() * 2);
+ @Getter
+ private final String serverName;
+
+ @Getter
+ private final ExecutorService methodInvokerExecutor;
private final EventLoopGroup bossGroup;
@@ -61,16 +63,20 @@ public class NettyRemotingServer {
private final NettyServerConfig serverConfig;
- private final JdkDynamicServerHandler serverHandler = new
JdkDynamicServerHandler(this);
+ private final JdkDynamicServerHandler channelHandler;
private final AtomicBoolean isStarted = new AtomicBoolean(false);
- public NettyRemotingServer(final NettyServerConfig serverConfig) {
+ NettyRemotingServer(final NettyServerConfig serverConfig) {
this.serverConfig = serverConfig;
+ this.serverName = serverConfig.getServerName();
+ this.methodInvokerExecutor = ThreadUtils.newDaemonFixedThreadExecutor(
+ serverName + "MethodInvoker-%d",
Runtime.getRuntime().availableProcessors() * 2 + 1);
+ this.channelHandler = new
JdkDynamicServerHandler(methodInvokerExecutor);
ThreadFactory bossThreadFactory =
-
ThreadUtils.newDaemonThreadFactory(serverConfig.getServerName() +
"BossThread_%s");
+ ThreadUtils.newDaemonThreadFactory(serverName +
"BossThread-%d");
ThreadFactory workerThreadFactory =
-
ThreadUtils.newDaemonThreadFactory(serverConfig.getServerName() +
"WorkerThread_%s");
+ ThreadUtils.newDaemonThreadFactory(serverName +
"WorkerThread-%d");
if (Epoll.isAvailable()) {
this.bossGroup = new EpollEventLoopGroup(1, bossThreadFactory);
this.workGroup = new
EpollEventLoopGroup(serverConfig.getWorkerThread(), workerThreadFactory);
@@ -80,7 +86,7 @@ public class NettyRemotingServer {
}
}
- public void start() {
+ void start() {
if (isStarted.compareAndSet(false, true)) {
this.serverBootstrap
.group(this.bossGroup, this.workGroup)
@@ -103,9 +109,9 @@ public class NettyRemotingServer {
try {
future =
serverBootstrap.bind(serverConfig.getListenPort()).sync();
} catch (Exception e) {
- log.error("{} bind fail {}, exit",
serverConfig.getServerName(), e.getMessage(), e);
throw new RemoteException(
- String.format("%s bind %s fail",
serverConfig.getServerName(), serverConfig.getListenPort()));
+ String.format("%s bind %s fail",
serverConfig.getServerName(), serverConfig.getListenPort()),
+ e);
}
if (future.isSuccess()) {
@@ -113,14 +119,9 @@ public class NettyRemotingServer {
return;
}
- if (future.cause() != null) {
- throw new RemoteException(
- String.format("%s bind %s fail",
serverConfig.getServerName(), serverConfig.getListenPort()),
- future.cause());
- } else {
- throw new RemoteException(
- String.format("%s bind %s fail",
serverConfig.getServerName(), serverConfig.getListenPort()));
- }
+ throw new RemoteException(
+ String.format("%s bind %s fail",
serverConfig.getServerName(), serverConfig.getListenPort()),
+ future.cause());
}
}
@@ -135,18 +136,14 @@ public class NettyRemotingServer {
.addLast("decoder", new TransporterDecoder())
.addLast("server-idle-handle",
new IdleStateHandler(0, 0,
Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS))
- .addLast("handler", serverHandler);
- }
-
- public ExecutorService getDefaultExecutor() {
- return defaultExecutor;
+ .addLast("handler", channelHandler);
}
- public void registerMethodInvoker(ServerMethodInvoker methodInvoker) {
- serverHandler.registerMethodInvoker(methodInvoker);
+ void registerMethodInvoker(ServerMethodInvoker methodInvoker) {
+ channelHandler.registerMethodInvoker(methodInvoker);
}
- public void close() {
+ void close() {
if (isStarted.compareAndSet(true, false)) {
try {
if (bossGroup != null) {
@@ -155,7 +152,7 @@ public class NettyRemotingServer {
if (workGroup != null) {
this.workGroup.shutdownGracefully();
}
- defaultExecutor.shutdown();
+ methodInvokerExecutor.shutdown();
} catch (Exception ex) {
log.error("netty server close exception", ex);
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServerFactory.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServerFactory.java
similarity index 84%
rename from
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServerFactory.java
rename to
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServerFactory.java
index 6bf1b8d31c..70ed0529e8 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServerFactory.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServerFactory.java
@@ -15,16 +15,16 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.extract.base;
+package org.apache.dolphinscheduler.extract.base.server;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import lombok.experimental.UtilityClass;
@UtilityClass
-public class NettyRemotingServerFactory {
+class NettyRemotingServerFactory {
- public NettyRemotingServer buildNettyRemotingServer(NettyServerConfig
nettyServerConfig) {
+ NettyRemotingServer buildNettyRemotingServer(NettyServerConfig
nettyServerConfig) {
return new NettyRemotingServer(nettyServerConfig);
}
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/RpcServer.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/RpcServer.java
new file mode 100644
index 0000000000..213868ba46
--- /dev/null
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/RpcServer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.extract.base.server;
+
+import org.apache.dolphinscheduler.extract.base.RpcMethod;
+import org.apache.dolphinscheduler.extract.base.RpcService;
+import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
+
+import java.lang.reflect.Method;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * The RpcServer based on Netty. The server will register the method invoker
and provide the service to the client.
+ * Once the server is started, it will listen on the port and wait for the
client to connect.
+ * <pre>
+ * RpcServer rpcServer = new RpcServer(new NettyServerConfig());
+ * rpcServer.registerServerMethodInvokerProvider(new
ServerMethodInvokerProviderImpl());
+ * rpcServer.start();
+ * </pre>
+ */
+@Slf4j
+public class RpcServer implements ServerMethodInvokerRegistry, AutoCloseable {
+
+ private final NettyRemotingServer nettyRemotingServer;
+
+ public RpcServer(NettyServerConfig nettyServerConfig) {
+ this.nettyRemotingServer =
NettyRemotingServerFactory.buildNettyRemotingServer(nettyServerConfig);
+ }
+
+ public void start() {
+ nettyRemotingServer.start();
+ }
+
+ @Override
+ public void registerServerMethodInvokerProvider(Object
serverMethodInvokerProviderBean) {
+ for (Class<?> anInterface :
serverMethodInvokerProviderBean.getClass().getInterfaces()) {
+ if (anInterface.getAnnotation(RpcService.class) == null) {
+ continue;
+ }
+ for (Method method : anInterface.getDeclaredMethods()) {
+ RpcMethod rpcMethod = method.getAnnotation(RpcMethod.class);
+ if (rpcMethod == null) {
+ continue;
+ }
+ ServerMethodInvoker serverMethodInvoker =
+ new
ServerMethodInvokerImpl(serverMethodInvokerProviderBean, method);
+ nettyRemotingServer.registerMethodInvoker(serverMethodInvoker);
+ log.debug("Register ServerMethodInvoker: {} to bean: {}",
+ serverMethodInvoker.getMethodIdentify(),
serverMethodInvoker.getMethodProviderIdentify());
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ nettyRemotingServer.close();
+ }
+}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvoker.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvoker.java
index ee633217b2..151b54bb97 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvoker.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvoker.java
@@ -17,10 +17,12 @@
package org.apache.dolphinscheduler.extract.base.server;
-public interface ServerMethodInvoker {
+interface ServerMethodInvoker {
String getMethodIdentify();
+ String getMethodProviderIdentify();
+
Object invoke(final Object... arg) throws Throwable;
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerImpl.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerImpl.java
index eea9da5e14..4c29650aa0 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerImpl.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerImpl.java
@@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.extract.base.server;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-public class ServerMethodInvokerImpl implements ServerMethodInvoker {
+class ServerMethodInvokerImpl implements ServerMethodInvoker {
private final Object serviceBean;
@@ -28,7 +28,7 @@ public class ServerMethodInvokerImpl implements
ServerMethodInvoker {
private final String methodIdentify;
- public ServerMethodInvokerImpl(Object serviceBean, Method method) {
+ ServerMethodInvokerImpl(Object serviceBean, Method method) {
this.serviceBean = serviceBean;
this.method = method;
this.methodIdentify = method.toGenericString();
@@ -48,4 +48,9 @@ public class ServerMethodInvokerImpl implements
ServerMethodInvoker {
public String getMethodIdentify() {
return methodIdentify;
}
+
+ @Override
+ public String getMethodProviderIdentify() {
+ return serviceBean.getClass().getName();
+ }
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvoker.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerRegistry.java
similarity index 68%
copy from
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvoker.java
copy to
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerRegistry.java
index ee633217b2..4e56be2617 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvoker.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerRegistry.java
@@ -17,10 +17,12 @@
package org.apache.dolphinscheduler.extract.base.server;
-public interface ServerMethodInvoker {
+interface ServerMethodInvokerRegistry {
- String getMethodIdentify();
-
- Object invoke(final Object... arg) throws Throwable;
+ /**
+ * Register service object, which will be used to invoke the {@link
ServerMethodInvoker}.
+ * The serverMethodInvokerProviderObject should implement with interface
which contains {@link org.apache.dolphinscheduler.extract.base.RpcService}
annotation.
+ */
+ void registerServerMethodInvokerProvider(Object
serverMethodInvokerProviderObject);
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/SpringServerMethodInvokerDiscovery.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/SpringServerMethodInvokerDiscovery.java
index 2b87a70080..de4943990c 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/SpringServerMethodInvokerDiscovery.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/SpringServerMethodInvokerDiscovery.java
@@ -17,11 +17,7 @@
package org.apache.dolphinscheduler.extract.base.server;
-import org.apache.dolphinscheduler.extract.base.NettyRemotingServer;
-import org.apache.dolphinscheduler.extract.base.RpcMethod;
-import org.apache.dolphinscheduler.extract.base.RpcService;
-
-import java.lang.reflect.Method;
+import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import lombok.extern.slf4j.Slf4j;
@@ -29,38 +25,21 @@ import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.lang.Nullable;
+/**
+ * The RpcServer which will auto discovery the {@link ServerMethodInvoker}
from Spring container.
+ */
@Slf4j
-public class SpringServerMethodInvokerDiscovery implements BeanPostProcessor {
+public class SpringServerMethodInvokerDiscovery extends RpcServer implements
BeanPostProcessor {
- protected final NettyRemotingServer nettyRemotingServer;
-
- public SpringServerMethodInvokerDiscovery(NettyRemotingServer
nettyRemotingServer) {
- this.nettyRemotingServer = nettyRemotingServer;
+ public SpringServerMethodInvokerDiscovery(NettyServerConfig
nettyServerConfig) {
+ super(nettyServerConfig);
}
@Nullable
@Override
public Object postProcessAfterInitialization(Object bean, String beanName)
throws BeansException {
- Class<?>[] interfaces = bean.getClass().getInterfaces();
- for (Class<?> anInterface : interfaces) {
- if (anInterface.getAnnotation(RpcService.class) == null) {
- continue;
- }
- registerRpcMethodInvoker(anInterface, bean, beanName);
- }
+ registerServerMethodInvokerProvider(bean);
return bean;
}
- private void registerRpcMethodInvoker(Class<?> anInterface, Object bean,
String beanName) {
- Method[] declaredMethods = anInterface.getDeclaredMethods();
- for (Method method : declaredMethods) {
- RpcMethod rpcMethod = method.getAnnotation(RpcMethod.class);
- if (rpcMethod == null) {
- continue;
- }
- ServerMethodInvoker methodInvoker = new
ServerMethodInvokerImpl(bean, method);
- nettyRemotingServer.registerMethodInvoker(methodInvoker);
- log.debug("Register ServerMethodInvoker: {} to bean: {}",
methodInvoker.getMethodIdentify(), beanName);
- }
- }
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java
index 521cf7c75a..92ed49934c 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java
@@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.extract.base.client;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import org.apache.dolphinscheduler.extract.base.NettyRemotingServer;
import org.apache.dolphinscheduler.extract.base.RpcMethod;
import org.apache.dolphinscheduler.extract.base.RpcService;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
@@ -37,7 +36,7 @@ import org.junit.jupiter.api.Test;
public class SingletonJdkDynamicRpcClientProxyFactoryTest {
- private NettyRemotingServer nettyRemotingServer;
+ private SpringServerMethodInvokerDiscovery
springServerMethodInvokerDiscovery;
private String serverAddress;
@@ -48,11 +47,10 @@ public class SingletonJdkDynamicRpcClientProxyFactoryTest {
.serverName("ApiServer")
.listenPort(listenPort)
.build();
- nettyRemotingServer = new NettyRemotingServer(nettyServerConfig);
- nettyRemotingServer.start();
serverAddress = "localhost:" + listenPort;
- new SpringServerMethodInvokerDiscovery(nettyRemotingServer)
- .postProcessAfterInitialization(new IServiceImpl(),
"iServiceImpl");
+ springServerMethodInvokerDiscovery = new
SpringServerMethodInvokerDiscovery(nettyServerConfig);
+
springServerMethodInvokerDiscovery.registerServerMethodInvokerProvider(new
IServiceImpl());
+ springServerMethodInvokerDiscovery.start();
}
@Test
@@ -82,7 +80,7 @@ public class SingletonJdkDynamicRpcClientProxyFactoryTest {
@AfterEach
public void tearDown() {
- nettyRemotingServer.close();
+ springServerMethodInvokerDiscovery.close();
}
@RpcService
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServer.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServer.java
index 0eaf885d11..ab89b021d6 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServer.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServer.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.master.rpc;
-import org.apache.dolphinscheduler.extract.base.NettyRemotingServerFactory;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import
org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
@@ -31,21 +30,8 @@ import org.springframework.stereotype.Component;
public class MasterRpcServer extends SpringServerMethodInvokerDiscovery
implements AutoCloseable {
public MasterRpcServer(MasterConfig masterConfig) {
-
super(NettyRemotingServerFactory.buildNettyRemotingServer(NettyServerConfig.builder()
-
.serverName("MasterRpcServer").listenPort(masterConfig.getListenPort()).build()));
- }
-
- public void start() {
- log.info("Starting MasterRPCServer...");
- nettyRemotingServer.start();
- log.info("Started MasterRPCServer...");
- }
-
- @Override
- public void close() {
- log.info("Closing MasterRPCServer...");
- nettyRemotingServer.close();
- log.info("Closed MasterRPCServer...");
+
super(NettyServerConfig.builder().serverName("MasterRpcServer").listenPort(masterConfig.getListenPort())
+ .build());
}
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessorTestConfig.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServerTest.java
similarity index 60%
copy from
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessorTestConfig.java
copy to
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServerTest.java
index df88de34e3..1e5a77edb3 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessorTestConfig.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServerTest.java
@@ -15,22 +15,24 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.processor;
+package org.apache.dolphinscheduler.server.master.rpc;
-import
org.apache.dolphinscheduler.server.master.utils.DataQualityResultOperator;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.mockito.Mockito;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
-/**
- * dependency config
- */
-@Configuration
-public class TaskResponseProcessorTestConfig {
+class MasterRpcServerTest {
+
+ private final MasterRpcServer masterRpcServer = new MasterRpcServer(new
MasterConfig());
+
+ @Test
+ void testStart() {
+ Assertions.assertDoesNotThrow(masterRpcServer::start);
+ }
- @Bean
- public DataQualityResultOperator dataQualityResultOperator() {
- return Mockito.mock(DataQualityResultOperator.class);
+ @Test
+ void testClose() {
+ Assertions.assertDoesNotThrow(masterRpcServer::close);
}
}
diff --git
a/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/RpcBenchMarkTest.java
b/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/RpcBenchMarkTest.java
index 1a3e4ab1e2..496983118f 100644
---
a/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/RpcBenchMarkTest.java
+++
b/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/RpcBenchMarkTest.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.microbench.rpc;
-import org.apache.dolphinscheduler.extract.base.NettyRemotingServer;
import
org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import
org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
@@ -46,18 +45,17 @@ import org.openjdk.jmh.infra.Blackhole;
@BenchmarkMode({Mode.Throughput, Mode.AverageTime, Mode.SampleTime})
public class RpcBenchMarkTest extends AbstractBaseBenchmark {
- private NettyRemotingServer nettyRemotingServer;
+ private SpringServerMethodInvokerDiscovery
springServerMethodInvokerDiscovery;
private IService iService;
@Setup
public void before() {
- nettyRemotingServer = new NettyRemotingServer(
-
NettyServerConfig.builder().serverName("NettyRemotingServer").listenPort(12345).build());
- nettyRemotingServer.start();
- SpringServerMethodInvokerDiscovery springServerMethodInvokerDiscovery =
- new SpringServerMethodInvokerDiscovery(nettyRemotingServer);
+ NettyServerConfig nettyServerConfig =
+
NettyServerConfig.builder().serverName("NettyRemotingServer").listenPort(12345).build();
+ springServerMethodInvokerDiscovery = new
SpringServerMethodInvokerDiscovery(nettyServerConfig);
springServerMethodInvokerDiscovery.postProcessAfterInitialization(new
IServiceImpl(), "iServiceImpl");
+ springServerMethodInvokerDiscovery.start();
iService =
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient("localhost:12345",
IService.class);
}
@@ -72,6 +70,6 @@ public class RpcBenchMarkTest extends AbstractBaseBenchmark {
@TearDown
public void after() {
- nettyRemotingServer.close();
+ springServerMethodInvokerDiscovery.close();
}
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
index 7733fbba4f..b9f3855cf9 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker.rpc;
-import org.apache.dolphinscheduler.extract.base.NettyRemotingServerFactory;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import
org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@@ -33,21 +32,8 @@ import org.springframework.stereotype.Service;
public class WorkerRpcServer extends SpringServerMethodInvokerDiscovery
implements Closeable {
public WorkerRpcServer(WorkerConfig workerConfig) {
-
super(NettyRemotingServerFactory.buildNettyRemotingServer(NettyServerConfig.builder()
-
.serverName("WorkerRpcServer").listenPort(workerConfig.getListenPort()).build()));
- }
-
- public void start() {
- log.info("WorkerRpcServer starting...");
- nettyRemotingServer.start();
- log.info("WorkerRpcServer started...");
- }
-
- @Override
- public void close() {
- log.info("WorkerRpcServer closing");
- nettyRemotingServer.close();
- log.info("WorkerRpcServer closed");
+
super(NettyServerConfig.builder().serverName("WorkerRpcServer").listenPort(workerConfig.getListenPort())
+ .build());
}
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessorTestConfig.java
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServerTest.java
similarity index 60%
rename from
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessorTestConfig.java
rename to
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServerTest.java
index df88de34e3..d27eaeeadf 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessorTestConfig.java
+++
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServerTest.java
@@ -15,22 +15,25 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.processor;
+package org.apache.dolphinscheduler.server.worker.rpc;
-import
org.apache.dolphinscheduler.server.master.utils.DataQualityResultOperator;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.mockito.Mockito;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
-/**
- * dependency config
- */
-@Configuration
-public class TaskResponseProcessorTestConfig {
+class WorkerRpcServerTest {
+
+ private final WorkerRpcServer workerRpcServer = new WorkerRpcServer(new
WorkerConfig());
- @Bean
- public DataQualityResultOperator dataQualityResultOperator() {
- return Mockito.mock(DataQualityResultOperator.class);
+ @Test
+ void testStart() {
+ Assertions.assertDoesNotThrow(workerRpcServer::start);
}
+
+ @Test
+ void testClose() {
+ Assertions.assertDoesNotThrow(workerRpcServer::close);
+ }
+
}