This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 14bf48c [ISSUE-4072] Parallel insert records in Session (#4073)
14bf48c is described below
commit 14bf48cf0abaa7ec207de5cde7ff64f2b5660973
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu Oct 14 16:11:59 2021 +0800
[ISSUE-4072] Parallel insert records in Session (#4073)
---
.../main/java/org/apache/iotdb/session/Config.java | 4 +
.../org/apache/iotdb/session/InsertConsumer.java | 31 +++
.../java/org/apache/iotdb/session/Session.java | 237 ++++++++++-----------
.../apache/iotdb/session/SessionConnection.java | 1 +
.../org/apache/iotdb/session/pool/SessionPool.java | 4 +-
.../iotdb/session/{ => util}/SessionUtils.java | 2 +-
.../org/apache/iotdb/session/util/ThreadUtils.java | 45 ++++
.../apache/iotdb/session/util/ThreadUtilsTest.java | 35 +++
8 files changed, 236 insertions(+), 123 deletions(-)
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java
b/session/src/main/java/org/apache/iotdb/session/Config.java
index 7b1a636..f56aa44 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/session/src/main/java/org/apache/iotdb/session/Config.java
@@ -28,6 +28,10 @@ public class Config {
public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 0;
public static final boolean DEFAULT_CACHE_LEADER_MODE = true;
+ public static final int CPU_CORES =
Runtime.getRuntime().availableProcessors();
+ public static final int DEFAULT_SESSION_EXECUTOR_THREAD_NUM = 2 * CPU_CORES;
+ public static final int DEFAULT_SESSION_EXECUTOR_TASK_NUM = 1_000;
+
public static final int RETRY_NUM = 3;
public static final long RETRY_INTERVAL_MS = 1000;
diff --git a/session/src/main/java/org/apache/iotdb/session/InsertConsumer.java
b/session/src/main/java/org/apache/iotdb/session/InsertConsumer.java
new file mode 100644
index 0000000..dcc655c
--- /dev/null
+++ b/session/src/main/java/org/apache/iotdb/session/InsertConsumer.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.RedirectException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+
+@FunctionalInterface
+public interface InsertConsumer<T> {
+
+ void insert(SessionConnection connection, T record)
+ throws IoTDBConnectionException, StatementExecutionException,
RedirectException;
+}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java
b/session/src/main/java/org/apache/iotdb/session/Session.java
index 8673f7b..48833aa 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -37,6 +37,8 @@ import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
+import org.apache.iotdb.session.util.SessionUtils;
+import org.apache.iotdb.session.util.ThreadUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -63,6 +65,12 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -75,6 +83,14 @@ public class Session {
public static final String MSG_UNSUPPORTED_DATA_TYPE = "Unsupported data
type:";
public static final String MSG_DONOT_ENABLE_REDIRECT =
"Query do not enable redirect," + " please confirm the session and
server conf.";
+ private static final ThreadPoolExecutor OPERATION_EXECUTOR =
+ new ThreadPoolExecutor(
+ Config.DEFAULT_SESSION_EXECUTOR_THREAD_NUM,
+ Config.DEFAULT_SESSION_EXECUTOR_THREAD_NUM,
+ 0,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(Config.DEFAULT_SESSION_EXECUTOR_TASK_NUM),
+ ThreadUtils.createThreadFactory("SessionExecutor", true));
protected List<String> nodeUrls;
protected String username;
protected String password;
@@ -100,9 +116,8 @@ public class Session {
// Cluster version cache
protected boolean enableCacheLeader;
protected SessionConnection metaSessionConnection;
- protected Map<String, EndPoint> deviceIdToEndpoint;
- protected Map<EndPoint, SessionConnection> endPointToSessionConnection;
- private AtomicReference<IoTDBConnectionException> tmp = new
AtomicReference<>();
+ protected volatile Map<String, EndPoint> deviceIdToEndpoint;
+ protected volatile Map<EndPoint, SessionConnection>
endPointToSessionConnection;
protected boolean enableQueryRedirection = false;
@@ -335,8 +350,8 @@ public class Session {
metaSessionConnection = defaultSessionConnection;
isClosed = false;
if (enableCacheLeader || enableQueryRedirection) {
- deviceIdToEndpoint = new HashMap<>();
- endPointToSessionConnection = new HashMap<>();
+ deviceIdToEndpoint = new ConcurrentHashMap<>();
+ endPointToSessionConnection = new ConcurrentHashMap<>();
endPointToSessionConnection.put(defaultEndPoint,
defaultSessionConnection);
}
}
@@ -774,6 +789,7 @@ public class Session {
throws IoTDBConnectionException {
if (enableCacheLeader) {
logger.debug("storageGroup[{}]:{}", storageGroup, e.getMessage());
+ AtomicReference<IoTDBConnectionException> exceptionReference = new
AtomicReference<>();
SessionConnection connection =
endPointToSessionConnection.computeIfAbsent(
e.getEndPoint(),
@@ -781,12 +797,12 @@ public class Session {
try {
return constructSessionConnection(this, e.getEndPoint(),
zoneId);
} catch (IoTDBConnectionException ex) {
- tmp.set(ex);
+ exceptionReference.set(ex);
return null;
}
});
if (connection == null) {
- throw new IoTDBConnectionException(tmp.get());
+ throw new IoTDBConnectionException(exceptionReference.get());
}
metaSessionConnection = connection;
}
@@ -795,6 +811,7 @@ public class Session {
private void handleRedirection(String deviceId, EndPoint endpoint)
throws IoTDBConnectionException {
if (enableCacheLeader) {
+ AtomicReference<IoTDBConnectionException> exceptionReference = new
AtomicReference<>();
deviceIdToEndpoint.put(deviceId, endpoint);
SessionConnection connection =
endPointToSessionConnection.computeIfAbsent(
@@ -803,18 +820,20 @@ public class Session {
try {
return constructSessionConnection(this, endpoint, zoneId);
} catch (IoTDBConnectionException ex) {
- tmp.set(ex);
+ exceptionReference.set(ex);
return null;
}
});
if (connection == null) {
- throw new IoTDBConnectionException(tmp.get());
+ deviceIdToEndpoint.remove(deviceId);
+ throw new IoTDBConnectionException(exceptionReference.get());
}
}
}
private void handleQueryRedirection(EndPoint endPoint) throws
IoTDBConnectionException {
if (enableQueryRedirection) {
+ AtomicReference<IoTDBConnectionException> exceptionReference = new
AtomicReference<>();
SessionConnection connection =
endPointToSessionConnection.computeIfAbsent(
endPoint,
@@ -825,12 +844,12 @@ public class Session {
sessionConnection.setEnableRedirect(enableQueryRedirection);
return sessionConnection;
} catch (IoTDBConnectionException ex) {
- tmp.set(ex);
+ exceptionReference.set(ex);
return null;
}
});
if (connection == null) {
- throw new IoTDBConnectionException(tmp.get());
+ throw new IoTDBConnectionException(exceptionReference.get());
}
defaultSessionConnection = connection;
}
@@ -1018,42 +1037,16 @@ public class Session {
boolean isAligned)
throws IoTDBConnectionException, StatementExecutionException {
Map<SessionConnection, TSInsertStringRecordsReq> recordsGroup = new
HashMap<>();
- EndPoint endPoint;
- SessionConnection connection;
for (int i = 0; i < deviceIds.size(); i++) {
- endPoint = deviceIdToEndpoint.isEmpty() ? null :
deviceIdToEndpoint.get(deviceIds.get(i));
- if (endPoint != null) {
- connection = endPointToSessionConnection.get(endPoint);
- } else {
- connection = defaultSessionConnection;
- }
+ final SessionConnection connection =
getSessionConnection(deviceIds.get(i));
TSInsertStringRecordsReq request =
recordsGroup.computeIfAbsent(connection, k -> new
TSInsertStringRecordsReq());
request.setIsAligned(isAligned);
updateTSInsertStringRecordsReq(
request, deviceIds.get(i), times.get(i), measurementsList.get(i),
valuesList.get(i));
}
- // TODO parallel
- StringBuilder errMsgBuilder = new StringBuilder();
- for (Entry<SessionConnection, TSInsertStringRecordsReq> entry :
recordsGroup.entrySet()) {
- try {
- entry.getKey().insertRecords(entry.getValue());
- } catch (RedirectException e) {
- for (Entry<String, EndPoint> deviceEndPointEntry :
e.getDeviceEndPointMap().entrySet()) {
- handleRedirection(deviceEndPointEntry.getKey(),
deviceEndPointEntry.getValue());
- }
- } catch (StatementExecutionException e) {
- errMsgBuilder.append(e.getMessage());
- } catch (IoTDBConnectionException e) {
- // remove the broken session
- removeBrokenSessionConnection(entry.getKey());
- throw e;
- }
- }
- String errMsg = errMsgBuilder.toString();
- if (!errMsg.isEmpty()) {
- throw new StatementExecutionException(errMsg);
- }
+
+ insertByGroup(recordsGroup, SessionConnection::insertRecords);
}
private TSInsertStringRecordsReq genTSInsertStringRecordsReq(
@@ -1316,13 +1309,18 @@ public class Session {
return request;
}
- @SuppressWarnings("squid:S3740")
- private List sortList(List source, Integer[] index) {
- Object[] result = new Object[source.size()];
- for (int i = 0; i < index.length; i++) {
- result[i] = source.get(index[i]);
- }
- return Arrays.asList(result);
+ /**
+ * Sort the input source list.
+ *
+ * <p>e.g. source: [1,2,3,4,5], index:[1,0,3,2,4], return : [2,1,4,3,5]
+ *
+ * @param source Input list
+ * @param index retuen order
+ * @param <T> Input type
+ * @return ordered list
+ */
+ private static <T> List<T> sortList(List<T> source, Integer[] index) {
+ return Arrays.stream(index).map(source::get).collect(Collectors.toList());
}
private List<ByteBuffer> objectValuesListToByteBufferList(
@@ -1346,15 +1344,8 @@ public class Session {
boolean isAligned)
throws IoTDBConnectionException, StatementExecutionException {
Map<SessionConnection, TSInsertRecordsReq> recordsGroup = new HashMap<>();
- EndPoint endPoint;
- SessionConnection connection;
for (int i = 0; i < deviceIds.size(); i++) {
- endPoint = deviceIdToEndpoint.isEmpty() ? null :
deviceIdToEndpoint.get(deviceIds.get(i));
- if (endPoint != null) {
- connection = endPointToSessionConnection.get(endPoint);
- } else {
- connection = defaultSessionConnection;
- }
+ final SessionConnection connection =
getSessionConnection(deviceIds.get(i));
TSInsertRecordsReq request =
recordsGroup.computeIfAbsent(connection, k -> new
TSInsertRecordsReq());
request.setIsAligned(isAligned);
@@ -1366,27 +1357,7 @@ public class Session {
typesList.get(i),
valuesList.get(i));
}
- // TODO parallel
- StringBuilder errMsgBuilder = new StringBuilder();
- for (Entry<SessionConnection, TSInsertRecordsReq> entry :
recordsGroup.entrySet()) {
- try {
- entry.getKey().insertRecords(entry.getValue());
- } catch (RedirectException e) {
- for (Entry<String, EndPoint> deviceEndPointEntry :
e.getDeviceEndPointMap().entrySet()) {
- handleRedirection(deviceEndPointEntry.getKey(),
deviceEndPointEntry.getValue());
- }
- } catch (StatementExecutionException e) {
- errMsgBuilder.append(e.getMessage());
- } catch (IoTDBConnectionException e) {
- // remove the broken session
- removeBrokenSessionConnection(entry.getKey());
- throw e;
- }
- }
- String errMsg = errMsgBuilder.toString();
- if (!errMsg.isEmpty()) {
- throw new StatementExecutionException(errMsg);
- }
+ insertByGroup(recordsGroup, SessionConnection::insertRecords);
}
private TSInsertRecordsReq genTSInsertRecordsReq(
@@ -1435,15 +1406,8 @@ public class Session {
public void insertTablet(Tablet tablet)
throws StatementExecutionException, IoTDBConnectionException {
TSInsertTabletReq request = genTSInsertTabletReq(tablet, false);
- EndPoint endPoint;
try {
- if (enableCacheLeader
- && !deviceIdToEndpoint.isEmpty()
- && (endPoint = deviceIdToEndpoint.get(tablet.prefixPath)) != null) {
- endPointToSessionConnection.get(endPoint).insertTablet(request);
- } else {
- defaultSessionConnection.insertTablet(request);
- }
+ getSessionConnection(tablet.prefixPath).insertTablet(request);
} catch (RedirectException e) {
handleRedirection(tablet.prefixPath, e.getEndPoint());
}
@@ -1458,15 +1422,8 @@ public class Session {
public void insertTablet(Tablet tablet, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted);
- EndPoint endPoint;
try {
- if (enableCacheLeader
- && !deviceIdToEndpoint.isEmpty()
- && (endPoint = deviceIdToEndpoint.get(tablet.prefixPath)) != null) {
- endPointToSessionConnection.get(endPoint).insertTablet(request);
- } else {
- defaultSessionConnection.insertTablet(request);
- }
+ getSessionConnection(tablet.prefixPath).insertTablet(request);
} catch (RedirectException e) {
handleRedirection(tablet.prefixPath, e.getEndPoint());
}
@@ -1545,41 +1502,15 @@ public class Session {
private void insertTabletsWithLeaderCache(Map<String, Tablet> tablets,
boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
- EndPoint endPoint;
- SessionConnection connection;
Map<SessionConnection, TSInsertTabletsReq> tabletGroup = new HashMap<>();
for (Entry<String, Tablet> entry : tablets.entrySet()) {
- endPoint = deviceIdToEndpoint.isEmpty() ? null :
deviceIdToEndpoint.get(entry.getKey());
- if (endPoint != null) {
- connection = endPointToSessionConnection.get(endPoint);
- } else {
- connection = defaultSessionConnection;
- }
+ final SessionConnection connection =
getSessionConnection(entry.getKey());
TSInsertTabletsReq request =
tabletGroup.computeIfAbsent(connection, k -> new
TSInsertTabletsReq());
updateTSInsertTabletsReq(request, entry.getValue(), sorted);
}
- // TODO parallel
- StringBuilder errMsgBuilder = new StringBuilder();
- for (Entry<SessionConnection, TSInsertTabletsReq> entry :
tabletGroup.entrySet()) {
- try {
- entry.getKey().insertTablets(entry.getValue());
- } catch (RedirectException e) {
- for (Entry<String, EndPoint> deviceEndPointEntry :
e.getDeviceEndPointMap().entrySet()) {
- handleRedirection(deviceEndPointEntry.getKey(),
deviceEndPointEntry.getValue());
- }
- } catch (StatementExecutionException e) {
- errMsgBuilder.append(e.getMessage());
- } catch (IoTDBConnectionException e) {
- removeBrokenSessionConnection(entry.getKey());
- throw e;
- }
- }
- String errMsg = errMsgBuilder.toString();
- if (!errMsg.isEmpty()) {
- throw new StatementExecutionException(errMsg);
- }
+ insertByGroup(tabletGroup, SessionConnection::insertTablets);
}
private TSInsertTabletsReq genTSInsertTabletsReq(List<Tablet> tablets,
boolean sorted)
@@ -2090,6 +2021,72 @@ public class Session {
return request;
}
+ /**
+ * @param recordsGroup connection to record map
+ * @param insertConsumer insert function
+ * @param <T>
+ * <ul>
+ * <li>{@link TSInsertRecordsReq}
+ * <li>{@link TSInsertStringRecordsReq}
+ * <li>{@link TSInsertTabletsReq}
+ * </ul>
+ *
+ * @throws IoTDBConnectionException
+ * @throws StatementExecutionException
+ */
+ private <T> void insertByGroup(
+ Map<SessionConnection, T> recordsGroup, InsertConsumer<T> insertConsumer)
+ throws IoTDBConnectionException, StatementExecutionException {
+ List<CompletableFuture<Void>> completableFutures =
+ recordsGroup.entrySet().stream()
+ .map(
+ entry -> {
+ SessionConnection connection = entry.getKey();
+ T recordsReq = entry.getValue();
+ return CompletableFuture.runAsync(
+ () -> {
+ try {
+ insertConsumer.insert(connection, recordsReq);
+ } catch (RedirectException e) {
+ e.getDeviceEndPointMap()
+ .forEach(
+ (deviceId, endpoint) -> {
+ try {
+ handleRedirection(deviceId, endpoint);
+ } catch (IoTDBConnectionException
ioTDBConnectionException) {
+ throw new
CompletionException(ioTDBConnectionException);
+ }
+ });
+ } catch (StatementExecutionException e) {
+ throw new CompletionException(e);
+ } catch (IoTDBConnectionException e) {
+ // remove the broken session
+ removeBrokenSessionConnection(connection);
+ throw new CompletionException(e);
+ }
+ },
+ OPERATION_EXECUTOR);
+ })
+ .collect(Collectors.toList());
+
+ StringBuilder errMsgBuilder = new StringBuilder();
+ for (CompletableFuture<Void> completableFuture : completableFutures) {
+ try {
+ completableFuture.join();
+ } catch (CompletionException completionException) {
+ Throwable cause = completionException.getCause();
+ if (cause instanceof IoTDBConnectionException) {
+ throw (IoTDBConnectionException) cause;
+ } else {
+ errMsgBuilder.append(cause.getMessage());
+ }
+ }
+ }
+ if (errMsgBuilder.length() > 0) {
+ throw new StatementExecutionException(errMsgBuilder.toString());
+ }
+ }
+
public boolean isEnableQueryRedirection() {
return enableQueryRedirection;
}
diff --git
a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 80bb4af..9f39e7e 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -49,6 +49,7 @@ import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.session.util.SessionUtils;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
diff --git
a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index fd8f8e7..f25d1c8 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -218,8 +218,8 @@ public class SessionPool {
logger.debug("no more sessions can be created, wait...
queue.size={}", queue.size());
}
this.wait(1000);
- long time = waitToGetSessionTimeoutInMs < 60_000 ?
waitToGetSessionTimeoutInMs : 60_000;
- if (System.currentTimeMillis() - start > time) {
+ long timeOut = Math.min(waitToGetSessionTimeoutInMs, 60_000);
+ if (System.currentTimeMillis() - start > timeOut) {
logger.warn(
"the SessionPool has wait for {} seconds to get a new
connection: {}:{} with {}, {}",
(System.currentTimeMillis() - start) / 1000,
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionUtils.java
b/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
similarity index 99%
rename from session/src/main/java/org/apache/iotdb/session/SessionUtils.java
rename to session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
index 69041e1..4f27d06 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionUtils.java
+++ b/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.session;
+package org.apache.iotdb.session.util;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
diff --git
a/session/src/main/java/org/apache/iotdb/session/util/ThreadUtils.java
b/session/src/main/java/org/apache/iotdb/session/util/ThreadUtils.java
new file mode 100644
index 0000000..db989cd
--- /dev/null
+++ b/session/src/main/java/org/apache/iotdb/session/util/ThreadUtils.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session.util;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ThreadUtils {
+
+ /**
+ * @param threadNamePrefix thread name prefix, the thread name will be
"prefix-num"
+ * @param isDaemon if true, the thread be created will be daemon
+ * @return
+ */
+ public static ThreadFactory createThreadFactory(String threadNamePrefix,
boolean isDaemon) {
+ return new ThreadFactory() {
+ private final AtomicInteger THREAD_COUNT = new AtomicInteger(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r);
+ thread.setName(String.format("%s-%d", threadNamePrefix,
THREAD_COUNT.getAndIncrement()));
+ thread.setDaemon(isDaemon);
+ return thread;
+ }
+ };
+ }
+}
diff --git
a/session/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java
b/session/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java
new file mode 100644
index 0000000..202e60b
--- /dev/null
+++ b/session/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.ThreadFactory;
+
+public class ThreadUtilsTest {
+
+ @Test
+ public void createThreadFactory() {
+ ThreadFactory daemonThreadFactory =
ThreadUtils.createThreadFactory("Test", true);
+ Thread thread = daemonThreadFactory.newThread(() -> {});
+ Assert.assertEquals("Test-0", thread.getName());
+ }
+}