This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new d223d654cc Add rpc benchmark test (#14797)
d223d654cc is described below

commit d223d654ccfbf5621c4faeed1b7fe45715bf5a6b
Author: Wenjun Ruan <[email protected]>
AuthorDate: Sun Aug 27 20:59:55 2023 +0800

    Add rpc benchmark test (#14797)
---
 .github/CODEOWNERS                                 |  2 +-
 docs/docs/en/about/glossary.md                     |  2 +-
 docs/docs/zh/about/glossary.md                     |  2 +-
 .../api/aspect/CacheEvictAspect.java               |  2 +-
 .../instance/pause/pause/PauseExecuteFunction.java |  2 +-
 .../instance/stop/StopExecuteFunction.java         |  4 +-
 .../api/service/impl/ExecutorServiceImpl.java      |  8 +-
 .../api/service/impl/LoggerServiceImpl.java        |  8 +-
 .../service/impl/MetricsCleanUpServiceImpl.java    |  2 +-
 .../api/service/impl/TaskInstanceServiceImpl.java  |  8 +-
 .../client/JdkDynamicRpcClientProxyFactory.java    | 36 +++++++--
 .../SingletonJdkDynamicRpcClientProxyFactory.java  | 14 ++--
 .../base/server/JdkDynamicServerHandler.java       |  2 +-
 ...ngletonJdkDynamicRpcClientProxyFactoryTest.java | 94 ++++++++++++++++++++++
 .../server/master/event/TaskDelayEventHandler.java |  2 +-
 .../master/event/TaskResultEventHandler.java       |  2 +-
 .../master/event/TaskRunningEventHandler.java      |  2 +-
 .../master/event/TaskUpdatePidEventHandler.java    |  2 +-
 .../master/runner/StreamTaskExecuteRunnable.java   |  2 +-
 .../master/runner/WorkflowExecuteRunnable.java     | 10 +--
 .../master/runner/WorkflowExecuteThreadPool.java   |  2 +-
 .../runner/dispatcher/MasterTaskDispatcher.java    |  2 +-
 .../runner/dispatcher/WorkerTaskDispatcher.java    |  2 +-
 ...LogicTaskInstanceExecuteRunningEventSender.java |  2 +-
 ...ogicTaskInstanceExecutionFinishEventSender.java |  2 +-
 .../LogicTaskExecuteRunnableKillOperator.java      |  2 +-
 .../LogicTaskExecuteRunnablePauseOperator.java     |  2 +-
 .../LogicTaskExecuteRunnableTimeoutOperator.java   |  2 +-
 .../operator/TaskExecuteRunnableKillOperator.java  |  2 +-
 .../operator/TaskExecuteRunnablePauseOperator.java |  2 +-
 .../TaskExecuteRunnableTimeoutOperator.java        |  2 +-
 .../runner/task/dynamic/DynamicLogicTask.java      |  2 +-
 .../task/subworkflow/SubWorkflowLogicTask.java     |  2 +-
 .../master/service/WorkerFailoverService.java      |  2 +-
 dolphinscheduler-microbench/pom.xml                |  5 ++
 .../dolphinscheduler/microbench/rpc/IService.java  | 21 ++---
 .../microbench/rpc/IServiceImpl.java               | 19 ++---
 .../microbench/rpc/RpcBenchMarkTest.java           | 76 +++++++++++++++++
 .../service/process/ProcessServiceImpl.java        |  8 +-
 .../TaskInstanceExecutionFinishEventSender.java    |  2 +-
 ...TaskInstanceExecutionInfoUpdateEventSender.java |  2 +-
 .../TaskInstanceExecutionRunningEventSender.java   |  2 +-
 .../worker/runner/WorkerTaskExecuteRunnable.java   |  2 +-
 43 files changed, 273 insertions(+), 98 deletions(-)

diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS
index 3462cff0fa..b82253d7aa 100644
--- a/.github/CODEOWNERS
+++ b/.github/CODEOWNERS
@@ -33,7 +33,7 @@
 /dolphinscheduler-master/ @caishunfeng @SbloodyS @ruanwenjun
 /dolphinscheduler-worker/ @caishunfeng @SbloodyS @ruanwenjun
 /dolphinscheduler-service/ @caishunfeng
-/dolphinscheduler-remote/ @caishunfeng
+/dolphinscheduler-extract/ @caishunfeng @ruanwenjun
 /dolphinscheduler-spi/ @caishunfeng
 /dolphinscheduler-task-plugin/ @caishunfeng @SbloodyS @zhuangchong
 /dolphinscheduler-tools/ @caishunfeng @SbloodyS @zhongjiajie @EricGao888
diff --git a/docs/docs/en/about/glossary.md b/docs/docs/en/about/glossary.md
index 29f0da5a5a..e3cee76f14 100644
--- a/docs/docs/en/about/glossary.md
+++ b/docs/docs/en/about/glossary.md
@@ -61,7 +61,7 @@ process fails and ends
 
 - dolphinscheduler-dao provides operations such as database access.
 
-- dolphinscheduler-remote client and server based on netty
+- dolphinscheduler-extract dolphinscheduler extract module, providing 
master/worker/alert sdk.
 
 - dolphinscheduler-service service module, including Quartz, Zookeeper, log 
client access service, easy to call server
   module and api module
diff --git a/docs/docs/zh/about/glossary.md b/docs/docs/zh/about/glossary.md
index 2358b2c5d1..2b5876669e 100644
--- a/docs/docs/zh/about/glossary.md
+++ b/docs/docs/zh/about/glossary.md
@@ -45,7 +45,7 @@
 
 - dolphinscheduler-dao 提供数据库访问等操作。
 
-- dolphinscheduler-remote 基于 netty 的客户端、服务端
+- dolphinscheduler-extract extract模块,包含master/worker/alert的sdk
 
 - dolphinscheduler-service 
service模块,包含Quartz、Zookeeper、日志客户端访问服务,便于server模块和api模块调用
 
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
index cf6df2823a..53dff28aff 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
@@ -147,7 +147,7 @@ public class CacheEvictAspect {
                 return;
             }
             for (Server server : serverList) {
-                IMasterCacheService masterCacheService = 
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+                IMasterCacheService masterCacheService = 
SingletonJdkDynamicRpcClientProxyFactory
                         .getProxyClient(server.getHost() + ":" + 
server.getPort(), IMasterCacheService.class);
                 masterCacheService.cacheExpire(new 
CacheExpireRequest(cacheType, cacheKey));
             }
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunction.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunction.java
index 5df321628a..e8d6de590d 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunction.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunction.java
@@ -58,7 +58,7 @@ public class PauseExecuteFunction implements 
ExecuteFunction<PauseExecuteRequest
         try {
             // todo: direct call the workflow instance pause method
             ITaskInstanceExecutionEventListener 
iTaskInstanceExecutionEventListener =
-                    SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+                    SingletonJdkDynamicRpcClientProxyFactory
                             .getProxyClient(workflowInstance.getHost(), 
ITaskInstanceExecutionEventListener.class);
             
iTaskInstanceExecutionEventListener.onWorkflowInstanceInstanceStateChange(
                     new WorkflowInstanceStateChangeEvent(
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunction.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunction.java
index 8afb18a278..1ad0f9ae7a 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunction.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunction.java
@@ -59,8 +59,8 @@ public class StopExecuteFunction implements 
ExecuteFunction<StopRequest, StopRes
             try {
                 // todo: direct call the workflow instance stop method
                 ITaskInstanceExecutionEventListener 
iTaskInstanceExecutionEventListener =
-                        SingletonJdkDynamicRpcClientProxyFactory.getInstance()
-                                .getProxyClient(workflowInstance.getHost(), 
ITaskInstanceExecutionEventListener.class);
+                        
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(workflowInstance.getHost(),
+                                ITaskInstanceExecutionEventListener.class);
                 
iTaskInstanceExecutionEventListener.onWorkflowInstanceInstanceStateChange(
                         new WorkflowInstanceStateChangeEvent(
                                 workflowInstance.getId(), 0, 
workflowInstance.getState(), workflowInstance.getId(), 0));
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index be3a007d26..dc5e761190 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -656,7 +656,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
             WorkflowInstanceStateChangeEvent workflowStateEventChangeRequest = 
new WorkflowInstanceStateChangeEvent(
                     processInstance.getId(), 0, processInstance.getState(), 
processInstance.getId(), 0);
             ITaskInstanceExecutionEventListener 
iTaskInstanceExecutionEventListener =
-                    SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+                    SingletonJdkDynamicRpcClientProxyFactory
                             .getProxyClient(processInstance.getHost(), 
ITaskInstanceExecutionEventListener.class);
             
iTaskInstanceExecutionEventListener.onWorkflowInstanceInstanceStateChange(workflowStateEventChangeRequest);
             putMsg(result, Status.SUCCESS);
@@ -684,7 +684,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
         taskGroupQueue.setForceStart(Flag.YES.getCode());
         processService.updateTaskGroupQueue(taskGroupQueue);
         log.info("Sending force start command to master: {}.", 
processInstance.getHost());
-        ILogicTaskInstanceOperator iLogicTaskInstanceOperator = 
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+        ILogicTaskInstanceOperator iLogicTaskInstanceOperator = 
SingletonJdkDynamicRpcClientProxyFactory
                 .getProxyClient(processInstance.getHost(), 
ILogicTaskInstanceOperator.class);
         iLogicTaskInstanceOperator.forceStartTaskInstance(
                 new TaskInstanceForceStartRequest(processInstance.getId(), 
taskGroupQueue.getTaskId()));
@@ -1152,7 +1152,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
             log.error("Process instance does not exist, 
processInstanceId:{}.", processInstanceId);
             return null;
         }
-        IWorkflowInstanceService iWorkflowInstanceService = 
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+        IWorkflowInstanceService iWorkflowInstanceService = 
SingletonJdkDynamicRpcClientProxyFactory
                 .getProxyClient(processInstance.getHost(), 
IWorkflowInstanceService.class);
         return 
iWorkflowInstanceService.getWorkflowExecutingData(processInstanceId);
     }
@@ -1189,7 +1189,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
         taskExecuteStartMessage.setStartParams(startParams);
         taskExecuteStartMessage.setDryRun(dryRun);
 
-        IStreamingTaskOperator streamingTaskOperator = 
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+        IStreamingTaskOperator streamingTaskOperator = 
SingletonJdkDynamicRpcClientProxyFactory
                 .getProxyClient(server.getHost() + ":" + server.getPort(), 
IStreamingTaskOperator.class);
         StreamingTaskTriggerResponse streamingTaskTriggerResponse =
                 
streamingTaskOperator.triggerStreamingTask(taskExecuteStartMessage);
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
index 99216e917f..69c7ba1a01 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
@@ -225,7 +225,7 @@ public class LoggerServiceImpl extends BaseServiceImpl 
implements LoggerService
 
         String logContent = null;
         if (TaskUtils.isLogicTask(taskInstance.getTaskType())) {
-            IMasterLogService masterLogService = 
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+            IMasterLogService masterLogService = 
SingletonJdkDynamicRpcClientProxyFactory
                     .getProxyClient(taskInstance.getHost(), 
IMasterLogService.class);
             try {
                 LogicTaskInstanceLogPageQueryRequest 
logicTaskInstanceLogPageQueryRequest =
@@ -237,7 +237,7 @@ public class LoggerServiceImpl extends BaseServiceImpl 
implements LoggerService
                 log.error("Query LogicTaskInstance log error", ex);
             }
         } else {
-            IWorkerLogService iWorkerLogService = 
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+            IWorkerLogService iWorkerLogService = 
SingletonJdkDynamicRpcClientProxyFactory
                     .getProxyClient(host, IWorkerLogService.class);
             try {
                 TaskInstanceLogPageQueryRequest 
taskInstanceLogPageQueryRequest =
@@ -282,7 +282,7 @@ public class LoggerServiceImpl extends BaseServiceImpl 
implements LoggerService
 
         byte[] logBytes = new byte[0];
         if (TaskUtils.isLogicTask(taskInstance.getTaskType())) {
-            IMasterLogService masterLogService = 
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+            IMasterLogService masterLogService = 
SingletonJdkDynamicRpcClientProxyFactory
                     .getProxyClient(taskInstance.getHost(), 
IMasterLogService.class);
             try {
                 LogicTaskInstanceLogFileDownloadRequest 
logicTaskInstanceLogFileDownloadRequest =
@@ -294,7 +294,7 @@ public class LoggerServiceImpl extends BaseServiceImpl 
implements LoggerService
                 log.error("Query LogicTaskInstance log error", ex);
             }
         } else {
-            IWorkerLogService iWorkerLogService = 
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+            IWorkerLogService iWorkerLogService = 
SingletonJdkDynamicRpcClientProxyFactory
                     .getProxyClient(host, IWorkerLogService.class);
             try {
                 TaskInstanceLogFileDownloadRequest 
taskInstanceLogFileDownloadRequest =
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MetricsCleanUpServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MetricsCleanUpServiceImpl.java
index a7ad669e3b..f1ac0579ff 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MetricsCleanUpServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MetricsCleanUpServiceImpl.java
@@ -50,7 +50,7 @@ public class MetricsCleanUpServiceImpl implements 
MetricsCleanUpService {
     private void cleanUpWorkflowMetrics(Server server, Long 
workflowDefinitionCode) {
         try {
             IWorkflowInstanceService iWorkflowInstanceService =
-                    
SingletonJdkDynamicRpcClientProxyFactory.getInstance().getProxyClient(
+                    SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(
                             String.format("%s:%s", server.getHost(), 
server.getPort()), IWorkflowInstanceService.class);
             
iWorkflowInstanceService.clearWorkflowMetrics(workflowDefinitionCode);
         } catch (Exception e) {
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
index 91156a6890..a381a07d49 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
@@ -290,7 +290,7 @@ public class TaskInstanceServiceImpl extends 
BaseServiceImpl implements TaskInst
             return result;
         }
         IStreamingTaskInstanceOperator streamingTaskInstanceOperator =
-                SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+                SingletonJdkDynamicRpcClientProxyFactory
                         .getProxyClient(taskInstance.getHost(), 
IStreamingTaskInstanceOperator.class);
         TaskInstanceTriggerSavepointResponse 
taskInstanceTriggerSavepointResponse =
                 streamingTaskInstanceOperator.triggerSavepoint(new 
TaskInstanceTriggerSavepointRequest(taskInstanceId));
@@ -322,7 +322,7 @@ public class TaskInstanceServiceImpl extends 
BaseServiceImpl implements TaskInst
         }
 
         // todo: we only support streaming task for now
-        ITaskInstanceOperator iTaskInstanceOperator = 
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+        ITaskInstanceOperator iTaskInstanceOperator = 
SingletonJdkDynamicRpcClientProxyFactory
                 .getProxyClient(taskInstance.getHost(), 
ITaskInstanceOperator.class);
         TaskInstanceKillResponse taskInstanceKillResponse =
                 iTaskInstanceOperator.killTask(new 
TaskInstanceKillRequest(taskInstanceId));
@@ -381,11 +381,11 @@ public class TaskInstanceServiceImpl extends 
BaseServiceImpl implements TaskInst
             // delete log
             if (StringUtils.isNotEmpty(taskInstance.getLogPath())) {
                 if (TaskUtils.isLogicTask(taskInstance.getTaskType())) {
-                    IMasterLogService masterLogService = 
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+                    IMasterLogService masterLogService = 
SingletonJdkDynamicRpcClientProxyFactory
                             .getProxyClient(taskInstance.getHost(), 
IMasterLogService.class);
                     
masterLogService.removeLogicTaskInstanceLog(taskInstance.getLogPath());
                 } else {
-                    IWorkerLogService workerLogService = 
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+                    IWorkerLogService workerLogService = 
SingletonJdkDynamicRpcClientProxyFactory
                             .getProxyClient(taskInstance.getHost(), 
IWorkerLogService.class);
                     
workerLogService.removeTaskInstanceLog(taskInstance.getLogPath());
                 }
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/JdkDynamicRpcClientProxyFactory.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/JdkDynamicRpcClientProxyFactory.java
index 25e6e32ee4..5635a88f34 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/JdkDynamicRpcClientProxyFactory.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/JdkDynamicRpcClientProxyFactory.java
@@ -21,9 +21,16 @@ import 
org.apache.dolphinscheduler.extract.base.NettyRemotingClient;
 import org.apache.dolphinscheduler.extract.base.utils.Host;
 
 import java.lang.reflect.Proxy;
+import java.time.Duration;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import lombok.SneakyThrows;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
 /**
  * This class is used to create a proxy client which will transform local 
method invocation to remove invocation.
  */
@@ -31,21 +38,34 @@ public class JdkDynamicRpcClientProxyFactory implements 
IRpcClientProxyFactory {
 
     private final NettyRemotingClient nettyRemotingClient;
 
-    // todo: use guava cache to avoid memory leak
-    private final Map<String, Map<String, Object>> proxyClientCache = new 
ConcurrentHashMap<>();
+    private static final LoadingCache<String, Map<String, Object>> 
proxyClientCache = CacheBuilder.newBuilder()
+            // expire here to remove dead host
+            .expireAfterAccess(Duration.ofHours(1))
+            .build(new CacheLoader<String, Map<String, Object>>() {
+
+                @Override
+                public Map<String, Object> load(String key) {
+                    return new ConcurrentHashMap<>();
+                }
+            });
 
     public JdkDynamicRpcClientProxyFactory(NettyRemotingClient 
nettyRemotingClient) {
         this.nettyRemotingClient = nettyRemotingClient;
     }
 
+    @SneakyThrows
     @SuppressWarnings("unchecked")
     @Override
     public <T> T getProxyClient(String serverHost, Class<T> clientInterface) {
-        return (T) proxyClientCache
-                .computeIfAbsent(serverHost, key -> new ConcurrentHashMap<>())
-                .computeIfAbsent(clientInterface.getName(),
-                        key -> Proxy.newProxyInstance(
-                                clientInterface.getClassLoader(), new 
Class[]{clientInterface},
-                                new 
ClientInvocationHandler(Host.of(serverHost), nettyRemotingClient)));
+        return (T) proxyClientCache.get(serverHost)
+                .computeIfAbsent(clientInterface.getName(), key -> 
newProxyClient(serverHost, clientInterface));
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T> T newProxyClient(String serverHost, Class<T> clientInterface) {
+        return (T) Proxy.newProxyInstance(
+                clientInterface.getClassLoader(),
+                new Class[]{clientInterface},
+                new ClientInvocationHandler(Host.of(serverHost), 
nettyRemotingClient));
     }
 }
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java
index 44e0420272..28d82532be 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java
@@ -20,17 +20,13 @@ package org.apache.dolphinscheduler.extract.base.client;
 import org.apache.dolphinscheduler.extract.base.NettyRemotingClientFactory;
 import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
 
-public class SingletonJdkDynamicRpcClientProxyFactory extends 
JdkDynamicRpcClientProxyFactory {
+public class SingletonJdkDynamicRpcClientProxyFactory {
 
-    private static final SingletonJdkDynamicRpcClientProxyFactory INSTANCE =
-            new SingletonJdkDynamicRpcClientProxyFactory();
+    private static final JdkDynamicRpcClientProxyFactory INSTANCE = new 
JdkDynamicRpcClientProxyFactory(
+            NettyRemotingClientFactory.buildNettyRemotingClient(new 
NettyClientConfig()));
 
-    private SingletonJdkDynamicRpcClientProxyFactory() {
-        super(NettyRemotingClientFactory.buildNettyRemotingClient(new 
NettyClientConfig()));
-    }
-
-    public static SingletonJdkDynamicRpcClientProxyFactory getInstance() {
-        return INSTANCE;
+    public static <T> T getProxyClient(String serverAddress, Class<T> clazz) {
+        return INSTANCE.getProxyClient(serverAddress, clazz);
     }
 
 }
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
index 7e5feab05f..a98362209d 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
@@ -96,7 +96,7 @@ public class JdkDynamicServerHandler extends 
ChannelInboundHandlerAdapter {
                     StandardRpcRequest standardRpcRequest =
                             JsonSerializer.deserialize(transporter.getBody(), 
StandardRpcRequest.class);
                     Object[] args;
-                    if (standardRpcRequest.getArgs().length == 0) {
+                    if (standardRpcRequest.getArgs() == null || 
standardRpcRequest.getArgs().length == 0) {
                         args = null;
                     } else {
                         args = new Object[standardRpcRequest.getArgs().length];
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java
new file mode 100644
index 0000000000..3cf9ff1c89
--- /dev/null
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.extract.base.client;
+
+import org.apache.dolphinscheduler.extract.base.NettyRemotingServer;
+import org.apache.dolphinscheduler.extract.base.RpcMethod;
+import org.apache.dolphinscheduler.extract.base.RpcService;
+import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
+import 
org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class SingletonJdkDynamicRpcClientProxyFactoryTest {
+
+    private NettyRemotingServer nettyRemotingServer;
+
+    @BeforeEach
+    public void setUp() {
+        nettyRemotingServer = new NettyRemotingServer(new 
NettyServerConfig(12345));
+        nettyRemotingServer.start();
+
+        new SpringServerMethodInvokerDiscovery(nettyRemotingServer)
+                .postProcessAfterInitialization(new IServiceImpl(), 
"iServiceImpl");
+    }
+
+    @Test
+    public void getProxyClient() {
+        IService proxyClient =
+                
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient("localhost:12345", 
IService.class);
+        Assertions.assertNotNull(proxyClient);
+    }
+
+    @Test
+    public void testPing() {
+        IService proxyClient =
+                
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient("localhost:12345", 
IService.class);
+        String ping = proxyClient.ping("ping");
+        Assertions.assertEquals("pong", ping);
+    }
+
+    @Test
+    public void testVoid() {
+        IService proxyClient =
+                
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient("localhost:12345", 
IService.class);
+        Assertions.assertDoesNotThrow(proxyClient::voidMethod);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        nettyRemotingServer.close();
+    }
+
+    @RpcService
+    public interface IService {
+
+        @RpcMethod
+        String ping(String ping);
+
+        @RpcMethod
+        void voidMethod();
+    }
+
+    public static class IServiceImpl implements IService {
+
+        @Override
+        public String ping(String ping) {
+            return "pong";
+        }
+
+        @Override
+        public void voidMethod() {
+            System.out.println("void method");
+        }
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java
index 25ed3283ed..f749ac46cd 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java
@@ -114,7 +114,7 @@ public class TaskDelayEventHandler implements 
TaskEventHandler {
     private void sendAckToWorker(TaskEvent taskEvent) {
         // If event handle success, send ack to worker to otherwise the worker 
will retry this event
         ITaskInstanceExecutionEventAckListener 
instanceExecutionEventAckListener =
-                SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+                SingletonJdkDynamicRpcClientProxyFactory
                         .getProxyClient(taskEvent.getWorkerAddress(), 
ITaskInstanceExecutionEventAckListener.class);
         
instanceExecutionEventAckListener.handleTaskInstanceExecutionRunningEventAck(
                 
TaskInstanceExecutionRunningEventAck.success(taskEvent.getTaskInstanceId()));
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
index f331dfd360..854812264f 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
@@ -116,7 +116,7 @@ public class TaskResultEventHandler implements 
TaskEventHandler {
 
     public void sendAckToWorker(TaskEvent taskEvent) {
         ITaskInstanceExecutionEventAckListener 
instanceExecutionEventAckListener =
-                SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+                SingletonJdkDynamicRpcClientProxyFactory
                         .getProxyClient(taskEvent.getWorkerAddress(), 
ITaskInstanceExecutionEventAckListener.class);
         
instanceExecutionEventAckListener.handleTaskInstanceExecutionFinishEventAck(
                 
TaskInstanceExecutionFinishEventAck.success(taskEvent.getTaskInstanceId()));
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java
index 03300f2ddb..14cb8571d9 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java
@@ -110,7 +110,7 @@ public class TaskRunningEventHandler implements 
TaskEventHandler {
     private void sendAckToWorker(TaskEvent taskEvent) {
         // If event handle success, send ack to worker to otherwise the worker 
will retry this event
         ITaskInstanceExecutionEventAckListener 
instanceExecutionEventAckListener =
-                SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+                SingletonJdkDynamicRpcClientProxyFactory
                         .getProxyClient(taskEvent.getWorkerAddress(), 
ITaskInstanceExecutionEventAckListener.class);
         
instanceExecutionEventAckListener.handleTaskInstanceExecutionRunningEventAck(
                 
TaskInstanceExecutionRunningEventAck.success(taskEvent.getTaskInstanceId()));
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskUpdatePidEventHandler.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskUpdatePidEventHandler.java
index f684322875..5be19ddfdb 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskUpdatePidEventHandler.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskUpdatePidEventHandler.java
@@ -94,7 +94,7 @@ public class TaskUpdatePidEventHandler implements 
TaskEventHandler {
     private void sendAckToWorker(TaskEvent taskEvent) {
         // If event handle success, send ack to worker to otherwise the worker 
will retry this event
         ITaskInstanceExecutionEventAckListener 
instanceExecutionEventAckListener =
-                SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+                SingletonJdkDynamicRpcClientProxyFactory
                         .getProxyClient(taskEvent.getWorkerAddress(), 
ITaskInstanceExecutionEventAckListener.class);
         
instanceExecutionEventAckListener.handleTaskInstanceExecutionInfoEventAck(
                 
TaskInstanceExecutionInfoEventAck.success(taskEvent.getTaskInstanceId()));
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
index a4b4f083d9..24e524d171 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
@@ -473,7 +473,7 @@ public class StreamTaskExecuteRunnable implements Runnable {
     private void sendAckToWorker(TaskEvent taskEvent) {
         // If event handle success, send ack to worker to otherwise the worker 
will retry this event
         ITaskInstanceExecutionEventAckListener 
instanceExecutionEventAckListener =
-                SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+                SingletonJdkDynamicRpcClientProxyFactory
                         .getProxyClient(taskEvent.getWorkerAddress(), 
ITaskInstanceExecutionEventAckListener.class);
         if (taskEvent.getEvent() == TaskEventType.RUNNING) {
             log.error("taskEvent.getChannel() is null, taskEvent:{}", 
taskEvent);
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index b3fc7c4960..9c5e65160f 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -60,6 +60,7 @@ import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
 import 
org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
 import org.apache.dolphinscheduler.extract.master.ILogicTaskInstanceOperator;
 import 
org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupRequest;
+import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator;
 import 
org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostRequest;
 import 
org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostResponse;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
@@ -466,7 +467,7 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
         } else {
             ProcessInstance processInstance =
                     
processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId());
-            ILogicTaskInstanceOperator taskInstanceOperator = 
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+            ILogicTaskInstanceOperator taskInstanceOperator = 
SingletonJdkDynamicRpcClientProxyFactory
                     .getProxyClient(processInstance.getHost(), 
ILogicTaskInstanceOperator.class);
             taskInstanceOperator.wakeupTaskInstance(
                     new TaskInstanceWakeupRequest(processInstance.getId(), 
nextTaskInstance.getId()));
@@ -1385,10 +1386,9 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
             return false;
         }
         try {
-            org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator 
iTaskInstanceOperator =
-                    SingletonJdkDynamicRpcClientProxyFactory.getInstance()
-                            .getProxyClient(taskInstance.getHost(),
-                                    
org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator.class);
+            ITaskInstanceOperator iTaskInstanceOperator =
+                    SingletonJdkDynamicRpcClientProxyFactory
+                            .getProxyClient(taskInstance.getHost(), 
ITaskInstanceOperator.class);
             UpdateWorkflowHostResponse updateWorkflowHostResponse = 
iTaskInstanceOperator.updateWorkflowInstanceHost(
                     new UpdateWorkflowHostRequest(taskInstance.getId(), 
masterConfig.getMasterAddress()));
             if (!updateWorkflowHostResponse.isSuccess()) {
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index 8fe1a9a683..c0743b5a2c 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -226,7 +226,7 @@ public class WorkflowExecuteThreadPool extends 
ThreadPoolTaskExecutor {
             return;
         }
         ITaskInstanceExecutionEventListener 
iTaskInstanceExecutionEventListener =
-                SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+                SingletonJdkDynamicRpcClientProxyFactory
                         .getProxyClient(processInstanceHost, 
ITaskInstanceExecutionEventListener.class);
 
         WorkflowInstanceStateChangeEvent workflowInstanceStateChangeEvent = 
new WorkflowInstanceStateChangeEvent(
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcher.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcher.java
index 4a4ca42c58..d30a1e554d 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcher.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcher.java
@@ -51,7 +51,7 @@ public class MasterTaskDispatcher extends BaseTaskDispatcher {
     protected void doDispatch(TaskExecuteRunnable taskExecuteRunnable) throws 
TaskDispatchException {
         TaskExecutionContext taskExecutionContext = 
taskExecuteRunnable.getTaskExecutionContext();
         try {
-            ILogicTaskInstanceOperator taskInstanceOperator = 
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+            ILogicTaskInstanceOperator taskInstanceOperator = 
SingletonJdkDynamicRpcClientProxyFactory
                     .getProxyClient(taskExecutionContext.getHost(), 
ILogicTaskInstanceOperator.class);
             LogicTaskDispatchResponse logicTaskDispatchResponse = 
taskInstanceOperator
                     .dispatchLogicTask(new 
LogicTaskDispatchRequest(taskExecuteRunnable.getTaskExecutionContext()));
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcher.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcher.java
index 6760633b9b..36739a1163 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcher.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcher.java
@@ -56,7 +56,7 @@ public class WorkerTaskDispatcher extends BaseTaskDispatcher {
     protected void doDispatch(TaskExecuteRunnable taskExecuteRunnable) throws 
TaskDispatchException {
         TaskExecutionContext taskExecutionContext = 
taskExecuteRunnable.getTaskExecutionContext();
         try {
-            ITaskInstanceOperator taskInstanceOperator = 
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+            ITaskInstanceOperator taskInstanceOperator = 
SingletonJdkDynamicRpcClientProxyFactory
                     .getProxyClient(taskExecutionContext.getHost(), 
ITaskInstanceOperator.class);
             TaskInstanceDispatchResponse taskInstanceDispatchResponse = 
taskInstanceOperator
                     .dispatchTask(new 
TaskInstanceDispatchRequest(taskExecuteRunnable.getTaskExecutionContext()));
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecuteRunningEventSender.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecuteRunningEventSender.java
index 1dcca72878..6ea085b31b 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecuteRunningEventSender.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecuteRunningEventSender.java
@@ -34,7 +34,7 @@ public class LogicTaskInstanceExecuteRunningEventSender
     @Override
     public void sendMessage(TaskInstanceExecutionRunningEvent 
taskInstanceExecutionRunningEvent) {
         ITaskInstanceExecutionEventListener 
iTaskInstanceExecutionEventListener =
-                SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+                SingletonJdkDynamicRpcClientProxyFactory
                         
.getProxyClient(taskInstanceExecutionRunningEvent.getHost(),
                                 ITaskInstanceExecutionEventListener.class);
         
iTaskInstanceExecutionEventListener.onTaskInstanceExecutionRunning(taskInstanceExecutionRunningEvent);
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecutionFinishEventSender.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecutionFinishEventSender.java
index ad5ea37ec6..1949145e86 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecutionFinishEventSender.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecutionFinishEventSender.java
@@ -32,7 +32,7 @@ public class LogicTaskInstanceExecutionFinishEventSender
     @Override
     public void sendMessage(TaskInstanceExecutionFinishEvent message) {
         ITaskInstanceExecutionEventListener 
iTaskInstanceExecutionEventListener =
-                SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+                SingletonJdkDynamicRpcClientProxyFactory
                         .getProxyClient(message.getHost(), 
ITaskInstanceExecutionEventListener.class);
         
iTaskInstanceExecutionEventListener.onTaskInstanceExecutionFinish(message);
     }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableKillOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableKillOperator.java
index 29ef03906d..e7c7b31da1 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableKillOperator.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableKillOperator.java
@@ -45,7 +45,7 @@ public class LogicTaskExecuteRunnableKillOperator extends 
BaseTaskExecuteRunnabl
                     taskInstance.getName());
             return;
         }
-        final ILogicTaskInstanceOperator taskInstanceOperator = 
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+        final ILogicTaskInstanceOperator taskInstanceOperator = 
SingletonJdkDynamicRpcClientProxyFactory
                 .getProxyClient(taskInstance.getHost(), 
ILogicTaskInstanceOperator.class);
         final LogicTaskKillRequest logicTaskKillRequest = new 
LogicTaskKillRequest(taskInstance.getId());
         final LogicTaskKillResponse logicTaskKillResponse = 
taskInstanceOperator.killLogicTask(logicTaskKillRequest);
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnablePauseOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnablePauseOperator.java
index af865f05f6..4c1c77f24a 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnablePauseOperator.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnablePauseOperator.java
@@ -40,7 +40,7 @@ public class LogicTaskExecuteRunnablePauseOperator extends 
BaseTaskExecuteRunnab
                     taskInstance.getName());
             return;
         }
-        final ILogicTaskInstanceOperator taskInstanceOperator = 
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+        final ILogicTaskInstanceOperator taskInstanceOperator = 
SingletonJdkDynamicRpcClientProxyFactory
                 .getProxyClient(taskInstance.getHost(), 
ILogicTaskInstanceOperator.class);
         final LogicTaskPauseRequest logicTaskPauseRequest = new 
LogicTaskPauseRequest(taskInstance.getId());
         final LogicTaskPauseResponse logicTaskPauseResponse =
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableTimeoutOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableTimeoutOperator.java
index 4cf16bb2c7..949bc940f5 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableTimeoutOperator.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableTimeoutOperator.java
@@ -47,7 +47,7 @@ public class LogicTaskExecuteRunnableTimeoutOperator extends 
BaseTaskExecuteRunn
         }
 
         final ILogicTaskInstanceOperator iLogicTaskInstanceOperator =
-                SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+                SingletonJdkDynamicRpcClientProxyFactory
                         .getProxyClient(taskInstance.getHost(), 
ILogicTaskInstanceOperator.class);
 
         final LogicTaskKillRequest taskInstanceKillRequest = new 
LogicTaskKillRequest(taskInstance.getId());
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableKillOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableKillOperator.java
index b80cb79561..dc9915f901 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableKillOperator.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableKillOperator.java
@@ -43,7 +43,7 @@ public class TaskExecuteRunnableKillOperator extends 
BaseTaskExecuteRunnableKill
             log.info("TaskInstance {} host is empty, no need to 
killRemoteTask", taskInstance.getName());
             return;
         }
-        ITaskInstanceOperator taskInstanceOperator = 
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+        ITaskInstanceOperator taskInstanceOperator = 
SingletonJdkDynamicRpcClientProxyFactory
                 .getProxyClient(taskInstance.getHost(), 
ITaskInstanceOperator.class);
         TaskInstanceKillRequest taskInstanceKillRequest = new 
TaskInstanceKillRequest(taskInstance.getId());
         TaskInstanceKillResponse taskInstanceKillResponse = 
taskInstanceOperator.killTask(taskInstanceKillRequest);
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnablePauseOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnablePauseOperator.java
index a0636b8504..39896bb0cb 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnablePauseOperator.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnablePauseOperator.java
@@ -48,7 +48,7 @@ public class TaskExecuteRunnablePauseOperator implements 
TaskExecuteRunnableOper
             log.info("The TaskInstance: {} host is null, no need to 
pauseRemoteTaskInstance", taskInstance.getName());
             return;
         }
-        final ITaskInstanceOperator taskInstanceOperator = 
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+        final ITaskInstanceOperator taskInstanceOperator = 
SingletonJdkDynamicRpcClientProxyFactory
                 .getProxyClient(taskInstance.getHost(), 
ITaskInstanceOperator.class);
         final TaskInstancePauseRequest taskInstancePauseRequest = new 
TaskInstancePauseRequest(taskInstance.getId());
         final TaskInstancePauseResponse taskInstancePauseResponse =
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableTimeoutOperator.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableTimeoutOperator.java
index 7154f5fa62..9356b24370 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableTimeoutOperator.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableTimeoutOperator.java
@@ -45,7 +45,7 @@ public class TaskExecuteRunnableTimeoutOperator extends 
BaseTaskExecuteRunnableT
             return;
         }
 
-        final ITaskInstanceOperator iTaskInstanceOperator = 
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+        final ITaskInstanceOperator iTaskInstanceOperator = 
SingletonJdkDynamicRpcClientProxyFactory
                 .getProxyClient(taskInstance.getHost(), 
ITaskInstanceOperator.class);
 
         final TaskInstanceKillRequest taskInstanceKillRequest = new 
TaskInstanceKillRequest(taskInstance.getId());
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java
index 25b53c8d6d..3baa10b343 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java
@@ -286,7 +286,7 @@ public class DynamicLogicTask extends 
BaseAsyncLogicTask<DynamicParameters> {
 
     private void sendToSubProcess(TaskExecutionContext taskExecutionContext, 
ProcessInstance subProcessInstance) {
         final ITaskInstanceExecutionEventListener 
iTaskInstanceExecutionEventListener =
-                SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+                SingletonJdkDynamicRpcClientProxyFactory
                         .getProxyClient(subProcessInstance.getHost(), 
ITaskInstanceExecutionEventListener.class);
         final WorkflowInstanceStateChangeEvent 
workflowInstanceStateChangeEvent = new WorkflowInstanceStateChangeEvent(
                 taskExecutionContext.getProcessInstanceId(),
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java
index 8e55e6fc82..1883a27d8b 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java
@@ -136,7 +136,7 @@ public class SubWorkflowLogicTask extends 
BaseAsyncLogicTask<SubProcessParameter
     private void sendToSubProcess(TaskExecutionContext taskExecutionContext,
                                   ProcessInstance subProcessInstance) {
         final ITaskInstanceExecutionEventListener 
iTaskInstanceExecutionEventListener =
-                SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+                SingletonJdkDynamicRpcClientProxyFactory
                         .getProxyClient(subProcessInstance.getHost(), 
ITaskInstanceExecutionEventListener.class);
         final WorkflowInstanceStateChangeEvent 
workflowInstanceStateChangeEvent = new WorkflowInstanceStateChangeEvent(
                 taskExecutionContext.getProcessInstanceId(),
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
index 4d5ddeaa4e..0dc1b16679 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
@@ -274,7 +274,7 @@ public class WorkerFailoverService {
                     .create();
             // only kill yarn/k8s job if exists , the local thread has exited
             log.info("TaskInstance failover begin kill the task related yarn 
or k8s job");
-            IWorkerLogService iWorkerLogService = 
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+            IWorkerLogService iWorkerLogService = 
SingletonJdkDynamicRpcClientProxyFactory
                     .getProxyClient(taskInstance.getHost(), 
IWorkerLogService.class);
             GetAppIdResponse getAppIdResponse =
                     iWorkerLogService.getAppId(new 
GetAppIdRequest(taskInstance.getId(), taskInstance.getLogPath()));
diff --git a/dolphinscheduler-microbench/pom.xml 
b/dolphinscheduler-microbench/pom.xml
index aaf7065323..ae5e62f610 100644
--- a/dolphinscheduler-microbench/pom.xml
+++ b/dolphinscheduler-microbench/pom.xml
@@ -48,6 +48,11 @@
 
     <dependencies>
 
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-extract-base</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>org.openjdk.jmh</groupId>
             <artifactId>jmh-core</artifactId>
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java
 
b/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/IService.java
similarity index 53%
copy from 
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java
copy to 
dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/IService.java
index 44e0420272..0a1122aa18 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java
+++ 
b/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/IService.java
@@ -15,22 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.extract.base.client;
+package org.apache.dolphinscheduler.microbench.rpc;
 
-import org.apache.dolphinscheduler.extract.base.NettyRemotingClientFactory;
-import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
+import org.apache.dolphinscheduler.extract.base.RpcMethod;
+import org.apache.dolphinscheduler.extract.base.RpcService;
 
-public class SingletonJdkDynamicRpcClientProxyFactory extends 
JdkDynamicRpcClientProxyFactory {
+@RpcService
+public interface IService {
 
-    private static final SingletonJdkDynamicRpcClientProxyFactory INSTANCE =
-            new SingletonJdkDynamicRpcClientProxyFactory();
-
-    private SingletonJdkDynamicRpcClientProxyFactory() {
-        super(NettyRemotingClientFactory.buildNettyRemotingClient(new 
NettyClientConfig()));
-    }
-
-    public static SingletonJdkDynamicRpcClientProxyFactory getInstance() {
-        return INSTANCE;
-    }
+    @RpcMethod
+    String ping(String pingRequest);
 
 }
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java
 
b/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/IServiceImpl.java
similarity index 53%
copy from 
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java
copy to 
dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/IServiceImpl.java
index 44e0420272..0bd83ecc29 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java
+++ 
b/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/IServiceImpl.java
@@ -15,22 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.extract.base.client;
+package org.apache.dolphinscheduler.microbench.rpc;
 
-import org.apache.dolphinscheduler.extract.base.NettyRemotingClientFactory;
-import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
+public class IServiceImpl implements IService {
 
-public class SingletonJdkDynamicRpcClientProxyFactory extends 
JdkDynamicRpcClientProxyFactory {
-
-    private static final SingletonJdkDynamicRpcClientProxyFactory INSTANCE =
-            new SingletonJdkDynamicRpcClientProxyFactory();
-
-    private SingletonJdkDynamicRpcClientProxyFactory() {
-        super(NettyRemotingClientFactory.buildNettyRemotingClient(new 
NettyClientConfig()));
-    }
-
-    public static SingletonJdkDynamicRpcClientProxyFactory getInstance() {
-        return INSTANCE;
+    @Override
+    public String ping(String pingRequest) {
+        return "I get " + pingRequest + ", I am Pong!";
     }
 
 }
diff --git 
a/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/RpcBenchMarkTest.java
 
b/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/RpcBenchMarkTest.java
new file mode 100644
index 0000000000..fd423bcda0
--- /dev/null
+++ 
b/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/RpcBenchMarkTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.microbench.rpc;
+
+import org.apache.dolphinscheduler.extract.base.NettyRemotingServer;
+import 
org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
+import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
+import 
org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
+import org.apache.dolphinscheduler.microbench.base.AbstractBaseBenchmark;
+
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+@Slf4j
+@Warmup(iterations = 5, time = 1)
+@Measurement(iterations = 10, time = 1)
+@State(Scope.Benchmark)
+@BenchmarkMode({Mode.Throughput, Mode.AverageTime, Mode.SampleTime})
+public class RpcBenchMarkTest extends AbstractBaseBenchmark {
+
+    private NettyRemotingServer nettyRemotingServer;
+
+    private IService iService;
+
+    @Setup
+    public void before() {
+        nettyRemotingServer = new NettyRemotingServer(new 
NettyServerConfig(12345));
+        nettyRemotingServer.start();
+        SpringServerMethodInvokerDiscovery springServerMethodInvokerDiscovery =
+                new SpringServerMethodInvokerDiscovery(nettyRemotingServer);
+        springServerMethodInvokerDiscovery.postProcessAfterInitialization(new 
IServiceImpl(), "iServiceImpl");
+        iService =
+                
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient("localhost:12345", 
IService.class);
+    }
+
+    @Benchmark
+    @BenchmarkMode({Mode.Throughput, Mode.AverageTime, Mode.SampleTime})
+    @OutputTimeUnit(TimeUnit.MILLISECONDS)
+    public void sendTest(Blackhole bh) {
+        String pong = iService.ping("ping");
+        bh.consume(pong);
+    }
+
+    @TearDown
+    public void after() {
+        nettyRemotingServer.close();
+    }
+}
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 82bbef30de..a47fdac8a5 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -389,8 +389,8 @@ public class ProcessServiceImpl implements ProcessService {
                 if (update) {
                     try {
                         final ITaskInstanceExecutionEventListener 
iTaskInstanceExecutionEventListener =
-                                
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
-                                        .getProxyClient(info.getHost(), 
ITaskInstanceExecutionEventListener.class);
+                                
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(info.getHost(),
+                                        
ITaskInstanceExecutionEventListener.class);
                         final WorkflowInstanceStateChangeEvent 
workflowInstanceStateChangeEvent =
                                 new 
WorkflowInstanceStateChangeEvent(info.getId(), 0, info.getState(), 
info.getId(), 0);
                         iTaskInstanceExecutionEventListener
@@ -516,11 +516,11 @@ public class ProcessServiceImpl implements ProcessService 
{
                 continue;
             }
             if (TaskUtils.isLogicTask(taskInstance.getTaskType())) {
-                IMasterLogService masterLogService = 
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+                IMasterLogService masterLogService = 
SingletonJdkDynamicRpcClientProxyFactory
                         .getProxyClient(taskInstance.getHost(), 
IMasterLogService.class);
                 masterLogService.removeLogicTaskInstanceLog(taskLogPath);
             } else {
-                IWorkerLogService iWorkerLogService = 
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+                IWorkerLogService iWorkerLogService = 
SingletonJdkDynamicRpcClientProxyFactory
                         .getProxyClient(taskInstance.getHost(), 
IWorkerLogService.class);
                 iWorkerLogService.removeTaskInstanceLog(taskLogPath);
             }
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionFinishEventSender.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionFinishEventSender.java
index 27a9de73ae..9469ba1a69 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionFinishEventSender.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionFinishEventSender.java
@@ -33,7 +33,7 @@ public class TaskInstanceExecutionFinishEventSender
     @Override
     public void sendEvent(TaskInstanceExecutionFinishEvent 
taskInstanceExecutionFinishEvent) {
         ITaskInstanceExecutionEventListener 
iTaskInstanceExecutionEventListener =
-                SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+                SingletonJdkDynamicRpcClientProxyFactory
                         
.getProxyClient(taskInstanceExecutionFinishEvent.getHost(),
                                 ITaskInstanceExecutionEventListener.class);
         
iTaskInstanceExecutionEventListener.onTaskInstanceExecutionFinish(taskInstanceExecutionFinishEvent);
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionInfoUpdateEventSender.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionInfoUpdateEventSender.java
index e5350a7c0c..4b9e7e76b0 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionInfoUpdateEventSender.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionInfoUpdateEventSender.java
@@ -35,7 +35,7 @@ public class TaskInstanceExecutionInfoUpdateEventSender
     @Override
     public void sendEvent(TaskInstanceExecutionInfoEvent 
taskInstanceExecutionInfoEvent) {
         ITaskInstanceExecutionEventListener 
iTaskInstanceExecutionEventListener =
-                SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+                SingletonJdkDynamicRpcClientProxyFactory
                         
.getProxyClient(taskInstanceExecutionInfoEvent.getHost(),
                                 ITaskInstanceExecutionEventListener.class);
         
iTaskInstanceExecutionEventListener.onTaskInstanceExecutionInfoUpdate(taskInstanceExecutionInfoEvent);
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionRunningEventSender.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionRunningEventSender.java
index 4f40940030..4f64a94002 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionRunningEventSender.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionRunningEventSender.java
@@ -35,7 +35,7 @@ public class TaskInstanceExecutionRunningEventSender
     @Override
     public void sendEvent(TaskInstanceExecutionRunningEvent 
taskInstanceExecutionRunningEvent) {
         ITaskInstanceExecutionEventListener 
iTaskInstanceExecutionEventListener =
-                SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+                SingletonJdkDynamicRpcClientProxyFactory
                         
.getProxyClient(taskInstanceExecutionRunningEvent.getHost(),
                                 ITaskInstanceExecutionEventListener.class);
         
iTaskInstanceExecutionEventListener.onTaskInstanceExecutionRunning(taskInstanceExecutionRunningEvent);
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
index 5fadf27ef0..15141ed747 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
@@ -257,7 +257,7 @@ public abstract class WorkerTaskExecuteRunnable implements 
Runnable {
                 task.getExitStatus() == TaskExecutionStatus.SUCCESS ? 
WarningType.SUCCESS.getCode()
                         : WarningType.FAILURE.getCode());
         try {
-            IAlertOperator alertOperator = 
SingletonJdkDynamicRpcClientProxyFactory.getInstance()
+            IAlertOperator alertOperator = 
SingletonJdkDynamicRpcClientProxyFactory
                     .getProxyClient(alertServerAddress.getAddress(), 
IAlertOperator.class);
             AlertSendResponse alertSendResponse = 
alertOperator.sendAlert(alertSendRequest);
             log.info("Send alert to: {} successfully, response: {}", 
alertServerAddress, alertSendResponse);

Reply via email to