This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4d826ef2a64 invert insertRecords to insertTablets (#12113)
4d826ef2a64 is described below
commit 4d826ef2a64908079d84435d12fd088b73a4e09e
Author: YuFengLiu <[email protected]>
AuthorDate: Fri Mar 29 18:36:23 2024 +0800
invert insertRecords to insertTablets (#12113)
---
.../iotdb/session/it/IoTDBSessionSimpleIT.java | 37 +++
.../org/apache/iotdb/isession/SessionConfig.java | 5 +-
.../java/org/apache/iotdb/session/Session.java | 250 ++++++++++++++++++++-
.../org/apache/iotdb/session/pool/SessionPool.java | 11 +
.../iotdb/session/SessionCacheLeaderTest.java | 6 +-
.../java/org/apache/iotdb/session/SessionTest.java | 5 +-
.../apache/iotdb/session/pool/SessionPoolTest.java | 2 +
7 files changed, 304 insertions(+), 12 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java
index 120b65797a5..bb52fffda5d 100644
---
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.session.it;
+import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.protocol.thrift.OperationType;
import org.apache.iotdb.isession.ISession;
@@ -1635,4 +1636,40 @@ public class IoTDBSessionSimpleIT {
fail(e.getMessage());
}
}
+
+ @Test
+ @Category({LocalStandaloneIT.class, ClusterIT.class})
+ public void convertRecordsToTabletsTest() {
+ List<String> measurements = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
+ List<Object> value = new ArrayList<>();
+ for (int measurement = 0; measurement < 100; measurement++) {
+ types.add(TSDataType.INT64);
+ measurements.add("s" + measurement);
+ value.add((long) measurement);
+ }
+ List<String> devices = new ArrayList<>();
+ List<List<String>> measurementsList = new ArrayList<>();
+ List<List<TSDataType>> typeList = new ArrayList<>();
+ List<List<Object>> values = new ArrayList<>();
+ List<Long> timestamps = new ArrayList<>();
+ for (long row = 0; row < 1000; row++) {
+ devices.add("root.sg1.d1");
+ measurementsList.add(measurements);
+ typeList.add(types);
+ values.add(value);
+ timestamps.add(row);
+ }
+ List<String> queryMeasurement = new ArrayList<>();
+ queryMeasurement.add("root.sg1.d1.s1");
+ List<TAggregationType> queryType = new ArrayList<>();
+ queryType.add(TAggregationType.COUNT);
+ try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
+ session.insertRecords(devices, timestamps, measurementsList, typeList,
values);
+ SessionDataSet dataSet =
session.executeAggregationQuery(queryMeasurement, queryType);
+ assertEquals(1000, dataSet.next().getFields().get(0).getLongV());
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ e.printStackTrace();
+ }
+ }
}
diff --git
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java
index ac14a99c80f..e43b771d3c6 100644
---
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java
+++
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java
@@ -30,16 +30,13 @@ public class SessionConfig {
public static final int DEFAULT_FETCH_SIZE = 5000;
public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 0;
public static final boolean DEFAULT_REDIRECTION_MODE = true;
-
+ public static final boolean DEFAULT_RECORDS_AUTO_CONVERT_TABLET = true;
public static final int CPU_CORES =
Runtime.getRuntime().availableProcessors();
public static final int DEFAULT_SESSION_EXECUTOR_THREAD_NUM = 2 * CPU_CORES;
public static final int DEFAULT_SESSION_EXECUTOR_TASK_NUM = 1_000;
-
public static final int RETRY_NUM = 3;
public static final long RETRY_INTERVAL_MS = 1000;
-
public static final long DEFAULT_QUERY_TIME_OUT = 60000;
-
/** thrift init buffer size, 1KB by default */
public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024;
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
index 85759d6bea9..b87d8937a1e 100644
--- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -84,16 +84,19 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -132,16 +135,19 @@ public class Session implements ISession {
protected boolean enableRPCCompression;
protected int connectionTimeoutInMs;
protected ZoneId zoneId;
-
protected int thriftDefaultBufferSize;
protected int thriftMaxFrameSize;
-
protected TEndPoint defaultEndPoint;
protected SessionConnection defaultSessionConnection;
private boolean isClosed = true;
// Cluster version cache
protected boolean enableRedirection;
+ protected boolean enableRecordsAutoConvertTablet =
+ SessionConfig.DEFAULT_RECORDS_AUTO_CONVERT_TABLET;
+ private static final double CONVERT_THRESHOLD = 0.5;
+ private static final double SAMPLE_PROPORTION = 0.05;
+ private static final int MIN_RECORDS_SIZE = 40;
@SuppressWarnings("squid:S3077") // Non-primitive fields should not be
"volatile"
protected volatile Map<String, TEndPoint> deviceIdToEndpoint;
@@ -414,6 +420,7 @@ public class Session implements ISession {
this.enableQueryRedirection = builder.enableRedirection;
}
this.enableRedirection = builder.enableRedirection;
+ this.enableRecordsAutoConvertTablet =
builder.enableRecordsAutoConvertTablet;
this.username = builder.username;
this.password = builder.pw;
this.fetchSize = builder.fetchSize;
@@ -1905,6 +1912,17 @@ public class Session implements ISession {
throw new IllegalArgumentException(
"deviceIds, times, measurementsList and valuesList's size should be
equal");
}
+ // judge if convert records to tablets.
+ if (enableRecordsAutoConvertTablet && len >= MIN_RECORDS_SIZE) {
+ Set<String> deviceSet = new HashSet<>(deviceIds);
+ if ((double) deviceSet.size() / deviceIds.size() <= CONVERT_THRESHOLD
+ && judgeConvertOfMultiDevice(deviceIds, measurementsList)) {
+ convertToTabletsAndInsert(
+ deviceIds, times, measurementsList, typesList, valuesList,
deviceSet.size(), false);
+ return;
+ }
+ }
+ // insert records
if (enableRedirection) {
insertRecordsWithLeaderCache(
deviceIds, times, measurementsList, typesList, valuesList, false);
@@ -1948,6 +1966,17 @@ public class Session implements ISession {
throw new IllegalArgumentException(
"prefixPaths, times, subMeasurementsList and valuesList's size
should be equal");
}
+ // judge if convert records to tablets.
+ if (enableRecordsAutoConvertTablet && len >= MIN_RECORDS_SIZE) {
+ Set<String> deviceSet = new HashSet<>(deviceIds);
+ if ((double) deviceSet.size() / deviceIds.size() <= CONVERT_THRESHOLD
+ && judgeConvertOfMultiDevice(deviceIds, measurementsList)) {
+
+ convertToTabletsAndInsert(
+ deviceIds, times, measurementsList, typesList, valuesList,
deviceSet.size(), true);
+ return;
+ }
+ }
if (enableRedirection) {
insertRecordsWithLeaderCache(deviceIds, times, measurementsList,
typesList, valuesList, true);
} else {
@@ -2010,6 +2039,12 @@ public class Session implements ISession {
if (len != measurementsList.size() || len != valuesList.size()) {
throw new IllegalArgumentException(VALUES_SIZE_SHOULD_BE_EQUAL);
}
+ if (enableRecordsAutoConvertTablet
+ && len >= MIN_RECORDS_SIZE
+ && judgeConvertOfOneDevice(measurementsList)) {
+ convertToTabletAndInsert(deviceId, times, measurementsList, typesList,
valuesList, false);
+ return;
+ }
TSInsertRecordsOfOneDeviceReq request;
try {
request =
@@ -2157,6 +2192,12 @@ public class Session implements ISession {
throw new IllegalArgumentException(
"times, subMeasurementsList and valuesList's size should be equal");
}
+ if (enableRecordsAutoConvertTablet
+ && len >= MIN_RECORDS_SIZE
+ && judgeConvertOfOneDevice(measurementsList)) {
+ convertToTabletAndInsert(deviceId, times, measurementsList, typesList,
valuesList, true);
+ return;
+ }
TSInsertRecordsOfOneDeviceReq request;
try {
request =
@@ -2741,6 +2782,203 @@ public class Session implements ISession {
request.addToSizeList(tablet.rowSize);
}
+ // sample some records and judge weather need to add too many null values to
convert to tablet.
+ private boolean judgeConvertOfOneDevice(List<List<String>> measurementsList)
{
+ int size = measurementsList.size();
+ int sampleNum = (int) (size * SAMPLE_PROPORTION);
+ List<Integer> indexList =
+ ThreadLocalRandom.current()
+ .ints(0, size)
+ .distinct()
+ .limit(sampleNum)
+ .boxed()
+ .collect(Collectors.toList());
+ Set<String> allMeasurement =
+ new HashSet<>(measurementsList.get(indexList.get(0)).size() + 1, 1);
+ for (int i = 0; i < sampleNum; i++) {
+ allMeasurement.addAll(measurementsList.get(indexList.get(i)));
+ }
+ for (int i = 0; i < sampleNum; i++) {
+ if ((double) measurementsList.get(indexList.get(i)).size() /
allMeasurement.size()
+ < CONVERT_THRESHOLD) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // convert records of one device to tablet and insert
+ private void convertToTabletAndInsert(
+ String deviceId,
+ List<Long> times,
+ List<List<String>> measurementsList,
+ List<List<TSDataType>> typesList,
+ List<List<Object>> valuesList,
+ boolean isAligned)
+ throws IoTDBConnectionException, StatementExecutionException {
+ // measurement -> <type,if null>
+ Map<String, Pair<TSDataType, Boolean>> measuremenMap =
+ new HashMap<>(measurementsList.get(0).size() + 1, 1);
+ // build measurementType
+ for (int rowIndex = 0; rowIndex < measurementsList.size(); rowIndex++) {
+ List<String> measurements = measurementsList.get(rowIndex);
+ List<TSDataType> types = typesList.get(rowIndex);
+ for (int colIndex = 0; colIndex < measurements.size(); colIndex++) {
+ String measurement = measurements.get(colIndex);
+ if (!measuremenMap.containsKey(measurement)) {
+ measuremenMap.put(measurement, new Pair<>(types.get(colIndex),
true));
+ }
+ }
+ }
+ List<MeasurementSchema> schemaList = new ArrayList<>(measuremenMap.size());
+ // use measurementType to build schemaList
+ for (Entry<String, Pair<TSDataType, Boolean>> entry :
measuremenMap.entrySet()) {
+ schemaList.add(new MeasurementSchema(entry.getKey(),
entry.getValue().getLeft()));
+ }
+ // build tablet and insert
+ Tablet tablet = new Tablet(deviceId, schemaList, times.size());
+ for (int rowIndex = 0; rowIndex < times.size(); rowIndex++) {
+ addRecordToTablet(
+ tablet,
+ times.get(rowIndex),
+ measurementsList.get(rowIndex),
+ valuesList.get(rowIndex),
+ measuremenMap);
+ }
+ if (isAligned) {
+ insertAlignedTablet(tablet);
+ } else {
+ insertTablet(tablet);
+ }
+ }
+
+ // sample some records and judge weather need to add too many null values to
convert to tablet.
+ private boolean judgeConvertOfMultiDevice(
+ List<String> deviceIds, List<List<String>> measurementsList) {
+ int size = deviceIds.size();
+ int sampleNum = (int) (size * SAMPLE_PROPORTION);
+ Map<String, Set<String>> measurementMap = new HashMap<>(sampleNum + 1, 1);
+ List<Integer> indexList =
+ ThreadLocalRandom.current()
+ .ints(0, size)
+ .distinct()
+ .limit(sampleNum)
+ .boxed()
+ .collect(Collectors.toList());
+ for (int i = 0; i < sampleNum; i++) {
+ int index = indexList.get(i);
+ List<String> measurements = measurementsList.get(index);
+ Set<String> allMeasurement =
+ measurementMap.computeIfAbsent(
+ deviceIds.get(index), k -> new HashSet<>(measurements.size() +
1, 1));
+ allMeasurement.addAll(measurements);
+ }
+ for (int i = 0; i < sampleNum; i++) {
+ int index = indexList.get(i);
+ Set<String> allMeasurement = measurementMap.get(deviceIds.get(index));
+ if ((double) measurementsList.get(index).size() / allMeasurement.size()
< CONVERT_THRESHOLD) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // convert records of multiple devices to tablets and insert
+ private void convertToTabletsAndInsert(
+ List<String> deviceIds,
+ List<Long> times,
+ List<List<String>> measurementsList,
+ List<List<TSDataType>> typesList,
+ List<List<Object>> valuesList,
+ int deviceSize,
+ boolean isAligned)
+ throws IoTDBConnectionException, StatementExecutionException {
+ // device -> measurement -> <type,if null>
+ Map<String, Map<String, Pair<TSDataType, Boolean>>> deviceMeasuremenMap =
+ new HashMap<>(deviceSize + 1, 1);
+ // device -> row count
+ Map<String, Integer> rowMap = new HashMap<>(deviceSize + 1, 1);
+ // first build measurementTypeMap and rowMap
+ for (int rowIndex = 0; rowIndex < deviceIds.size(); rowIndex++) {
+ String device = deviceIds.get(rowIndex);
+ List<String> measurements = measurementsList.get(rowIndex);
+ List<TSDataType> types = typesList.get(rowIndex);
+ Map<String, Pair<TSDataType, Boolean>> measurementMap =
+ deviceMeasuremenMap.computeIfAbsent(
+ device, k -> new HashMap<>(measurements.size() + 1, 1));
+ for (int colIndex = 0; colIndex < measurements.size(); colIndex++) {
+ String measurement = measurements.get(colIndex);
+ if (!measurementMap.containsKey(measurement)) {
+ measurementMap.put(measurement, new Pair<>(types.get(colIndex),
true));
+ }
+ }
+ rowMap.merge(device, 1, Integer::sum);
+ }
+ // device -> schema
+ Map<String, List<MeasurementSchema>> schemaMap = new HashMap<>(deviceSize
+ 1, 1);
+ // use measurementTypeMap to build schemaMap
+ for (Map.Entry<String, Map<String, Pair<TSDataType, Boolean>>> entry :
+ deviceMeasuremenMap.entrySet()) {
+ List<MeasurementSchema> schemaList = new
ArrayList<>(entry.getValue().size() + 1);
+ for (Map.Entry<String, Pair<TSDataType, Boolean>> schemaEntry :
entry.getValue().entrySet()) {
+ schemaList.add(
+ new MeasurementSchema(schemaEntry.getKey(),
schemaEntry.getValue().getLeft()));
+ }
+ schemaMap.put(entry.getKey(), schemaList);
+ }
+ // device -> tablet
+ Map<String, Tablet> tablets = new HashMap<>(deviceSize + 1, 1);
+ // use schemaMap and rowMap to build tablets and insert
+ for (int rowIndex = 0; rowIndex < deviceIds.size(); rowIndex++) {
+ String device = deviceIds.get(rowIndex);
+ Tablet tablet =
+ tablets.computeIfAbsent(
+ device, k -> new Tablet(device, schemaMap.get(device),
rowMap.get(device)));
+ addRecordToTablet(
+ tablet,
+ times.get(rowIndex),
+ measurementsList.get(rowIndex),
+ valuesList.get(rowIndex),
+ deviceMeasuremenMap.get(device));
+ }
+ if (isAligned) {
+ insertAlignedTablets(tablets);
+ } else {
+ insertTablets(tablets);
+ }
+ }
+
+ // add one record to tablet.
+ private void addRecordToTablet(
+ Tablet tablet,
+ Long timestamp,
+ List<String> measurements,
+ List<Object> values,
+ Map<String, Pair<TSDataType, Boolean>> allMeasurementMap) {
+ int row = tablet.rowSize++;
+ tablet.addTimestamp(row, timestamp);
+ // tablet without null value
+ if (measurements.size() == allMeasurementMap.size()) {
+ for (int i = 0; i < measurements.size(); i++) {
+ tablet.addValue(measurements.get(i), row, values.get(i));
+ }
+ return;
+ }
+ // tablet with null value
+ for (int i = 0; i < measurements.size(); i++) {
+ String measurement = measurements.get(i);
+ tablet.addValue(measurement, row, values.get(i));
+ allMeasurementMap.get(measurement).setRight(false);
+ }
+ for (Entry<String, Pair<TSDataType, Boolean>> entry :
allMeasurementMap.entrySet()) {
+ if (entry.getValue().getRight()) {
+ tablet.addValue(entry.getKey(), row, null);
+ } else {
+ entry.getValue().setRight(true);
+ }
+ }
+ }
+
/**
* This method NOT insert data into database and the server just return
after accept the request,
* this method should be used to test other time cost in client
@@ -3553,9 +3791,10 @@ public class Session implements ISession {
private int thriftDefaultBufferSize =
SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY;
private int thriftMaxFrameSize = SessionConfig.DEFAULT_MAX_FRAME_SIZE;
private boolean enableRedirection = SessionConfig.DEFAULT_REDIRECTION_MODE;
+ private boolean enableRecordsAutoConvertTablet =
+ SessionConfig.DEFAULT_RECORDS_AUTO_CONVERT_TABLET;
private Version version = SessionConfig.DEFAULT_VERSION;
private long timeOut = SessionConfig.DEFAULT_QUERY_TIME_OUT;
-
private boolean enableAutoFetch = SessionConfig.DEFAULT_ENABLE_AUTO_FETCH;
private boolean useSSL = false;
@@ -3628,6 +3867,11 @@ public class Session implements ISession {
return this;
}
+ public Builder enableRecordsAutoConvertTablet(boolean
enableRecordsAutoConvertTablet) {
+ this.enableRecordsAutoConvertTablet = enableRecordsAutoConvertTablet;
+ return this;
+ }
+
public Builder nodeUrls(List<String> nodeUrls) {
this.nodeUrls = nodeUrls;
return this;
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 4d48e7106c3..6db8113d7ba 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -112,6 +112,7 @@ public class SessionPool implements ISessionPool {
private String trustStorePwd;
private ZoneId zoneId;
private boolean enableRedirection;
+ private boolean enableRecordsAutoConvertTablet;
private boolean enableQueryRedirection = false;
private Map<String, TEndPoint> deviceIdToEndpoint;
@@ -475,6 +476,7 @@ public class SessionPool implements ISessionPool {
if (this.enableRedirection) {
deviceIdToEndpoint = new ConcurrentHashMap<>();
}
+ this.enableRecordsAutoConvertTablet =
builder.enableRecordsAutoConvertTablet;
this.connectionTimeoutInMs = builder.connectionTimeoutInMs;
this.version = builder.version;
this.thriftDefaultBufferSize = builder.thriftDefaultBufferSize;
@@ -532,6 +534,7 @@ public class SessionPool implements ISessionPool {
.thriftDefaultBufferSize(thriftDefaultBufferSize)
.thriftMaxFrameSize(thriftMaxFrameSize)
.enableRedirection(enableRedirection)
+ .enableRecordsAutoConvertTablet(enableRecordsAutoConvertTablet)
.version(version)
.useSSL(useSSL)
.trustStore(trustStore)
@@ -551,6 +554,7 @@ public class SessionPool implements ISessionPool {
.thriftDefaultBufferSize(thriftDefaultBufferSize)
.thriftMaxFrameSize(thriftMaxFrameSize)
.enableRedirection(enableRedirection)
+ .enableRecordsAutoConvertTablet(enableRecordsAutoConvertTablet)
.version(version)
.useSSL(useSSL)
.trustStore(trustStore)
@@ -3505,6 +3509,8 @@ public class SessionPool implements ISessionPool {
private boolean enableCompression = false;
private ZoneId zoneId = null;
private boolean enableRedirection = SessionConfig.DEFAULT_REDIRECTION_MODE;
+ private boolean enableRecordsAutoConvertTablet =
+ SessionConfig.DEFAULT_RECORDS_AUTO_CONVERT_TABLET;
private int connectionTimeoutInMs =
SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS;
private Version version = SessionConfig.DEFAULT_VERSION;
@@ -3598,6 +3604,11 @@ public class SessionPool implements ISessionPool {
return this;
}
+ public Builder enableRecordsAutoConvertTablet(boolean
enableRecordsAutoConvertTablet) {
+ this.enableRecordsAutoConvertTablet = enableRecordsAutoConvertTablet;
+ return this;
+ }
+
public Builder connectionTimeoutInMs(int connectionTimeoutInMs) {
this.connectionTimeoutInMs = connectionTimeoutInMs;
return this;
diff --git
a/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderTest.java
b/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderTest.java
index ea148312623..ef42f358283 100644
---
a/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderTest.java
+++
b/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderTest.java
@@ -728,16 +728,16 @@ public class SessionCacheLeaderTest {
// set connection as broken, due to we enable the cache leader, when we
called
// ((MockSession) session).getLastConstructedSessionConnection(), the
session's endpoint has
- // been changed to TEndPoint(ip:127.0.0.1, port:55562)
+ // been changed to TEndPoint(ip:127.0.0.1, port:55561)
Assert.assertEquals(
- "MockSessionConnection{ endPoint=TEndPoint(ip:127.0.0.1, port:55562)}",
+ "MockSessionConnection{ endPoint=TEndPoint(ip:127.0.0.1, port:55561)}",
((MockSession)
session).getLastConstructedSessionConnection().toString());
((MockSession)
session).getLastConstructedSessionConnection().setConnectionBroken(true);
try {
session.insertRecords(deviceIds, timestamps, measurementsList,
typesList, valuesList);
} catch (IoTDBConnectionException e) {
Assert.assertEquals(
- "the session connection = TEndPoint(ip:127.0.0.1, port:55562) is
broken", e.getMessage());
+ "the session connection = TEndPoint(ip:127.0.0.1, port:55561) is
broken", e.getMessage());
}
assertEquals(3, session.deviceIdToEndpoint.size());
for (Map.Entry<String, TEndPoint> endPointMap :
session.deviceIdToEndpoint.entrySet()) {
diff --git
a/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionTest.java
b/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionTest.java
index c1c8a76a244..09e8f24ad31 100644
---
a/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionTest.java
+++
b/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionTest.java
@@ -113,6 +113,7 @@ public class SessionTest {
.fetchSize(1000)
.zoneId(ZoneId.systemDefault())
.enableRedirection(true)
+ .enableRecordsAutoConvertTablet(true)
.thriftMaxFrameSize(SessionConfig.DEFAULT_MAX_FRAME_SIZE)
.thriftDefaultBufferSize(SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY)
.version(Version.V_0_13)
@@ -964,9 +965,9 @@ public class SessionTest {
map.put("one", tablet);
session.insertAlignedTablets(map, false);
session.setEnableRedirection(true);
- Assert.assertEquals(true, session.isEnableRedirection());
+ Assert.assertTrue(session.isEnableRedirection());
session.setEnableQueryRedirection(true);
- Assert.assertEquals(true, session.isEnableQueryRedirection());
+ Assert.assertTrue(session.isEnableQueryRedirection());
}
@Test
diff --git
a/iotdb-client/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
b/iotdb-client/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
index 61ff19bd36b..6ffecc961c5 100644
---
a/iotdb-client/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
+++
b/iotdb-client/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
@@ -109,6 +109,7 @@ public class SessionPoolTest {
.fetchSize(1)
.waitToGetSessionTimeoutInMs(2)
.enableRedirection(true)
+ .enableRecordsAutoConvertTablet(true)
.enableCompression(true)
.zoneId(ZoneOffset.UTC)
.connectionTimeoutInMs(3)
@@ -140,6 +141,7 @@ public class SessionPoolTest {
.fetchSize(1)
.waitToGetSessionTimeoutInMs(2)
.enableRedirection(true)
+ .enableRecordsAutoConvertTablet(true)
.enableCompression(true)
.zoneId(ZoneOffset.UTC)
.connectionTimeoutInMs(3)