This is an automated email from the ASF dual-hosted git repository.

weiraowang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 575b89e2f9 [Bug][Registry] Optimizing waiting strategy (#15223)
575b89e2f9 is described below

commit 575b89e2f9342f687c1b43e7b632abf34d00d553
Author: Gallardot <[email protected]>
AuthorDate: Tue Jan 2 21:41:50 2024 +0800

    [Bug][Registry] Optimizing waiting strategy (#15223)
    
    * [Improvement][Registry] Optimizing waiting strategy
    
    Signed-off-by: Gallardot <[email protected]>
---
 .../server/master/registry/MasterWaitingStrategy.java     | 12 ------------
 .../server/worker/registry/WorkerWaitingStrategy.java     | 15 +--------------
 .../operator/TaskInstanceDispatchOperationFunction.java   |  9 +++++++++
 3 files changed, 10 insertions(+), 26 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterWaitingStrategy.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterWaitingStrategy.java
index 4089acfd02..2647542a18 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterWaitingStrategy.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterWaitingStrategy.java
@@ -27,7 +27,6 @@ import org.apache.dolphinscheduler.registry.api.StrategyType;
 import 
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.event.WorkflowEventQueue;
-import org.apache.dolphinscheduler.server.master.rpc.MasterRpcServer;
 import 
org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread;
 
 import java.time.Duration;
@@ -51,8 +50,6 @@ public class MasterWaitingStrategy implements 
MasterConnectStrategy {
     @Autowired
     private RegistryClient registryClient;
     @Autowired
-    private MasterRpcServer masterRPCServer;
-    @Autowired
     private WorkflowEventQueue workflowEventQueue;
     @Autowired
     private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
@@ -97,7 +94,6 @@ public class MasterWaitingStrategy implements 
MasterConnectStrategy {
         } else {
             try {
                 ServerLifeCycleManager.recoverFromWaiting();
-                reStartMasterResource();
                 log.info("Recover from waiting success, the current server 
status is {}",
                         ServerLifeCycleManager.getServerStatus());
             } catch (Exception e) {
@@ -117,9 +113,6 @@ public class MasterWaitingStrategy implements 
MasterConnectStrategy {
     }
 
     private void clearMasterResource() {
-        // close the worker resource, if close failed should stop the worker 
server
-        masterRPCServer.close();
-        log.warn("Master closed RPC server due to lost registry connection");
         workflowEventQueue.clearWorkflowEventQueue();
         log.warn("Master clear workflow event queue due to lost registry 
connection");
         processInstanceExecCacheManager.clearCache();
@@ -129,9 +122,4 @@ public class MasterWaitingStrategy implements 
MasterConnectStrategy {
 
     }
 
-    private void reStartMasterResource() {
-        // reopen the resource, if reopen failed should stop the worker server
-        masterRPCServer.start();
-        log.warn("Master restarted RPC server due to reconnect to registry");
-    }
 }
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
index b2c3d0b606..4437e3545d 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
@@ -24,7 +24,6 @@ import 
org.apache.dolphinscheduler.registry.api.RegistryException;
 import org.apache.dolphinscheduler.registry.api.StrategyType;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
-import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
 import 
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder;
 import 
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
 
@@ -47,9 +46,6 @@ public class WorkerWaitingStrategy implements 
WorkerConnectStrategy {
     @Autowired
     private RegistryClient registryClient;
 
-    @Autowired
-    private WorkerRpcServer workerRpcServer;
-
     @Autowired
     private MessageRetryRunner messageRetryRunner;
 
@@ -70,6 +66,7 @@ public class WorkerWaitingStrategy implements 
WorkerConnectStrategy {
                 throw new ServerLifeCycleException(
                         String.format("Waiting to reconnect to registry in %s 
failed", maxWaitingTime), ex);
             }
+
         } catch (ServerLifeCycleException e) {
             String errorMessage = String.format(
                     "Disconnect from registry and change the current status to 
waiting error, the current server state is %s, will stop the current server",
@@ -94,7 +91,6 @@ public class WorkerWaitingStrategy implements 
WorkerConnectStrategy {
         } else {
             try {
                 ServerLifeCycleManager.recoverFromWaiting();
-                reStartWorkerResource();
                 log.info("Recover from waiting success, the current server 
status is {}",
                         ServerLifeCycleManager.getServerStatus());
             } catch (Exception e) {
@@ -114,20 +110,11 @@ public class WorkerWaitingStrategy implements 
WorkerConnectStrategy {
     }
 
     private void clearWorkerResource() {
-        // close the worker resource, if close failed should stop the worker 
server
-        workerRpcServer.close();
-        log.warn("Worker server close the RPC server due to lost connection 
from registry");
         workerManagerThread.clearTask();
         WorkerTaskExecutorHolder.clear();
         log.warn("Worker server clear the tasks due to lost connection from 
registry");
         messageRetryRunner.clearMessage();
         log.warn("Worker server clear the retry message due to lost connection 
from registry");
-
     }
 
-    private void reStartWorkerResource() {
-        // reopen the resource, if reopen failed should stop the worker server
-        workerRpcServer.start();
-        log.warn("Worker server restart PRC server due to reconnect to 
registry");
-    }
 }
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java
index 8c6825df63..e6d259412f 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.server.worker.runner.operator;
 
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import 
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceDispatchRequest;
 import 
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceDispatchResponse;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -57,6 +58,14 @@ public class TaskInstanceDispatchOperationFunction
 
             
LogUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
                     taskExecutionContext.getTaskInstanceId());
+
+            // check server status, if server is not running, return failed to 
reject this task
+            if (!ServerLifeCycleManager.isRunning()) {
+                log.error("server is not running. reject task: {}", 
taskExecutionContext.getProcessInstanceId());
+                return 
TaskInstanceDispatchResponse.failed(taskExecutionContext.getTaskInstanceId(),
+                        "server is not running");
+            }
+
             
TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());
 
             WorkerTaskExecutor workerTaskExecutor = 
workerTaskExecutorFactoryBuilder

Reply via email to