This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new e8ca7b8  [ISSUE-4072] Parallel insert records in Session (#4073) 
(#4169)
e8ca7b8 is described below

commit e8ca7b8fec796af7a837a5cc6be39065fd074f59
Author: Wenjun Ruan <[email protected]>
AuthorDate: Sat Oct 16 02:02:51 2021 +0800

    [ISSUE-4072] Parallel insert records in Session (#4073) (#4169)
---
 .../main/java/org/apache/iotdb/session/Config.java |   4 +
 .../session/{Config.java => InsertConsumer.java}   |  24 +--
 .../java/org/apache/iotdb/session/Session.java     | 237 ++++++++++-----------
 .../apache/iotdb/session/SessionConnection.java    |   1 +
 .../org/apache/iotdb/session/pool/SessionPool.java |   4 +-
 .../iotdb/session/{ => util}/SessionUtils.java     |   2 +-
 .../session/{Config.java => util/ThreadUtils.java} |  34 +--
 .../iotdb/session/util/ThreadUtilsTest.java}       |  24 +--
 8 files changed, 166 insertions(+), 164 deletions(-)

diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java 
b/session/src/main/java/org/apache/iotdb/session/Config.java
index 02e7e70..52c9543 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/session/src/main/java/org/apache/iotdb/session/Config.java
@@ -26,6 +26,10 @@ public class Config {
   public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 0;
   public static final boolean DEFAULT_CACHE_LEADER_MODE = true;
 
+  public static final int CPU_CORES = 
Runtime.getRuntime().availableProcessors();
+  public static final int DEFAULT_SESSION_EXECUTOR_THREAD_NUM = 2 * CPU_CORES;
+  public static final int DEFAULT_SESSION_EXECUTOR_TASK_NUM = 1_000;
+
   public static final int RETRY_NUM = 3;
   public static final long RETRY_INTERVAL_MS = 1000;
 
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java 
b/session/src/main/java/org/apache/iotdb/session/InsertConsumer.java
similarity index 52%
copy from session/src/main/java/org/apache/iotdb/session/Config.java
copy to session/src/main/java/org/apache/iotdb/session/InsertConsumer.java
index 02e7e70..dcc655c 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/session/src/main/java/org/apache/iotdb/session/InsertConsumer.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,22 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.session;
-
-public class Config {
 
-  public static final String DEFAULT_USER = "root";
-  public static final String DEFAULT_PASSWORD = "root";
-  public static final int DEFAULT_FETCH_SIZE = 5000;
-  public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 0;
-  public static final boolean DEFAULT_CACHE_LEADER_MODE = true;
+package org.apache.iotdb.session;
 
-  public static final int RETRY_NUM = 3;
-  public static final long RETRY_INTERVAL_MS = 1000;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.RedirectException;
+import org.apache.iotdb.rpc.StatementExecutionException;
 
-  /** thrift init buffer size, 1KB by default */
-  public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024;
+@FunctionalInterface
+public interface InsertConsumer<T> {
 
-  /** thrift max frame size (16384000 bytes by default), we change it to 64MB 
*/
-  public static final int DEFAULT_MAX_FRAME_SIZE = 67108864;
+  void insert(SessionConnection connection, T record)
+      throws IoTDBConnectionException, StatementExecutionException, 
RedirectException;
 }
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java 
b/session/src/main/java/org/apache/iotdb/session/Session.java
index 06b3536..8c3ee40 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -36,6 +36,8 @@ import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
 import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
 import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
+import org.apache.iotdb.session.util.SessionUtils;
+import org.apache.iotdb.session.util.ThreadUtils;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -60,6 +62,12 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
@@ -72,6 +80,14 @@ public class Session {
   public static final String MSG_UNSUPPORTED_DATA_TYPE = "Unsupported data 
type:";
   public static final String MSG_DONOT_ENABLE_REDIRECT =
       "Query do not enable redirect," + " please confirm the session and 
server conf.";
+  private static final ThreadPoolExecutor OPERATION_EXECUTOR =
+      new ThreadPoolExecutor(
+          Config.DEFAULT_SESSION_EXECUTOR_THREAD_NUM,
+          Config.DEFAULT_SESSION_EXECUTOR_THREAD_NUM,
+          0,
+          TimeUnit.MILLISECONDS,
+          new LinkedBlockingQueue<>(Config.DEFAULT_SESSION_EXECUTOR_TASK_NUM),
+          ThreadUtils.createThreadFactory("SessionExecutor", true));
   protected List<String> nodeUrls;
   protected String username;
   protected String password;
@@ -97,9 +113,8 @@ public class Session {
   // Cluster version cache
   protected boolean enableCacheLeader;
   protected SessionConnection metaSessionConnection;
-  protected Map<String, EndPoint> deviceIdToEndpoint;
-  protected Map<EndPoint, SessionConnection> endPointToSessionConnection;
-  private AtomicReference<IoTDBConnectionException> tmp = new 
AtomicReference<>();
+  protected volatile Map<String, EndPoint> deviceIdToEndpoint;
+  protected volatile Map<EndPoint, SessionConnection> 
endPointToSessionConnection;
 
   protected boolean enableQueryRedirection = false;
 
@@ -327,8 +342,8 @@ public class Session {
     metaSessionConnection = defaultSessionConnection;
     isClosed = false;
     if (enableCacheLeader || enableQueryRedirection) {
-      deviceIdToEndpoint = new HashMap<>();
-      endPointToSessionConnection = new HashMap<>();
+      deviceIdToEndpoint = new ConcurrentHashMap<>();
+      endPointToSessionConnection = new ConcurrentHashMap<>();
       endPointToSessionConnection.put(defaultEndPoint, 
defaultSessionConnection);
     }
   }
@@ -771,6 +786,7 @@ public class Session {
       throws IoTDBConnectionException {
     if (enableCacheLeader) {
       logger.debug("storageGroup[{}]:{}", storageGroup, e.getMessage());
+      AtomicReference<IoTDBConnectionException> exceptionReference = new 
AtomicReference<>();
       SessionConnection connection =
           endPointToSessionConnection.computeIfAbsent(
               e.getEndPoint(),
@@ -778,12 +794,12 @@ public class Session {
                 try {
                   return constructSessionConnection(this, e.getEndPoint(), 
zoneId);
                 } catch (IoTDBConnectionException ex) {
-                  tmp.set(ex);
+                  exceptionReference.set(ex);
                   return null;
                 }
               });
       if (connection == null) {
-        throw new IoTDBConnectionException(tmp.get());
+        throw new IoTDBConnectionException(exceptionReference.get());
       }
       metaSessionConnection = connection;
     }
@@ -792,6 +808,7 @@ public class Session {
   private void handleRedirection(String deviceId, EndPoint endpoint)
       throws IoTDBConnectionException {
     if (enableCacheLeader) {
+      AtomicReference<IoTDBConnectionException> exceptionReference = new 
AtomicReference<>();
       deviceIdToEndpoint.put(deviceId, endpoint);
       SessionConnection connection =
           endPointToSessionConnection.computeIfAbsent(
@@ -800,18 +817,20 @@ public class Session {
                 try {
                   return constructSessionConnection(this, endpoint, zoneId);
                 } catch (IoTDBConnectionException ex) {
-                  tmp.set(ex);
+                  exceptionReference.set(ex);
                   return null;
                 }
               });
       if (connection == null) {
-        throw new IoTDBConnectionException(tmp.get());
+        deviceIdToEndpoint.remove(deviceId);
+        throw new IoTDBConnectionException(exceptionReference.get());
       }
     }
   }
 
   private void handleQueryRedirection(EndPoint endPoint) throws 
IoTDBConnectionException {
     if (enableQueryRedirection) {
+      AtomicReference<IoTDBConnectionException> exceptionReference = new 
AtomicReference<>();
       SessionConnection connection =
           endPointToSessionConnection.computeIfAbsent(
               endPoint,
@@ -822,12 +841,12 @@ public class Session {
                   sessionConnection.setEnableRedirect(enableQueryRedirection);
                   return sessionConnection;
                 } catch (IoTDBConnectionException ex) {
-                  tmp.set(ex);
+                  exceptionReference.set(ex);
                   return null;
                 }
               });
       if (connection == null) {
-        throw new IoTDBConnectionException(tmp.get());
+        throw new IoTDBConnectionException(exceptionReference.get());
       }
       defaultSessionConnection = connection;
     }
@@ -933,41 +952,15 @@ public class Session {
       List<List<String>> valuesList)
       throws IoTDBConnectionException, StatementExecutionException {
     Map<SessionConnection, TSInsertStringRecordsReq> recordsGroup = new 
HashMap<>();
-    EndPoint endPoint;
-    SessionConnection connection;
     for (int i = 0; i < deviceIds.size(); i++) {
-      endPoint = deviceIdToEndpoint.isEmpty() ? null : 
deviceIdToEndpoint.get(deviceIds.get(i));
-      if (endPoint != null) {
-        connection = endPointToSessionConnection.get(endPoint);
-      } else {
-        connection = defaultSessionConnection;
-      }
+      final SessionConnection connection = 
getSessionConnection(deviceIds.get(i));
       TSInsertStringRecordsReq request =
           recordsGroup.computeIfAbsent(connection, k -> new 
TSInsertStringRecordsReq());
       updateTSInsertStringRecordsReq(
           request, deviceIds.get(i), times.get(i), measurementsList.get(i), 
valuesList.get(i));
     }
-    // TODO parallel
-    StringBuilder errMsgBuilder = new StringBuilder();
-    for (Entry<SessionConnection, TSInsertStringRecordsReq> entry : 
recordsGroup.entrySet()) {
-      try {
-        entry.getKey().insertRecords(entry.getValue());
-      } catch (RedirectException e) {
-        for (Entry<String, EndPoint> deviceEndPointEntry : 
e.getDeviceEndPointMap().entrySet()) {
-          handleRedirection(deviceEndPointEntry.getKey(), 
deviceEndPointEntry.getValue());
-        }
-      } catch (StatementExecutionException e) {
-        errMsgBuilder.append(e.getMessage());
-      } catch (IoTDBConnectionException e) {
-        // remove the broken session
-        removeBrokenSessionConnection(entry.getKey());
-        throw e;
-      }
-    }
-    String errMsg = errMsgBuilder.toString();
-    if (!errMsg.isEmpty()) {
-      throw new StatementExecutionException(errMsg);
-    }
+
+    insertByGroup(recordsGroup, SessionConnection::insertRecords);
   }
 
   private TSInsertStringRecordsReq genTSInsertStringRecordsReq(
@@ -1126,13 +1119,18 @@ public class Session {
     return request;
   }
 
-  @SuppressWarnings("squid:S3740")
-  private List sortList(List source, Integer[] index) {
-    Object[] result = new Object[source.size()];
-    for (int i = 0; i < index.length; i++) {
-      result[i] = source.get(index[i]);
-    }
-    return Arrays.asList(result);
+  /**
+   * Sort the input source list.
+   *
+   * <p>e.g. source: [1,2,3,4,5], index:[1,0,3,2,4], return : [2,1,4,3,5]
+   *
+   * @param source Input list
+   * @param index retuen order
+   * @param <T> Input type
+   * @return ordered list
+   */
+  private static <T> List<T> sortList(List<T> source, Integer[] index) {
+    return Arrays.stream(index).map(source::get).collect(Collectors.toList());
   }
 
   private List<ByteBuffer> objectValuesListToByteBufferList(
@@ -1155,15 +1153,8 @@ public class Session {
       List<List<Object>> valuesList)
       throws IoTDBConnectionException, StatementExecutionException {
     Map<SessionConnection, TSInsertRecordsReq> recordsGroup = new HashMap<>();
-    EndPoint endPoint;
-    SessionConnection connection;
     for (int i = 0; i < deviceIds.size(); i++) {
-      endPoint = deviceIdToEndpoint.isEmpty() ? null : 
deviceIdToEndpoint.get(deviceIds.get(i));
-      if (endPoint != null) {
-        connection = endPointToSessionConnection.get(endPoint);
-      } else {
-        connection = defaultSessionConnection;
-      }
+      final SessionConnection connection = 
getSessionConnection(deviceIds.get(i));
       TSInsertRecordsReq request =
           recordsGroup.computeIfAbsent(connection, k -> new 
TSInsertRecordsReq());
       updateTSInsertRecordsReq(
@@ -1174,27 +1165,7 @@ public class Session {
           typesList.get(i),
           valuesList.get(i));
     }
-    // TODO parallel
-    StringBuilder errMsgBuilder = new StringBuilder();
-    for (Entry<SessionConnection, TSInsertRecordsReq> entry : 
recordsGroup.entrySet()) {
-      try {
-        entry.getKey().insertRecords(entry.getValue());
-      } catch (RedirectException e) {
-        for (Entry<String, EndPoint> deviceEndPointEntry : 
e.getDeviceEndPointMap().entrySet()) {
-          handleRedirection(deviceEndPointEntry.getKey(), 
deviceEndPointEntry.getValue());
-        }
-      } catch (StatementExecutionException e) {
-        errMsgBuilder.append(e.getMessage());
-      } catch (IoTDBConnectionException e) {
-        // remove the broken session
-        removeBrokenSessionConnection(entry.getKey());
-        throw e;
-      }
-    }
-    String errMsg = errMsgBuilder.toString();
-    if (!errMsg.isEmpty()) {
-      throw new StatementExecutionException(errMsg);
-    }
+    insertByGroup(recordsGroup, SessionConnection::insertRecords);
   }
 
   private TSInsertRecordsReq genTSInsertRecordsReq(
@@ -1241,15 +1212,8 @@ public class Session {
   public void insertTablet(Tablet tablet)
       throws StatementExecutionException, IoTDBConnectionException {
     TSInsertTabletReq request = genTSInsertTabletReq(tablet, false);
-    EndPoint endPoint;
     try {
-      if (enableCacheLeader
-          && !deviceIdToEndpoint.isEmpty()
-          && (endPoint = deviceIdToEndpoint.get(tablet.deviceId)) != null) {
-        endPointToSessionConnection.get(endPoint).insertTablet(request);
-      } else {
-        defaultSessionConnection.insertTablet(request);
-      }
+      getSessionConnection(tablet.deviceId).insertTablet(request);
     } catch (RedirectException e) {
       handleRedirection(tablet.deviceId, e.getEndPoint());
     }
@@ -1264,15 +1228,8 @@ public class Session {
   public void insertTablet(Tablet tablet, boolean sorted)
       throws IoTDBConnectionException, StatementExecutionException {
     TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted);
-    EndPoint endPoint;
     try {
-      if (enableCacheLeader
-          && !deviceIdToEndpoint.isEmpty()
-          && (endPoint = deviceIdToEndpoint.get(tablet.deviceId)) != null) {
-        endPointToSessionConnection.get(endPoint).insertTablet(request);
-      } else {
-        defaultSessionConnection.insertTablet(request);
-      }
+      getSessionConnection(tablet.deviceId).insertTablet(request);
     } catch (RedirectException e) {
       handleRedirection(tablet.deviceId, e.getEndPoint());
     }
@@ -1334,41 +1291,15 @@ public class Session {
 
   private void insertTabletsWithLeaderCache(Map<String, Tablet> tablets, 
boolean sorted)
       throws IoTDBConnectionException, StatementExecutionException {
-    EndPoint endPoint;
-    SessionConnection connection;
     Map<SessionConnection, TSInsertTabletsReq> tabletGroup = new HashMap<>();
     for (Entry<String, Tablet> entry : tablets.entrySet()) {
-      endPoint = deviceIdToEndpoint.isEmpty() ? null : 
deviceIdToEndpoint.get(entry.getKey());
-      if (endPoint != null) {
-        connection = endPointToSessionConnection.get(endPoint);
-      } else {
-        connection = defaultSessionConnection;
-      }
+      final SessionConnection connection = 
getSessionConnection(entry.getKey());
       TSInsertTabletsReq request =
           tabletGroup.computeIfAbsent(connection, k -> new 
TSInsertTabletsReq());
       updateTSInsertTabletsReq(request, entry.getValue(), sorted);
     }
 
-    // TODO parallel
-    StringBuilder errMsgBuilder = new StringBuilder();
-    for (Entry<SessionConnection, TSInsertTabletsReq> entry : 
tabletGroup.entrySet()) {
-      try {
-        entry.getKey().insertTablets(entry.getValue());
-      } catch (RedirectException e) {
-        for (Entry<String, EndPoint> deviceEndPointEntry : 
e.getDeviceEndPointMap().entrySet()) {
-          handleRedirection(deviceEndPointEntry.getKey(), 
deviceEndPointEntry.getValue());
-        }
-      } catch (StatementExecutionException e) {
-        errMsgBuilder.append(e.getMessage());
-      } catch (IoTDBConnectionException e) {
-        removeBrokenSessionConnection(entry.getKey());
-        throw e;
-      }
-    }
-    String errMsg = errMsgBuilder.toString();
-    if (!errMsg.isEmpty()) {
-      throw new StatementExecutionException(errMsg);
-    }
+    insertByGroup(tabletGroup, SessionConnection::insertTablets);
   }
 
   private TSInsertTabletsReq genTSInsertTabletsReq(List<Tablet> tablets, 
boolean sorted)
@@ -1738,6 +1669,72 @@ public class Session {
     }
   }
 
+  /**
+   * @param recordsGroup connection to record map
+   * @param insertConsumer insert function
+   * @param <T>
+   *     <ul>
+   *       <li>{@link TSInsertRecordsReq}
+   *       <li>{@link TSInsertStringRecordsReq}
+   *       <li>{@link TSInsertTabletsReq}
+   *     </ul>
+   *
+   * @throws IoTDBConnectionException
+   * @throws StatementExecutionException
+   */
+  private <T> void insertByGroup(
+      Map<SessionConnection, T> recordsGroup, InsertConsumer<T> insertConsumer)
+      throws IoTDBConnectionException, StatementExecutionException {
+    List<CompletableFuture<Void>> completableFutures =
+        recordsGroup.entrySet().stream()
+            .map(
+                entry -> {
+                  SessionConnection connection = entry.getKey();
+                  T recordsReq = entry.getValue();
+                  return CompletableFuture.runAsync(
+                      () -> {
+                        try {
+                          insertConsumer.insert(connection, recordsReq);
+                        } catch (RedirectException e) {
+                          e.getDeviceEndPointMap()
+                              .forEach(
+                                  (deviceId, endpoint) -> {
+                                    try {
+                                      handleRedirection(deviceId, endpoint);
+                                    } catch (IoTDBConnectionException 
ioTDBConnectionException) {
+                                      throw new 
CompletionException(ioTDBConnectionException);
+                                    }
+                                  });
+                        } catch (StatementExecutionException e) {
+                          throw new CompletionException(e);
+                        } catch (IoTDBConnectionException e) {
+                          // remove the broken session
+                          removeBrokenSessionConnection(connection);
+                          throw new CompletionException(e);
+                        }
+                      },
+                      OPERATION_EXECUTOR);
+                })
+            .collect(Collectors.toList());
+
+    StringBuilder errMsgBuilder = new StringBuilder();
+    for (CompletableFuture<Void> completableFuture : completableFutures) {
+      try {
+        completableFuture.join();
+      } catch (CompletionException completionException) {
+        Throwable cause = completionException.getCause();
+        if (cause instanceof IoTDBConnectionException) {
+          throw (IoTDBConnectionException) cause;
+        } else {
+          errMsgBuilder.append(cause.getMessage());
+        }
+      }
+    }
+    if (errMsgBuilder.length() > 0) {
+      throw new StatementExecutionException(errMsgBuilder.toString());
+    }
+  }
+
   public boolean isEnableQueryRedirection() {
     return enableQueryRedirection;
   }
diff --git 
a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java 
b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 561cf97..42d19d2 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
 import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.session.util.SessionUtils;
 
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
diff --git 
a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java 
b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index d21bf39..3afdbe7 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -218,8 +218,8 @@ public class SessionPool {
             logger.debug("no more sessions can be created, wait... 
queue.size={}", queue.size());
           }
           this.wait(1000);
-          long time = waitToGetSessionTimeoutInMs < 60_000 ? 
waitToGetSessionTimeoutInMs : 60_000;
-          if (System.currentTimeMillis() - start > time) {
+          long timeOut = Math.min(waitToGetSessionTimeoutInMs, 60_000);
+          if (System.currentTimeMillis() - start > timeOut) {
             logger.warn(
                 "the SessionPool has wait for {} seconds to get a new 
connection: {}:{} with {}, {}",
                 (System.currentTimeMillis() - start) / 1000,
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionUtils.java 
b/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
similarity index 99%
rename from session/src/main/java/org/apache/iotdb/session/SessionUtils.java
rename to session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
index 66620c8..5f2d553 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionUtils.java
+++ b/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.session;
+package org.apache.iotdb.session.util;
 
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java 
b/session/src/main/java/org/apache/iotdb/session/util/ThreadUtils.java
similarity index 50%
copy from session/src/main/java/org/apache/iotdb/session/Config.java
copy to session/src/main/java/org/apache/iotdb/session/util/ThreadUtils.java
index 02e7e70..db989cd 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/session/src/main/java/org/apache/iotdb/session/util/ThreadUtils.java
@@ -16,22 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.session;
 
-public class Config {
+package org.apache.iotdb.session.util;
 
-  public static final String DEFAULT_USER = "root";
-  public static final String DEFAULT_PASSWORD = "root";
-  public static final int DEFAULT_FETCH_SIZE = 5000;
-  public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 0;
-  public static final boolean DEFAULT_CACHE_LEADER_MODE = true;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
 
-  public static final int RETRY_NUM = 3;
-  public static final long RETRY_INTERVAL_MS = 1000;
+public class ThreadUtils {
 
-  /** thrift init buffer size, 1KB by default */
-  public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024;
+  /**
+   * @param threadNamePrefix thread name prefix, the thread name will be 
"prefix-num"
+   * @param isDaemon if true, the thread be created will be daemon
+   * @return
+   */
+  public static ThreadFactory createThreadFactory(String threadNamePrefix, 
boolean isDaemon) {
+    return new ThreadFactory() {
+      private final AtomicInteger THREAD_COUNT = new AtomicInteger(0);
 
-  /** thrift max frame size (16384000 bytes by default), we change it to 64MB 
*/
-  public static final int DEFAULT_MAX_FRAME_SIZE = 67108864;
+      @Override
+      public Thread newThread(Runnable r) {
+        Thread thread = new Thread(r);
+        thread.setName(String.format("%s-%d", threadNamePrefix, 
THREAD_COUNT.getAndIncrement()));
+        thread.setDaemon(isDaemon);
+        return thread;
+      }
+    };
+  }
 }
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java 
b/session/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java
similarity index 54%
copy from session/src/main/java/org/apache/iotdb/session/Config.java
copy to session/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java
index 02e7e70..202e60b 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/session/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java
@@ -16,22 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.session;
 
-public class Config {
+package org.apache.iotdb.session.util;
 
-  public static final String DEFAULT_USER = "root";
-  public static final String DEFAULT_PASSWORD = "root";
-  public static final int DEFAULT_FETCH_SIZE = 5000;
-  public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 0;
-  public static final boolean DEFAULT_CACHE_LEADER_MODE = true;
+import org.junit.Assert;
+import org.junit.Test;
 
-  public static final int RETRY_NUM = 3;
-  public static final long RETRY_INTERVAL_MS = 1000;
+import java.util.concurrent.ThreadFactory;
 
-  /** thrift init buffer size, 1KB by default */
-  public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024;
+public class ThreadUtilsTest {
 
-  /** thrift max frame size (16384000 bytes by default), we change it to 64MB 
*/
-  public static final int DEFAULT_MAX_FRAME_SIZE = 67108864;
+  @Test
+  public void createThreadFactory() {
+    ThreadFactory daemonThreadFactory = 
ThreadUtils.createThreadFactory("Test", true);
+    Thread thread = daemonThreadFactory.newThread(() -> {});
+    Assert.assertEquals("Test-0", thread.getName());
+  }
 }

Reply via email to