This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 43d07b52a25 Support table model insert redirection (#13550)
43d07b52a25 is described below
commit 43d07b52a25f7d111a699c60eb1129b8cfbff1fe
Author: Haonan <[email protected]>
AuthorDate: Thu Sep 26 19:16:17 2024 +0800
Support table model insert redirection (#13550)
* dev_server_side
* dev session
* fix UT&IT
* fix UT
* fix IT
* change back
* fix table mode session pool issue and add IT
* fix IT bug
* fix split tablet error and add IT
* optimize first insert profermance
* add UT
* fix remove broken connection
* fix review
---
.../pool/IoTDBInsertTableSessionPoolIT.java | 245 +++++++++++++++++++
.../iotdb/session/it/IoTDBSessionRelationalIT.java | 76 ++++++
.../org/apache/iotdb/isession/IPooledSession.java | 1 +
.../java/org/apache/iotdb/isession/ISession.java | 9 +
.../org/apache/iotdb/rpc/RedirectException.java | 15 ++
.../main/java/org/apache/iotdb/rpc/RpcUtils.java | 17 ++
.../java/org/apache/iotdb/session/Session.java | 216 +++++++++++++++-
.../apache/iotdb/session/SessionConnection.java | 4 +-
.../org/apache/iotdb/session/pool/SessionPool.java | 22 +-
.../apache/iotdb/session/pool/SessionWrapper.java | 8 +-
.../iotdb/session/SessionCacheLeaderTest.java | 271 +++++++++++++++++++--
.../iotdb/session/SessionConnectionTest.java | 2 +-
.../db/queryengine/plan/analyze/Analysis.java | 1 +
.../db/queryengine/plan/analyze/IAnalysis.java | 2 +
.../planner/plan/node/write/InsertTabletNode.java | 10 +-
.../node/write/RelationalInsertTabletNode.java | 43 ++++
.../plan/relational/analyzer/Analysis.java | 5 +
.../plan/relational/planner/TableModelPlanner.java | 40 ++-
18 files changed, 942 insertions(+), 45 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/pool/IoTDBInsertTableSessionPoolIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/pool/IoTDBInsertTableSessionPoolIT.java
new file mode 100644
index 00000000000..c4f95f846f2
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/pool/IoTDBInsertTableSessionPoolIT.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.session.pool;
+
+import org.apache.iotdb.isession.IPooledSession;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.isession.pool.ISessionPool;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.RowRecord;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.record.Tablet.ColumnType;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.nio.charset.Charset;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBInsertTableSessionPoolIT {
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvFactory.getEnv().initClusterEnvironment();
+ ISessionPool sessionPool = EnvFactory.getEnv().getSessionPool(1, "table");
+ try (final IPooledSession session = sessionPool.getPooledSession()) {
+ session.executeNonQueryStatement("create database if not exists test");
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testPartialInsertTablet() {
+ ISessionPool sessionPool = EnvFactory.getEnv().getSessionPool(1, "table");
+ try (final IPooledSession session = sessionPool.getPooledSession()) {
+ session.executeNonQueryStatement("use \"test\"");
+ session.executeNonQueryStatement("SET CONFIGURATION
enable_auto_create_schema='false'");
+ session.executeNonQueryStatement(
+ "create table sg6 (id1 string id, s1 int64 measurement, s2 int64
measurement)");
+ List<IMeasurementSchema> schemaList = new ArrayList<>();
+ schemaList.add(new MeasurementSchema("id1", TSDataType.STRING));
+ schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+ schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
+ schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
+ final List<ColumnType> columnTypes =
+ Arrays.asList(
+ ColumnType.ID,
+ ColumnType.MEASUREMENT,
+ ColumnType.MEASUREMENT,
+ ColumnType.MEASUREMENT);
+ Tablet tablet = new Tablet("sg6", schemaList, columnTypes, 300);
+ long timestamp = 0;
+ for (long row = 0; row < 100; row++) {
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, timestamp);
+ for (int s = 0; s < 4; s++) {
+ long value = timestamp;
+ if (s == 0) {
+ tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex,
"d1");
+ } else {
+ tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex,
value);
+ }
+ }
+ timestamp++;
+ }
+ timestamp = System.currentTimeMillis();
+ for (long row = 0; row < 100; row++) {
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, timestamp);
+ for (int s = 0; s < 4; s++) {
+ long value = timestamp;
+ if (s == 0) {
+ tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex,
"d1");
+ } else {
+ tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex,
value);
+ }
+ }
+ timestamp++;
+ }
+ try {
+ session.insertTablet(tablet);
+ } catch (Exception e) {
+ if (!e.getMessage().contains("507")) {
+ fail(e.getMessage());
+ }
+ } finally {
+ session.executeNonQueryStatement("SET CONFIGURATION
enable_auto_create_schema='false'");
+ }
+ try (SessionDataSet dataSet = session.executeQueryStatement("SELECT *
FROM sg6")) {
+ assertEquals(dataSet.getColumnNames().size(), 4);
+ assertEquals(dataSet.getColumnNames().get(0), "time");
+ assertEquals(dataSet.getColumnNames().get(1), "id1");
+ assertEquals(dataSet.getColumnNames().get(2), "s1");
+ assertEquals(dataSet.getColumnNames().get(3), "s2");
+ int cnt = 0;
+ while (dataSet.hasNext()) {
+ RowRecord rowRecord = dataSet.next();
+ long time = rowRecord.getFields().get(0).getLongV();
+ assertEquals(time, rowRecord.getFields().get(2).getLongV());
+ assertEquals(time, rowRecord.getFields().get(3).getLongV());
+ cnt++;
+ }
+ Assert.assertEquals(200, cnt);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testInsertKeyword() throws IoTDBConnectionException,
StatementExecutionException {
+ ISessionPool sessionPool = EnvFactory.getEnv().getSessionPool(1, "table");
+ try (final IPooledSession session = sessionPool.getPooledSession()) {
+ session.executeNonQueryStatement("USE \"test\"");
+ session.executeNonQueryStatement(
+ "create table table20 ("
+ + "device_id string id,"
+ + "attribute STRING ATTRIBUTE,"
+ + "boolean boolean MEASUREMENT,"
+ + "int32 int32 MEASUREMENT,"
+ + "int64 int64 MEASUREMENT,"
+ + "float float MEASUREMENT,"
+ + "double double MEASUREMENT,"
+ + "text text MEASUREMENT,"
+ + "string string MEASUREMENT,"
+ + "blob blob MEASUREMENT,"
+ + "timestamp01 timestamp MEASUREMENT,"
+ + "date date MEASUREMENT)");
+
+ List<IMeasurementSchema> schemas = new ArrayList<>();
+ schemas.add(new MeasurementSchema("device_id", TSDataType.STRING));
+ schemas.add(new MeasurementSchema("attribute", TSDataType.STRING));
+ schemas.add(new MeasurementSchema("boolean", TSDataType.BOOLEAN));
+ schemas.add(new MeasurementSchema("int32", TSDataType.INT32));
+ schemas.add(new MeasurementSchema("int64", TSDataType.INT64));
+ schemas.add(new MeasurementSchema("float", TSDataType.FLOAT));
+ schemas.add(new MeasurementSchema("double", TSDataType.DOUBLE));
+ schemas.add(new MeasurementSchema("text", TSDataType.TEXT));
+ schemas.add(new MeasurementSchema("string", TSDataType.STRING));
+ schemas.add(new MeasurementSchema("blob", TSDataType.BLOB));
+ schemas.add(new MeasurementSchema("timestamp", TSDataType.TIMESTAMP));
+ schemas.add(new MeasurementSchema("date", TSDataType.DATE));
+ final List<ColumnType> columnTypes =
+ Arrays.asList(
+ ColumnType.ID,
+ ColumnType.ATTRIBUTE,
+ ColumnType.MEASUREMENT,
+ ColumnType.MEASUREMENT,
+ ColumnType.MEASUREMENT,
+ ColumnType.MEASUREMENT,
+ ColumnType.MEASUREMENT,
+ ColumnType.MEASUREMENT,
+ ColumnType.MEASUREMENT,
+ ColumnType.MEASUREMENT,
+ ColumnType.MEASUREMENT,
+ ColumnType.MEASUREMENT);
+
+ long timestamp = 0;
+ Tablet tablet = new Tablet("table20", schemas, columnTypes, 10);
+
+ for (long row = 0; row < 10; row++) {
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, timestamp + row);
+ tablet.addValue("device_id", rowIndex, "1");
+ tablet.addValue("attribute", rowIndex, "1");
+ tablet.addValue("boolean", rowIndex, true);
+ tablet.addValue("int32", rowIndex, Integer.valueOf("1"));
+ tablet.addValue("int64", rowIndex, Long.valueOf("1"));
+ tablet.addValue("float", rowIndex, Float.valueOf("1.0"));
+ tablet.addValue("double", rowIndex, Double.valueOf("1.0"));
+ tablet.addValue("text", rowIndex, "true");
+ tablet.addValue("string", rowIndex, "true");
+ tablet.addValue("blob", rowIndex, new Binary("iotdb",
Charset.defaultCharset()));
+ tablet.addValue("timestamp", rowIndex, 1L);
+ tablet.addValue("date", rowIndex, LocalDate.parse("2024-08-15"));
+ }
+ session.insertTablet(tablet);
+
+ SessionDataSet rs1 =
+ session.executeQueryStatement(
+ "select time, device_id, attribute, boolean, int32, int64,
float, double, text, string, blob, timestamp, date from table20 order by time");
+ for (int i = 0; i < 10; i++) {
+ RowRecord rec = rs1.next();
+ assertEquals(i, rec.getFields().get(0).getLongV());
+ assertEquals("1", rec.getFields().get(1).getStringValue());
+ assertEquals("1", rec.getFields().get(2).getStringValue());
+ assertTrue(rec.getFields().get(3).getBoolV());
+ assertEquals(1, rec.getFields().get(4).getIntV());
+ assertEquals(1, rec.getFields().get(5).getLongV());
+ assertEquals(1.0, rec.getFields().get(6).getFloatV(), 0.001);
+ assertEquals(1.0, rec.getFields().get(7).getDoubleV(), 0.001);
+ assertEquals("true", rec.getFields().get(8).getStringValue());
+ assertEquals("true", rec.getFields().get(9).getStringValue());
+ assertEquals("0x696f746462", rec.getFields().get(10).getStringValue());
+ assertEquals(1, rec.getFields().get(11).getLongV());
+ assertEquals("20240815", rec.getFields().get(12).getStringValue());
+ }
+ assertFalse(rs1.hasNext());
+ }
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
index fc7c96daeff..d77b2c81907 100644
---
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
@@ -674,6 +674,82 @@ public class IoTDBSessionRelationalIT {
}
}
+ @Test
+ @Category({LocalStandaloneIT.class, ClusterIT.class})
+ public void insertRelationalTabletWithCacheLeaderTest()
+ throws IoTDBConnectionException, StatementExecutionException {
+ try (ISession session =
EnvFactory.getEnv().getSessionConnection(TABLE_SQL_DIALECT)) {
+ session.executeNonQueryStatement("USE \"db1\"");
+ session.executeNonQueryStatement(
+ "CREATE TABLE table5 (id1 string id, attr1 string attribute, "
+ + "m1 double "
+ + "measurement)");
+
+ List<IMeasurementSchema> schemaList = new ArrayList<>();
+ schemaList.add(new MeasurementSchema("id1", TSDataType.STRING));
+ schemaList.add(new MeasurementSchema("attr1", TSDataType.STRING));
+ schemaList.add(new MeasurementSchema("m1", TSDataType.DOUBLE));
+ final List<ColumnType> columnTypes =
+ Arrays.asList(ColumnType.ID, ColumnType.ATTRIBUTE,
ColumnType.MEASUREMENT);
+
+ long timestamp = 0;
+ Tablet tablet = new Tablet("table5", schemaList, columnTypes, 15);
+
+ for (long row = 0; row < 15; row++) {
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, timestamp + row);
+ tablet.addValue("id1", rowIndex, "id:" + row);
+ tablet.addValue("attr1", rowIndex, "attr:" + row);
+ tablet.addValue("m1", rowIndex, row * 1.0);
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ session.insertRelationalTablet(tablet, true);
+ tablet.reset();
+ }
+ }
+
+ if (tablet.rowSize != 0) {
+ session.insertRelationalTablet(tablet);
+ tablet.reset();
+ }
+
+ session.executeNonQueryStatement("FLush");
+
+ for (long row = 15; row < 30; row++) {
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, timestamp + row);
+ // cache leader should work for devices that have inserted before
+ tablet.addValue("id1", rowIndex, "id:" + (row - 15));
+ tablet.addValue("attr1", rowIndex, "attr:" + (row - 15));
+ tablet.addValue("m1", rowIndex, row * 1.0);
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ session.insertRelationalTablet(tablet, true);
+ tablet.reset();
+ }
+ }
+
+ if (tablet.rowSize != 0) {
+ session.insertRelationalTablet(tablet);
+ tablet.reset();
+ }
+
+ int cnt = 0;
+ SessionDataSet dataSet = session.executeQueryStatement("select * from
table5 order by time");
+ while (dataSet.hasNext()) {
+ RowRecord rowRecord = dataSet.next();
+ timestamp = rowRecord.getFields().get(0).getLongV();
+ assertEquals(
+ "id:" + (timestamp < 15 ? timestamp : timestamp - 15),
+ rowRecord.getFields().get(1).getBinaryV().toString());
+ assertEquals(
+ "attr:" + (timestamp < 15 ? timestamp : timestamp - 15),
+ rowRecord.getFields().get(2).getBinaryV().toString());
+ assertEquals(timestamp * 1.0,
rowRecord.getFields().get(3).getDoubleV(), 0.0001);
+ cnt++;
+ }
+ assertEquals(30, cnt);
+ }
+ }
+
@Test
@Category({LocalStandaloneIT.class, ClusterIT.class})
public void autoCreateTableTest() throws IoTDBConnectionException,
StatementExecutionException {
diff --git
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/IPooledSession.java
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/IPooledSession.java
index 04c25944a97..f8764f9a203 100644
---
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/IPooledSession.java
+++
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/IPooledSession.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
import org.apache.thrift.TException;
import org.apache.tsfile.write.record.Tablet;
+/** NOTICE: IPooledSession is specific to the table model. */
public interface IPooledSession extends AutoCloseable {
Version getVersion();
diff --git
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
index 7bc31731802..ed627da82ef 100644
---
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
+++
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
import org.apache.thrift.TException;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.write.record.Tablet;
@@ -65,6 +66,14 @@ public interface ISession extends AutoCloseable {
INodeSupplier nodeSupplier)
throws IoTDBConnectionException;
+ void open(
+ boolean enableRPCCompression,
+ int connectionTimeoutInMs,
+ Map<String, TEndPoint> deviceIdToEndpoint,
+ Map<IDeviceID, TEndPoint> tabletModelDeviceIdToEndpoint,
+ INodeSupplier nodeSupplier)
+ throws IoTDBConnectionException;
+
void close() throws IoTDBConnectionException;
String getTimeZone();
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
index 8da65e8249c..c7db354b529 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.rpc;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
public class RedirectException extends IOException {
@@ -29,17 +30,27 @@ public class RedirectException extends IOException {
private final TEndPoint endPoint;
private final Map<String, TEndPoint> deviceEndPointMap;
+ private final List<TEndPoint> endPointList;
public RedirectException(TEndPoint endPoint) {
super("later request in same group will be redirected to " +
endPoint.toString());
this.endPoint = endPoint;
this.deviceEndPointMap = null;
+ this.endPointList = null;
}
public RedirectException(Map<String, TEndPoint> deviceEndPointMap) {
super("later request in same group will be redirected to " +
deviceEndPointMap);
this.endPoint = null;
this.deviceEndPointMap = deviceEndPointMap;
+ this.endPointList = null;
+ }
+
+ public RedirectException(List<TEndPoint> endPointList) {
+ super("later request in same group will be redirected to " + endPointList);
+ this.endPoint = null;
+ this.deviceEndPointMap = null;
+ this.endPointList = endPointList;
}
public TEndPoint getEndPoint() {
@@ -49,4 +60,8 @@ public class RedirectException extends IOException {
public Map<String, TEndPoint> getDeviceEndPointMap() {
return deviceEndPointMap;
}
+
+ public List<TEndPoint> getEndPointList() {
+ return endPointList;
+ }
}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index 1a7bc9970f6..d41c1c8fd97 100644
--- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -36,6 +36,7 @@ import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -112,6 +113,22 @@ public class RpcUtils {
if (status.isSetRedirectNode()) {
throw new RedirectException(status.getRedirectNode());
}
+ if (status.isSetSubStatus()) { // the resp of insertRelationalTablet may
set subStatus
+ List<TSStatus> statusSubStatus = status.getSubStatus();
+ List<TEndPoint> endPointList = new ArrayList<>(statusSubStatus.size());
+ int count = 0;
+ for (TSStatus subStatus : statusSubStatus) {
+ if (subStatus.isSetRedirectNode()) {
+ endPointList.add(subStatus.getRedirectNode());
+ count++;
+ } else {
+ endPointList.add(null);
+ }
+ }
+ if (!endPointList.isEmpty() && count != 0) {
+ throw new RedirectException(endPointList);
+ }
+ }
}
public static void verifySuccessWithRedirectionForMultiDevices(
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
index 35229b54162..78ecd13242d 100644
--- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -63,6 +63,7 @@ import org.apache.iotdb.session.util.ThreadUtils;
import org.apache.thrift.TException;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.utils.Binary;
@@ -164,6 +165,9 @@ public class Session implements ISession {
@SuppressWarnings("squid:S3077") // Non-primitive fields should not be
"volatile"
protected volatile Map<String, TEndPoint> deviceIdToEndpoint;
+ @SuppressWarnings("squid:S3077") // Non-primitive fields should not be
"volatile"
+ protected volatile Map<IDeviceID, TEndPoint> tableModelDeviceIdToEndpoint;
+
@SuppressWarnings("squid:S3077") // Non-primitive fields should not be
"volatile"
protected volatile Map<TEndPoint, SessionConnection>
endPointToSessionConnection;
@@ -535,6 +539,7 @@ public class Session implements ISession {
isClosed = false;
if (enableRedirection || enableQueryRedirection) {
deviceIdToEndpoint = new ConcurrentHashMap<>();
+ tableModelDeviceIdToEndpoint = new ConcurrentHashMap<>();
endPointToSessionConnection = new ConcurrentHashMap<>();
endPointToSessionConnection.put(defaultEndPoint,
defaultSessionConnection);
}
@@ -584,6 +589,33 @@ public class Session implements ISession {
isClosed = false;
if (enableRedirection || enableQueryRedirection) {
this.deviceIdToEndpoint = deviceIdToEndpoint;
+ this.tableModelDeviceIdToEndpoint = new ConcurrentHashMap<>();
+ endPointToSessionConnection = new ConcurrentHashMap<>();
+ endPointToSessionConnection.put(defaultEndPoint,
defaultSessionConnection);
+ }
+ }
+
+ @Override
+ public synchronized void open(
+ boolean enableRPCCompression,
+ int connectionTimeoutInMs,
+ Map<String, TEndPoint> deviceIdToEndpoint,
+ Map<IDeviceID, TEndPoint> tableModelDeviceIdToEndpoint,
+ INodeSupplier nodesSupplier)
+ throws IoTDBConnectionException {
+ if (!isClosed) {
+ return;
+ }
+
+ this.availableNodes = nodesSupplier;
+ this.enableRPCCompression = enableRPCCompression;
+ this.connectionTimeoutInMs = connectionTimeoutInMs;
+ defaultSessionConnection = constructSessionConnection(this,
defaultEndPoint, zoneId);
+ defaultSessionConnection.setEnableRedirect(enableQueryRedirection);
+ isClosed = false;
+ if (enableRedirection || enableQueryRedirection) {
+ this.deviceIdToEndpoint = deviceIdToEndpoint;
+ this.tableModelDeviceIdToEndpoint = tableModelDeviceIdToEndpoint;
endPointToSessionConnection = new ConcurrentHashMap<>();
endPointToSessionConnection.put(defaultEndPoint,
defaultSessionConnection);
}
@@ -1321,6 +1353,18 @@ public class Session implements ISession {
}
}
+ private SessionConnection getSessionConnection(IDeviceID deviceId) {
+ TEndPoint endPoint;
+ if (enableRedirection
+ && tableModelDeviceIdToEndpoint != null
+ && (endPoint = tableModelDeviceIdToEndpoint.get(deviceId)) != null
+ && endPointToSessionConnection.containsKey(endPoint)) {
+ return endPointToSessionConnection.get(endPoint);
+ } else {
+ return defaultSessionConnection;
+ }
+ }
+
@Override
public String getTimestampPrecision() throws TException {
return
defaultSessionConnection.getClient().getProperties().getTimestampPrecision();
@@ -1343,8 +1387,7 @@ public class Session implements ISession {
}
}
}
-
- if (deviceIdToEndpoint != null) {
+ if (deviceIdToEndpoint != null && !deviceIdToEndpoint.isEmpty()) {
for (Iterator<Entry<String, TEndPoint>> it =
deviceIdToEndpoint.entrySet().iterator();
it.hasNext(); ) {
Entry<String, TEndPoint> entry = it.next();
@@ -1353,6 +1396,16 @@ public class Session implements ISession {
}
}
}
+ if (tableModelDeviceIdToEndpoint != null &&
!tableModelDeviceIdToEndpoint.isEmpty()) {
+ for (Iterator<Entry<IDeviceID, TEndPoint>> it =
+ tableModelDeviceIdToEndpoint.entrySet().iterator();
+ it.hasNext(); ) {
+ Entry<IDeviceID, TEndPoint> entry = it.next();
+ if (entry.getValue().equals(endPoint)) {
+ it.remove();
+ }
+ }
+ }
}
}
@@ -1362,7 +1415,6 @@ public class Session implements ISession {
if (endpoint.ip.equals("0.0.0.0")) {
return;
}
- AtomicReference<IoTDBConnectionException> exceptionReference = new
AtomicReference<>();
if (!deviceIdToEndpoint.containsKey(deviceId)
|| !deviceIdToEndpoint.get(deviceId).equals(endpoint)) {
deviceIdToEndpoint.put(deviceId, endpoint);
@@ -1374,7 +1426,6 @@ public class Session implements ISession {
try {
return constructSessionConnection(this, endpoint, zoneId);
} catch (IoTDBConnectionException ex) {
- exceptionReference.set(ex);
return null;
}
});
@@ -1384,6 +1435,32 @@ public class Session implements ISession {
}
}
+ private void handleRedirection(IDeviceID deviceId, TEndPoint endpoint) {
+ if (enableRedirection) {
+ // no need to redirection
+ if (endpoint.ip.equals("0.0.0.0")) {
+ return;
+ }
+ if (!tableModelDeviceIdToEndpoint.containsKey(deviceId)
+ || !tableModelDeviceIdToEndpoint.get(deviceId).equals(endpoint)) {
+ tableModelDeviceIdToEndpoint.put(deviceId, endpoint);
+ }
+ SessionConnection connection =
+ endPointToSessionConnection.computeIfAbsent(
+ endpoint,
+ k -> {
+ try {
+ return constructSessionConnection(this, endpoint, zoneId);
+ } catch (IoTDBConnectionException ex) {
+ return null;
+ }
+ });
+ if (connection == null) {
+ tableModelDeviceIdToEndpoint.remove(deviceId);
+ }
+ }
+ }
+
private void handleQueryRedirection(TEndPoint endPoint) throws
IoTDBConnectionException {
if (enableQueryRedirection) {
AtomicReference<IoTDBConnectionException> exceptionReference = new
AtomicReference<>();
@@ -2492,7 +2569,7 @@ public class Session implements ISession {
* <p>e.g. source: [1,2,3,4,5], index:[1,0,3,2,4], return : [2,1,4,3,5]
*
* @param source Input list
- * @param index retuen order
+ * @param index return order
* @param <T> Input type
* @return ordered list
*/
@@ -2679,11 +2756,20 @@ public class Session implements ISession {
@Override
public void insertRelationalTablet(Tablet tablet, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
- TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted, false);
- request.setWriteToTable(true);
- request.setColumnCategories(
- tablet.getColumnTypes().stream().map(t -> (byte)
t.ordinal()).collect(Collectors.toList()));
- insertTabletInternal(tablet, request);
+ if (enableRedirection) {
+ insertRelationalTabletWithLeaderCache(tablet);
+ } else {
+ TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted, false);
+ request.setWriteToTable(true);
+ request.setColumnCategories(
+ tablet.getColumnTypes().stream()
+ .map(t -> (byte) t.ordinal())
+ .collect(Collectors.toList()));
+ try {
+ defaultSessionConnection.insertTablet(request);
+ } catch (RedirectException ignored) {
+ }
+ }
}
/**
@@ -2697,6 +2783,113 @@ public class Session implements ISession {
insertRelationalTablet(tablet, false);
}
+ private void insertRelationalTabletWithLeaderCache(Tablet tablet)
+ throws IoTDBConnectionException, StatementExecutionException {
+ Map<SessionConnection, Tablet> relationalTabletGroup = new HashMap<>();
+ if (tableModelDeviceIdToEndpoint.isEmpty()) {
+ relationalTabletGroup.put(defaultSessionConnection, tablet);
+ } else {
+ for (int i = 0; i < tablet.rowSize; i++) {
+ IDeviceID iDeviceID = tablet.getDeviceID(i);
+ final SessionConnection connection = getSessionConnection(iDeviceID);
+ int finalI = i;
+ relationalTabletGroup.compute(
+ connection,
+ (k, v) -> {
+ if (v == null) {
+ v =
+ new Tablet(
+ tablet.getTableName(),
+ tablet.getSchemas(),
+ tablet.getColumnTypes(),
+ tablet.rowSize);
+ }
+ for (int j = 0; j < v.getSchemas().size(); j++) {
+ v.addValue(
+ v.getSchemas().get(j).getMeasurementId(),
+ v.rowSize,
+ tablet.getValue(finalI, j));
+ }
+ v.addTimestamp(v.rowSize, tablet.timestamps[finalI]);
+ v.rowSize++;
+ return v;
+ });
+ }
+ }
+ insertRelationalTabletByGroup(relationalTabletGroup);
+ }
+
+ @SuppressWarnings({
+ "squid:S3776"
+ }) // ignore Cognitive Complexity of methods should not be too high
+ private void insertRelationalTabletByGroup(Map<SessionConnection, Tablet>
relationalTabletGroup)
+ throws IoTDBConnectionException, StatementExecutionException {
+ List<CompletableFuture<Void>> completableFutures =
+ relationalTabletGroup.entrySet().stream()
+ .map(
+ entry -> {
+ SessionConnection connection = entry.getKey();
+ Tablet subTablet = entry.getValue();
+ return CompletableFuture.runAsync(
+ () -> {
+ TSInsertTabletReq request =
genTSInsertTabletReq(subTablet, false, false);
+ request.setWriteToTable(true);
+ request.setColumnCategories(
+ subTablet.getColumnTypes().stream()
+ .map(t -> (byte) t.ordinal())
+ .collect(Collectors.toList()));
+ InsertConsumer<TSInsertTabletReq> insertConsumer =
+ SessionConnection::insertTablet;
+ try {
+ insertConsumer.insert(connection, request);
+ } catch (RedirectException e) {
+ List<TEndPoint> endPointList = e.getEndPointList();
+ Map<IDeviceID, TEndPoint> endPointMap = new
HashMap<>();
+ for (int i = 0; i < endPointList.size(); i++) {
+ if (endPointList.get(i) != null) {
+ endPointMap.put(subTablet.getDeviceID(i),
endPointList.get(i));
+ }
+ }
+ endPointMap.forEach(this::handleRedirection);
+ } catch (StatementExecutionException e) {
+ throw new CompletionException(e);
+ } catch (IoTDBConnectionException e) {
+ // remove the broken session
+ removeBrokenSessionConnection(connection);
+ try {
+ insertConsumer.insert(defaultSessionConnection,
request);
+ } catch (IoTDBConnectionException |
StatementExecutionException ex) {
+ throw new CompletionException(ex);
+ } catch (RedirectException ignored) {
+ }
+ }
+ },
+ OPERATION_EXECUTOR);
+ })
+ .collect(Collectors.toList());
+
+ StringBuilder errMsgBuilder = new StringBuilder();
+ for (CompletableFuture<Void> completableFuture : completableFutures) {
+ try {
+ completableFuture.join();
+ } catch (CompletionException completionException) {
+ Throwable cause = completionException.getCause();
+ logger.error("Meet error when async insert!", cause);
+ if (cause instanceof IoTDBConnectionException) {
+ throw (IoTDBConnectionException) cause;
+ } else {
+ if (errMsgBuilder.length() > 0) {
+ errMsgBuilder.append(";");
+ }
+ errMsgBuilder.append(cause.getMessage());
+ }
+ }
+ }
+ if (errMsgBuilder.length() > 0) {
+ throw new StatementExecutionException(errMsgBuilder.toString());
+ }
+ }
+
/**
* insert the aligned timeseries data of a device. For each timestamp, the
number of measurements
* is the same.
@@ -3863,6 +4056,9 @@ public class Session implements ISession {
if (cause instanceof IoTDBConnectionException) {
throw (IoTDBConnectionException) cause;
} else {
+ if (errMsgBuilder.length() > 0) {
+ errMsgBuilder.append(";");
+ }
errMsgBuilder.append(cause.getMessage());
}
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 9a59e6dc50f..d9871d4b4e2 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -114,11 +114,11 @@ public class SessionConnection {
private int timeFactor = 1_000;
// TestOnly
- public SessionConnection() {
+ public SessionConnection(String sqlDialect) {
availableNodes = Collections::emptyList;
this.maxRetryCount = Math.max(0, SessionConfig.MAX_RETRY_COUNT);
this.retryIntervalInMs = Math.max(0, SessionConfig.RETRY_INTERVAL_IN_MS);
- this.sqlDialect = "tree";
+ this.sqlDialect = sqlDialect;
database = null;
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 5ef9f7616f6..3edfc396173 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.session.util.SessionUtils;
import org.apache.thrift.TException;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.write.record.Tablet;
@@ -127,6 +128,7 @@ public class SessionPool implements ISessionPool {
private boolean enableQueryRedirection = false;
private Map<String, TEndPoint> deviceIdToEndpoint;
+ private Map<IDeviceID, TEndPoint> tableModelDeviceIdToEndpoint;
private int thriftDefaultBufferSize;
private int thriftMaxFrameSize;
@@ -391,6 +393,7 @@ public class SessionPool implements ISessionPool {
this.enableRedirection = enableRedirection;
if (this.enableRedirection) {
deviceIdToEndpoint = new ConcurrentHashMap<>();
+ tableModelDeviceIdToEndpoint = new ConcurrentHashMap<>();
}
this.connectionTimeoutInMs = connectionTimeoutInMs;
this.version = version;
@@ -432,6 +435,7 @@ public class SessionPool implements ISessionPool {
this.enableRedirection = enableRedirection;
if (this.enableRedirection) {
deviceIdToEndpoint = new ConcurrentHashMap<>();
+ tableModelDeviceIdToEndpoint = new ConcurrentHashMap<>();
}
this.connectionTimeoutInMs = connectionTimeoutInMs;
this.version = version;
@@ -476,6 +480,7 @@ public class SessionPool implements ISessionPool {
this.enableRedirection = enableRedirection;
if (this.enableRedirection) {
deviceIdToEndpoint = new ConcurrentHashMap<>();
+ tableModelDeviceIdToEndpoint = new ConcurrentHashMap<>();
}
this.connectionTimeoutInMs = connectionTimeoutInMs;
this.version = version;
@@ -497,6 +502,7 @@ public class SessionPool implements ISessionPool {
this.enableRedirection = builder.enableRedirection;
if (this.enableRedirection) {
deviceIdToEndpoint = new ConcurrentHashMap<>();
+ tableModelDeviceIdToEndpoint = new ConcurrentHashMap<>();
}
this.enableRecordsAutoConvertTablet =
builder.enableRecordsAutoConvertTablet;
this.connectionTimeoutInMs = builder.connectionTimeoutInMs;
@@ -703,7 +709,13 @@ public class SessionPool implements ISessionPool {
session = constructNewSession();
try {
- session.open(enableCompression, connectionTimeoutInMs,
deviceIdToEndpoint, availableNodes);
+
+ session.open(
+ enableCompression,
+ connectionTimeoutInMs,
+ deviceIdToEndpoint,
+ tableModelDeviceIdToEndpoint,
+ availableNodes);
// avoid someone has called close() the session pool
synchronized (this) {
if (closed) {
@@ -813,7 +825,12 @@ public class SessionPool implements ISessionPool {
private void tryConstructNewSession() {
Session session = constructNewSession();
try {
- session.open(enableCompression, connectionTimeoutInMs,
deviceIdToEndpoint, availableNodes);
+ session.open(
+ enableCompression,
+ connectionTimeoutInMs,
+ deviceIdToEndpoint,
+ tableModelDeviceIdToEndpoint,
+ availableNodes);
// avoid someone has called close() the session pool
synchronized (this) {
if (closed) {
@@ -3440,6 +3457,7 @@ public class SessionPool implements ISessionPool {
this.enableRedirection = enableRedirection;
if (this.enableRedirection) {
deviceIdToEndpoint = new ConcurrentHashMap<>();
+ tableModelDeviceIdToEndpoint = new ConcurrentHashMap<>();
}
for (ISession session : queue) {
session.setEnableRedirection(enableRedirection);
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionWrapper.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionWrapper.java
index deb43f56e80..5a007921a89 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionWrapper.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionWrapper.java
@@ -37,9 +37,11 @@ import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
/**
- * used for SessionPool.getSession need to do some other things like calling
+ * NOTICE: SessionWrapper is specific to the table model.
+ *
+ * <p>used for SessionPool.getSession need to do some other things like calling
* cleanSessionAndMayThrowConnectionException in SessionPool while
encountering connection exception
- * only need to putBack to SessionPool while closing
+ * only need to putBack to SessionPool while closing.
*/
public class SessionWrapper implements IPooledSession {
@@ -144,7 +146,7 @@ public class SessionWrapper implements IPooledSession {
public void insertTablet(Tablet tablet)
throws StatementExecutionException, IoTDBConnectionException {
try {
- session.insertTablet(tablet);
+ session.insertRelationalTablet(tablet);
} catch (IoTDBConnectionException e) {
sessionPool.cleanSessionAndMayThrowConnectionException(session);
closed.set(true);
diff --git
a/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderTest.java
b/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderTest.java
index 6a7a9f83051..c83cf8c4ea4 100644
---
a/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderTest.java
+++
b/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderTest.java
@@ -33,7 +33,10 @@ import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.record.Tablet.ColumnType;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.Assert;
@@ -56,7 +59,7 @@ public class SessionCacheLeaderTest {
new ArrayList<TEndPoint>() {
{
add(new TEndPoint("127.0.0.1", 55560)); // default endpoint
- add(new TEndPoint("127.0.0.1", 55561)); // meta leader endpoint
+ add(new TEndPoint("127.0.0.1", 55561));
add(new TEndPoint("127.0.0.1", 55562));
add(new TEndPoint("127.0.0.1", 55563));
}
@@ -79,6 +82,20 @@ public class SessionCacheLeaderTest {
return endpoints.get(deviceId.hashCode() % endpoints.size());
}
+ public static TEndPoint getDeviceIdBelongedEndpoint(IDeviceID deviceId) {
+ if (deviceId.equals(new StringArrayDeviceID("table1", "id0"))) {
+ return endpoints.get(0);
+ } else if (deviceId.equals(new StringArrayDeviceID("table1", "id1"))) {
+ return endpoints.get(1);
+ } else if (deviceId.equals(new StringArrayDeviceID("table1", "id2"))) {
+ return endpoints.get(2);
+ } else if (deviceId.equals(new StringArrayDeviceID("table1", "id3"))) {
+ return endpoints.get(3);
+ }
+
+ return endpoints.get(deviceId.hashCode() % endpoints.size());
+ }
+
@Test
public void testInsertRecord() throws IoTDBConnectionException,
StatementExecutionException {
// without leader cache
@@ -607,6 +624,83 @@ public class SessionCacheLeaderTest {
session.close();
}
+ @Test
+ public void testInsertRelationalTablet()
+ throws IoTDBConnectionException, StatementExecutionException {
+ // without leader cache
+ session = new MockSession("127.0.0.1", 55560, false, "table");
+ session.open();
+ assertNull(session.tableModelDeviceIdToEndpoint);
+ assertNull(session.endPointToSessionConnection);
+
+ String tableName = "table1";
+ List<IMeasurementSchema> schemaList = new ArrayList<>();
+ List<ColumnType> columnTypeList = new ArrayList<>();
+ schemaList.add(new MeasurementSchema("id", TSDataType.STRING));
+ schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+ schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
+ columnTypeList.add(ColumnType.ID);
+ columnTypeList.add(ColumnType.MEASUREMENT);
+ columnTypeList.add(ColumnType.MEASUREMENT);
+ Tablet tablet = new Tablet(tableName, schemaList, columnTypeList, 50);
+ long timestamp = System.currentTimeMillis();
+ for (long row = 0; row < 100; row++) {
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, timestamp);
+ tablet.addValue(schemaList.get(0).getMeasurementId(), rowIndex, "id" +
(rowIndex % 4));
+ for (int s = 1; s < 3; s++) {
+ long value = new Random().nextLong();
+ tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
+ }
+
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ session.insertRelationalTablet(tablet);
+ tablet.reset();
+ }
+ timestamp++;
+ }
+
+ if (tablet.rowSize != 0) {
+ session.insertRelationalTablet(tablet);
+ tablet.reset();
+ }
+
+ assertNull(session.tableModelDeviceIdToEndpoint);
+ assertNull(session.endPointToSessionConnection);
+ session.close();
+
+ // with leader cache
+ session = new MockSession("127.0.0.1", 55560, true, "table");
+ session.open();
+ assertEquals(0, session.tableModelDeviceIdToEndpoint.size());
+ assertEquals(1, session.endPointToSessionConnection.size());
+
+ for (long row = 0; row < 100; row++) {
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, timestamp);
+ tablet.addValue(schemaList.get(0).getMeasurementId(), rowIndex, "id" +
(rowIndex % 4));
+ for (int s = 1; s < 3; s++) {
+ long value = new Random().nextLong();
+ tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
+ }
+
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ session.insertRelationalTablet(tablet, true);
+ tablet.reset();
+ }
+ timestamp++;
+ }
+
+ if (tablet.rowSize != 0) {
+ session.insertRelationalTablet(tablet);
+ tablet.reset();
+ }
+
+ assertEquals(4, session.tableModelDeviceIdToEndpoint.size());
+ assertEquals(4, session.endPointToSessionConnection.size());
+ session.close();
+ }
+
@Test
public void testInsertRecordsWithSessionBroken() throws
StatementExecutionException {
// without leader cache
@@ -658,6 +752,7 @@ public class SessionCacheLeaderTest {
if (time != 0 && time % 100 == 0) {
try {
session.insertRecords(deviceIds, timestamps, measurementsList,
typesList, valuesList);
+ Assert.fail();
} catch (IoTDBConnectionException e) {
Assert.assertEquals(
"the session connection = TEndPoint(ip:127.0.0.1, port:55560) is
broken",
@@ -672,6 +767,7 @@ public class SessionCacheLeaderTest {
try {
session.insertRecords(deviceIds, timestamps, measurementsList,
typesList, valuesList);
+ fail();
} catch (IoTDBConnectionException e) {
Assert.assertEquals(
"the session connection = TEndPoint(ip:127.0.0.1, port:55560) is
broken", e.getMessage());
@@ -686,8 +782,7 @@ public class SessionCacheLeaderTest {
try {
session.close();
} catch (IoTDBConnectionException e) {
- Assert.assertEquals(
- "the session connection = TEndPoint(ip:127.0.0.1, port:55560) is
broken", e.getMessage());
+ Assert.fail(e.getMessage());
}
// with leader cache
@@ -737,8 +832,7 @@ public class SessionCacheLeaderTest {
try {
session.insertRecords(deviceIds, timestamps, measurementsList,
typesList, valuesList);
} catch (IoTDBConnectionException e) {
- Assert.assertEquals(
- "the session connection = TEndPoint(ip:127.0.0.1, port:55561) is
broken", e.getMessage());
+ Assert.fail(e.getMessage());
}
assertEquals(3, session.deviceIdToEndpoint.size());
for (Map.Entry<String, TEndPoint> endPointMap :
session.deviceIdToEndpoint.entrySet()) {
@@ -806,6 +900,7 @@ public class SessionCacheLeaderTest {
if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
try {
session.insertTablets(tabletMap, true);
+ fail();
} catch (IoTDBConnectionException e) {
assertEquals(
"the session connection = TEndPoint(ip:127.0.0.1, port:55560) is
broken",
@@ -818,18 +913,6 @@ public class SessionCacheLeaderTest {
timestamp++;
}
- if (tablet1.rowSize != 0) {
- try {
- session.insertTablets(tabletMap, true);
- } catch (IoTDBConnectionException e) {
- Assert.fail(e.getMessage());
- }
-
- tablet1.reset();
- tablet2.reset();
- tablet3.reset();
- }
-
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
try {
@@ -912,8 +995,7 @@ public class SessionCacheLeaderTest {
try {
session.insertTablets(tabletMap, true);
} catch (IoTDBConnectionException e) {
- Assert.assertEquals(
- "the session connection = TEndPoint(ip:127.0.0.1, port:55562) is
broken", e.getMessage());
+ Assert.fail(e.getMessage());
}
tablet1.reset();
tablet2.reset();
@@ -931,6 +1013,136 @@ public class SessionCacheLeaderTest {
}
}
+ @Test
+ public void testInsertRelationalTabletWithSessionBroken() throws
StatementExecutionException {
+ // without leader cache
+ session = new MockSession("127.0.0.1", 55560, false, "table");
+ try {
+ session.open();
+ } catch (IoTDBConnectionException e) {
+ Assert.fail(e.getMessage());
+ }
+ assertNull(session.tableModelDeviceIdToEndpoint);
+ assertNull(session.endPointToSessionConnection);
+
+ // set the session connection as broken
+ ((MockSession)
session).getLastConstructedSessionConnection().setConnectionBroken(true);
+
+ String tableName = "table1";
+ List<IMeasurementSchema> schemaList = new ArrayList<>();
+ List<ColumnType> columnTypeList = new ArrayList<>();
+ schemaList.add(new MeasurementSchema("id", TSDataType.STRING));
+ schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+ schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
+ columnTypeList.add(ColumnType.ID);
+ columnTypeList.add(ColumnType.MEASUREMENT);
+ columnTypeList.add(ColumnType.MEASUREMENT);
+ Tablet tablet = new Tablet(tableName, schemaList, columnTypeList, 50);
+ long timestamp = System.currentTimeMillis();
+ for (long row = 0; row < 100; row++) {
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, timestamp);
+ tablet.addValue(schemaList.get(0).getMeasurementId(), rowIndex, "id" +
(rowIndex % 4));
+ for (int s = 1; s < 3; s++) {
+ long value = new Random().nextLong();
+ tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
+ }
+
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ try {
+ session.insertRelationalTablet(tablet);
+ fail();
+ } catch (IoTDBConnectionException e) {
+ assertEquals(
+ "the session connection = TEndPoint(ip:127.0.0.1, port:55560) is
broken",
+ e.getMessage());
+ }
+ tablet.reset();
+ }
+ timestamp++;
+ }
+
+ assertNull(session.tableModelDeviceIdToEndpoint);
+ assertNull(session.endPointToSessionConnection);
+ try {
+ session.close();
+ } catch (IoTDBConnectionException e) {
+ Assert.fail(e.getMessage());
+ }
+
+ // with leader cache
+ // rest the session connection
+ session = new MockSession("127.0.0.1", 55560, true, "table");
+ try {
+ session.open();
+ } catch (IoTDBConnectionException e) {
+ Assert.fail(e.getMessage());
+ }
+ assertEquals(0, session.tableModelDeviceIdToEndpoint.size());
+ assertEquals(1, session.endPointToSessionConnection.size());
+
+ for (long row = 0; row < 100; row++) {
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, timestamp);
+ tablet.addValue(schemaList.get(0).getMeasurementId(), rowIndex, "id" +
(rowIndex % 4));
+ for (int s = 1; s < 3; s++) {
+ long value = new Random().nextLong();
+ tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
+ }
+
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ try {
+ session.insertRelationalTablet(tablet);
+ } catch (IoTDBConnectionException e) {
+ Assert.fail(e.getMessage());
+ }
+ tablet.reset();
+ }
+ timestamp++;
+ }
+
+ // set the session connection as broken
+ ((MockSession)
session).getLastConstructedSessionConnection().setConnectionBroken(true);
+ // set connection as broken, due to we enable the cache leader, when we
called
+ // ((MockSession) session).getLastConstructedSessionConnection(), the
session's endpoint has
+ // been changed to EndPoint(ip:127.0.0.1, port:55562)
+ Assert.assertEquals(
+ "MockSessionConnection{ endPoint=TEndPoint(ip:127.0.0.1, port:55562)}",
+ ((MockSession)
session).getLastConstructedSessionConnection().toString());
+
+ for (long row = 0; row < 100; row++) {
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, timestamp);
+ tablet.addValue(schemaList.get(0).getMeasurementId(), rowIndex, "id" +
(rowIndex % 4));
+ for (int s = 1; s < 3; s++) {
+ long value = new Random().nextLong();
+ tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
+ }
+
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ try {
+ session.insertRelationalTablet(tablet);
+ } catch (IoTDBConnectionException e) {
+ Assert.fail(e.getMessage());
+ }
+ tablet.reset();
+ }
+ timestamp++;
+ }
+
+ assertEquals(3, session.tableModelDeviceIdToEndpoint.size());
+ for (Map.Entry<IDeviceID, TEndPoint> endPointEntry :
+ session.tableModelDeviceIdToEndpoint.entrySet()) {
+ assertEquals(getDeviceIdBelongedEndpoint(endPointEntry.getKey()),
endPointEntry.getValue());
+ }
+ assertEquals(3, session.endPointToSessionConnection.size());
+ try {
+ session.close();
+ } catch (IoTDBConnectionException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
private void addLine(
List<Long> times,
List<List<String>> measurements,
@@ -977,6 +1189,12 @@ public class SessionCacheLeaderTest {
this.enableAutoFetch = false;
}
+ public MockSession(String host, int rpcPort, boolean enableRedirection,
String sqlDialect) {
+ this(host, rpcPort, enableRedirection);
+ this.sqlDialect = sqlDialect;
+ this.enableAutoFetch = false;
+ }
+
@Override
public SessionConnection constructSessionConnection(
Session session, TEndPoint endpoint, ZoneId zoneId) {
@@ -996,7 +1214,7 @@ public class SessionCacheLeaderTest {
private IoTDBConnectionException ioTDBConnectionException;
public MockSessionConnection(Session session, TEndPoint endPoint, ZoneId
zoneId) {
- super();
+ super(session.sqlDialect);
this.endPoint = endPoint;
ioTDBConnectionException =
new IoTDBConnectionException(
@@ -1057,7 +1275,18 @@ public class SessionCacheLeaderTest {
if (isConnectionBroken()) {
throw ioTDBConnectionException;
}
- throw new
RedirectException(getDeviceIdBelongedEndpoint(request.prefixPath));
+ if (request.writeToTable) {
+ if (request.size >= 50) {
+ // multi devices
+ List<TEndPoint> endPoints = new ArrayList<>();
+ for (int i = 0; i < request.size; i++) {
+ endPoints.add(endpoints.get(i % 4));
+ }
+ throw new RedirectException(endPoints);
+ }
+ } else {
+ throw new
RedirectException(getDeviceIdBelongedEndpoint(request.prefixPath));
+ }
}
@Override
diff --git
a/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionConnectionTest.java
b/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionConnectionTest.java
index 0785bc800d9..a10d9b1d6f4 100644
---
a/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionConnectionTest.java
+++
b/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionConnectionTest.java
@@ -84,7 +84,7 @@ public class SessionConnectionTest {
@Before
public void setUp() throws IoTDBConnectionException,
StatementExecutionException, TException {
MockitoAnnotations.initMocks(this);
- sessionConnection = new SessionConnection();
+ sessionConnection = new SessionConnection("tree");
Whitebox.setInternalState(sessionConnection, "transport", transport);
Whitebox.setInternalState(sessionConnection, "client", client);
session =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
index 11e8e42ec83..1f8905cc666 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
@@ -414,6 +414,7 @@ public class Analysis implements IAnalysis {
this.schemaTree = schemaTree;
}
+ @Override
public List<TEndPoint> getRedirectNodeList() {
return redirectNodeList;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
index 05c3638a95d..e03f75db230 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
@@ -66,6 +66,8 @@ public interface IAnalysis {
DataPartition getDataPartitionInfo();
+ List<TEndPoint> getRedirectNodeList();
+
void setRedirectNodeList(List<TEndPoint> redirectNodeList);
void addEndPointToRedirectNodeList(TEndPoint endPoint);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index d40d1c50b84..3a2027d1927 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -248,7 +248,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
return deviceIDSplitInfoMap;
}
- private Map<TRegionReplicaSet, List<Integer>> splitByReplicaSet(
+ protected Map<TRegionReplicaSet, List<Integer>> splitByReplicaSet(
Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap, IAnalysis
analysis) {
Map<TRegionReplicaSet, List<Integer>> splitMap = new HashMap<>();
@@ -1189,12 +1189,12 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
return deviceID;
}
- private static class PartitionSplitInfo {
+ protected static class PartitionSplitInfo {
// for each List in split, they are range1.start, range1.end,
range2.start, range2.end, ...
- private List<Integer> ranges = new ArrayList<>();
- private List<TTimePartitionSlot> timePartitionSlots = new ArrayList<>();
- private List<TRegionReplicaSet> replicaSets;
+ List<Integer> ranges = new ArrayList<>();
+ List<TTimePartitionSlot> timePartitionSlots = new ArrayList<>();
+ List<TRegionReplicaSet> replicaSets;
}
/**
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
index c2129f8d8d7..b6b97457cff 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
@@ -19,10 +19,13 @@
package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.db.exception.query.OutOfTTLException;
+import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
@@ -41,7 +44,9 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.function.IntToLongFunction;
public class RelationalInsertTabletNode extends InsertTabletNode {
@@ -139,6 +144,44 @@ public class RelationalInsertTabletNode extends
InsertTabletNode {
columnCategories);
}
+ protected Map<TRegionReplicaSet, List<Integer>> splitByReplicaSet(
+ Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap, IAnalysis
analysis) {
+ Map<TRegionReplicaSet, List<Integer>> splitMap = new HashMap<>();
+ Map<IDeviceID, TEndPoint> endPointMap = new HashMap<>();
+
+ for (Map.Entry<IDeviceID, PartitionSplitInfo> entry :
deviceIDSplitInfoMap.entrySet()) {
+ final IDeviceID deviceID = entry.getKey();
+ final PartitionSplitInfo splitInfo = entry.getValue();
+ final List<TRegionReplicaSet> replicaSets =
+ analysis
+ .getDataPartitionInfo()
+ .getDataRegionReplicaSetForWriting(
+ deviceID, splitInfo.timePartitionSlots,
analysis.getDatabaseName());
+ splitInfo.replicaSets = replicaSets;
+ // collect redirectInfo
+ endPointMap.put(
+ deviceID,
+ replicaSets
+ .get(replicaSets.size() - 1)
+ .getDataNodeLocations()
+ .get(0)
+ .getClientRpcEndPoint());
+ for (int i = 0; i < replicaSets.size(); i++) {
+ List<Integer> subRanges =
+ splitMap.computeIfAbsent(replicaSets.get(i), x -> new
ArrayList<>());
+ subRanges.add(splitInfo.ranges.get(2 * i));
+ subRanges.add(splitInfo.ranges.get(2 * i + 1));
+ }
+ }
+ List<TEndPoint> redirectNodeList = new ArrayList<>(times.length);
+ for (int i = 0; i < times.length; i++) {
+ IDeviceID deviceId = getDeviceID(i);
+ redirectNodeList.add(endPointMap.get(deviceId));
+ }
+ analysis.setRedirectNodeList(redirectNodeList);
+ return splitMap;
+ }
+
public static RelationalInsertTabletNode deserialize(ByteBuffer byteBuffer) {
RelationalInsertTabletNode insertNode = new RelationalInsertTabletNode(new
PlanNodeId(""));
insertNode.subDeserialize(byteBuffer);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
index a9ddeeea37e..3ae791bd496 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
@@ -754,6 +754,11 @@ public class Analysis implements IAnalysis {
}
}
+ @Override
+ public List<TEndPoint> getRedirectNodeList() {
+ return redirectNodeList;
+ }
+
@Override
public void setRedirectNodeList(List<TEndPoint> redirectNodeList) {
this.redirectNodeList = redirectNodeList;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
index e750e2d95a8..1b90384efc3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
@@ -38,12 +38,18 @@ import
org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableDistributedPlanner;
import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
+import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WrappedInsertStatement;
import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
import org.apache.iotdb.db.queryengine.plan.scheduler.ClusterScheduler;
import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
@@ -149,7 +155,39 @@ public class TableModelPlanner implements IPlanner {
@Override
public void setRedirectInfo(
- IAnalysis analysis, TEndPoint localEndPoint, TSStatus tsstatus,
TSStatusCode statusCode) {}
+ IAnalysis iAnalysis, TEndPoint localEndPoint, TSStatus tsstatus,
TSStatusCode statusCode) {
+ Analysis analysis = (Analysis) iAnalysis;
+ if (!(analysis.getStatement() instanceof WrappedInsertStatement)) {
+ return;
+ }
+ InsertBaseStatement insertStatement =
+ ((WrappedInsertStatement)
analysis.getStatement()).getInnerTreeStatement();
+
+ if (!analysis.isFinishQueryAfterAnalyze()) {
+ // Table Model Session only supports insertTablet
+ if (insertStatement instanceof InsertTabletStatement) {
+ if (statusCode == TSStatusCode.SUCCESS_STATUS) {
+ boolean needRedirect = false;
+ List<TEndPoint> redirectNodeList = analysis.getRedirectNodeList();
+ List<TSStatus> subStatus = new ArrayList<>(redirectNodeList.size());
+ for (TEndPoint endPoint : redirectNodeList) {
+ // redirect writing only if the redirectEndPoint is not the
current node
+ if (!localEndPoint.equals(endPoint)) {
+ subStatus.add(
+
RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS).setRedirectNode(endPoint));
+ needRedirect = true;
+ } else {
+ subStatus.add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+ }
+ }
+ if (needRedirect) {
+
tsstatus.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode());
+ tsstatus.setSubStatus(subStatus);
+ }
+ }
+ }
+ }
+ }
private static class NopAccessControl implements AccessControl {}
}