areyouok commented on a change in pull request #3639:
URL: https://github.com/apache/rocketmq/pull/3639#discussion_r769190299



##########
File path: 
client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java
##########
@@ -74,14 +73,42 @@ public void scanExpiredRequest() {
         }
     }
 
-    private RequestFutureHolder() {
+    public synchronized void startScheduledTask(DefaultMQProducerImpl 
producer) {
+        this.producerSet.add(producer);
+        if (this.producerSet.size() >= 1 && this.serviceState != 
ServiceState.RUNNING) {
+            this.serviceState = ServiceState.START_FAILED;
+
+            this.getScheduledExecutorService().scheduleAtFixedRate(new 
Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        RequestFutureHolder.getInstance().scanExpiredRequest();
+                    } catch (Throwable e) {
+                        log.error("scan RequestFutureTable exception", e);
+                    }
+                }
+            }, 1000 * 3, 1000, TimeUnit.MILLISECONDS);
+
+            this.serviceState = ServiceState.RUNNING;
+        }
     }
 
-    public AtomicInteger getProducerNum() {
-        return producerNum;
+    public synchronized void shutdown(DefaultMQProducerImpl producer) {
+        this.producerSet.remove(producer);
+        if (this.producerSet.size() <= 0 && null != 
this.scheduledExecutorService && this.serviceState != 
ServiceState.SHUTDOWN_ALREADY) {
+            this.scheduledExecutorService.shutdown();
+            this.scheduledExecutorService = null;
+            this.serviceState = ServiceState.SHUTDOWN_ALREADY;
+        }
     }
 
-    public ScheduledExecutorService getScheduledExecutorService() {
+    private RequestFutureHolder() {
+    }
+
+    private ScheduledExecutorService getScheduledExecutorService() {

Review comment:
       this method should inline

##########
File path: 
client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java
##########
@@ -74,14 +73,42 @@ public void scanExpiredRequest() {
         }
     }
 
-    private RequestFutureHolder() {
+    public synchronized void startScheduledTask(DefaultMQProducerImpl 
producer) {
+        this.producerSet.add(producer);
+        if (this.producerSet.size() >= 1 && this.serviceState != 
ServiceState.RUNNING) {

Review comment:
       no need check producerSet.size()

##########
File path: 
client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java
##########
@@ -17,39 +17,38 @@
 
 package org.apache.rocketmq.client.producer;
 
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.rocketmq.client.common.ClientErrorCode;
 import org.apache.rocketmq.client.exception.RequestTimeoutException;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
 import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.ServiceState;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.logging.InternalLogger;
 
 public class RequestFutureHolder {
     private static InternalLogger log = ClientLogger.getLog();
     private static final RequestFutureHolder INSTANCE = new 
RequestFutureHolder();
     private ConcurrentHashMap<String, RequestResponseFuture> 
requestFutureTable = new ConcurrentHashMap<String, RequestResponseFuture>();
-    private final AtomicInteger producerNum = new AtomicInteger(0);
-    private final ScheduledExecutorService scheduledExecutorService = Executors
-        .newSingleThreadScheduledExecutor(new ThreadFactory() {
-            @Override
-            public Thread newThread(Runnable r) {
-                return new Thread(r, "RequestHouseKeepingService");
-            }
-        });
+    private final Set<DefaultMQProducerImpl> producerSet = new HashSet<>();
+    private ScheduledExecutorService scheduledExecutorService = null;
+    private ServiceState serviceState = ServiceState.CREATE_JUST;

Review comment:
       this serviceState is duplicate. we can use 
scheduledExecutorService==null to check

##########
File path: 
client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java
##########
@@ -74,14 +69,40 @@ public void scanExpiredRequest() {
         }
     }
 
-    private RequestFutureHolder() {
+    public synchronized void startScheduledTask() {
+        if (this.producerNum.incrementAndGet() == 1) {
+            this.getScheduledExecutorService().scheduleAtFixedRate(new 
Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        RequestFutureHolder.getInstance().scanExpiredRequest();
+                    } catch (Throwable e) {
+                        log.error("scan RequestFutureTable exception", e);
+                    }
+                }
+            }, 1000 * 3, 1000, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    public synchronized void shutdown() {
+        if (this.producerNum.decrementAndGet() == 0) {
+            this.scheduledExecutorService.shutdown();

Review comment:
       may thutdown() throws exception? 
   
   the below code maybe better?
   ```
   ScheduledExecutorService ses = this.scheduledExecutorService;
   this.scheduledExecutorService = null;
   ses.shutdown();
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to