This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 14bf48c  [ISSUE-4072] Parallel insert records in Session (#4073)
14bf48c is described below

commit 14bf48cf0abaa7ec207de5cde7ff64f2b5660973
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu Oct 14 16:11:59 2021 +0800

    [ISSUE-4072] Parallel insert records in Session (#4073)
---
 .../main/java/org/apache/iotdb/session/Config.java |   4 +
 .../org/apache/iotdb/session/InsertConsumer.java   |  31 +++
 .../java/org/apache/iotdb/session/Session.java     | 237 ++++++++++-----------
 .../apache/iotdb/session/SessionConnection.java    |   1 +
 .../org/apache/iotdb/session/pool/SessionPool.java |   4 +-
 .../iotdb/session/{ => util}/SessionUtils.java     |   2 +-
 .../org/apache/iotdb/session/util/ThreadUtils.java |  45 ++++
 .../apache/iotdb/session/util/ThreadUtilsTest.java |  35 +++
 8 files changed, 236 insertions(+), 123 deletions(-)

diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java 
b/session/src/main/java/org/apache/iotdb/session/Config.java
index 7b1a636..f56aa44 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/session/src/main/java/org/apache/iotdb/session/Config.java
@@ -28,6 +28,10 @@ public class Config {
   public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 0;
   public static final boolean DEFAULT_CACHE_LEADER_MODE = true;
 
+  public static final int CPU_CORES = 
Runtime.getRuntime().availableProcessors();
+  public static final int DEFAULT_SESSION_EXECUTOR_THREAD_NUM = 2 * CPU_CORES;
+  public static final int DEFAULT_SESSION_EXECUTOR_TASK_NUM = 1_000;
+
   public static final int RETRY_NUM = 3;
   public static final long RETRY_INTERVAL_MS = 1000;
 
diff --git a/session/src/main/java/org/apache/iotdb/session/InsertConsumer.java 
b/session/src/main/java/org/apache/iotdb/session/InsertConsumer.java
new file mode 100644
index 0000000..dcc655c
--- /dev/null
+++ b/session/src/main/java/org/apache/iotdb/session/InsertConsumer.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.RedirectException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+
+@FunctionalInterface
+public interface InsertConsumer<T> {
+
+  void insert(SessionConnection connection, T record)
+      throws IoTDBConnectionException, StatementExecutionException, 
RedirectException;
+}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java 
b/session/src/main/java/org/apache/iotdb/session/Session.java
index 8673f7b..48833aa 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -37,6 +37,8 @@ import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
 import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
 import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
+import org.apache.iotdb.session.util.SessionUtils;
+import org.apache.iotdb.session.util.ThreadUtils;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -63,6 +65,12 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
@@ -75,6 +83,14 @@ public class Session {
   public static final String MSG_UNSUPPORTED_DATA_TYPE = "Unsupported data 
type:";
   public static final String MSG_DONOT_ENABLE_REDIRECT =
       "Query do not enable redirect," + " please confirm the session and 
server conf.";
+  private static final ThreadPoolExecutor OPERATION_EXECUTOR =
+      new ThreadPoolExecutor(
+          Config.DEFAULT_SESSION_EXECUTOR_THREAD_NUM,
+          Config.DEFAULT_SESSION_EXECUTOR_THREAD_NUM,
+          0,
+          TimeUnit.MILLISECONDS,
+          new LinkedBlockingQueue<>(Config.DEFAULT_SESSION_EXECUTOR_TASK_NUM),
+          ThreadUtils.createThreadFactory("SessionExecutor", true));
   protected List<String> nodeUrls;
   protected String username;
   protected String password;
@@ -100,9 +116,8 @@ public class Session {
   // Cluster version cache
   protected boolean enableCacheLeader;
   protected SessionConnection metaSessionConnection;
-  protected Map<String, EndPoint> deviceIdToEndpoint;
-  protected Map<EndPoint, SessionConnection> endPointToSessionConnection;
-  private AtomicReference<IoTDBConnectionException> tmp = new 
AtomicReference<>();
+  protected volatile Map<String, EndPoint> deviceIdToEndpoint;
+  protected volatile Map<EndPoint, SessionConnection> 
endPointToSessionConnection;
 
   protected boolean enableQueryRedirection = false;
 
@@ -335,8 +350,8 @@ public class Session {
     metaSessionConnection = defaultSessionConnection;
     isClosed = false;
     if (enableCacheLeader || enableQueryRedirection) {
-      deviceIdToEndpoint = new HashMap<>();
-      endPointToSessionConnection = new HashMap<>();
+      deviceIdToEndpoint = new ConcurrentHashMap<>();
+      endPointToSessionConnection = new ConcurrentHashMap<>();
       endPointToSessionConnection.put(defaultEndPoint, 
defaultSessionConnection);
     }
   }
@@ -774,6 +789,7 @@ public class Session {
       throws IoTDBConnectionException {
     if (enableCacheLeader) {
       logger.debug("storageGroup[{}]:{}", storageGroup, e.getMessage());
+      AtomicReference<IoTDBConnectionException> exceptionReference = new 
AtomicReference<>();
       SessionConnection connection =
           endPointToSessionConnection.computeIfAbsent(
               e.getEndPoint(),
@@ -781,12 +797,12 @@ public class Session {
                 try {
                   return constructSessionConnection(this, e.getEndPoint(), 
zoneId);
                 } catch (IoTDBConnectionException ex) {
-                  tmp.set(ex);
+                  exceptionReference.set(ex);
                   return null;
                 }
               });
       if (connection == null) {
-        throw new IoTDBConnectionException(tmp.get());
+        throw new IoTDBConnectionException(exceptionReference.get());
       }
       metaSessionConnection = connection;
     }
@@ -795,6 +811,7 @@ public class Session {
   private void handleRedirection(String deviceId, EndPoint endpoint)
       throws IoTDBConnectionException {
     if (enableCacheLeader) {
+      AtomicReference<IoTDBConnectionException> exceptionReference = new 
AtomicReference<>();
       deviceIdToEndpoint.put(deviceId, endpoint);
       SessionConnection connection =
           endPointToSessionConnection.computeIfAbsent(
@@ -803,18 +820,20 @@ public class Session {
                 try {
                   return constructSessionConnection(this, endpoint, zoneId);
                 } catch (IoTDBConnectionException ex) {
-                  tmp.set(ex);
+                  exceptionReference.set(ex);
                   return null;
                 }
               });
       if (connection == null) {
-        throw new IoTDBConnectionException(tmp.get());
+        deviceIdToEndpoint.remove(deviceId);
+        throw new IoTDBConnectionException(exceptionReference.get());
       }
     }
   }
 
   private void handleQueryRedirection(EndPoint endPoint) throws 
IoTDBConnectionException {
     if (enableQueryRedirection) {
+      AtomicReference<IoTDBConnectionException> exceptionReference = new 
AtomicReference<>();
       SessionConnection connection =
           endPointToSessionConnection.computeIfAbsent(
               endPoint,
@@ -825,12 +844,12 @@ public class Session {
                   sessionConnection.setEnableRedirect(enableQueryRedirection);
                   return sessionConnection;
                 } catch (IoTDBConnectionException ex) {
-                  tmp.set(ex);
+                  exceptionReference.set(ex);
                   return null;
                 }
               });
       if (connection == null) {
-        throw new IoTDBConnectionException(tmp.get());
+        throw new IoTDBConnectionException(exceptionReference.get());
       }
       defaultSessionConnection = connection;
     }
@@ -1018,42 +1037,16 @@ public class Session {
       boolean isAligned)
       throws IoTDBConnectionException, StatementExecutionException {
     Map<SessionConnection, TSInsertStringRecordsReq> recordsGroup = new 
HashMap<>();
-    EndPoint endPoint;
-    SessionConnection connection;
     for (int i = 0; i < deviceIds.size(); i++) {
-      endPoint = deviceIdToEndpoint.isEmpty() ? null : 
deviceIdToEndpoint.get(deviceIds.get(i));
-      if (endPoint != null) {
-        connection = endPointToSessionConnection.get(endPoint);
-      } else {
-        connection = defaultSessionConnection;
-      }
+      final SessionConnection connection = 
getSessionConnection(deviceIds.get(i));
       TSInsertStringRecordsReq request =
           recordsGroup.computeIfAbsent(connection, k -> new 
TSInsertStringRecordsReq());
       request.setIsAligned(isAligned);
       updateTSInsertStringRecordsReq(
           request, deviceIds.get(i), times.get(i), measurementsList.get(i), 
valuesList.get(i));
     }
-    // TODO parallel
-    StringBuilder errMsgBuilder = new StringBuilder();
-    for (Entry<SessionConnection, TSInsertStringRecordsReq> entry : 
recordsGroup.entrySet()) {
-      try {
-        entry.getKey().insertRecords(entry.getValue());
-      } catch (RedirectException e) {
-        for (Entry<String, EndPoint> deviceEndPointEntry : 
e.getDeviceEndPointMap().entrySet()) {
-          handleRedirection(deviceEndPointEntry.getKey(), 
deviceEndPointEntry.getValue());
-        }
-      } catch (StatementExecutionException e) {
-        errMsgBuilder.append(e.getMessage());
-      } catch (IoTDBConnectionException e) {
-        // remove the broken session
-        removeBrokenSessionConnection(entry.getKey());
-        throw e;
-      }
-    }
-    String errMsg = errMsgBuilder.toString();
-    if (!errMsg.isEmpty()) {
-      throw new StatementExecutionException(errMsg);
-    }
+
+    insertByGroup(recordsGroup, SessionConnection::insertRecords);
   }
 
   private TSInsertStringRecordsReq genTSInsertStringRecordsReq(
@@ -1316,13 +1309,18 @@ public class Session {
     return request;
   }
 
-  @SuppressWarnings("squid:S3740")
-  private List sortList(List source, Integer[] index) {
-    Object[] result = new Object[source.size()];
-    for (int i = 0; i < index.length; i++) {
-      result[i] = source.get(index[i]);
-    }
-    return Arrays.asList(result);
+  /**
+   * Sort the input source list.
+   *
+   * <p>e.g. source: [1,2,3,4,5], index:[1,0,3,2,4], return : [2,1,4,3,5]
+   *
+   * @param source Input list
+   * @param index retuen order
+   * @param <T> Input type
+   * @return ordered list
+   */
+  private static <T> List<T> sortList(List<T> source, Integer[] index) {
+    return Arrays.stream(index).map(source::get).collect(Collectors.toList());
   }
 
   private List<ByteBuffer> objectValuesListToByteBufferList(
@@ -1346,15 +1344,8 @@ public class Session {
       boolean isAligned)
       throws IoTDBConnectionException, StatementExecutionException {
     Map<SessionConnection, TSInsertRecordsReq> recordsGroup = new HashMap<>();
-    EndPoint endPoint;
-    SessionConnection connection;
     for (int i = 0; i < deviceIds.size(); i++) {
-      endPoint = deviceIdToEndpoint.isEmpty() ? null : 
deviceIdToEndpoint.get(deviceIds.get(i));
-      if (endPoint != null) {
-        connection = endPointToSessionConnection.get(endPoint);
-      } else {
-        connection = defaultSessionConnection;
-      }
+      final SessionConnection connection = 
getSessionConnection(deviceIds.get(i));
       TSInsertRecordsReq request =
           recordsGroup.computeIfAbsent(connection, k -> new 
TSInsertRecordsReq());
       request.setIsAligned(isAligned);
@@ -1366,27 +1357,7 @@ public class Session {
           typesList.get(i),
           valuesList.get(i));
     }
-    // TODO parallel
-    StringBuilder errMsgBuilder = new StringBuilder();
-    for (Entry<SessionConnection, TSInsertRecordsReq> entry : 
recordsGroup.entrySet()) {
-      try {
-        entry.getKey().insertRecords(entry.getValue());
-      } catch (RedirectException e) {
-        for (Entry<String, EndPoint> deviceEndPointEntry : 
e.getDeviceEndPointMap().entrySet()) {
-          handleRedirection(deviceEndPointEntry.getKey(), 
deviceEndPointEntry.getValue());
-        }
-      } catch (StatementExecutionException e) {
-        errMsgBuilder.append(e.getMessage());
-      } catch (IoTDBConnectionException e) {
-        // remove the broken session
-        removeBrokenSessionConnection(entry.getKey());
-        throw e;
-      }
-    }
-    String errMsg = errMsgBuilder.toString();
-    if (!errMsg.isEmpty()) {
-      throw new StatementExecutionException(errMsg);
-    }
+    insertByGroup(recordsGroup, SessionConnection::insertRecords);
   }
 
   private TSInsertRecordsReq genTSInsertRecordsReq(
@@ -1435,15 +1406,8 @@ public class Session {
   public void insertTablet(Tablet tablet)
       throws StatementExecutionException, IoTDBConnectionException {
     TSInsertTabletReq request = genTSInsertTabletReq(tablet, false);
-    EndPoint endPoint;
     try {
-      if (enableCacheLeader
-          && !deviceIdToEndpoint.isEmpty()
-          && (endPoint = deviceIdToEndpoint.get(tablet.prefixPath)) != null) {
-        endPointToSessionConnection.get(endPoint).insertTablet(request);
-      } else {
-        defaultSessionConnection.insertTablet(request);
-      }
+      getSessionConnection(tablet.prefixPath).insertTablet(request);
     } catch (RedirectException e) {
       handleRedirection(tablet.prefixPath, e.getEndPoint());
     }
@@ -1458,15 +1422,8 @@ public class Session {
   public void insertTablet(Tablet tablet, boolean sorted)
       throws IoTDBConnectionException, StatementExecutionException {
     TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted);
-    EndPoint endPoint;
     try {
-      if (enableCacheLeader
-          && !deviceIdToEndpoint.isEmpty()
-          && (endPoint = deviceIdToEndpoint.get(tablet.prefixPath)) != null) {
-        endPointToSessionConnection.get(endPoint).insertTablet(request);
-      } else {
-        defaultSessionConnection.insertTablet(request);
-      }
+      getSessionConnection(tablet.prefixPath).insertTablet(request);
     } catch (RedirectException e) {
       handleRedirection(tablet.prefixPath, e.getEndPoint());
     }
@@ -1545,41 +1502,15 @@ public class Session {
 
   private void insertTabletsWithLeaderCache(Map<String, Tablet> tablets, 
boolean sorted)
       throws IoTDBConnectionException, StatementExecutionException {
-    EndPoint endPoint;
-    SessionConnection connection;
     Map<SessionConnection, TSInsertTabletsReq> tabletGroup = new HashMap<>();
     for (Entry<String, Tablet> entry : tablets.entrySet()) {
-      endPoint = deviceIdToEndpoint.isEmpty() ? null : 
deviceIdToEndpoint.get(entry.getKey());
-      if (endPoint != null) {
-        connection = endPointToSessionConnection.get(endPoint);
-      } else {
-        connection = defaultSessionConnection;
-      }
+      final SessionConnection connection = 
getSessionConnection(entry.getKey());
       TSInsertTabletsReq request =
           tabletGroup.computeIfAbsent(connection, k -> new 
TSInsertTabletsReq());
       updateTSInsertTabletsReq(request, entry.getValue(), sorted);
     }
 
-    // TODO parallel
-    StringBuilder errMsgBuilder = new StringBuilder();
-    for (Entry<SessionConnection, TSInsertTabletsReq> entry : 
tabletGroup.entrySet()) {
-      try {
-        entry.getKey().insertTablets(entry.getValue());
-      } catch (RedirectException e) {
-        for (Entry<String, EndPoint> deviceEndPointEntry : 
e.getDeviceEndPointMap().entrySet()) {
-          handleRedirection(deviceEndPointEntry.getKey(), 
deviceEndPointEntry.getValue());
-        }
-      } catch (StatementExecutionException e) {
-        errMsgBuilder.append(e.getMessage());
-      } catch (IoTDBConnectionException e) {
-        removeBrokenSessionConnection(entry.getKey());
-        throw e;
-      }
-    }
-    String errMsg = errMsgBuilder.toString();
-    if (!errMsg.isEmpty()) {
-      throw new StatementExecutionException(errMsg);
-    }
+    insertByGroup(tabletGroup, SessionConnection::insertTablets);
   }
 
   private TSInsertTabletsReq genTSInsertTabletsReq(List<Tablet> tablets, 
boolean sorted)
@@ -2090,6 +2021,72 @@ public class Session {
     return request;
   }
 
+  /**
+   * @param recordsGroup connection to record map
+   * @param insertConsumer insert function
+   * @param <T>
+   *     <ul>
+   *       <li>{@link TSInsertRecordsReq}
+   *       <li>{@link TSInsertStringRecordsReq}
+   *       <li>{@link TSInsertTabletsReq}
+   *     </ul>
+   *
+   * @throws IoTDBConnectionException
+   * @throws StatementExecutionException
+   */
+  private <T> void insertByGroup(
+      Map<SessionConnection, T> recordsGroup, InsertConsumer<T> insertConsumer)
+      throws IoTDBConnectionException, StatementExecutionException {
+    List<CompletableFuture<Void>> completableFutures =
+        recordsGroup.entrySet().stream()
+            .map(
+                entry -> {
+                  SessionConnection connection = entry.getKey();
+                  T recordsReq = entry.getValue();
+                  return CompletableFuture.runAsync(
+                      () -> {
+                        try {
+                          insertConsumer.insert(connection, recordsReq);
+                        } catch (RedirectException e) {
+                          e.getDeviceEndPointMap()
+                              .forEach(
+                                  (deviceId, endpoint) -> {
+                                    try {
+                                      handleRedirection(deviceId, endpoint);
+                                    } catch (IoTDBConnectionException 
ioTDBConnectionException) {
+                                      throw new 
CompletionException(ioTDBConnectionException);
+                                    }
+                                  });
+                        } catch (StatementExecutionException e) {
+                          throw new CompletionException(e);
+                        } catch (IoTDBConnectionException e) {
+                          // remove the broken session
+                          removeBrokenSessionConnection(connection);
+                          throw new CompletionException(e);
+                        }
+                      },
+                      OPERATION_EXECUTOR);
+                })
+            .collect(Collectors.toList());
+
+    StringBuilder errMsgBuilder = new StringBuilder();
+    for (CompletableFuture<Void> completableFuture : completableFutures) {
+      try {
+        completableFuture.join();
+      } catch (CompletionException completionException) {
+        Throwable cause = completionException.getCause();
+        if (cause instanceof IoTDBConnectionException) {
+          throw (IoTDBConnectionException) cause;
+        } else {
+          errMsgBuilder.append(cause.getMessage());
+        }
+      }
+    }
+    if (errMsgBuilder.length() > 0) {
+      throw new StatementExecutionException(errMsgBuilder.toString());
+    }
+  }
+
   public boolean isEnableQueryRedirection() {
     return enableQueryRedirection;
   }
diff --git 
a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java 
b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 80bb4af..9f39e7e 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -49,6 +49,7 @@ import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
 import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.session.util.SessionUtils;
 
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
diff --git 
a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java 
b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index fd8f8e7..f25d1c8 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -218,8 +218,8 @@ public class SessionPool {
             logger.debug("no more sessions can be created, wait... 
queue.size={}", queue.size());
           }
           this.wait(1000);
-          long time = waitToGetSessionTimeoutInMs < 60_000 ? 
waitToGetSessionTimeoutInMs : 60_000;
-          if (System.currentTimeMillis() - start > time) {
+          long timeOut = Math.min(waitToGetSessionTimeoutInMs, 60_000);
+          if (System.currentTimeMillis() - start > timeOut) {
             logger.warn(
                 "the SessionPool has wait for {} seconds to get a new 
connection: {}:{} with {}, {}",
                 (System.currentTimeMillis() - start) / 1000,
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionUtils.java 
b/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
similarity index 99%
rename from session/src/main/java/org/apache/iotdb/session/SessionUtils.java
rename to session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
index 69041e1..4f27d06 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionUtils.java
+++ b/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.session;
+package org.apache.iotdb.session.util;
 
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
diff --git 
a/session/src/main/java/org/apache/iotdb/session/util/ThreadUtils.java 
b/session/src/main/java/org/apache/iotdb/session/util/ThreadUtils.java
new file mode 100644
index 0000000..db989cd
--- /dev/null
+++ b/session/src/main/java/org/apache/iotdb/session/util/ThreadUtils.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session.util;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ThreadUtils {
+
+  /**
+   * @param threadNamePrefix thread name prefix, the thread name will be 
"prefix-num"
+   * @param isDaemon if true, the thread be created will be daemon
+   * @return
+   */
+  public static ThreadFactory createThreadFactory(String threadNamePrefix, 
boolean isDaemon) {
+    return new ThreadFactory() {
+      private final AtomicInteger THREAD_COUNT = new AtomicInteger(0);
+
+      @Override
+      public Thread newThread(Runnable r) {
+        Thread thread = new Thread(r);
+        thread.setName(String.format("%s-%d", threadNamePrefix, 
THREAD_COUNT.getAndIncrement()));
+        thread.setDaemon(isDaemon);
+        return thread;
+      }
+    };
+  }
+}
diff --git 
a/session/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java 
b/session/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java
new file mode 100644
index 0000000..202e60b
--- /dev/null
+++ b/session/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.ThreadFactory;
+
+public class ThreadUtilsTest {
+
+  @Test
+  public void createThreadFactory() {
+    ThreadFactory daemonThreadFactory = 
ThreadUtils.createThreadFactory("Test", true);
+    Thread thread = daemonThreadFactory.newThread(() -> {});
+    Assert.assertEquals("Test-0", thread.getName());
+  }
+}

Reply via email to