This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new d223d654cc Add rpc benchmark test (#14797)
d223d654cc is described below
commit d223d654ccfbf5621c4faeed1b7fe45715bf5a6b
Author: Wenjun Ruan <[email protected]>
AuthorDate: Sun Aug 27 20:59:55 2023 +0800
Add rpc benchmark test (#14797)
---
.github/CODEOWNERS | 2 +-
docs/docs/en/about/glossary.md | 2 +-
docs/docs/zh/about/glossary.md | 2 +-
.../api/aspect/CacheEvictAspect.java | 2 +-
.../instance/pause/pause/PauseExecuteFunction.java | 2 +-
.../instance/stop/StopExecuteFunction.java | 4 +-
.../api/service/impl/ExecutorServiceImpl.java | 8 +-
.../api/service/impl/LoggerServiceImpl.java | 8 +-
.../service/impl/MetricsCleanUpServiceImpl.java | 2 +-
.../api/service/impl/TaskInstanceServiceImpl.java | 8 +-
.../client/JdkDynamicRpcClientProxyFactory.java | 36 +++++++--
.../SingletonJdkDynamicRpcClientProxyFactory.java | 14 ++--
.../base/server/JdkDynamicServerHandler.java | 2 +-
...ngletonJdkDynamicRpcClientProxyFactoryTest.java | 94 ++++++++++++++++++++++
.../server/master/event/TaskDelayEventHandler.java | 2 +-
.../master/event/TaskResultEventHandler.java | 2 +-
.../master/event/TaskRunningEventHandler.java | 2 +-
.../master/event/TaskUpdatePidEventHandler.java | 2 +-
.../master/runner/StreamTaskExecuteRunnable.java | 2 +-
.../master/runner/WorkflowExecuteRunnable.java | 10 +--
.../master/runner/WorkflowExecuteThreadPool.java | 2 +-
.../runner/dispatcher/MasterTaskDispatcher.java | 2 +-
.../runner/dispatcher/WorkerTaskDispatcher.java | 2 +-
...LogicTaskInstanceExecuteRunningEventSender.java | 2 +-
...ogicTaskInstanceExecutionFinishEventSender.java | 2 +-
.../LogicTaskExecuteRunnableKillOperator.java | 2 +-
.../LogicTaskExecuteRunnablePauseOperator.java | 2 +-
.../LogicTaskExecuteRunnableTimeoutOperator.java | 2 +-
.../operator/TaskExecuteRunnableKillOperator.java | 2 +-
.../operator/TaskExecuteRunnablePauseOperator.java | 2 +-
.../TaskExecuteRunnableTimeoutOperator.java | 2 +-
.../runner/task/dynamic/DynamicLogicTask.java | 2 +-
.../task/subworkflow/SubWorkflowLogicTask.java | 2 +-
.../master/service/WorkerFailoverService.java | 2 +-
dolphinscheduler-microbench/pom.xml | 5 ++
.../dolphinscheduler/microbench/rpc/IService.java | 21 ++---
.../microbench/rpc/IServiceImpl.java | 19 ++---
.../microbench/rpc/RpcBenchMarkTest.java | 76 +++++++++++++++++
.../service/process/ProcessServiceImpl.java | 8 +-
.../TaskInstanceExecutionFinishEventSender.java | 2 +-
...TaskInstanceExecutionInfoUpdateEventSender.java | 2 +-
.../TaskInstanceExecutionRunningEventSender.java | 2 +-
.../worker/runner/WorkerTaskExecuteRunnable.java | 2 +-
43 files changed, 273 insertions(+), 98 deletions(-)
diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS
index 3462cff0fa..b82253d7aa 100644
--- a/.github/CODEOWNERS
+++ b/.github/CODEOWNERS
@@ -33,7 +33,7 @@
/dolphinscheduler-master/ @caishunfeng @SbloodyS @ruanwenjun
/dolphinscheduler-worker/ @caishunfeng @SbloodyS @ruanwenjun
/dolphinscheduler-service/ @caishunfeng
-/dolphinscheduler-remote/ @caishunfeng
+/dolphinscheduler-extract/ @caishunfeng @ruanwenjun
/dolphinscheduler-spi/ @caishunfeng
/dolphinscheduler-task-plugin/ @caishunfeng @SbloodyS @zhuangchong
/dolphinscheduler-tools/ @caishunfeng @SbloodyS @zhongjiajie @EricGao888
diff --git a/docs/docs/en/about/glossary.md b/docs/docs/en/about/glossary.md
index 29f0da5a5a..e3cee76f14 100644
--- a/docs/docs/en/about/glossary.md
+++ b/docs/docs/en/about/glossary.md
@@ -61,7 +61,7 @@ process fails and ends
- dolphinscheduler-dao provides operations such as database access.
-- dolphinscheduler-remote client and server based on netty
+- dolphinscheduler-extract dolphinscheduler extract module, providing
master/worker/alert sdk.
- dolphinscheduler-service service module, including Quartz, Zookeeper, log
client access service, easy to call server
module and api module
diff --git a/docs/docs/zh/about/glossary.md b/docs/docs/zh/about/glossary.md
index 2358b2c5d1..2b5876669e 100644
--- a/docs/docs/zh/about/glossary.md
+++ b/docs/docs/zh/about/glossary.md
@@ -45,7 +45,7 @@
- dolphinscheduler-dao 提供数据库访问等操作。
-- dolphinscheduler-remote 基于 netty 的客户端、服务端
+- dolphinscheduler-extract extract模块,包含master/worker/alert的sdk
- dolphinscheduler-service
service模块,包含Quartz、Zookeeper、日志客户端访问服务,便于server模块和api模块调用
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
index cf6df2823a..53dff28aff 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
@@ -147,7 +147,7 @@ public class CacheEvictAspect {
return;
}
for (Server server : serverList) {
- IMasterCacheService masterCacheService =
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ IMasterCacheService masterCacheService =
SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(server.getHost() + ":" +
server.getPort(), IMasterCacheService.class);
masterCacheService.cacheExpire(new
CacheExpireRequest(cacheType, cacheKey));
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunction.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunction.java
index 5df321628a..e8d6de590d 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunction.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunction.java
@@ -58,7 +58,7 @@ public class PauseExecuteFunction implements
ExecuteFunction<PauseExecuteRequest
try {
// todo: direct call the workflow instance pause method
ITaskInstanceExecutionEventListener
iTaskInstanceExecutionEventListener =
- SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(workflowInstance.getHost(),
ITaskInstanceExecutionEventListener.class);
iTaskInstanceExecutionEventListener.onWorkflowInstanceInstanceStateChange(
new WorkflowInstanceStateChangeEvent(
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunction.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunction.java
index 8afb18a278..1ad0f9ae7a 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunction.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunction.java
@@ -59,8 +59,8 @@ public class StopExecuteFunction implements
ExecuteFunction<StopRequest, StopRes
try {
// todo: direct call the workflow instance stop method
ITaskInstanceExecutionEventListener
iTaskInstanceExecutionEventListener =
- SingletonJdkDynamicRpcClientProxyFactory.getInstance()
- .getProxyClient(workflowInstance.getHost(),
ITaskInstanceExecutionEventListener.class);
+
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(workflowInstance.getHost(),
+ ITaskInstanceExecutionEventListener.class);
iTaskInstanceExecutionEventListener.onWorkflowInstanceInstanceStateChange(
new WorkflowInstanceStateChangeEvent(
workflowInstance.getId(), 0,
workflowInstance.getState(), workflowInstance.getId(), 0));
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index be3a007d26..dc5e761190 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -656,7 +656,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
WorkflowInstanceStateChangeEvent workflowStateEventChangeRequest =
new WorkflowInstanceStateChangeEvent(
processInstance.getId(), 0, processInstance.getState(),
processInstance.getId(), 0);
ITaskInstanceExecutionEventListener
iTaskInstanceExecutionEventListener =
- SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(processInstance.getHost(),
ITaskInstanceExecutionEventListener.class);
iTaskInstanceExecutionEventListener.onWorkflowInstanceInstanceStateChange(workflowStateEventChangeRequest);
putMsg(result, Status.SUCCESS);
@@ -684,7 +684,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
taskGroupQueue.setForceStart(Flag.YES.getCode());
processService.updateTaskGroupQueue(taskGroupQueue);
log.info("Sending force start command to master: {}.",
processInstance.getHost());
- ILogicTaskInstanceOperator iLogicTaskInstanceOperator =
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ ILogicTaskInstanceOperator iLogicTaskInstanceOperator =
SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(processInstance.getHost(),
ILogicTaskInstanceOperator.class);
iLogicTaskInstanceOperator.forceStartTaskInstance(
new TaskInstanceForceStartRequest(processInstance.getId(),
taskGroupQueue.getTaskId()));
@@ -1152,7 +1152,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
log.error("Process instance does not exist,
processInstanceId:{}.", processInstanceId);
return null;
}
- IWorkflowInstanceService iWorkflowInstanceService =
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ IWorkflowInstanceService iWorkflowInstanceService =
SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(processInstance.getHost(),
IWorkflowInstanceService.class);
return
iWorkflowInstanceService.getWorkflowExecutingData(processInstanceId);
}
@@ -1189,7 +1189,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
taskExecuteStartMessage.setStartParams(startParams);
taskExecuteStartMessage.setDryRun(dryRun);
- IStreamingTaskOperator streamingTaskOperator =
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ IStreamingTaskOperator streamingTaskOperator =
SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(server.getHost() + ":" + server.getPort(),
IStreamingTaskOperator.class);
StreamingTaskTriggerResponse streamingTaskTriggerResponse =
streamingTaskOperator.triggerStreamingTask(taskExecuteStartMessage);
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
index 99216e917f..69c7ba1a01 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
@@ -225,7 +225,7 @@ public class LoggerServiceImpl extends BaseServiceImpl
implements LoggerService
String logContent = null;
if (TaskUtils.isLogicTask(taskInstance.getTaskType())) {
- IMasterLogService masterLogService =
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ IMasterLogService masterLogService =
SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstance.getHost(),
IMasterLogService.class);
try {
LogicTaskInstanceLogPageQueryRequest
logicTaskInstanceLogPageQueryRequest =
@@ -237,7 +237,7 @@ public class LoggerServiceImpl extends BaseServiceImpl
implements LoggerService
log.error("Query LogicTaskInstance log error", ex);
}
} else {
- IWorkerLogService iWorkerLogService =
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ IWorkerLogService iWorkerLogService =
SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(host, IWorkerLogService.class);
try {
TaskInstanceLogPageQueryRequest
taskInstanceLogPageQueryRequest =
@@ -282,7 +282,7 @@ public class LoggerServiceImpl extends BaseServiceImpl
implements LoggerService
byte[] logBytes = new byte[0];
if (TaskUtils.isLogicTask(taskInstance.getTaskType())) {
- IMasterLogService masterLogService =
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ IMasterLogService masterLogService =
SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstance.getHost(),
IMasterLogService.class);
try {
LogicTaskInstanceLogFileDownloadRequest
logicTaskInstanceLogFileDownloadRequest =
@@ -294,7 +294,7 @@ public class LoggerServiceImpl extends BaseServiceImpl
implements LoggerService
log.error("Query LogicTaskInstance log error", ex);
}
} else {
- IWorkerLogService iWorkerLogService =
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ IWorkerLogService iWorkerLogService =
SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(host, IWorkerLogService.class);
try {
TaskInstanceLogFileDownloadRequest
taskInstanceLogFileDownloadRequest =
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MetricsCleanUpServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MetricsCleanUpServiceImpl.java
index a7ad669e3b..f1ac0579ff 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MetricsCleanUpServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MetricsCleanUpServiceImpl.java
@@ -50,7 +50,7 @@ public class MetricsCleanUpServiceImpl implements
MetricsCleanUpService {
private void cleanUpWorkflowMetrics(Server server, Long
workflowDefinitionCode) {
try {
IWorkflowInstanceService iWorkflowInstanceService =
-
SingletonJdkDynamicRpcClientProxyFactory.getInstance().getProxyClient(
+ SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(
String.format("%s:%s", server.getHost(),
server.getPort()), IWorkflowInstanceService.class);
iWorkflowInstanceService.clearWorkflowMetrics(workflowDefinitionCode);
} catch (Exception e) {
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
index 91156a6890..a381a07d49 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
@@ -290,7 +290,7 @@ public class TaskInstanceServiceImpl extends
BaseServiceImpl implements TaskInst
return result;
}
IStreamingTaskInstanceOperator streamingTaskInstanceOperator =
- SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstance.getHost(),
IStreamingTaskInstanceOperator.class);
TaskInstanceTriggerSavepointResponse
taskInstanceTriggerSavepointResponse =
streamingTaskInstanceOperator.triggerSavepoint(new
TaskInstanceTriggerSavepointRequest(taskInstanceId));
@@ -322,7 +322,7 @@ public class TaskInstanceServiceImpl extends
BaseServiceImpl implements TaskInst
}
// todo: we only support streaming task for now
- ITaskInstanceOperator iTaskInstanceOperator =
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ ITaskInstanceOperator iTaskInstanceOperator =
SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstance.getHost(),
ITaskInstanceOperator.class);
TaskInstanceKillResponse taskInstanceKillResponse =
iTaskInstanceOperator.killTask(new
TaskInstanceKillRequest(taskInstanceId));
@@ -381,11 +381,11 @@ public class TaskInstanceServiceImpl extends
BaseServiceImpl implements TaskInst
// delete log
if (StringUtils.isNotEmpty(taskInstance.getLogPath())) {
if (TaskUtils.isLogicTask(taskInstance.getTaskType())) {
- IMasterLogService masterLogService =
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ IMasterLogService masterLogService =
SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstance.getHost(),
IMasterLogService.class);
masterLogService.removeLogicTaskInstanceLog(taskInstance.getLogPath());
} else {
- IWorkerLogService workerLogService =
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ IWorkerLogService workerLogService =
SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstance.getHost(),
IWorkerLogService.class);
workerLogService.removeTaskInstanceLog(taskInstance.getLogPath());
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/JdkDynamicRpcClientProxyFactory.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/JdkDynamicRpcClientProxyFactory.java
index 25e6e32ee4..5635a88f34 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/JdkDynamicRpcClientProxyFactory.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/JdkDynamicRpcClientProxyFactory.java
@@ -21,9 +21,16 @@ import
org.apache.dolphinscheduler.extract.base.NettyRemotingClient;
import org.apache.dolphinscheduler.extract.base.utils.Host;
import java.lang.reflect.Proxy;
+import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import lombok.SneakyThrows;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
/**
* This class is used to create a proxy client which will transform local
method invocation to remove invocation.
*/
@@ -31,21 +38,34 @@ public class JdkDynamicRpcClientProxyFactory implements
IRpcClientProxyFactory {
private final NettyRemotingClient nettyRemotingClient;
- // todo: use guava cache to avoid memory leak
- private final Map<String, Map<String, Object>> proxyClientCache = new
ConcurrentHashMap<>();
+ private static final LoadingCache<String, Map<String, Object>>
proxyClientCache = CacheBuilder.newBuilder()
+ // expire here to remove dead host
+ .expireAfterAccess(Duration.ofHours(1))
+ .build(new CacheLoader<String, Map<String, Object>>() {
+
+ @Override
+ public Map<String, Object> load(String key) {
+ return new ConcurrentHashMap<>();
+ }
+ });
public JdkDynamicRpcClientProxyFactory(NettyRemotingClient
nettyRemotingClient) {
this.nettyRemotingClient = nettyRemotingClient;
}
+ @SneakyThrows
@SuppressWarnings("unchecked")
@Override
public <T> T getProxyClient(String serverHost, Class<T> clientInterface) {
- return (T) proxyClientCache
- .computeIfAbsent(serverHost, key -> new ConcurrentHashMap<>())
- .computeIfAbsent(clientInterface.getName(),
- key -> Proxy.newProxyInstance(
- clientInterface.getClassLoader(), new
Class[]{clientInterface},
- new
ClientInvocationHandler(Host.of(serverHost), nettyRemotingClient)));
+ return (T) proxyClientCache.get(serverHost)
+ .computeIfAbsent(clientInterface.getName(), key ->
newProxyClient(serverHost, clientInterface));
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> T newProxyClient(String serverHost, Class<T> clientInterface) {
+ return (T) Proxy.newProxyInstance(
+ clientInterface.getClassLoader(),
+ new Class[]{clientInterface},
+ new ClientInvocationHandler(Host.of(serverHost),
nettyRemotingClient));
}
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java
index 44e0420272..28d82532be 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java
@@ -20,17 +20,13 @@ package org.apache.dolphinscheduler.extract.base.client;
import org.apache.dolphinscheduler.extract.base.NettyRemotingClientFactory;
import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
-public class SingletonJdkDynamicRpcClientProxyFactory extends
JdkDynamicRpcClientProxyFactory {
+public class SingletonJdkDynamicRpcClientProxyFactory {
- private static final SingletonJdkDynamicRpcClientProxyFactory INSTANCE =
- new SingletonJdkDynamicRpcClientProxyFactory();
+ private static final JdkDynamicRpcClientProxyFactory INSTANCE = new
JdkDynamicRpcClientProxyFactory(
+ NettyRemotingClientFactory.buildNettyRemotingClient(new
NettyClientConfig()));
- private SingletonJdkDynamicRpcClientProxyFactory() {
- super(NettyRemotingClientFactory.buildNettyRemotingClient(new
NettyClientConfig()));
- }
-
- public static SingletonJdkDynamicRpcClientProxyFactory getInstance() {
- return INSTANCE;
+ public static <T> T getProxyClient(String serverAddress, Class<T> clazz) {
+ return INSTANCE.getProxyClient(serverAddress, clazz);
}
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
index 7e5feab05f..a98362209d 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
@@ -96,7 +96,7 @@ public class JdkDynamicServerHandler extends
ChannelInboundHandlerAdapter {
StandardRpcRequest standardRpcRequest =
JsonSerializer.deserialize(transporter.getBody(),
StandardRpcRequest.class);
Object[] args;
- if (standardRpcRequest.getArgs().length == 0) {
+ if (standardRpcRequest.getArgs() == null ||
standardRpcRequest.getArgs().length == 0) {
args = null;
} else {
args = new Object[standardRpcRequest.getArgs().length];
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java
new file mode 100644
index 0000000000..3cf9ff1c89
--- /dev/null
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.extract.base.client;
+
+import org.apache.dolphinscheduler.extract.base.NettyRemotingServer;
+import org.apache.dolphinscheduler.extract.base.RpcMethod;
+import org.apache.dolphinscheduler.extract.base.RpcService;
+import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
+import
org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class SingletonJdkDynamicRpcClientProxyFactoryTest {
+
+ private NettyRemotingServer nettyRemotingServer;
+
+ @BeforeEach
+ public void setUp() {
+ nettyRemotingServer = new NettyRemotingServer(new
NettyServerConfig(12345));
+ nettyRemotingServer.start();
+
+ new SpringServerMethodInvokerDiscovery(nettyRemotingServer)
+ .postProcessAfterInitialization(new IServiceImpl(),
"iServiceImpl");
+ }
+
+ @Test
+ public void getProxyClient() {
+ IService proxyClient =
+
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient("localhost:12345",
IService.class);
+ Assertions.assertNotNull(proxyClient);
+ }
+
+ @Test
+ public void testPing() {
+ IService proxyClient =
+
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient("localhost:12345",
IService.class);
+ String ping = proxyClient.ping("ping");
+ Assertions.assertEquals("pong", ping);
+ }
+
+ @Test
+ public void testVoid() {
+ IService proxyClient =
+
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient("localhost:12345",
IService.class);
+ Assertions.assertDoesNotThrow(proxyClient::voidMethod);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ nettyRemotingServer.close();
+ }
+
+ @RpcService
+ public interface IService {
+
+ @RpcMethod
+ String ping(String ping);
+
+ @RpcMethod
+ void voidMethod();
+ }
+
+ public static class IServiceImpl implements IService {
+
+ @Override
+ public String ping(String ping) {
+ return "pong";
+ }
+
+ @Override
+ public void voidMethod() {
+ System.out.println("void method");
+ }
+ }
+
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java
index 25ed3283ed..f749ac46cd 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java
@@ -114,7 +114,7 @@ public class TaskDelayEventHandler implements
TaskEventHandler {
private void sendAckToWorker(TaskEvent taskEvent) {
// If event handle success, send ack to worker to otherwise the worker
will retry this event
ITaskInstanceExecutionEventAckListener
instanceExecutionEventAckListener =
- SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskEvent.getWorkerAddress(),
ITaskInstanceExecutionEventAckListener.class);
instanceExecutionEventAckListener.handleTaskInstanceExecutionRunningEventAck(
TaskInstanceExecutionRunningEventAck.success(taskEvent.getTaskInstanceId()));
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
index f331dfd360..854812264f 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
@@ -116,7 +116,7 @@ public class TaskResultEventHandler implements
TaskEventHandler {
public void sendAckToWorker(TaskEvent taskEvent) {
ITaskInstanceExecutionEventAckListener
instanceExecutionEventAckListener =
- SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskEvent.getWorkerAddress(),
ITaskInstanceExecutionEventAckListener.class);
instanceExecutionEventAckListener.handleTaskInstanceExecutionFinishEventAck(
TaskInstanceExecutionFinishEventAck.success(taskEvent.getTaskInstanceId()));
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java
index 03300f2ddb..14cb8571d9 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java
@@ -110,7 +110,7 @@ public class TaskRunningEventHandler implements
TaskEventHandler {
private void sendAckToWorker(TaskEvent taskEvent) {
// If event handle success, send ack to worker to otherwise the worker
will retry this event
ITaskInstanceExecutionEventAckListener
instanceExecutionEventAckListener =
- SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskEvent.getWorkerAddress(),
ITaskInstanceExecutionEventAckListener.class);
instanceExecutionEventAckListener.handleTaskInstanceExecutionRunningEventAck(
TaskInstanceExecutionRunningEventAck.success(taskEvent.getTaskInstanceId()));
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskUpdatePidEventHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskUpdatePidEventHandler.java
index f684322875..5be19ddfdb 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskUpdatePidEventHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskUpdatePidEventHandler.java
@@ -94,7 +94,7 @@ public class TaskUpdatePidEventHandler implements
TaskEventHandler {
private void sendAckToWorker(TaskEvent taskEvent) {
// If event handle success, send ack to worker to otherwise the worker
will retry this event
ITaskInstanceExecutionEventAckListener
instanceExecutionEventAckListener =
- SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskEvent.getWorkerAddress(),
ITaskInstanceExecutionEventAckListener.class);
instanceExecutionEventAckListener.handleTaskInstanceExecutionInfoEventAck(
TaskInstanceExecutionInfoEventAck.success(taskEvent.getTaskInstanceId()));
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
index a4b4f083d9..24e524d171 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
@@ -473,7 +473,7 @@ public class StreamTaskExecuteRunnable implements Runnable {
private void sendAckToWorker(TaskEvent taskEvent) {
// If event handle success, send ack to worker to otherwise the worker
will retry this event
ITaskInstanceExecutionEventAckListener
instanceExecutionEventAckListener =
- SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskEvent.getWorkerAddress(),
ITaskInstanceExecutionEventAckListener.class);
if (taskEvent.getEvent() == TaskEventType.RUNNING) {
log.error("taskEvent.getChannel() is null, taskEvent:{}",
taskEvent);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index b3fc7c4960..9c5e65160f 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -60,6 +60,7 @@ import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
import
org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.master.ILogicTaskInstanceOperator;
import
org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupRequest;
+import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator;
import
org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostRequest;
import
org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostResponse;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
@@ -466,7 +467,7 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
} else {
ProcessInstance processInstance =
processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId());
- ILogicTaskInstanceOperator taskInstanceOperator =
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ ILogicTaskInstanceOperator taskInstanceOperator =
SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(processInstance.getHost(),
ILogicTaskInstanceOperator.class);
taskInstanceOperator.wakeupTaskInstance(
new TaskInstanceWakeupRequest(processInstance.getId(),
nextTaskInstance.getId()));
@@ -1385,10 +1386,9 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
return false;
}
try {
- org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator
iTaskInstanceOperator =
- SingletonJdkDynamicRpcClientProxyFactory.getInstance()
- .getProxyClient(taskInstance.getHost(),
-
org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator.class);
+ ITaskInstanceOperator iTaskInstanceOperator =
+ SingletonJdkDynamicRpcClientProxyFactory
+ .getProxyClient(taskInstance.getHost(),
ITaskInstanceOperator.class);
UpdateWorkflowHostResponse updateWorkflowHostResponse =
iTaskInstanceOperator.updateWorkflowInstanceHost(
new UpdateWorkflowHostRequest(taskInstance.getId(),
masterConfig.getMasterAddress()));
if (!updateWorkflowHostResponse.isSuccess()) {
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index 8fe1a9a683..c0743b5a2c 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -226,7 +226,7 @@ public class WorkflowExecuteThreadPool extends
ThreadPoolTaskExecutor {
return;
}
ITaskInstanceExecutionEventListener
iTaskInstanceExecutionEventListener =
- SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(processInstanceHost,
ITaskInstanceExecutionEventListener.class);
WorkflowInstanceStateChangeEvent workflowInstanceStateChangeEvent =
new WorkflowInstanceStateChangeEvent(
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcher.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcher.java
index 4a4ca42c58..d30a1e554d 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcher.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcher.java
@@ -51,7 +51,7 @@ public class MasterTaskDispatcher extends BaseTaskDispatcher {
protected void doDispatch(TaskExecuteRunnable taskExecuteRunnable) throws
TaskDispatchException {
TaskExecutionContext taskExecutionContext =
taskExecuteRunnable.getTaskExecutionContext();
try {
- ILogicTaskInstanceOperator taskInstanceOperator =
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ ILogicTaskInstanceOperator taskInstanceOperator =
SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskExecutionContext.getHost(),
ILogicTaskInstanceOperator.class);
LogicTaskDispatchResponse logicTaskDispatchResponse =
taskInstanceOperator
.dispatchLogicTask(new
LogicTaskDispatchRequest(taskExecuteRunnable.getTaskExecutionContext()));
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcher.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcher.java
index 6760633b9b..36739a1163 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcher.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcher.java
@@ -56,7 +56,7 @@ public class WorkerTaskDispatcher extends BaseTaskDispatcher {
protected void doDispatch(TaskExecuteRunnable taskExecuteRunnable) throws
TaskDispatchException {
TaskExecutionContext taskExecutionContext =
taskExecuteRunnable.getTaskExecutionContext();
try {
- ITaskInstanceOperator taskInstanceOperator =
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ ITaskInstanceOperator taskInstanceOperator =
SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskExecutionContext.getHost(),
ITaskInstanceOperator.class);
TaskInstanceDispatchResponse taskInstanceDispatchResponse =
taskInstanceOperator
.dispatchTask(new
TaskInstanceDispatchRequest(taskExecuteRunnable.getTaskExecutionContext()));
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecuteRunningEventSender.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecuteRunningEventSender.java
index 1dcca72878..6ea085b31b 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecuteRunningEventSender.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecuteRunningEventSender.java
@@ -34,7 +34,7 @@ public class LogicTaskInstanceExecuteRunningEventSender
@Override
public void sendMessage(TaskInstanceExecutionRunningEvent
taskInstanceExecutionRunningEvent) {
ITaskInstanceExecutionEventListener
iTaskInstanceExecutionEventListener =
- SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstanceExecutionRunningEvent.getHost(),
ITaskInstanceExecutionEventListener.class);
iTaskInstanceExecutionEventListener.onTaskInstanceExecutionRunning(taskInstanceExecutionRunningEvent);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecutionFinishEventSender.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecutionFinishEventSender.java
index ad5ea37ec6..1949145e86 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecutionFinishEventSender.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecutionFinishEventSender.java
@@ -32,7 +32,7 @@ public class LogicTaskInstanceExecutionFinishEventSender
@Override
public void sendMessage(TaskInstanceExecutionFinishEvent message) {
ITaskInstanceExecutionEventListener
iTaskInstanceExecutionEventListener =
- SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(message.getHost(),
ITaskInstanceExecutionEventListener.class);
iTaskInstanceExecutionEventListener.onTaskInstanceExecutionFinish(message);
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableKillOperator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableKillOperator.java
index 29ef03906d..e7c7b31da1 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableKillOperator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableKillOperator.java
@@ -45,7 +45,7 @@ public class LogicTaskExecuteRunnableKillOperator extends
BaseTaskExecuteRunnabl
taskInstance.getName());
return;
}
- final ILogicTaskInstanceOperator taskInstanceOperator =
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ final ILogicTaskInstanceOperator taskInstanceOperator =
SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstance.getHost(),
ILogicTaskInstanceOperator.class);
final LogicTaskKillRequest logicTaskKillRequest = new
LogicTaskKillRequest(taskInstance.getId());
final LogicTaskKillResponse logicTaskKillResponse =
taskInstanceOperator.killLogicTask(logicTaskKillRequest);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnablePauseOperator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnablePauseOperator.java
index af865f05f6..4c1c77f24a 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnablePauseOperator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnablePauseOperator.java
@@ -40,7 +40,7 @@ public class LogicTaskExecuteRunnablePauseOperator extends
BaseTaskExecuteRunnab
taskInstance.getName());
return;
}
- final ILogicTaskInstanceOperator taskInstanceOperator =
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ final ILogicTaskInstanceOperator taskInstanceOperator =
SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstance.getHost(),
ILogicTaskInstanceOperator.class);
final LogicTaskPauseRequest logicTaskPauseRequest = new
LogicTaskPauseRequest(taskInstance.getId());
final LogicTaskPauseResponse logicTaskPauseResponse =
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableTimeoutOperator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableTimeoutOperator.java
index 4cf16bb2c7..949bc940f5 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableTimeoutOperator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableTimeoutOperator.java
@@ -47,7 +47,7 @@ public class LogicTaskExecuteRunnableTimeoutOperator extends
BaseTaskExecuteRunn
}
final ILogicTaskInstanceOperator iLogicTaskInstanceOperator =
- SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstance.getHost(),
ILogicTaskInstanceOperator.class);
final LogicTaskKillRequest taskInstanceKillRequest = new
LogicTaskKillRequest(taskInstance.getId());
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableKillOperator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableKillOperator.java
index b80cb79561..dc9915f901 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableKillOperator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableKillOperator.java
@@ -43,7 +43,7 @@ public class TaskExecuteRunnableKillOperator extends
BaseTaskExecuteRunnableKill
log.info("TaskInstance {} host is empty, no need to
killRemoteTask", taskInstance.getName());
return;
}
- ITaskInstanceOperator taskInstanceOperator =
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ ITaskInstanceOperator taskInstanceOperator =
SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstance.getHost(),
ITaskInstanceOperator.class);
TaskInstanceKillRequest taskInstanceKillRequest = new
TaskInstanceKillRequest(taskInstance.getId());
TaskInstanceKillResponse taskInstanceKillResponse =
taskInstanceOperator.killTask(taskInstanceKillRequest);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnablePauseOperator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnablePauseOperator.java
index a0636b8504..39896bb0cb 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnablePauseOperator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnablePauseOperator.java
@@ -48,7 +48,7 @@ public class TaskExecuteRunnablePauseOperator implements
TaskExecuteRunnableOper
log.info("The TaskInstance: {} host is null, no need to
pauseRemoteTaskInstance", taskInstance.getName());
return;
}
- final ITaskInstanceOperator taskInstanceOperator =
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ final ITaskInstanceOperator taskInstanceOperator =
SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstance.getHost(),
ITaskInstanceOperator.class);
final TaskInstancePauseRequest taskInstancePauseRequest = new
TaskInstancePauseRequest(taskInstance.getId());
final TaskInstancePauseResponse taskInstancePauseResponse =
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableTimeoutOperator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableTimeoutOperator.java
index 7154f5fa62..9356b24370 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableTimeoutOperator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableTimeoutOperator.java
@@ -45,7 +45,7 @@ public class TaskExecuteRunnableTimeoutOperator extends
BaseTaskExecuteRunnableT
return;
}
- final ITaskInstanceOperator iTaskInstanceOperator =
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ final ITaskInstanceOperator iTaskInstanceOperator =
SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstance.getHost(),
ITaskInstanceOperator.class);
final TaskInstanceKillRequest taskInstanceKillRequest = new
TaskInstanceKillRequest(taskInstance.getId());
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java
index 25b53c8d6d..3baa10b343 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java
@@ -286,7 +286,7 @@ public class DynamicLogicTask extends
BaseAsyncLogicTask<DynamicParameters> {
private void sendToSubProcess(TaskExecutionContext taskExecutionContext,
ProcessInstance subProcessInstance) {
final ITaskInstanceExecutionEventListener
iTaskInstanceExecutionEventListener =
- SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(subProcessInstance.getHost(),
ITaskInstanceExecutionEventListener.class);
final WorkflowInstanceStateChangeEvent
workflowInstanceStateChangeEvent = new WorkflowInstanceStateChangeEvent(
taskExecutionContext.getProcessInstanceId(),
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java
index 8e55e6fc82..1883a27d8b 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java
@@ -136,7 +136,7 @@ public class SubWorkflowLogicTask extends
BaseAsyncLogicTask<SubProcessParameter
private void sendToSubProcess(TaskExecutionContext taskExecutionContext,
ProcessInstance subProcessInstance) {
final ITaskInstanceExecutionEventListener
iTaskInstanceExecutionEventListener =
- SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(subProcessInstance.getHost(),
ITaskInstanceExecutionEventListener.class);
final WorkflowInstanceStateChangeEvent
workflowInstanceStateChangeEvent = new WorkflowInstanceStateChangeEvent(
taskExecutionContext.getProcessInstanceId(),
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
index 4d5ddeaa4e..0dc1b16679 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
@@ -274,7 +274,7 @@ public class WorkerFailoverService {
.create();
// only kill yarn/k8s job if exists , the local thread has exited
log.info("TaskInstance failover begin kill the task related yarn
or k8s job");
- IWorkerLogService iWorkerLogService =
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ IWorkerLogService iWorkerLogService =
SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstance.getHost(),
IWorkerLogService.class);
GetAppIdResponse getAppIdResponse =
iWorkerLogService.getAppId(new
GetAppIdRequest(taskInstance.getId(), taskInstance.getLogPath()));
diff --git a/dolphinscheduler-microbench/pom.xml
b/dolphinscheduler-microbench/pom.xml
index aaf7065323..ae5e62f610 100644
--- a/dolphinscheduler-microbench/pom.xml
+++ b/dolphinscheduler-microbench/pom.xml
@@ -48,6 +48,11 @@
<dependencies>
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-extract-base</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java
b/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/IService.java
similarity index 53%
copy from
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java
copy to
dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/IService.java
index 44e0420272..0a1122aa18 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java
+++
b/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/IService.java
@@ -15,22 +15,15 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.extract.base.client;
+package org.apache.dolphinscheduler.microbench.rpc;
-import org.apache.dolphinscheduler.extract.base.NettyRemotingClientFactory;
-import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
+import org.apache.dolphinscheduler.extract.base.RpcMethod;
+import org.apache.dolphinscheduler.extract.base.RpcService;
-public class SingletonJdkDynamicRpcClientProxyFactory extends
JdkDynamicRpcClientProxyFactory {
+@RpcService
+public interface IService {
- private static final SingletonJdkDynamicRpcClientProxyFactory INSTANCE =
- new SingletonJdkDynamicRpcClientProxyFactory();
-
- private SingletonJdkDynamicRpcClientProxyFactory() {
- super(NettyRemotingClientFactory.buildNettyRemotingClient(new
NettyClientConfig()));
- }
-
- public static SingletonJdkDynamicRpcClientProxyFactory getInstance() {
- return INSTANCE;
- }
+ @RpcMethod
+ String ping(String pingRequest);
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java
b/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/IServiceImpl.java
similarity index 53%
copy from
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java
copy to
dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/IServiceImpl.java
index 44e0420272..0bd83ecc29 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java
+++
b/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/IServiceImpl.java
@@ -15,22 +15,13 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.extract.base.client;
+package org.apache.dolphinscheduler.microbench.rpc;
-import org.apache.dolphinscheduler.extract.base.NettyRemotingClientFactory;
-import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
+public class IServiceImpl implements IService {
-public class SingletonJdkDynamicRpcClientProxyFactory extends
JdkDynamicRpcClientProxyFactory {
-
- private static final SingletonJdkDynamicRpcClientProxyFactory INSTANCE =
- new SingletonJdkDynamicRpcClientProxyFactory();
-
- private SingletonJdkDynamicRpcClientProxyFactory() {
- super(NettyRemotingClientFactory.buildNettyRemotingClient(new
NettyClientConfig()));
- }
-
- public static SingletonJdkDynamicRpcClientProxyFactory getInstance() {
- return INSTANCE;
+ @Override
+ public String ping(String pingRequest) {
+ return "I get " + pingRequest + ", I am Pong!";
}
}
diff --git
a/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/RpcBenchMarkTest.java
b/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/RpcBenchMarkTest.java
new file mode 100644
index 0000000000..fd423bcda0
--- /dev/null
+++
b/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/RpcBenchMarkTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.microbench.rpc;
+
+import org.apache.dolphinscheduler.extract.base.NettyRemotingServer;
+import
org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
+import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
+import
org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
+import org.apache.dolphinscheduler.microbench.base.AbstractBaseBenchmark;
+
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+@Slf4j
+@Warmup(iterations = 5, time = 1)
+@Measurement(iterations = 10, time = 1)
+@State(Scope.Benchmark)
+@BenchmarkMode({Mode.Throughput, Mode.AverageTime, Mode.SampleTime})
+public class RpcBenchMarkTest extends AbstractBaseBenchmark {
+
+ private NettyRemotingServer nettyRemotingServer;
+
+ private IService iService;
+
+ @Setup
+ public void before() {
+ nettyRemotingServer = new NettyRemotingServer(new
NettyServerConfig(12345));
+ nettyRemotingServer.start();
+ SpringServerMethodInvokerDiscovery springServerMethodInvokerDiscovery =
+ new SpringServerMethodInvokerDiscovery(nettyRemotingServer);
+ springServerMethodInvokerDiscovery.postProcessAfterInitialization(new
IServiceImpl(), "iServiceImpl");
+ iService =
+
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient("localhost:12345",
IService.class);
+ }
+
+ @Benchmark
+ @BenchmarkMode({Mode.Throughput, Mode.AverageTime, Mode.SampleTime})
+ @OutputTimeUnit(TimeUnit.MILLISECONDS)
+ public void sendTest(Blackhole bh) {
+ String pong = iService.ping("ping");
+ bh.consume(pong);
+ }
+
+ @TearDown
+ public void after() {
+ nettyRemotingServer.close();
+ }
+}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 82bbef30de..a47fdac8a5 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -389,8 +389,8 @@ public class ProcessServiceImpl implements ProcessService {
if (update) {
try {
final ITaskInstanceExecutionEventListener
iTaskInstanceExecutionEventListener =
-
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
- .getProxyClient(info.getHost(),
ITaskInstanceExecutionEventListener.class);
+
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(info.getHost(),
+
ITaskInstanceExecutionEventListener.class);
final WorkflowInstanceStateChangeEvent
workflowInstanceStateChangeEvent =
new
WorkflowInstanceStateChangeEvent(info.getId(), 0, info.getState(),
info.getId(), 0);
iTaskInstanceExecutionEventListener
@@ -516,11 +516,11 @@ public class ProcessServiceImpl implements ProcessService
{
continue;
}
if (TaskUtils.isLogicTask(taskInstance.getTaskType())) {
- IMasterLogService masterLogService =
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ IMasterLogService masterLogService =
SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstance.getHost(),
IMasterLogService.class);
masterLogService.removeLogicTaskInstanceLog(taskLogPath);
} else {
- IWorkerLogService iWorkerLogService =
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ IWorkerLogService iWorkerLogService =
SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstance.getHost(),
IWorkerLogService.class);
iWorkerLogService.removeTaskInstanceLog(taskLogPath);
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionFinishEventSender.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionFinishEventSender.java
index 27a9de73ae..9469ba1a69 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionFinishEventSender.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionFinishEventSender.java
@@ -33,7 +33,7 @@ public class TaskInstanceExecutionFinishEventSender
@Override
public void sendEvent(TaskInstanceExecutionFinishEvent
taskInstanceExecutionFinishEvent) {
ITaskInstanceExecutionEventListener
iTaskInstanceExecutionEventListener =
- SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstanceExecutionFinishEvent.getHost(),
ITaskInstanceExecutionEventListener.class);
iTaskInstanceExecutionEventListener.onTaskInstanceExecutionFinish(taskInstanceExecutionFinishEvent);
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionInfoUpdateEventSender.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionInfoUpdateEventSender.java
index e5350a7c0c..4b9e7e76b0 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionInfoUpdateEventSender.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionInfoUpdateEventSender.java
@@ -35,7 +35,7 @@ public class TaskInstanceExecutionInfoUpdateEventSender
@Override
public void sendEvent(TaskInstanceExecutionInfoEvent
taskInstanceExecutionInfoEvent) {
ITaskInstanceExecutionEventListener
iTaskInstanceExecutionEventListener =
- SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstanceExecutionInfoEvent.getHost(),
ITaskInstanceExecutionEventListener.class);
iTaskInstanceExecutionEventListener.onTaskInstanceExecutionInfoUpdate(taskInstanceExecutionInfoEvent);
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionRunningEventSender.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionRunningEventSender.java
index 4f40940030..4f64a94002 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionRunningEventSender.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionRunningEventSender.java
@@ -35,7 +35,7 @@ public class TaskInstanceExecutionRunningEventSender
@Override
public void sendEvent(TaskInstanceExecutionRunningEvent
taskInstanceExecutionRunningEvent) {
ITaskInstanceExecutionEventListener
iTaskInstanceExecutionEventListener =
- SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstanceExecutionRunningEvent.getHost(),
ITaskInstanceExecutionEventListener.class);
iTaskInstanceExecutionEventListener.onTaskInstanceExecutionRunning(taskInstanceExecutionRunningEvent);
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
index 5fadf27ef0..15141ed747 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
@@ -257,7 +257,7 @@ public abstract class WorkerTaskExecuteRunnable implements
Runnable {
task.getExitStatus() == TaskExecutionStatus.SUCCESS ?
WarningType.SUCCESS.getCode()
: WarningType.FAILURE.getCode());
try {
- IAlertOperator alertOperator =
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+ IAlertOperator alertOperator =
SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(alertServerAddress.getAddress(),
IAlertOperator.class);
AlertSendResponse alertSendResponse =
alertOperator.sendAlert(alertSendRequest);
log.info("Send alert to: {} successfully, response: {}",
alertServerAddress, alertSendResponse);