This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new e8ca7b8 [ISSUE-4072] Parallel insert records in Session (#4073)
(#4169)
e8ca7b8 is described below
commit e8ca7b8fec796af7a837a5cc6be39065fd074f59
Author: Wenjun Ruan <[email protected]>
AuthorDate: Sat Oct 16 02:02:51 2021 +0800
[ISSUE-4072] Parallel insert records in Session (#4073) (#4169)
---
.../main/java/org/apache/iotdb/session/Config.java | 4 +
.../session/{Config.java => InsertConsumer.java} | 24 +--
.../java/org/apache/iotdb/session/Session.java | 237 ++++++++++-----------
.../apache/iotdb/session/SessionConnection.java | 1 +
.../org/apache/iotdb/session/pool/SessionPool.java | 4 +-
.../iotdb/session/{ => util}/SessionUtils.java | 2 +-
.../session/{Config.java => util/ThreadUtils.java} | 34 +--
.../iotdb/session/util/ThreadUtilsTest.java} | 24 +--
8 files changed, 166 insertions(+), 164 deletions(-)
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java
b/session/src/main/java/org/apache/iotdb/session/Config.java
index 02e7e70..52c9543 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/session/src/main/java/org/apache/iotdb/session/Config.java
@@ -26,6 +26,10 @@ public class Config {
public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 0;
public static final boolean DEFAULT_CACHE_LEADER_MODE = true;
+ public static final int CPU_CORES =
Runtime.getRuntime().availableProcessors();
+ public static final int DEFAULT_SESSION_EXECUTOR_THREAD_NUM = 2 * CPU_CORES;
+ public static final int DEFAULT_SESSION_EXECUTOR_TASK_NUM = 1_000;
+
public static final int RETRY_NUM = 3;
public static final long RETRY_INTERVAL_MS = 1000;
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java
b/session/src/main/java/org/apache/iotdb/session/InsertConsumer.java
similarity index 52%
copy from session/src/main/java/org/apache/iotdb/session/Config.java
copy to session/src/main/java/org/apache/iotdb/session/InsertConsumer.java
index 02e7e70..dcc655c 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/session/src/main/java/org/apache/iotdb/session/InsertConsumer.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -16,22 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.session;
-
-public class Config {
- public static final String DEFAULT_USER = "root";
- public static final String DEFAULT_PASSWORD = "root";
- public static final int DEFAULT_FETCH_SIZE = 5000;
- public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 0;
- public static final boolean DEFAULT_CACHE_LEADER_MODE = true;
+package org.apache.iotdb.session;
- public static final int RETRY_NUM = 3;
- public static final long RETRY_INTERVAL_MS = 1000;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.RedirectException;
+import org.apache.iotdb.rpc.StatementExecutionException;
- /** thrift init buffer size, 1KB by default */
- public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024;
+@FunctionalInterface
+public interface InsertConsumer<T> {
- /** thrift max frame size (16384000 bytes by default), we change it to 64MB
*/
- public static final int DEFAULT_MAX_FRAME_SIZE = 67108864;
+ void insert(SessionConnection connection, T record)
+ throws IoTDBConnectionException, StatementExecutionException,
RedirectException;
}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java
b/session/src/main/java/org/apache/iotdb/session/Session.java
index 06b3536..8c3ee40 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -36,6 +36,8 @@ import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
+import org.apache.iotdb.session.util.SessionUtils;
+import org.apache.iotdb.session.util.ThreadUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -60,6 +62,12 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -72,6 +80,14 @@ public class Session {
public static final String MSG_UNSUPPORTED_DATA_TYPE = "Unsupported data
type:";
public static final String MSG_DONOT_ENABLE_REDIRECT =
"Query do not enable redirect," + " please confirm the session and
server conf.";
+ private static final ThreadPoolExecutor OPERATION_EXECUTOR =
+ new ThreadPoolExecutor(
+ Config.DEFAULT_SESSION_EXECUTOR_THREAD_NUM,
+ Config.DEFAULT_SESSION_EXECUTOR_THREAD_NUM,
+ 0,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(Config.DEFAULT_SESSION_EXECUTOR_TASK_NUM),
+ ThreadUtils.createThreadFactory("SessionExecutor", true));
protected List<String> nodeUrls;
protected String username;
protected String password;
@@ -97,9 +113,8 @@ public class Session {
// Cluster version cache
protected boolean enableCacheLeader;
protected SessionConnection metaSessionConnection;
- protected Map<String, EndPoint> deviceIdToEndpoint;
- protected Map<EndPoint, SessionConnection> endPointToSessionConnection;
- private AtomicReference<IoTDBConnectionException> tmp = new
AtomicReference<>();
+ protected volatile Map<String, EndPoint> deviceIdToEndpoint;
+ protected volatile Map<EndPoint, SessionConnection>
endPointToSessionConnection;
protected boolean enableQueryRedirection = false;
@@ -327,8 +342,8 @@ public class Session {
metaSessionConnection = defaultSessionConnection;
isClosed = false;
if (enableCacheLeader || enableQueryRedirection) {
- deviceIdToEndpoint = new HashMap<>();
- endPointToSessionConnection = new HashMap<>();
+ deviceIdToEndpoint = new ConcurrentHashMap<>();
+ endPointToSessionConnection = new ConcurrentHashMap<>();
endPointToSessionConnection.put(defaultEndPoint,
defaultSessionConnection);
}
}
@@ -771,6 +786,7 @@ public class Session {
throws IoTDBConnectionException {
if (enableCacheLeader) {
logger.debug("storageGroup[{}]:{}", storageGroup, e.getMessage());
+ AtomicReference<IoTDBConnectionException> exceptionReference = new
AtomicReference<>();
SessionConnection connection =
endPointToSessionConnection.computeIfAbsent(
e.getEndPoint(),
@@ -778,12 +794,12 @@ public class Session {
try {
return constructSessionConnection(this, e.getEndPoint(),
zoneId);
} catch (IoTDBConnectionException ex) {
- tmp.set(ex);
+ exceptionReference.set(ex);
return null;
}
});
if (connection == null) {
- throw new IoTDBConnectionException(tmp.get());
+ throw new IoTDBConnectionException(exceptionReference.get());
}
metaSessionConnection = connection;
}
@@ -792,6 +808,7 @@ public class Session {
private void handleRedirection(String deviceId, EndPoint endpoint)
throws IoTDBConnectionException {
if (enableCacheLeader) {
+ AtomicReference<IoTDBConnectionException> exceptionReference = new
AtomicReference<>();
deviceIdToEndpoint.put(deviceId, endpoint);
SessionConnection connection =
endPointToSessionConnection.computeIfAbsent(
@@ -800,18 +817,20 @@ public class Session {
try {
return constructSessionConnection(this, endpoint, zoneId);
} catch (IoTDBConnectionException ex) {
- tmp.set(ex);
+ exceptionReference.set(ex);
return null;
}
});
if (connection == null) {
- throw new IoTDBConnectionException(tmp.get());
+ deviceIdToEndpoint.remove(deviceId);
+ throw new IoTDBConnectionException(exceptionReference.get());
}
}
}
private void handleQueryRedirection(EndPoint endPoint) throws
IoTDBConnectionException {
if (enableQueryRedirection) {
+ AtomicReference<IoTDBConnectionException> exceptionReference = new
AtomicReference<>();
SessionConnection connection =
endPointToSessionConnection.computeIfAbsent(
endPoint,
@@ -822,12 +841,12 @@ public class Session {
sessionConnection.setEnableRedirect(enableQueryRedirection);
return sessionConnection;
} catch (IoTDBConnectionException ex) {
- tmp.set(ex);
+ exceptionReference.set(ex);
return null;
}
});
if (connection == null) {
- throw new IoTDBConnectionException(tmp.get());
+ throw new IoTDBConnectionException(exceptionReference.get());
}
defaultSessionConnection = connection;
}
@@ -933,41 +952,15 @@ public class Session {
List<List<String>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
Map<SessionConnection, TSInsertStringRecordsReq> recordsGroup = new
HashMap<>();
- EndPoint endPoint;
- SessionConnection connection;
for (int i = 0; i < deviceIds.size(); i++) {
- endPoint = deviceIdToEndpoint.isEmpty() ? null :
deviceIdToEndpoint.get(deviceIds.get(i));
- if (endPoint != null) {
- connection = endPointToSessionConnection.get(endPoint);
- } else {
- connection = defaultSessionConnection;
- }
+ final SessionConnection connection =
getSessionConnection(deviceIds.get(i));
TSInsertStringRecordsReq request =
recordsGroup.computeIfAbsent(connection, k -> new
TSInsertStringRecordsReq());
updateTSInsertStringRecordsReq(
request, deviceIds.get(i), times.get(i), measurementsList.get(i),
valuesList.get(i));
}
- // TODO parallel
- StringBuilder errMsgBuilder = new StringBuilder();
- for (Entry<SessionConnection, TSInsertStringRecordsReq> entry :
recordsGroup.entrySet()) {
- try {
- entry.getKey().insertRecords(entry.getValue());
- } catch (RedirectException e) {
- for (Entry<String, EndPoint> deviceEndPointEntry :
e.getDeviceEndPointMap().entrySet()) {
- handleRedirection(deviceEndPointEntry.getKey(),
deviceEndPointEntry.getValue());
- }
- } catch (StatementExecutionException e) {
- errMsgBuilder.append(e.getMessage());
- } catch (IoTDBConnectionException e) {
- // remove the broken session
- removeBrokenSessionConnection(entry.getKey());
- throw e;
- }
- }
- String errMsg = errMsgBuilder.toString();
- if (!errMsg.isEmpty()) {
- throw new StatementExecutionException(errMsg);
- }
+
+ insertByGroup(recordsGroup, SessionConnection::insertRecords);
}
private TSInsertStringRecordsReq genTSInsertStringRecordsReq(
@@ -1126,13 +1119,18 @@ public class Session {
return request;
}
- @SuppressWarnings("squid:S3740")
- private List sortList(List source, Integer[] index) {
- Object[] result = new Object[source.size()];
- for (int i = 0; i < index.length; i++) {
- result[i] = source.get(index[i]);
- }
- return Arrays.asList(result);
+ /**
+ * Sort the input source list.
+ *
+ * <p>e.g. source: [1,2,3,4,5], index:[1,0,3,2,4], return : [2,1,4,3,5]
+ *
+ * @param source Input list
+ * @param index retuen order
+ * @param <T> Input type
+ * @return ordered list
+ */
+ private static <T> List<T> sortList(List<T> source, Integer[] index) {
+ return Arrays.stream(index).map(source::get).collect(Collectors.toList());
}
private List<ByteBuffer> objectValuesListToByteBufferList(
@@ -1155,15 +1153,8 @@ public class Session {
List<List<Object>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
Map<SessionConnection, TSInsertRecordsReq> recordsGroup = new HashMap<>();
- EndPoint endPoint;
- SessionConnection connection;
for (int i = 0; i < deviceIds.size(); i++) {
- endPoint = deviceIdToEndpoint.isEmpty() ? null :
deviceIdToEndpoint.get(deviceIds.get(i));
- if (endPoint != null) {
- connection = endPointToSessionConnection.get(endPoint);
- } else {
- connection = defaultSessionConnection;
- }
+ final SessionConnection connection =
getSessionConnection(deviceIds.get(i));
TSInsertRecordsReq request =
recordsGroup.computeIfAbsent(connection, k -> new
TSInsertRecordsReq());
updateTSInsertRecordsReq(
@@ -1174,27 +1165,7 @@ public class Session {
typesList.get(i),
valuesList.get(i));
}
- // TODO parallel
- StringBuilder errMsgBuilder = new StringBuilder();
- for (Entry<SessionConnection, TSInsertRecordsReq> entry :
recordsGroup.entrySet()) {
- try {
- entry.getKey().insertRecords(entry.getValue());
- } catch (RedirectException e) {
- for (Entry<String, EndPoint> deviceEndPointEntry :
e.getDeviceEndPointMap().entrySet()) {
- handleRedirection(deviceEndPointEntry.getKey(),
deviceEndPointEntry.getValue());
- }
- } catch (StatementExecutionException e) {
- errMsgBuilder.append(e.getMessage());
- } catch (IoTDBConnectionException e) {
- // remove the broken session
- removeBrokenSessionConnection(entry.getKey());
- throw e;
- }
- }
- String errMsg = errMsgBuilder.toString();
- if (!errMsg.isEmpty()) {
- throw new StatementExecutionException(errMsg);
- }
+ insertByGroup(recordsGroup, SessionConnection::insertRecords);
}
private TSInsertRecordsReq genTSInsertRecordsReq(
@@ -1241,15 +1212,8 @@ public class Session {
public void insertTablet(Tablet tablet)
throws StatementExecutionException, IoTDBConnectionException {
TSInsertTabletReq request = genTSInsertTabletReq(tablet, false);
- EndPoint endPoint;
try {
- if (enableCacheLeader
- && !deviceIdToEndpoint.isEmpty()
- && (endPoint = deviceIdToEndpoint.get(tablet.deviceId)) != null) {
- endPointToSessionConnection.get(endPoint).insertTablet(request);
- } else {
- defaultSessionConnection.insertTablet(request);
- }
+ getSessionConnection(tablet.deviceId).insertTablet(request);
} catch (RedirectException e) {
handleRedirection(tablet.deviceId, e.getEndPoint());
}
@@ -1264,15 +1228,8 @@ public class Session {
public void insertTablet(Tablet tablet, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted);
- EndPoint endPoint;
try {
- if (enableCacheLeader
- && !deviceIdToEndpoint.isEmpty()
- && (endPoint = deviceIdToEndpoint.get(tablet.deviceId)) != null) {
- endPointToSessionConnection.get(endPoint).insertTablet(request);
- } else {
- defaultSessionConnection.insertTablet(request);
- }
+ getSessionConnection(tablet.deviceId).insertTablet(request);
} catch (RedirectException e) {
handleRedirection(tablet.deviceId, e.getEndPoint());
}
@@ -1334,41 +1291,15 @@ public class Session {
private void insertTabletsWithLeaderCache(Map<String, Tablet> tablets,
boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
- EndPoint endPoint;
- SessionConnection connection;
Map<SessionConnection, TSInsertTabletsReq> tabletGroup = new HashMap<>();
for (Entry<String, Tablet> entry : tablets.entrySet()) {
- endPoint = deviceIdToEndpoint.isEmpty() ? null :
deviceIdToEndpoint.get(entry.getKey());
- if (endPoint != null) {
- connection = endPointToSessionConnection.get(endPoint);
- } else {
- connection = defaultSessionConnection;
- }
+ final SessionConnection connection =
getSessionConnection(entry.getKey());
TSInsertTabletsReq request =
tabletGroup.computeIfAbsent(connection, k -> new
TSInsertTabletsReq());
updateTSInsertTabletsReq(request, entry.getValue(), sorted);
}
- // TODO parallel
- StringBuilder errMsgBuilder = new StringBuilder();
- for (Entry<SessionConnection, TSInsertTabletsReq> entry :
tabletGroup.entrySet()) {
- try {
- entry.getKey().insertTablets(entry.getValue());
- } catch (RedirectException e) {
- for (Entry<String, EndPoint> deviceEndPointEntry :
e.getDeviceEndPointMap().entrySet()) {
- handleRedirection(deviceEndPointEntry.getKey(),
deviceEndPointEntry.getValue());
- }
- } catch (StatementExecutionException e) {
- errMsgBuilder.append(e.getMessage());
- } catch (IoTDBConnectionException e) {
- removeBrokenSessionConnection(entry.getKey());
- throw e;
- }
- }
- String errMsg = errMsgBuilder.toString();
- if (!errMsg.isEmpty()) {
- throw new StatementExecutionException(errMsg);
- }
+ insertByGroup(tabletGroup, SessionConnection::insertTablets);
}
private TSInsertTabletsReq genTSInsertTabletsReq(List<Tablet> tablets,
boolean sorted)
@@ -1738,6 +1669,72 @@ public class Session {
}
}
+ /**
+ * @param recordsGroup connection to record map
+ * @param insertConsumer insert function
+ * @param <T>
+ * <ul>
+ * <li>{@link TSInsertRecordsReq}
+ * <li>{@link TSInsertStringRecordsReq}
+ * <li>{@link TSInsertTabletsReq}
+ * </ul>
+ *
+ * @throws IoTDBConnectionException
+ * @throws StatementExecutionException
+ */
+ private <T> void insertByGroup(
+ Map<SessionConnection, T> recordsGroup, InsertConsumer<T> insertConsumer)
+ throws IoTDBConnectionException, StatementExecutionException {
+ List<CompletableFuture<Void>> completableFutures =
+ recordsGroup.entrySet().stream()
+ .map(
+ entry -> {
+ SessionConnection connection = entry.getKey();
+ T recordsReq = entry.getValue();
+ return CompletableFuture.runAsync(
+ () -> {
+ try {
+ insertConsumer.insert(connection, recordsReq);
+ } catch (RedirectException e) {
+ e.getDeviceEndPointMap()
+ .forEach(
+ (deviceId, endpoint) -> {
+ try {
+ handleRedirection(deviceId, endpoint);
+ } catch (IoTDBConnectionException
ioTDBConnectionException) {
+ throw new
CompletionException(ioTDBConnectionException);
+ }
+ });
+ } catch (StatementExecutionException e) {
+ throw new CompletionException(e);
+ } catch (IoTDBConnectionException e) {
+ // remove the broken session
+ removeBrokenSessionConnection(connection);
+ throw new CompletionException(e);
+ }
+ },
+ OPERATION_EXECUTOR);
+ })
+ .collect(Collectors.toList());
+
+ StringBuilder errMsgBuilder = new StringBuilder();
+ for (CompletableFuture<Void> completableFuture : completableFutures) {
+ try {
+ completableFuture.join();
+ } catch (CompletionException completionException) {
+ Throwable cause = completionException.getCause();
+ if (cause instanceof IoTDBConnectionException) {
+ throw (IoTDBConnectionException) cause;
+ } else {
+ errMsgBuilder.append(cause.getMessage());
+ }
+ }
+ }
+ if (errMsgBuilder.length() > 0) {
+ throw new StatementExecutionException(errMsgBuilder.toString());
+ }
+ }
+
public boolean isEnableQueryRedirection() {
return enableQueryRedirection;
}
diff --git
a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 561cf97..42d19d2 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.session.util.SessionUtils;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
diff --git
a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index d21bf39..3afdbe7 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -218,8 +218,8 @@ public class SessionPool {
logger.debug("no more sessions can be created, wait...
queue.size={}", queue.size());
}
this.wait(1000);
- long time = waitToGetSessionTimeoutInMs < 60_000 ?
waitToGetSessionTimeoutInMs : 60_000;
- if (System.currentTimeMillis() - start > time) {
+ long timeOut = Math.min(waitToGetSessionTimeoutInMs, 60_000);
+ if (System.currentTimeMillis() - start > timeOut) {
logger.warn(
"the SessionPool has wait for {} seconds to get a new
connection: {}:{} with {}, {}",
(System.currentTimeMillis() - start) / 1000,
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionUtils.java
b/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
similarity index 99%
rename from session/src/main/java/org/apache/iotdb/session/SessionUtils.java
rename to session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
index 66620c8..5f2d553 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionUtils.java
+++ b/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.session;
+package org.apache.iotdb.session.util;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java
b/session/src/main/java/org/apache/iotdb/session/util/ThreadUtils.java
similarity index 50%
copy from session/src/main/java/org/apache/iotdb/session/Config.java
copy to session/src/main/java/org/apache/iotdb/session/util/ThreadUtils.java
index 02e7e70..db989cd 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/session/src/main/java/org/apache/iotdb/session/util/ThreadUtils.java
@@ -16,22 +16,30 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.session;
-public class Config {
+package org.apache.iotdb.session.util;
- public static final String DEFAULT_USER = "root";
- public static final String DEFAULT_PASSWORD = "root";
- public static final int DEFAULT_FETCH_SIZE = 5000;
- public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 0;
- public static final boolean DEFAULT_CACHE_LEADER_MODE = true;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
- public static final int RETRY_NUM = 3;
- public static final long RETRY_INTERVAL_MS = 1000;
+public class ThreadUtils {
- /** thrift init buffer size, 1KB by default */
- public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024;
+ /**
+ * @param threadNamePrefix thread name prefix, the thread name will be
"prefix-num"
+ * @param isDaemon if true, the thread be created will be daemon
+ * @return
+ */
+ public static ThreadFactory createThreadFactory(String threadNamePrefix,
boolean isDaemon) {
+ return new ThreadFactory() {
+ private final AtomicInteger THREAD_COUNT = new AtomicInteger(0);
- /** thrift max frame size (16384000 bytes by default), we change it to 64MB
*/
- public static final int DEFAULT_MAX_FRAME_SIZE = 67108864;
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r);
+ thread.setName(String.format("%s-%d", threadNamePrefix,
THREAD_COUNT.getAndIncrement()));
+ thread.setDaemon(isDaemon);
+ return thread;
+ }
+ };
+ }
}
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java
b/session/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java
similarity index 54%
copy from session/src/main/java/org/apache/iotdb/session/Config.java
copy to session/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java
index 02e7e70..202e60b 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/session/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java
@@ -16,22 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.session;
-public class Config {
+package org.apache.iotdb.session.util;
- public static final String DEFAULT_USER = "root";
- public static final String DEFAULT_PASSWORD = "root";
- public static final int DEFAULT_FETCH_SIZE = 5000;
- public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 0;
- public static final boolean DEFAULT_CACHE_LEADER_MODE = true;
+import org.junit.Assert;
+import org.junit.Test;
- public static final int RETRY_NUM = 3;
- public static final long RETRY_INTERVAL_MS = 1000;
+import java.util.concurrent.ThreadFactory;
- /** thrift init buffer size, 1KB by default */
- public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024;
+public class ThreadUtilsTest {
- /** thrift max frame size (16384000 bytes by default), we change it to 64MB
*/
- public static final int DEFAULT_MAX_FRAME_SIZE = 67108864;
+ @Test
+ public void createThreadFactory() {
+ ThreadFactory daemonThreadFactory =
ThreadUtils.createThreadFactory("Test", true);
+ Thread thread = daemonThreadFactory.newThread(() -> {});
+ Assert.assertEquals("Test-0", thread.getName());
+ }
}