Repository: incubator-rocketmq
Updated Branches:
  refs/heads/develop 53b98d0d8 -> e3f4251c9


[ROCKETMQ-119] Add ThreadUtils and shutdown PullMessageService properly


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/e3f4251c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/e3f4251c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/e3f4251c

Branch: refs/heads/develop
Commit: e3f4251c91a73f4e51732bcb1690554ac5fb3096
Parents: 53b98d0
Author: yukon <[email protected]>
Authored: Mon Mar 6 20:01:10 2017 +0800
Committer: yukon <[email protected]>
Committed: Mon Mar 6 20:01:10 2017 +0800

----------------------------------------------------------------------
 .../impl/consumer/PullMessageService.java       |   7 +
 .../rocketmq/common/utils/ThreadUtils.java      | 174 +++++++++++++++++++
 2 files changed, 181 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/e3f4251c/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
index 8ddd483..ed4b837 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.slf4j.Logger;
 
 public class PullMessageService extends ServiceThread {
@@ -98,6 +99,12 @@ public class PullMessageService extends ServiceThread {
     }
 
     @Override
+    public void shutdown(boolean interrupt) {
+        super.shutdown(interrupt);
+        ThreadUtils.shutdownGracefully(this.scheduledExecutorService, 1000, 
TimeUnit.MILLISECONDS);
+    }
+
+    @Override
     public String getServiceName() {
         return PullMessageService.class.getSimpleName();
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/e3f4251c/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java 
b/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java
new file mode 100644
index 0000000..8c28d70
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.utils;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class ThreadUtils {
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.TOOLS_LOGGER_NAME);
+
+    public static ExecutorService newThreadPoolExecutor(int corePoolSize, int 
maximumPoolSize, long keepAliveTime,
+        TimeUnit unit, BlockingQueue<Runnable> workQueue, String processName, 
boolean isDaemon) {
+        return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 
keepAliveTime, unit, workQueue, newThreadFactory(processName, isDaemon));
+    }
+
+    public static ExecutorService newSingleThreadExecutor(String processName, 
boolean isDaemon) {
+        return Executors.newSingleThreadExecutor(newThreadFactory(processName, 
isDaemon));
+    }
+
+    public static ScheduledExecutorService 
newSingleThreadScheduledExecutor(String processName, boolean isDaemon) {
+        return 
Executors.newSingleThreadScheduledExecutor(newThreadFactory(processName, 
isDaemon));
+    }
+
+    public static ScheduledExecutorService newFixedThreadScheduledPool(int 
nThreads, String processName,
+        boolean isDaemon) {
+        return Executors.newScheduledThreadPool(nThreads, 
newThreadFactory(processName, isDaemon));
+    }
+
+    public static ThreadFactory newThreadFactory(String processName, boolean 
isDaemon) {
+        return newGenericThreadFactory("Remoting-" + processName, isDaemon);
+    }
+
+    public static ThreadFactory newGenericThreadFactory(String processName) {
+        return newGenericThreadFactory(processName, false);
+    }
+
+    public static ThreadFactory newGenericThreadFactory(String processName, 
int threads) {
+        return newGenericThreadFactory(processName, threads, false);
+    }
+
+    public static ThreadFactory newGenericThreadFactory(final String 
processName, final boolean isDaemon) {
+        return new ThreadFactory() {
+            private AtomicInteger threadIndex = new AtomicInteger(0);
+
+            @Override
+            public Thread newThread(Runnable r) {
+                Thread thread = new Thread(r, String.format("%s_%d", 
processName, this.threadIndex.incrementAndGet()));
+                thread.setDaemon(isDaemon);
+                return thread;
+            }
+        };
+    }
+
+    public static ThreadFactory newGenericThreadFactory(final String 
processName, final int threads,
+        final boolean isDaemon) {
+        return new ThreadFactory() {
+            private AtomicInteger threadIndex = new AtomicInteger(0);
+
+            @Override
+            public Thread newThread(Runnable r) {
+                Thread thread = new Thread(r, String.format("%s_%d_%d", 
processName, threads, this.threadIndex.incrementAndGet()));
+                thread.setDaemon(isDaemon);
+                return thread;
+            }
+        };
+    }
+
+    /**
+     * Create a new thread
+     *
+     * @param name The name of the thread
+     * @param runnable The work for the thread to do
+     * @param daemon Should the thread block JVM stop?
+     * @return The unstarted thread
+     */
+    public static Thread newThread(String name, Runnable runnable, boolean 
daemon) {
+        Thread thread = new Thread(runnable, name);
+        thread.setDaemon(daemon);
+        thread.setUncaughtExceptionHandler(new 
Thread.UncaughtExceptionHandler() {
+            public void uncaughtException(Thread t, Throwable e) {
+                log.error("Uncaught exception in thread '" + t.getName() + 
"':", e);
+            }
+        });
+        return thread;
+    }
+
+    /**
+     * Shutdown passed thread using isAlive and join.
+     *
+     * @param t Thread to stop
+     */
+    public static void shutdownGracefully(final Thread t) {
+        shutdownGracefully(t, 0);
+    }
+
+    /**
+     * Shutdown passed thread using isAlive and join.
+     *
+     * @param millis Pass 0 if we're to wait forever.
+     * @param t Thread to stop
+     */
+    public static void shutdownGracefully(final Thread t, final long millis) {
+        if (t == null)
+            return;
+        while (t.isAlive()) {
+            try {
+                t.interrupt();
+                t.join(millis);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * An implementation of the graceful stop sequence recommended by
+     * {@link ExecutorService}.
+     *
+     * @param executor executor
+     * @param timeout timeout
+     * @param timeUnit timeUnit
+     */
+    public static void shutdownGracefully(ExecutorService executor, long 
timeout, TimeUnit timeUnit) {
+        // Disable new tasks from being submitted.
+        executor.shutdown();
+        try {
+            // Wait a while for existing tasks to terminate.
+            if (!executor.awaitTermination(timeout, timeUnit)) {
+                executor.shutdownNow();
+                // Wait a while for tasks to respond to being cancelled.
+                if (!executor.awaitTermination(timeout, timeUnit)) {
+                    log.warn(String.format("%s didn't terminate!", executor));
+                }
+            }
+        } catch (InterruptedException ie) {
+            // (Re-)Cancel if current thread also interrupted.
+            executor.shutdownNow();
+            // Preserve interrupt status.
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    /**
+     * A constructor to stop this class being constructed.
+     */
+    private ThreadUtils() {
+        // Unused
+
+    }
+}

Reply via email to