This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 340fa99a redress running status (#423)
340fa99a is described below

commit 340fa99a8ae25cd1c9b0aefbfcd919dc75981e0d
Author: zhoubo <[email protected]>
AuthorDate: Fri Feb 17 13:34:26 2023 +0800

    redress running status (#423)
---
 .../connect/runtime/connectorwrapper/Worker.java   | 63 +++++++++++++++-------
 1 file changed, 44 insertions(+), 19 deletions(-)

diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index 097a0fdd..b52dd83a 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -27,6 +27,25 @@ import 
io.openmessaging.connector.api.component.task.source.SourceTask;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.RecordConverter;
 import io.openmessaging.connector.api.errors.ConnectException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -34,6 +53,8 @@ import 
org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.config.ConnectorConfig;
 import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
+import 
org.apache.rocketmq.connect.runtime.connectorwrapper.status.ConnectorStatus;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.status.TaskStatus;
 import 
org.apache.rocketmq.connect.runtime.connectorwrapper.status.WrapperStatusListener;
 import 
org.apache.rocketmq.connect.runtime.controller.AbstractConnectController;
 import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
@@ -55,25 +76,8 @@ import org.apache.rocketmq.connect.runtime.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
+import static 
org.apache.rocketmq.connect.runtime.connectorwrapper.status.AbstractStatus.State.PAUSED;
+import static 
org.apache.rocketmq.connect.runtime.connectorwrapper.status.AbstractStatus.State.RUNNING;
 
 /**
  * A worker to schedule all connectors and tasks in a process.
@@ -124,6 +128,9 @@ public class Worker {
     private AtomicReference<WorkerState> workerState;
     private final StateMachineService stateMachineService = new 
StateMachineService();
 
+    private final StateManagementService stateManagementService;
+
+
     public Worker(WorkerConfig workerConfig,
                   PositionManagementService positionManagementService,
                   ConfigManagementService configManagementService,
@@ -141,6 +148,7 @@ public class Worker {
         this.statusListener = new 
WrapperStatusListener(stateManagementService, workerConfig.getWorkerId());
         this.executor = Executors.newCachedThreadPool();
         this.connectMetrics = new ConnectMetrics(workerConfig);
+        this.stateManagementService = stateManagementService;
     }
 
     public void start() {
@@ -640,6 +648,8 @@ public class Worker {
                         runningTasks.remove(runnable);
                         stoppingTasks.put(runnable, 
System.currentTimeMillis());
                     } else {
+                        //status redress
+                        redressRunningStatus(workerTask);
                         // set target state
                         TargetState targetState = 
configManagementService.snapshot().targetState(connectorName);
                         if (targetState != null) {
@@ -900,6 +910,21 @@ public class Worker {
         }
     }
 
+    private void redressRunningStatus(WorkerTask workerTask) {
+        TaskStatus taskStatus = stateManagementService.get(workerTask.id);
+        if (taskStatus != null && taskStatus.getState() != RUNNING && 
taskStatus.getState() != PAUSED) {
+            ConnectorStatus connectorStatus = 
stateManagementService.get(workerTask.id.connector());
+            TaskStatus newTaskStatus;
+            if (null != connectorStatus && connectorStatus.getState() == 
PAUSED) {
+                newTaskStatus = new TaskStatus(workerTask.id, PAUSED, 
workerConfig.getWorkerId(), System.currentTimeMillis());
+            } else {
+                newTaskStatus = new TaskStatus(workerTask.id, RUNNING, 
workerConfig.getWorkerId(), System.currentTimeMillis());
+            }
+            log.warn("Task {}, Old task status is {}, new task status {}", 
workerTask.id, taskStatus, newTaskStatus);
+            stateManagementService.put(newTaskStatus);
+        }
+    }
+
     private Map<String, List<ConnectKeyValue>> newTasks(Map<String, 
List<ConnectKeyValue>> taskConfigs) {
         Map<String, List<ConnectKeyValue>> newTasks = new HashMap<>();
         for (String connectorName : taskConfigs.keySet()) {

Reply via email to