This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch runtimer
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git

commit aab58d6cb8811710351a3867e99cb2d98c0d0a52
Author: windwheel <[email protected]>
AuthorDate: Sun May 21 16:09:02 2023 +0800

    refactor shutdown
---
 .../eventbridge/adapter/runtime/Runtime.java       | 39 +++++++++++++++++++---
 .../adapter/runtime/boot/EventBusListener.java     |  5 +++
 .../adapter/runtime/boot/EventRuleTransfer.java    | 27 +++++++++++++--
 .../adapter/runtime/boot/EventTargetTrigger.java   | 19 +++++++++++
 .../runtime/boot/common/CirculatorContext.java     | 25 ++++++++++++++
 .../boot/hook/AbstractStartAndShutdown.java        |  2 +-
 .../adapter/runtime}/boot/hook/Shutdown.java       |  2 +-
 .../adapter/runtime}/boot/hook/Start.java          |  2 +-
 .../runtime}/boot/hook/StartAndShutdown.java       |  4 +--
 .../runtime/boot/listener/EventSubscriber.java     |  5 +++
 .../adapter/runtime/common/ServiceThread.java      | 12 ++++++-
 .../adapter/runtime}/utils/ShutdownUtils.java      |  2 +-
 .../rocketmq/runtimer/RocketMQEventSubscriber.java |  8 +++++
 13 files changed, 137 insertions(+), 15 deletions(-)

diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java
index e5bd487..aae2f70 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java
@@ -31,6 +31,10 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import 
org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook.StartAndShutdown;
+import 
org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook.AbstractStartAndShutdown;
+
+
 import javax.annotation.PostConstruct;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -46,6 +50,8 @@ public class Runtime {
 
     private AtomicReference<RuntimeState> runtimerState;
 
+    private static final RuntimeStartAndShutdown RUNTIME_START_AND_SHUTDOWN = 
new RuntimeStartAndShutdown();
+
     @Autowired
     private CirculatorContext circulatorContext;
     @Autowired
@@ -58,17 +64,40 @@ public class Runtime {
     private ErrorHandler errorHandler;
 
     @PostConstruct
-    public void initAndStart() {
-        logger.info("Start init runtime.");
+    public void initAndStart() throws Exception {
+        logger.info("Start init runtimer.");
         
circulatorContext.initCirculatorContext(runnerConfigObserver.getTargetRunnerConfig());
         runnerConfigObserver.registerListener(circulatorContext);
         runnerConfigObserver.registerListener(eventSubscriber);
-        new EventBusListener(circulatorContext, eventSubscriber, 
errorHandler).start();
-        new EventRuleTransfer(circulatorContext, offsetManager, 
errorHandler).start();
-        new EventTargetTrigger(circulatorContext, offsetManager, 
errorHandler).start();
+        EventBusListener eventBusListener = new 
EventBusListener(circulatorContext, eventSubscriber, errorHandler);
+        EventRuleTransfer eventRuleTransfer = new 
EventRuleTransfer(circulatorContext, offsetManager, errorHandler);
+        EventTargetTrigger eventTargetPusher = new 
EventTargetTrigger(circulatorContext, offsetManager, errorHandler);
+        RUNTIME_START_AND_SHUTDOWN.appendStartAndShutdown(eventBusListener);
+        RUNTIME_START_AND_SHUTDOWN.appendStartAndShutdown(eventRuleTransfer);
+        RUNTIME_START_AND_SHUTDOWN.appendStartAndShutdown(eventTargetPusher);
+
+        // start servers one by one.
+        RUNTIME_START_AND_SHUTDOWN.start();
+
+        java.lang.Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            logger.info("try to shutdown server");
+            try {
+                RUNTIME_START_AND_SHUTDOWN.shutdown();
+            } catch (Exception e) {
+                logger.error("err when shutdown rocketmq-proxy", e);
+            }
+        }));
+
         startRuntimer();
     }
 
+    private static class RuntimeStartAndShutdown extends 
AbstractStartAndShutdown {
+        @Override
+        protected void appendStartAndShutdown(StartAndShutdown 
startAndShutdown) {
+            super.appendStartAndShutdown(startAndShutdown);
+        }
+    }
+
     public void startRuntimer() {
         runtimerState = new AtomicReference<>(RuntimeState.START);
     }
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java
index e6f06c2..48af27f 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java
@@ -70,4 +70,9 @@ public class EventBusListener extends ServiceThread {
     public String getServiceName() {
         return EventBusListener.class.getSimpleName();
     }
+
+    @Override
+    public void shutdown() {
+        eventSubscriber.close();
+    }
 }
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
index 8612733..4322187 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
@@ -32,9 +32,9 @@ import 
org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorCon
 import 
org.apache.rocketmq.eventbridge.adapter.runtime.boot.transfer.TransformEngine;
 import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread;
 import org.apache.rocketmq.eventbridge.adapter.runtime.error.ErrorHandler;
+import org.apache.rocketmq.eventbridge.adapter.runtime.utils.ShutdownUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 
 /**
  * receive event and transfer the rule to pusher
@@ -48,6 +48,8 @@ public class EventRuleTransfer extends ServiceThread {
     private final CirculatorContext circulatorContext;
     private final OffsetManager offsetManager;
     private final ErrorHandler errorHandler;
+    private Map<String, TransformEngine<ConnectRecord>> latestTransformMap;
+    private List<CompletableFuture<Void>> completableFutures;
 
     public EventRuleTransfer(CirculatorContext circulatorContext, 
OffsetManager offsetManager,
         ErrorHandler errorHandler) {
@@ -75,7 +77,7 @@ public class EventRuleTransfer extends ServiceThread {
                 this.waitForRunning(1000);
                 continue;
             }
-            Map<String, TransformEngine<ConnectRecord>> latestTransformMap = 
circulatorContext.getTaskTransformMap();
+            latestTransformMap = circulatorContext.getTaskTransformMap();
             if (MapUtils.isEmpty(latestTransformMap)) {
                 logger.warn("latest transform engine is empty, continue by 
curTime - {}", System.currentTimeMillis());
                 this.waitForRunning(3000);
@@ -83,7 +85,7 @@ public class EventRuleTransfer extends ServiceThread {
             }
 
             List<ConnectRecord> afterTransformConnect = Lists.newArrayList();
-            List<CompletableFuture<Void>> completableFutures = 
Lists.newArrayList();
+            completableFutures = Lists.newArrayList();
             for(String runnerName: eventRecordMap.keySet()){
                 TransformEngine<ConnectRecord> curTransformEngine = 
latestTransformMap.get(runnerName);
                 List<ConnectRecord> curEventRecords = 
eventRecordMap.get(runnerName);
@@ -117,4 +119,23 @@ public class EventRuleTransfer extends ServiceThread {
         }
     }
 
+    @Override
+    public void start() {
+        thread.start();
+    }
+
+    @Override
+    public void shutdown() {
+        try {
+            for (Map.Entry<String, TransformEngine<ConnectRecord>> 
taskTransform : latestTransformMap.entrySet()) {
+                TransformEngine<ConnectRecord> transformEngine = 
taskTransform.getValue();
+                transformEngine.close();
+            }
+            ShutdownUtils.completedFuture(completableFutures);
+            circulatorContext.releaseTaskTransform();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
 }
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
index a54ee39..c4ff349 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
@@ -89,4 +89,23 @@ public class EventTargetTrigger extends ServiceThread {
         return EventTargetTrigger.class.getSimpleName();
     }
 
+    @Override
+    public void start() {
+        thread.start();
+    }
+
+    @Override
+    public void shutdown() {
+        Map<String, SinkTask> sinkTaskMap =  
circulatorContext.getPusherTaskMap();
+        for (Map.Entry<String, SinkTask> item : sinkTaskMap.entrySet()) {
+            SinkTask sinkTask = item.getValue();
+            sinkTask.stop();
+        }
+        try {
+            circulatorContext.releaseExecutorService();
+            circulatorContext.releaseTriggerTask();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
 }
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
index 203f43a..7b467d8 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
@@ -32,6 +32,7 @@ import 
org.apache.rocketmq.eventbridge.adapter.runtime.common.enums.RefreshTypeE
 import org.apache.rocketmq.eventbridge.adapter.runtime.common.plugin.Plugin;
 import 
org.apache.rocketmq.eventbridge.adapter.runtime.common.plugin.PluginClassLoader;
 import 
org.apache.rocketmq.eventbridge.adapter.runtime.config.RuntimeConfigDefine;
+import org.apache.rocketmq.eventbridge.adapter.runtime.utils.ShutdownUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -291,4 +292,28 @@ public class CirculatorContext implements 
TargetRunnerListener {
         return null;
     }
 
+
+    public void releaseTaskTransform() {
+        for (Map.Entry<String, TransformEngine<ConnectRecord>> taskTransform : 
taskTransformMap.entrySet()) {
+            String runnerName = taskTransform.getKey();
+            taskTransformMap.remove(runnerName);
+        }
+    }
+
+    public void releaseTriggerTask() {
+        for (Map.Entry<String, SinkTask> triggerTask: 
pusherTaskMap.entrySet()) {
+            SinkTask sinkTask = triggerTask.getValue();
+            String runnerName = triggerTask.getKey();
+            sinkTask.stop();
+            pusherTaskMap.remove(runnerName);
+        }
+    }
+
+    public void releaseExecutorService() throws Exception {
+        for (Map.Entry<String, ExecutorService> pusherExecutor: 
pusherExecutorMap.entrySet()) {
+            ExecutorService pusher = pusherExecutor.getValue();
+            ShutdownUtils.shutdownThreadPool(pusher);
+        }
+    }
+
 }
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/AbstractStartAndShutdown.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/AbstractStartAndShutdown.java
similarity index 97%
rename from 
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/AbstractStartAndShutdown.java
rename to 
adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/AbstractStartAndShutdown.java
index 051fe5a..9a1f33e 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/AbstractStartAndShutdown.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/AbstractStartAndShutdown.java
@@ -15,7 +15,7 @@
  *  limitations under the License.
  */
 
-package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.hook;
+package org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook;
 
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/Shutdown.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/Shutdown.java
similarity index 92%
rename from 
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/Shutdown.java
rename to 
adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/Shutdown.java
index f3ac5f3..854cd19 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/Shutdown.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/Shutdown.java
@@ -15,7 +15,7 @@
  *  limitations under the License.
  */
 
-package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.hook;
+package org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook;
 
 public interface Shutdown {
     void shutdown() throws Exception;
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/Start.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/Start.java
similarity index 92%
rename from 
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/Start.java
rename to 
adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/Start.java
index b44d86a..353255f 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/Start.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/Start.java
@@ -14,7 +14,7 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.hook;
+package org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook;
 
 public interface Start {
     void start() throws Exception;
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/StartAndShutdown.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/StartAndShutdown.java
similarity index 86%
rename from 
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/StartAndShutdown.java
rename to 
adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/StartAndShutdown.java
index 242c1b0..cc80740 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/hook/StartAndShutdown.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/hook/StartAndShutdown.java
@@ -15,7 +15,7 @@
  *  limitations under the License.
  */
 
-package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.hook;
+package org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook;
 
-public interface StartAndShutdown extends Start,Shutdown {
+public interface StartAndShutdown extends Start, Shutdown {
 }
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java
index 0d99fce..be4db9b 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java
@@ -48,6 +48,11 @@ public abstract class EventSubscriber implements 
TargetRunnerListener {
      */
     public abstract void commit(List<ConnectRecord> connectRecordList);
 
+    /**
+     * close resource such as consumer
+     */
+    public abstract void close();
+
     /**
      * Put the connect record to the eventbus.
      * @param eventBusName
diff --git 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java
index 48835c1..400cf47 100644
--- 
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.eventbridge.adapter.runtime.common;
 
+import 
org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook.AbstractStartAndShutdown;
 import org.apache.rocketmq.common.CountDownLatch2;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -24,7 +25,7 @@ import org.slf4j.LoggerFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-public abstract class ServiceThread implements Runnable {
+public abstract class ServiceThread extends AbstractStartAndShutdown 
implements Runnable {
 
     private static final Logger logger = 
LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER);
 
@@ -34,6 +35,8 @@ public abstract class ServiceThread implements Runnable {
     protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
     protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
     protected volatile boolean stopped = false;
+    protected boolean isDaemon = false;
+
 
     public ServiceThread() {
         this.thread = new Thread(this, this.getServiceName());
@@ -42,7 +45,14 @@ public abstract class ServiceThread implements Runnable {
     public abstract String getServiceName();
 
     public void start() {
+        logger.info("Try to start service thread:{} started:{} lastThread:{}", 
getServiceName(), hasNotified.get(), thread);
+        if (!hasNotified.compareAndSet(false, true)) {
+            return;
+        }
+        stopped = false;
+        this.thread.setDaemon(isDaemon);
         this.thread.start();
+        logger.info("Start service thread:{} started:{} lastThread:{}", 
getServiceName(), hasNotified.get(), thread);
     }
 
     public void shutdown() {
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/utils/ShutdownUtils.java
 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java
similarity index 96%
rename from 
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/utils/ShutdownUtils.java
rename to 
adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java
index 980c242..7feb906 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/utils/ShutdownUtils.java
+++ 
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java
@@ -15,7 +15,7 @@
  *  limitations under the License.
  */
 
-package org.apache.rocketmq.eventbridge.adapter.runtimer.utils;
+package org.apache.rocketmq.eventbridge.adapter.runtime.utils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
 
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
index f6b08cd..3005486 100644
--- 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
+++ 
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
@@ -158,6 +158,14 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
         consumeWorkerMap.get(runnerName).commit(msgIds);
     }
 
+    @Override
+    public void close() {
+        for (Map.Entry<String, ConsumeWorker> item : 
consumeWorkerMap.entrySet()) {
+            ConsumeWorker consumeWorker =  item.getValue();
+            consumeWorker.shutdown();
+        }
+    }
+
     /**
      * init rocketmq ref config
      */

Reply via email to