zuston commented on code in PR #2127:
URL: 
https://github.com/apache/incubator-uniffle/pull/2127#discussion_r1764680230


##########
common/src/main/java/org/apache/uniffle/common/executor/ThreadPoolManager.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.executor;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.metrics.CommonMetrics;
+
+/** The threadPool manager which represents a manager to handle all thread 
pool executors. */
+public class ThreadPoolManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ThreadPoolManager.class);
+
+  private static final Map<Object, MeasurableThreadPoolExecutor> 
THREAD_POOL_MAP =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Add a thread pool.
+   *
+   * @param name the name of the thread pool
+   * @param corePoolSize the core pool size supplier
+   * @param maximumPoolSize the maximum pool size supplier
+   * @param keepAliveTime the keep alive time supplier
+   * @param unit the unit
+   * @param workQueue the work queue
+   * @param threadFactory the thread factory
+   * @return the registered thread pool
+   */
+  public static ThreadPoolExecutor newThreadPool(
+      String name,
+      int corePoolSize,
+      int maximumPoolSize,
+      long keepAliveTime,
+      TimeUnit unit,
+      BlockingQueue<Runnable> workQueue,
+      ThreadFactory threadFactory) {
+    ThreadPoolExecutor threadPoolExecutor =
+        new ThreadPoolExecutor(
+            corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
threadFactory);
+    registerThreadPool(name, corePoolSize, maximumPoolSize, keepAliveTime, 
threadPoolExecutor);
+    return threadPoolExecutor;
+  }
+
+  /**
+   * Add a thread pool.
+   *
+   * @param name the name of the thread pool
+   * @param corePoolSize the core pool size supplier
+   * @param maximumPoolSize the maximum pool size supplier
+   * @param keepAliveTime the keep alive time supplier
+   * @param unit the unit
+   * @param workQueue the work queue
+   * @param threadFactory the thread factory
+   * @param handler the handler to use when execution is blocked because the 
thread bounds and queue
+   *     capacities are reached
+   * @return the registered thread pool
+   */
+  public static ThreadPoolExecutor newThreadPool(
+      String name,
+      int corePoolSize,
+      int maximumPoolSize,
+      long keepAliveTime,
+      TimeUnit unit,
+      BlockingQueue<Runnable> workQueue,
+      ThreadFactory threadFactory,
+      RejectedExecutionHandler handler) {
+    ThreadPoolExecutor threadPoolExecutor =
+        new ThreadPoolExecutor(
+            corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
threadFactory, handler);
+    registerThreadPool(name, corePoolSize, maximumPoolSize, keepAliveTime, 
threadPoolExecutor);
+    return threadPoolExecutor;
+  }
+
+  /**
+   * Register a thread pool to THREAD_POOL_MAP.
+   *
+   * @param name the name of the thread pool
+   * @param corePoolSize the core pool size supplier
+   * @param maximumPoolSize the maximum pool size supplier
+   * @param keepAliveTime the keep alive time supplier
+   * @param threadPoolExecutor the thread pool which will be registered
+   */
+  public static void registerThreadPool(
+      String name,
+      int corePoolSize,
+      int maximumPoolSize,
+      long keepAliveTime,
+      ThreadPoolExecutor threadPoolExecutor) {
+    THREAD_POOL_MAP.put(
+        threadPoolExecutor, new MeasurableThreadPoolExecutor(name, 
threadPoolExecutor));
+    LOG.info(
+        "{} thread pool, core size:{}, max size:{}, keep alive time:{}",
+        name,
+        corePoolSize,
+        maximumPoolSize,
+        keepAliveTime);
+  }
+
+  /**
+   * Unregister the thread pool executor related to the given key.
+   *
+   * @param key the key of thread pool executor to unregister
+   */
+  public static void unregister(Object key) {
+    MeasurableThreadPoolExecutor measurableThreadPoolExecutor = 
THREAD_POOL_MAP.remove(key);

Review Comment:
   I think this is not thread safe. Before real closing this executor, we 
should ensure the reference of this executor pool is equals to 0. 



##########
common/src/main/java/org/apache/uniffle/common/executor/ThreadPoolManager.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.executor;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.metrics.CommonMetrics;
+
+/** The threadPool manager which represents a manager to handle all thread 
pool executors. */
+public class ThreadPoolManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ThreadPoolManager.class);
+
+  private static final Map<Object, MeasurableThreadPoolExecutor> 
THREAD_POOL_MAP =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Add a thread pool.
+   *
+   * @param name the name of the thread pool
+   * @param corePoolSize the core pool size supplier
+   * @param maximumPoolSize the maximum pool size supplier
+   * @param keepAliveTime the keep alive time supplier
+   * @param unit the unit
+   * @param workQueue the work queue
+   * @param threadFactory the thread factory
+   * @return the registered thread pool
+   */
+  public static ThreadPoolExecutor newThreadPool(
+      String name,
+      int corePoolSize,
+      int maximumPoolSize,
+      long keepAliveTime,
+      TimeUnit unit,
+      BlockingQueue<Runnable> workQueue,
+      ThreadFactory threadFactory) {
+    ThreadPoolExecutor threadPoolExecutor =
+        new ThreadPoolExecutor(
+            corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
threadFactory);
+    registerThreadPool(name, corePoolSize, maximumPoolSize, keepAliveTime, 
threadPoolExecutor);
+    return threadPoolExecutor;
+  }
+
+  /**
+   * Add a thread pool.
+   *
+   * @param name the name of the thread pool
+   * @param corePoolSize the core pool size supplier
+   * @param maximumPoolSize the maximum pool size supplier
+   * @param keepAliveTime the keep alive time supplier
+   * @param unit the unit
+   * @param workQueue the work queue
+   * @param threadFactory the thread factory
+   * @param handler the handler to use when execution is blocked because the 
thread bounds and queue
+   *     capacities are reached
+   * @return the registered thread pool
+   */
+  public static ThreadPoolExecutor newThreadPool(
+      String name,
+      int corePoolSize,
+      int maximumPoolSize,
+      long keepAliveTime,
+      TimeUnit unit,
+      BlockingQueue<Runnable> workQueue,
+      ThreadFactory threadFactory,
+      RejectedExecutionHandler handler) {
+    ThreadPoolExecutor threadPoolExecutor =
+        new ThreadPoolExecutor(
+            corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
threadFactory, handler);
+    registerThreadPool(name, corePoolSize, maximumPoolSize, keepAliveTime, 
threadPoolExecutor);
+    return threadPoolExecutor;
+  }
+
+  /**
+   * Register a thread pool to THREAD_POOL_MAP.
+   *
+   * @param name the name of the thread pool
+   * @param corePoolSize the core pool size supplier
+   * @param maximumPoolSize the maximum pool size supplier
+   * @param keepAliveTime the keep alive time supplier
+   * @param threadPoolExecutor the thread pool which will be registered
+   */
+  public static void registerThreadPool(
+      String name,
+      int corePoolSize,
+      int maximumPoolSize,
+      long keepAliveTime,
+      ThreadPoolExecutor threadPoolExecutor) {
+    THREAD_POOL_MAP.put(
+        threadPoolExecutor, new MeasurableThreadPoolExecutor(name, 
threadPoolExecutor));
+    LOG.info(
+        "{} thread pool, core size:{}, max size:{}, keep alive time:{}",
+        name,
+        corePoolSize,
+        maximumPoolSize,
+        keepAliveTime);
+  }
+
+  /**
+   * Unregister the thread pool executor related to the given key.
+   *
+   * @param key the key of thread pool executor to unregister
+   */
+  public static void unregister(Object key) {
+    MeasurableThreadPoolExecutor measurableThreadPoolExecutor = 
THREAD_POOL_MAP.remove(key);
+    if (measurableThreadPoolExecutor != null) {
+      measurableThreadPoolExecutor.close();
+    }
+  }
+
+  public static boolean exists(Object key) {
+    return THREAD_POOL_MAP.containsKey(key);
+  }
+
+  private static class MeasurableThreadPoolExecutor implements Closeable {

Review Comment:
   How to use this? Could you help describe this in the test case? 



##########
common/src/main/java/org/apache/uniffle/common/executor/ThreadPoolManager.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.executor;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.metrics.CommonMetrics;
+
+/** The threadPool manager which represents a manager to handle all thread 
pool executors. */
+public class ThreadPoolManager {

Review Comment:
   Please add some tests about this class, the following sections should be 
covered.
   
   1. The rejected metrics should be tested on the different cases
   2. The pool register/unregister under the multiple threads. Is this 
necessary? 
   3. ...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to