Jashinck commented on code in PR #94:
URL: 
https://github.com/apache/rocketmq-eventbridge/pull/94#discussion_r1176114179


##########
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java:
##########
@@ -75,9 +82,23 @@ public void initAndStart() {
         
circulatorContext.initListenerMetadata(runnerConfigObserver.getTargetRunnerConfig());
         runnerConfigObserver.registerListener(circulatorContext);
         runnerConfigObserver.registerListener(eventSubscriber);
-        new EventBusListener(circulatorContext, eventSubscriber, 
errorHandler).start();
-        new EventRuleTransfer(circulatorContext, offsetManager, 
errorHandler).start();
-        new EventTargetPusher(circulatorContext, offsetManager, 
errorHandler).start();
+        EventBusListener eventBusListener = new 
EventBusListener(circulatorContext, eventSubscriber, errorHandler);
+        EventRuleTransfer eventRuleTransfer = new 
EventRuleTransfer(circulatorContext, offsetManager, errorHandler);
+        EventTargetPusher eventTargetPusher = new 
EventTargetPusher(circulatorContext, offsetManager, errorHandler);
+        ConcurrentHashMap<Thread, ExecutorService> threadThreadPoolExecutorMap 
= new ConcurrentHashMap<Thread, ExecutorService>() {
+            {
+                put(new Thread(eventBusListener, 
eventBusListener.getServiceName()), Executors.newSingleThreadExecutor());
+                put(new Thread(eventRuleTransfer, 
eventRuleTransfer.getServiceName()), Executors.newSingleThreadExecutor());
+                put(new Thread(eventTargetPusher, 
eventTargetPusher.getServiceName()), Executors.newSingleThreadExecutor());

Review Comment:
   Listener, Transfer, Pusher, Subscriber都需要单独重写或实现shutDown接口,实现各自领域的shutDown逻辑。
   



##########
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java:
##########
@@ -75,9 +82,23 @@ public void initAndStart() {
         
circulatorContext.initListenerMetadata(runnerConfigObserver.getTargetRunnerConfig());
         runnerConfigObserver.registerListener(circulatorContext);
         runnerConfigObserver.registerListener(eventSubscriber);
-        new EventBusListener(circulatorContext, eventSubscriber, 
errorHandler).start();
-        new EventRuleTransfer(circulatorContext, offsetManager, 
errorHandler).start();
-        new EventTargetPusher(circulatorContext, offsetManager, 
errorHandler).start();
+        EventBusListener eventBusListener = new 
EventBusListener(circulatorContext, eventSubscriber, errorHandler);
+        EventRuleTransfer eventRuleTransfer = new 
EventRuleTransfer(circulatorContext, offsetManager, errorHandler);
+        EventTargetPusher eventTargetPusher = new 
EventTargetPusher(circulatorContext, offsetManager, errorHandler);
+        ConcurrentHashMap<Thread, ExecutorService> threadThreadPoolExecutorMap 
= new ConcurrentHashMap<Thread, ExecutorService>() {
+            {
+                put(new Thread(eventBusListener, 
eventBusListener.getServiceName()), Executors.newSingleThreadExecutor());
+                put(new Thread(eventRuleTransfer, 
eventRuleTransfer.getServiceName()), Executors.newSingleThreadExecutor());
+                put(new Thread(eventTargetPusher, 
eventTargetPusher.getServiceName()), Executors.newSingleThreadExecutor());
+            }
+        };
+        ShutdownHookThread shutdownHookThread = new ShutdownHookThread(logger, 
() -> {

Review Comment:
   
Shutdown的触发维护在Runtimer是OK的,有必要单独起一个线程,交给ShutdownHookThread去维护么。Runtimer线程运行时,直接addShutdownHook调用各模块的shudDown接口,是否更简洁些



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to