This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 43d07b52a25 Support table model insert redirection (#13550)
43d07b52a25 is described below

commit 43d07b52a25f7d111a699c60eb1129b8cfbff1fe
Author: Haonan <[email protected]>
AuthorDate: Thu Sep 26 19:16:17 2024 +0800

    Support table model insert redirection (#13550)
    
    * dev_server_side
    
    * dev session
    
    * fix UT&IT
    
    * fix UT
    
    * fix IT
    
    * change back
    
    * fix table mode session pool issue and add IT
    
    * fix IT bug
    
    * fix split tablet error and add IT
    
    * optimize first insert profermance
    
    * add UT
    
    * fix remove broken connection
    
    * fix review
---
 .../pool/IoTDBInsertTableSessionPoolIT.java        | 245 +++++++++++++++++++
 .../iotdb/session/it/IoTDBSessionRelationalIT.java |  76 ++++++
 .../org/apache/iotdb/isession/IPooledSession.java  |   1 +
 .../java/org/apache/iotdb/isession/ISession.java   |   9 +
 .../org/apache/iotdb/rpc/RedirectException.java    |  15 ++
 .../main/java/org/apache/iotdb/rpc/RpcUtils.java   |  17 ++
 .../java/org/apache/iotdb/session/Session.java     | 216 +++++++++++++++-
 .../apache/iotdb/session/SessionConnection.java    |   4 +-
 .../org/apache/iotdb/session/pool/SessionPool.java |  22 +-
 .../apache/iotdb/session/pool/SessionWrapper.java  |   8 +-
 .../iotdb/session/SessionCacheLeaderTest.java      | 271 +++++++++++++++++++--
 .../iotdb/session/SessionConnectionTest.java       |   2 +-
 .../db/queryengine/plan/analyze/Analysis.java      |   1 +
 .../db/queryengine/plan/analyze/IAnalysis.java     |   2 +
 .../planner/plan/node/write/InsertTabletNode.java  |  10 +-
 .../node/write/RelationalInsertTabletNode.java     |  43 ++++
 .../plan/relational/analyzer/Analysis.java         |   5 +
 .../plan/relational/planner/TableModelPlanner.java |  40 ++-
 18 files changed, 942 insertions(+), 45 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/pool/IoTDBInsertTableSessionPoolIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/pool/IoTDBInsertTableSessionPoolIT.java
new file mode 100644
index 00000000000..c4f95f846f2
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/pool/IoTDBInsertTableSessionPoolIT.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.session.pool;
+
+import org.apache.iotdb.isession.IPooledSession;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.isession.pool.ISessionPool;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.RowRecord;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.record.Tablet.ColumnType;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.nio.charset.Charset;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBInsertTableSessionPoolIT {
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv().initClusterEnvironment();
+    ISessionPool sessionPool = EnvFactory.getEnv().getSessionPool(1, "table");
+    try (final IPooledSession session = sessionPool.getPooledSession()) {
+      session.executeNonQueryStatement("create database if not exists test");
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testPartialInsertTablet() {
+    ISessionPool sessionPool = EnvFactory.getEnv().getSessionPool(1, "table");
+    try (final IPooledSession session = sessionPool.getPooledSession()) {
+      session.executeNonQueryStatement("use \"test\"");
+      session.executeNonQueryStatement("SET CONFIGURATION 
enable_auto_create_schema='false'");
+      session.executeNonQueryStatement(
+          "create table sg6 (id1 string id, s1 int64 measurement, s2 int64 
measurement)");
+      List<IMeasurementSchema> schemaList = new ArrayList<>();
+      schemaList.add(new MeasurementSchema("id1", TSDataType.STRING));
+      schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+      schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
+      schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
+      final List<ColumnType> columnTypes =
+          Arrays.asList(
+              ColumnType.ID,
+              ColumnType.MEASUREMENT,
+              ColumnType.MEASUREMENT,
+              ColumnType.MEASUREMENT);
+      Tablet tablet = new Tablet("sg6", schemaList, columnTypes, 300);
+      long timestamp = 0;
+      for (long row = 0; row < 100; row++) {
+        int rowIndex = tablet.rowSize++;
+        tablet.addTimestamp(rowIndex, timestamp);
+        for (int s = 0; s < 4; s++) {
+          long value = timestamp;
+          if (s == 0) {
+            tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, 
"d1");
+          } else {
+            tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, 
value);
+          }
+        }
+        timestamp++;
+      }
+      timestamp = System.currentTimeMillis();
+      for (long row = 0; row < 100; row++) {
+        int rowIndex = tablet.rowSize++;
+        tablet.addTimestamp(rowIndex, timestamp);
+        for (int s = 0; s < 4; s++) {
+          long value = timestamp;
+          if (s == 0) {
+            tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, 
"d1");
+          } else {
+            tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, 
value);
+          }
+        }
+        timestamp++;
+      }
+      try {
+        session.insertTablet(tablet);
+      } catch (Exception e) {
+        if (!e.getMessage().contains("507")) {
+          fail(e.getMessage());
+        }
+      } finally {
+        session.executeNonQueryStatement("SET CONFIGURATION 
enable_auto_create_schema='false'");
+      }
+      try (SessionDataSet dataSet = session.executeQueryStatement("SELECT * 
FROM sg6")) {
+        assertEquals(dataSet.getColumnNames().size(), 4);
+        assertEquals(dataSet.getColumnNames().get(0), "time");
+        assertEquals(dataSet.getColumnNames().get(1), "id1");
+        assertEquals(dataSet.getColumnNames().get(2), "s1");
+        assertEquals(dataSet.getColumnNames().get(3), "s2");
+        int cnt = 0;
+        while (dataSet.hasNext()) {
+          RowRecord rowRecord = dataSet.next();
+          long time = rowRecord.getFields().get(0).getLongV();
+          assertEquals(time, rowRecord.getFields().get(2).getLongV());
+          assertEquals(time, rowRecord.getFields().get(3).getLongV());
+          cnt++;
+        }
+        Assert.assertEquals(200, cnt);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testInsertKeyword() throws IoTDBConnectionException, 
StatementExecutionException {
+    ISessionPool sessionPool = EnvFactory.getEnv().getSessionPool(1, "table");
+    try (final IPooledSession session = sessionPool.getPooledSession()) {
+      session.executeNonQueryStatement("USE \"test\"");
+      session.executeNonQueryStatement(
+          "create table table20 ("
+              + "device_id string id,"
+              + "attribute STRING ATTRIBUTE,"
+              + "boolean boolean MEASUREMENT,"
+              + "int32 int32 MEASUREMENT,"
+              + "int64 int64 MEASUREMENT,"
+              + "float float MEASUREMENT,"
+              + "double double MEASUREMENT,"
+              + "text text MEASUREMENT,"
+              + "string string MEASUREMENT,"
+              + "blob blob MEASUREMENT,"
+              + "timestamp01 timestamp MEASUREMENT,"
+              + "date date MEASUREMENT)");
+
+      List<IMeasurementSchema> schemas = new ArrayList<>();
+      schemas.add(new MeasurementSchema("device_id", TSDataType.STRING));
+      schemas.add(new MeasurementSchema("attribute", TSDataType.STRING));
+      schemas.add(new MeasurementSchema("boolean", TSDataType.BOOLEAN));
+      schemas.add(new MeasurementSchema("int32", TSDataType.INT32));
+      schemas.add(new MeasurementSchema("int64", TSDataType.INT64));
+      schemas.add(new MeasurementSchema("float", TSDataType.FLOAT));
+      schemas.add(new MeasurementSchema("double", TSDataType.DOUBLE));
+      schemas.add(new MeasurementSchema("text", TSDataType.TEXT));
+      schemas.add(new MeasurementSchema("string", TSDataType.STRING));
+      schemas.add(new MeasurementSchema("blob", TSDataType.BLOB));
+      schemas.add(new MeasurementSchema("timestamp", TSDataType.TIMESTAMP));
+      schemas.add(new MeasurementSchema("date", TSDataType.DATE));
+      final List<ColumnType> columnTypes =
+          Arrays.asList(
+              ColumnType.ID,
+              ColumnType.ATTRIBUTE,
+              ColumnType.MEASUREMENT,
+              ColumnType.MEASUREMENT,
+              ColumnType.MEASUREMENT,
+              ColumnType.MEASUREMENT,
+              ColumnType.MEASUREMENT,
+              ColumnType.MEASUREMENT,
+              ColumnType.MEASUREMENT,
+              ColumnType.MEASUREMENT,
+              ColumnType.MEASUREMENT,
+              ColumnType.MEASUREMENT);
+
+      long timestamp = 0;
+      Tablet tablet = new Tablet("table20", schemas, columnTypes, 10);
+
+      for (long row = 0; row < 10; row++) {
+        int rowIndex = tablet.rowSize++;
+        tablet.addTimestamp(rowIndex, timestamp + row);
+        tablet.addValue("device_id", rowIndex, "1");
+        tablet.addValue("attribute", rowIndex, "1");
+        tablet.addValue("boolean", rowIndex, true);
+        tablet.addValue("int32", rowIndex, Integer.valueOf("1"));
+        tablet.addValue("int64", rowIndex, Long.valueOf("1"));
+        tablet.addValue("float", rowIndex, Float.valueOf("1.0"));
+        tablet.addValue("double", rowIndex, Double.valueOf("1.0"));
+        tablet.addValue("text", rowIndex, "true");
+        tablet.addValue("string", rowIndex, "true");
+        tablet.addValue("blob", rowIndex, new Binary("iotdb", 
Charset.defaultCharset()));
+        tablet.addValue("timestamp", rowIndex, 1L);
+        tablet.addValue("date", rowIndex, LocalDate.parse("2024-08-15"));
+      }
+      session.insertTablet(tablet);
+
+      SessionDataSet rs1 =
+          session.executeQueryStatement(
+              "select time, device_id, attribute, boolean, int32, int64, 
float, double, text, string, blob, timestamp, date from table20 order by time");
+      for (int i = 0; i < 10; i++) {
+        RowRecord rec = rs1.next();
+        assertEquals(i, rec.getFields().get(0).getLongV());
+        assertEquals("1", rec.getFields().get(1).getStringValue());
+        assertEquals("1", rec.getFields().get(2).getStringValue());
+        assertTrue(rec.getFields().get(3).getBoolV());
+        assertEquals(1, rec.getFields().get(4).getIntV());
+        assertEquals(1, rec.getFields().get(5).getLongV());
+        assertEquals(1.0, rec.getFields().get(6).getFloatV(), 0.001);
+        assertEquals(1.0, rec.getFields().get(7).getDoubleV(), 0.001);
+        assertEquals("true", rec.getFields().get(8).getStringValue());
+        assertEquals("true", rec.getFields().get(9).getStringValue());
+        assertEquals("0x696f746462", rec.getFields().get(10).getStringValue());
+        assertEquals(1, rec.getFields().get(11).getLongV());
+        assertEquals("20240815", rec.getFields().get(12).getStringValue());
+      }
+      assertFalse(rs1.hasNext());
+    }
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
index fc7c96daeff..d77b2c81907 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
@@ -674,6 +674,82 @@ public class IoTDBSessionRelationalIT {
     }
   }
 
+  @Test
+  @Category({LocalStandaloneIT.class, ClusterIT.class})
+  public void insertRelationalTabletWithCacheLeaderTest()
+      throws IoTDBConnectionException, StatementExecutionException {
+    try (ISession session = 
EnvFactory.getEnv().getSessionConnection(TABLE_SQL_DIALECT)) {
+      session.executeNonQueryStatement("USE \"db1\"");
+      session.executeNonQueryStatement(
+          "CREATE TABLE table5 (id1 string id, attr1 string attribute, "
+              + "m1 double "
+              + "measurement)");
+
+      List<IMeasurementSchema> schemaList = new ArrayList<>();
+      schemaList.add(new MeasurementSchema("id1", TSDataType.STRING));
+      schemaList.add(new MeasurementSchema("attr1", TSDataType.STRING));
+      schemaList.add(new MeasurementSchema("m1", TSDataType.DOUBLE));
+      final List<ColumnType> columnTypes =
+          Arrays.asList(ColumnType.ID, ColumnType.ATTRIBUTE, 
ColumnType.MEASUREMENT);
+
+      long timestamp = 0;
+      Tablet tablet = new Tablet("table5", schemaList, columnTypes, 15);
+
+      for (long row = 0; row < 15; row++) {
+        int rowIndex = tablet.rowSize++;
+        tablet.addTimestamp(rowIndex, timestamp + row);
+        tablet.addValue("id1", rowIndex, "id:" + row);
+        tablet.addValue("attr1", rowIndex, "attr:" + row);
+        tablet.addValue("m1", rowIndex, row * 1.0);
+        if (tablet.rowSize == tablet.getMaxRowNumber()) {
+          session.insertRelationalTablet(tablet, true);
+          tablet.reset();
+        }
+      }
+
+      if (tablet.rowSize != 0) {
+        session.insertRelationalTablet(tablet);
+        tablet.reset();
+      }
+
+      session.executeNonQueryStatement("FLush");
+
+      for (long row = 15; row < 30; row++) {
+        int rowIndex = tablet.rowSize++;
+        tablet.addTimestamp(rowIndex, timestamp + row);
+        // cache leader should work for devices that have inserted before
+        tablet.addValue("id1", rowIndex, "id:" + (row - 15));
+        tablet.addValue("attr1", rowIndex, "attr:" + (row - 15));
+        tablet.addValue("m1", rowIndex, row * 1.0);
+        if (tablet.rowSize == tablet.getMaxRowNumber()) {
+          session.insertRelationalTablet(tablet, true);
+          tablet.reset();
+        }
+      }
+
+      if (tablet.rowSize != 0) {
+        session.insertRelationalTablet(tablet);
+        tablet.reset();
+      }
+
+      int cnt = 0;
+      SessionDataSet dataSet = session.executeQueryStatement("select * from 
table5 order by time");
+      while (dataSet.hasNext()) {
+        RowRecord rowRecord = dataSet.next();
+        timestamp = rowRecord.getFields().get(0).getLongV();
+        assertEquals(
+            "id:" + (timestamp < 15 ? timestamp : timestamp - 15),
+            rowRecord.getFields().get(1).getBinaryV().toString());
+        assertEquals(
+            "attr:" + (timestamp < 15 ? timestamp : timestamp - 15),
+            rowRecord.getFields().get(2).getBinaryV().toString());
+        assertEquals(timestamp * 1.0, 
rowRecord.getFields().get(3).getDoubleV(), 0.0001);
+        cnt++;
+      }
+      assertEquals(30, cnt);
+    }
+  }
+
   @Test
   @Category({LocalStandaloneIT.class, ClusterIT.class})
   public void autoCreateTableTest() throws IoTDBConnectionException, 
StatementExecutionException {
diff --git 
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/IPooledSession.java
 
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/IPooledSession.java
index 04c25944a97..f8764f9a203 100644
--- 
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/IPooledSession.java
+++ 
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/IPooledSession.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
 import org.apache.thrift.TException;
 import org.apache.tsfile.write.record.Tablet;
 
+/** NOTICE: IPooledSession is specific to the table model. */
 public interface IPooledSession extends AutoCloseable {
 
   Version getVersion();
diff --git 
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java 
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
index 7bc31731802..ed627da82ef 100644
--- 
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
+++ 
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
 
 import org.apache.thrift.TException;
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.write.record.Tablet;
@@ -65,6 +66,14 @@ public interface ISession extends AutoCloseable {
       INodeSupplier nodeSupplier)
       throws IoTDBConnectionException;
 
+  void open(
+      boolean enableRPCCompression,
+      int connectionTimeoutInMs,
+      Map<String, TEndPoint> deviceIdToEndpoint,
+      Map<IDeviceID, TEndPoint> tabletModelDeviceIdToEndpoint,
+      INodeSupplier nodeSupplier)
+      throws IoTDBConnectionException;
+
   void close() throws IoTDBConnectionException;
 
   String getTimeZone();
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
index 8da65e8249c..c7db354b529 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.rpc;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 
 public class RedirectException extends IOException {
@@ -29,17 +30,27 @@ public class RedirectException extends IOException {
   private final TEndPoint endPoint;
 
   private final Map<String, TEndPoint> deviceEndPointMap;
+  private final List<TEndPoint> endPointList;
 
   public RedirectException(TEndPoint endPoint) {
     super("later request in same group will be redirected to " + 
endPoint.toString());
     this.endPoint = endPoint;
     this.deviceEndPointMap = null;
+    this.endPointList = null;
   }
 
   public RedirectException(Map<String, TEndPoint> deviceEndPointMap) {
     super("later request in same group will be redirected to " + 
deviceEndPointMap);
     this.endPoint = null;
     this.deviceEndPointMap = deviceEndPointMap;
+    this.endPointList = null;
+  }
+
+  public RedirectException(List<TEndPoint> endPointList) {
+    super("later request in same group will be redirected to " + endPointList);
+    this.endPoint = null;
+    this.deviceEndPointMap = null;
+    this.endPointList = endPointList;
   }
 
   public TEndPoint getEndPoint() {
@@ -49,4 +60,8 @@ public class RedirectException extends IOException {
   public Map<String, TEndPoint> getDeviceEndPointMap() {
     return deviceEndPointMap;
   }
+
+  public List<TEndPoint> getEndPointList() {
+    return endPointList;
+  }
 }
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index 1a7bc9970f6..d41c1c8fd97 100644
--- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -36,6 +36,7 @@ import java.time.Instant;
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
 import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -112,6 +113,22 @@ public class RpcUtils {
     if (status.isSetRedirectNode()) {
       throw new RedirectException(status.getRedirectNode());
     }
+    if (status.isSetSubStatus()) { // the resp of insertRelationalTablet may 
set subStatus
+      List<TSStatus> statusSubStatus = status.getSubStatus();
+      List<TEndPoint> endPointList = new ArrayList<>(statusSubStatus.size());
+      int count = 0;
+      for (TSStatus subStatus : statusSubStatus) {
+        if (subStatus.isSetRedirectNode()) {
+          endPointList.add(subStatus.getRedirectNode());
+          count++;
+        } else {
+          endPointList.add(null);
+        }
+      }
+      if (!endPointList.isEmpty() && count != 0) {
+        throw new RedirectException(endPointList);
+      }
+    }
   }
 
   public static void verifySuccessWithRedirectionForMultiDevices(
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
index 35229b54162..78ecd13242d 100644
--- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -63,6 +63,7 @@ import org.apache.iotdb.session.util.ThreadUtils;
 
 import org.apache.thrift.TException;
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.utils.Binary;
@@ -164,6 +165,9 @@ public class Session implements ISession {
   @SuppressWarnings("squid:S3077") // Non-primitive fields should not be 
"volatile"
   protected volatile Map<String, TEndPoint> deviceIdToEndpoint;
 
+  @SuppressWarnings("squid:S3077") // Non-primitive fields should not be 
"volatile"
+  protected volatile Map<IDeviceID, TEndPoint> tableModelDeviceIdToEndpoint;
+
   @SuppressWarnings("squid:S3077") // Non-primitive fields should not be 
"volatile"
   protected volatile Map<TEndPoint, SessionConnection> 
endPointToSessionConnection;
 
@@ -535,6 +539,7 @@ public class Session implements ISession {
     isClosed = false;
     if (enableRedirection || enableQueryRedirection) {
       deviceIdToEndpoint = new ConcurrentHashMap<>();
+      tableModelDeviceIdToEndpoint = new ConcurrentHashMap<>();
       endPointToSessionConnection = new ConcurrentHashMap<>();
       endPointToSessionConnection.put(defaultEndPoint, 
defaultSessionConnection);
     }
@@ -584,6 +589,33 @@ public class Session implements ISession {
     isClosed = false;
     if (enableRedirection || enableQueryRedirection) {
       this.deviceIdToEndpoint = deviceIdToEndpoint;
+      this.tableModelDeviceIdToEndpoint = new ConcurrentHashMap<>();
+      endPointToSessionConnection = new ConcurrentHashMap<>();
+      endPointToSessionConnection.put(defaultEndPoint, 
defaultSessionConnection);
+    }
+  }
+
+  @Override
+  public synchronized void open(
+      boolean enableRPCCompression,
+      int connectionTimeoutInMs,
+      Map<String, TEndPoint> deviceIdToEndpoint,
+      Map<IDeviceID, TEndPoint> tableModelDeviceIdToEndpoint,
+      INodeSupplier nodesSupplier)
+      throws IoTDBConnectionException {
+    if (!isClosed) {
+      return;
+    }
+
+    this.availableNodes = nodesSupplier;
+    this.enableRPCCompression = enableRPCCompression;
+    this.connectionTimeoutInMs = connectionTimeoutInMs;
+    defaultSessionConnection = constructSessionConnection(this, 
defaultEndPoint, zoneId);
+    defaultSessionConnection.setEnableRedirect(enableQueryRedirection);
+    isClosed = false;
+    if (enableRedirection || enableQueryRedirection) {
+      this.deviceIdToEndpoint = deviceIdToEndpoint;
+      this.tableModelDeviceIdToEndpoint = tableModelDeviceIdToEndpoint;
       endPointToSessionConnection = new ConcurrentHashMap<>();
       endPointToSessionConnection.put(defaultEndPoint, 
defaultSessionConnection);
     }
@@ -1321,6 +1353,18 @@ public class Session implements ISession {
     }
   }
 
+  private SessionConnection getSessionConnection(IDeviceID deviceId) {
+    TEndPoint endPoint;
+    if (enableRedirection
+        && tableModelDeviceIdToEndpoint != null
+        && (endPoint = tableModelDeviceIdToEndpoint.get(deviceId)) != null
+        && endPointToSessionConnection.containsKey(endPoint)) {
+      return endPointToSessionConnection.get(endPoint);
+    } else {
+      return defaultSessionConnection;
+    }
+  }
+
   @Override
   public String getTimestampPrecision() throws TException {
     return 
defaultSessionConnection.getClient().getProperties().getTimestampPrecision();
@@ -1343,8 +1387,7 @@ public class Session implements ISession {
           }
         }
       }
-
-      if (deviceIdToEndpoint != null) {
+      if (deviceIdToEndpoint != null && !deviceIdToEndpoint.isEmpty()) {
         for (Iterator<Entry<String, TEndPoint>> it = 
deviceIdToEndpoint.entrySet().iterator();
             it.hasNext(); ) {
           Entry<String, TEndPoint> entry = it.next();
@@ -1353,6 +1396,16 @@ public class Session implements ISession {
           }
         }
       }
+      if (tableModelDeviceIdToEndpoint != null && 
!tableModelDeviceIdToEndpoint.isEmpty()) {
+        for (Iterator<Entry<IDeviceID, TEndPoint>> it =
+                tableModelDeviceIdToEndpoint.entrySet().iterator();
+            it.hasNext(); ) {
+          Entry<IDeviceID, TEndPoint> entry = it.next();
+          if (entry.getValue().equals(endPoint)) {
+            it.remove();
+          }
+        }
+      }
     }
   }
 
@@ -1362,7 +1415,6 @@ public class Session implements ISession {
       if (endpoint.ip.equals("0.0.0.0")) {
         return;
       }
-      AtomicReference<IoTDBConnectionException> exceptionReference = new 
AtomicReference<>();
       if (!deviceIdToEndpoint.containsKey(deviceId)
           || !deviceIdToEndpoint.get(deviceId).equals(endpoint)) {
         deviceIdToEndpoint.put(deviceId, endpoint);
@@ -1374,7 +1426,6 @@ public class Session implements ISession {
                 try {
                   return constructSessionConnection(this, endpoint, zoneId);
                 } catch (IoTDBConnectionException ex) {
-                  exceptionReference.set(ex);
                   return null;
                 }
               });
@@ -1384,6 +1435,32 @@ public class Session implements ISession {
     }
   }
 
+  private void handleRedirection(IDeviceID deviceId, TEndPoint endpoint) {
+    if (enableRedirection) {
+      // no need to redirection
+      if (endpoint.ip.equals("0.0.0.0")) {
+        return;
+      }
+      if (!tableModelDeviceIdToEndpoint.containsKey(deviceId)
+          || !tableModelDeviceIdToEndpoint.get(deviceId).equals(endpoint)) {
+        tableModelDeviceIdToEndpoint.put(deviceId, endpoint);
+      }
+      SessionConnection connection =
+          endPointToSessionConnection.computeIfAbsent(
+              endpoint,
+              k -> {
+                try {
+                  return constructSessionConnection(this, endpoint, zoneId);
+                } catch (IoTDBConnectionException ex) {
+                  return null;
+                }
+              });
+      if (connection == null) {
+        tableModelDeviceIdToEndpoint.remove(deviceId);
+      }
+    }
+  }
+
   private void handleQueryRedirection(TEndPoint endPoint) throws 
IoTDBConnectionException {
     if (enableQueryRedirection) {
       AtomicReference<IoTDBConnectionException> exceptionReference = new 
AtomicReference<>();
@@ -2492,7 +2569,7 @@ public class Session implements ISession {
    * <p>e.g. source: [1,2,3,4,5], index:[1,0,3,2,4], return : [2,1,4,3,5]
    *
    * @param source Input list
-   * @param index retuen order
+   * @param index return order
    * @param <T> Input type
    * @return ordered list
    */
@@ -2679,11 +2756,20 @@ public class Session implements ISession {
   @Override
   public void insertRelationalTablet(Tablet tablet, boolean sorted)
       throws IoTDBConnectionException, StatementExecutionException {
-    TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted, false);
-    request.setWriteToTable(true);
-    request.setColumnCategories(
-        tablet.getColumnTypes().stream().map(t -> (byte) 
t.ordinal()).collect(Collectors.toList()));
-    insertTabletInternal(tablet, request);
+    if (enableRedirection) {
+      insertRelationalTabletWithLeaderCache(tablet);
+    } else {
+      TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted, false);
+      request.setWriteToTable(true);
+      request.setColumnCategories(
+          tablet.getColumnTypes().stream()
+              .map(t -> (byte) t.ordinal())
+              .collect(Collectors.toList()));
+      try {
+        defaultSessionConnection.insertTablet(request);
+      } catch (RedirectException ignored) {
+      }
+    }
   }
 
   /**
@@ -2697,6 +2783,113 @@ public class Session implements ISession {
     insertRelationalTablet(tablet, false);
   }
 
+  private void insertRelationalTabletWithLeaderCache(Tablet tablet)
+      throws IoTDBConnectionException, StatementExecutionException {
+    Map<SessionConnection, Tablet> relationalTabletGroup = new HashMap<>();
+    if (tableModelDeviceIdToEndpoint.isEmpty()) {
+      relationalTabletGroup.put(defaultSessionConnection, tablet);
+    } else {
+      for (int i = 0; i < tablet.rowSize; i++) {
+        IDeviceID iDeviceID = tablet.getDeviceID(i);
+        final SessionConnection connection = getSessionConnection(iDeviceID);
+        int finalI = i;
+        relationalTabletGroup.compute(
+            connection,
+            (k, v) -> {
+              if (v == null) {
+                v =
+                    new Tablet(
+                        tablet.getTableName(),
+                        tablet.getSchemas(),
+                        tablet.getColumnTypes(),
+                        tablet.rowSize);
+              }
+              for (int j = 0; j < v.getSchemas().size(); j++) {
+                v.addValue(
+                    v.getSchemas().get(j).getMeasurementId(),
+                    v.rowSize,
+                    tablet.getValue(finalI, j));
+              }
+              v.addTimestamp(v.rowSize, tablet.timestamps[finalI]);
+              v.rowSize++;
+              return v;
+            });
+      }
+    }
+    insertRelationalTabletByGroup(relationalTabletGroup);
+  }
+
+  @SuppressWarnings({
+    "squid:S3776"
+  }) // ignore Cognitive Complexity of methods should not be too high
+  private void insertRelationalTabletByGroup(Map<SessionConnection, Tablet> 
relationalTabletGroup)
+      throws IoTDBConnectionException, StatementExecutionException {
+    List<CompletableFuture<Void>> completableFutures =
+        relationalTabletGroup.entrySet().stream()
+            .map(
+                entry -> {
+                  SessionConnection connection = entry.getKey();
+                  Tablet subTablet = entry.getValue();
+                  return CompletableFuture.runAsync(
+                      () -> {
+                        TSInsertTabletReq request = 
genTSInsertTabletReq(subTablet, false, false);
+                        request.setWriteToTable(true);
+                        request.setColumnCategories(
+                            subTablet.getColumnTypes().stream()
+                                .map(t -> (byte) t.ordinal())
+                                .collect(Collectors.toList()));
+                        InsertConsumer<TSInsertTabletReq> insertConsumer =
+                            SessionConnection::insertTablet;
+                        try {
+                          insertConsumer.insert(connection, request);
+                        } catch (RedirectException e) {
+                          List<TEndPoint> endPointList = e.getEndPointList();
+                          Map<IDeviceID, TEndPoint> endPointMap = new 
HashMap<>();
+                          for (int i = 0; i < endPointList.size(); i++) {
+                            if (endPointList.get(i) != null) {
+                              endPointMap.put(subTablet.getDeviceID(i), 
endPointList.get(i));
+                            }
+                          }
+                          endPointMap.forEach(this::handleRedirection);
+                        } catch (StatementExecutionException e) {
+                          throw new CompletionException(e);
+                        } catch (IoTDBConnectionException e) {
+                          // remove the broken session
+                          removeBrokenSessionConnection(connection);
+                          try {
+                            insertConsumer.insert(defaultSessionConnection, 
request);
+                          } catch (IoTDBConnectionException | 
StatementExecutionException ex) {
+                            throw new CompletionException(ex);
+                          } catch (RedirectException ignored) {
+                          }
+                        }
+                      },
+                      OPERATION_EXECUTOR);
+                })
+            .collect(Collectors.toList());
+
+    StringBuilder errMsgBuilder = new StringBuilder();
+    for (CompletableFuture<Void> completableFuture : completableFutures) {
+      try {
+        completableFuture.join();
+      } catch (CompletionException completionException) {
+        Throwable cause = completionException.getCause();
+        logger.error("Meet error when async insert!", cause);
+        if (cause instanceof IoTDBConnectionException) {
+          throw (IoTDBConnectionException) cause;
+        } else {
+          if (errMsgBuilder.length() > 0) {
+            errMsgBuilder.append(";");
+          }
+          errMsgBuilder.append(cause.getMessage());
+        }
+      }
+    }
+    if (errMsgBuilder.length() > 0) {
+      throw new StatementExecutionException(errMsgBuilder.toString());
+    }
+  }
+
   /**
    * insert the aligned timeseries data of a device. For each timestamp, the 
number of measurements
    * is the same.
@@ -3863,6 +4056,9 @@ public class Session implements ISession {
         if (cause instanceof IoTDBConnectionException) {
           throw (IoTDBConnectionException) cause;
         } else {
+          if (errMsgBuilder.length() > 0) {
+            errMsgBuilder.append(";");
+          }
           errMsgBuilder.append(cause.getMessage());
         }
       }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 9a59e6dc50f..d9871d4b4e2 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -114,11 +114,11 @@ public class SessionConnection {
   private int timeFactor = 1_000;
 
   // TestOnly
-  public SessionConnection() {
+  public SessionConnection(String sqlDialect) {
     availableNodes = Collections::emptyList;
     this.maxRetryCount = Math.max(0, SessionConfig.MAX_RETRY_COUNT);
     this.retryIntervalInMs = Math.max(0, SessionConfig.RETRY_INTERVAL_IN_MS);
-    this.sqlDialect = "tree";
+    this.sqlDialect = sqlDialect;
     database = null;
   }
 
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 5ef9f7616f6..3edfc396173 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.session.util.SessionUtils;
 
 import org.apache.thrift.TException;
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.write.record.Tablet;
@@ -127,6 +128,7 @@ public class SessionPool implements ISessionPool {
   private boolean enableQueryRedirection = false;
 
   private Map<String, TEndPoint> deviceIdToEndpoint;
+  private Map<IDeviceID, TEndPoint> tableModelDeviceIdToEndpoint;
 
   private int thriftDefaultBufferSize;
   private int thriftMaxFrameSize;
@@ -391,6 +393,7 @@ public class SessionPool implements ISessionPool {
     this.enableRedirection = enableRedirection;
     if (this.enableRedirection) {
       deviceIdToEndpoint = new ConcurrentHashMap<>();
+      tableModelDeviceIdToEndpoint = new ConcurrentHashMap<>();
     }
     this.connectionTimeoutInMs = connectionTimeoutInMs;
     this.version = version;
@@ -432,6 +435,7 @@ public class SessionPool implements ISessionPool {
     this.enableRedirection = enableRedirection;
     if (this.enableRedirection) {
       deviceIdToEndpoint = new ConcurrentHashMap<>();
+      tableModelDeviceIdToEndpoint = new ConcurrentHashMap<>();
     }
     this.connectionTimeoutInMs = connectionTimeoutInMs;
     this.version = version;
@@ -476,6 +480,7 @@ public class SessionPool implements ISessionPool {
     this.enableRedirection = enableRedirection;
     if (this.enableRedirection) {
       deviceIdToEndpoint = new ConcurrentHashMap<>();
+      tableModelDeviceIdToEndpoint = new ConcurrentHashMap<>();
     }
     this.connectionTimeoutInMs = connectionTimeoutInMs;
     this.version = version;
@@ -497,6 +502,7 @@ public class SessionPool implements ISessionPool {
     this.enableRedirection = builder.enableRedirection;
     if (this.enableRedirection) {
       deviceIdToEndpoint = new ConcurrentHashMap<>();
+      tableModelDeviceIdToEndpoint = new ConcurrentHashMap<>();
     }
     this.enableRecordsAutoConvertTablet = 
builder.enableRecordsAutoConvertTablet;
     this.connectionTimeoutInMs = builder.connectionTimeoutInMs;
@@ -703,7 +709,13 @@ public class SessionPool implements ISessionPool {
       session = constructNewSession();
 
       try {
-        session.open(enableCompression, connectionTimeoutInMs, 
deviceIdToEndpoint, availableNodes);
+
+        session.open(
+            enableCompression,
+            connectionTimeoutInMs,
+            deviceIdToEndpoint,
+            tableModelDeviceIdToEndpoint,
+            availableNodes);
         // avoid someone has called close() the session pool
         synchronized (this) {
           if (closed) {
@@ -813,7 +825,12 @@ public class SessionPool implements ISessionPool {
   private void tryConstructNewSession() {
     Session session = constructNewSession();
     try {
-      session.open(enableCompression, connectionTimeoutInMs, 
deviceIdToEndpoint, availableNodes);
+      session.open(
+          enableCompression,
+          connectionTimeoutInMs,
+          deviceIdToEndpoint,
+          tableModelDeviceIdToEndpoint,
+          availableNodes);
       // avoid someone has called close() the session pool
       synchronized (this) {
         if (closed) {
@@ -3440,6 +3457,7 @@ public class SessionPool implements ISessionPool {
     this.enableRedirection = enableRedirection;
     if (this.enableRedirection) {
       deviceIdToEndpoint = new ConcurrentHashMap<>();
+      tableModelDeviceIdToEndpoint = new ConcurrentHashMap<>();
     }
     for (ISession session : queue) {
       session.setEnableRedirection(enableRedirection);
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionWrapper.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionWrapper.java
index deb43f56e80..5a007921a89 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionWrapper.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionWrapper.java
@@ -37,9 +37,11 @@ import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
- * used for SessionPool.getSession need to do some other things like calling
+ * NOTICE: SessionWrapper is specific to the table model.
+ *
+ * <p>used for SessionPool.getSession need to do some other things like calling
  * cleanSessionAndMayThrowConnectionException in SessionPool while 
encountering connection exception
- * only need to putBack to SessionPool while closing
+ * only need to putBack to SessionPool while closing.
  */
 public class SessionWrapper implements IPooledSession {
 
@@ -144,7 +146,7 @@ public class SessionWrapper implements IPooledSession {
   public void insertTablet(Tablet tablet)
       throws StatementExecutionException, IoTDBConnectionException {
     try {
-      session.insertTablet(tablet);
+      session.insertRelationalTablet(tablet);
     } catch (IoTDBConnectionException e) {
       sessionPool.cleanSessionAndMayThrowConnectionException(session);
       closed.set(true);
diff --git 
a/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderTest.java
 
b/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderTest.java
index 6a7a9f83051..c83cf8c4ea4 100644
--- 
a/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderTest.java
+++ 
b/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderTest.java
@@ -33,7 +33,10 @@ import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
 
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
 import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.record.Tablet.ColumnType;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.junit.Assert;
@@ -56,7 +59,7 @@ public class SessionCacheLeaderTest {
       new ArrayList<TEndPoint>() {
         {
           add(new TEndPoint("127.0.0.1", 55560)); // default endpoint
-          add(new TEndPoint("127.0.0.1", 55561)); // meta leader endpoint
+          add(new TEndPoint("127.0.0.1", 55561));
           add(new TEndPoint("127.0.0.1", 55562));
           add(new TEndPoint("127.0.0.1", 55563));
         }
@@ -79,6 +82,20 @@ public class SessionCacheLeaderTest {
     return endpoints.get(deviceId.hashCode() % endpoints.size());
   }
 
+  public static TEndPoint getDeviceIdBelongedEndpoint(IDeviceID deviceId) {
+    if (deviceId.equals(new StringArrayDeviceID("table1", "id0"))) {
+      return endpoints.get(0);
+    } else if (deviceId.equals(new StringArrayDeviceID("table1", "id1"))) {
+      return endpoints.get(1);
+    } else if (deviceId.equals(new StringArrayDeviceID("table1", "id2"))) {
+      return endpoints.get(2);
+    } else if (deviceId.equals(new StringArrayDeviceID("table1", "id3"))) {
+      return endpoints.get(3);
+    }
+
+    return endpoints.get(deviceId.hashCode() % endpoints.size());
+  }
+
   @Test
   public void testInsertRecord() throws IoTDBConnectionException, 
StatementExecutionException {
     // without leader cache
@@ -607,6 +624,83 @@ public class SessionCacheLeaderTest {
     session.close();
   }
 
+  @Test
+  public void testInsertRelationalTablet()
+      throws IoTDBConnectionException, StatementExecutionException {
+    // without leader cache
+    session = new MockSession("127.0.0.1", 55560, false, "table");
+    session.open();
+    assertNull(session.tableModelDeviceIdToEndpoint);
+    assertNull(session.endPointToSessionConnection);
+
+    String tableName = "table1";
+    List<IMeasurementSchema> schemaList = new ArrayList<>();
+    List<ColumnType> columnTypeList = new ArrayList<>();
+    schemaList.add(new MeasurementSchema("id", TSDataType.STRING));
+    schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+    schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
+    columnTypeList.add(ColumnType.ID);
+    columnTypeList.add(ColumnType.MEASUREMENT);
+    columnTypeList.add(ColumnType.MEASUREMENT);
+    Tablet tablet = new Tablet(tableName, schemaList, columnTypeList, 50);
+    long timestamp = System.currentTimeMillis();
+    for (long row = 0; row < 100; row++) {
+      int rowIndex = tablet.rowSize++;
+      tablet.addTimestamp(rowIndex, timestamp);
+      tablet.addValue(schemaList.get(0).getMeasurementId(), rowIndex, "id" + 
(rowIndex % 4));
+      for (int s = 1; s < 3; s++) {
+        long value = new Random().nextLong();
+        tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
+      }
+
+      if (tablet.rowSize == tablet.getMaxRowNumber()) {
+        session.insertRelationalTablet(tablet);
+        tablet.reset();
+      }
+      timestamp++;
+    }
+
+    if (tablet.rowSize != 0) {
+      session.insertRelationalTablet(tablet);
+      tablet.reset();
+    }
+
+    assertNull(session.tableModelDeviceIdToEndpoint);
+    assertNull(session.endPointToSessionConnection);
+    session.close();
+
+    // with leader cache
+    session = new MockSession("127.0.0.1", 55560, true, "table");
+    session.open();
+    assertEquals(0, session.tableModelDeviceIdToEndpoint.size());
+    assertEquals(1, session.endPointToSessionConnection.size());
+
+    for (long row = 0; row < 100; row++) {
+      int rowIndex = tablet.rowSize++;
+      tablet.addTimestamp(rowIndex, timestamp);
+      tablet.addValue(schemaList.get(0).getMeasurementId(), rowIndex, "id" + 
(rowIndex % 4));
+      for (int s = 1; s < 3; s++) {
+        long value = new Random().nextLong();
+        tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
+      }
+
+      if (tablet.rowSize == tablet.getMaxRowNumber()) {
+        session.insertRelationalTablet(tablet, true);
+        tablet.reset();
+      }
+      timestamp++;
+    }
+
+    if (tablet.rowSize != 0) {
+      session.insertRelationalTablet(tablet);
+      tablet.reset();
+    }
+
+    assertEquals(4, session.tableModelDeviceIdToEndpoint.size());
+    assertEquals(4, session.endPointToSessionConnection.size());
+    session.close();
+  }
+
   @Test
   public void testInsertRecordsWithSessionBroken() throws 
StatementExecutionException {
     // without leader cache
@@ -658,6 +752,7 @@ public class SessionCacheLeaderTest {
       if (time != 0 && time % 100 == 0) {
         try {
           session.insertRecords(deviceIds, timestamps, measurementsList, 
typesList, valuesList);
+          Assert.fail();
         } catch (IoTDBConnectionException e) {
           Assert.assertEquals(
               "the session connection = TEndPoint(ip:127.0.0.1, port:55560) is 
broken",
@@ -672,6 +767,7 @@ public class SessionCacheLeaderTest {
 
     try {
       session.insertRecords(deviceIds, timestamps, measurementsList, 
typesList, valuesList);
+      fail();
     } catch (IoTDBConnectionException e) {
       Assert.assertEquals(
           "the session connection = TEndPoint(ip:127.0.0.1, port:55560) is 
broken", e.getMessage());
@@ -686,8 +782,7 @@ public class SessionCacheLeaderTest {
     try {
       session.close();
     } catch (IoTDBConnectionException e) {
-      Assert.assertEquals(
-          "the session connection = TEndPoint(ip:127.0.0.1, port:55560) is 
broken", e.getMessage());
+      Assert.fail(e.getMessage());
     }
 
     // with leader cache
@@ -737,8 +832,7 @@ public class SessionCacheLeaderTest {
     try {
       session.insertRecords(deviceIds, timestamps, measurementsList, 
typesList, valuesList);
     } catch (IoTDBConnectionException e) {
-      Assert.assertEquals(
-          "the session connection = TEndPoint(ip:127.0.0.1, port:55561) is 
broken", e.getMessage());
+      Assert.fail(e.getMessage());
     }
     assertEquals(3, session.deviceIdToEndpoint.size());
     for (Map.Entry<String, TEndPoint> endPointMap : 
session.deviceIdToEndpoint.entrySet()) {
@@ -806,6 +900,7 @@ public class SessionCacheLeaderTest {
       if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
         try {
           session.insertTablets(tabletMap, true);
+          fail();
         } catch (IoTDBConnectionException e) {
           assertEquals(
               "the session connection = TEndPoint(ip:127.0.0.1, port:55560) is 
broken",
@@ -818,18 +913,6 @@ public class SessionCacheLeaderTest {
       timestamp++;
     }
 
-    if (tablet1.rowSize != 0) {
-      try {
-        session.insertTablets(tabletMap, true);
-      } catch (IoTDBConnectionException e) {
-        Assert.fail(e.getMessage());
-      }
-
-      tablet1.reset();
-      tablet2.reset();
-      tablet3.reset();
-    }
-
     assertNull(session.deviceIdToEndpoint);
     assertNull(session.endPointToSessionConnection);
     try {
@@ -912,8 +995,7 @@ public class SessionCacheLeaderTest {
     try {
       session.insertTablets(tabletMap, true);
     } catch (IoTDBConnectionException e) {
-      Assert.assertEquals(
-          "the session connection = TEndPoint(ip:127.0.0.1, port:55562) is 
broken", e.getMessage());
+      Assert.fail(e.getMessage());
     }
     tablet1.reset();
     tablet2.reset();
@@ -931,6 +1013,136 @@ public class SessionCacheLeaderTest {
     }
   }
 
+  @Test
+  public void testInsertRelationalTabletWithSessionBroken() throws 
StatementExecutionException {
+    // without leader cache
+    session = new MockSession("127.0.0.1", 55560, false, "table");
+    try {
+      session.open();
+    } catch (IoTDBConnectionException e) {
+      Assert.fail(e.getMessage());
+    }
+    assertNull(session.tableModelDeviceIdToEndpoint);
+    assertNull(session.endPointToSessionConnection);
+
+    // set the session connection as broken
+    ((MockSession) 
session).getLastConstructedSessionConnection().setConnectionBroken(true);
+
+    String tableName = "table1";
+    List<IMeasurementSchema> schemaList = new ArrayList<>();
+    List<ColumnType> columnTypeList = new ArrayList<>();
+    schemaList.add(new MeasurementSchema("id", TSDataType.STRING));
+    schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+    schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
+    columnTypeList.add(ColumnType.ID);
+    columnTypeList.add(ColumnType.MEASUREMENT);
+    columnTypeList.add(ColumnType.MEASUREMENT);
+    Tablet tablet = new Tablet(tableName, schemaList, columnTypeList, 50);
+    long timestamp = System.currentTimeMillis();
+    for (long row = 0; row < 100; row++) {
+      int rowIndex = tablet.rowSize++;
+      tablet.addTimestamp(rowIndex, timestamp);
+      tablet.addValue(schemaList.get(0).getMeasurementId(), rowIndex, "id" + 
(rowIndex % 4));
+      for (int s = 1; s < 3; s++) {
+        long value = new Random().nextLong();
+        tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
+      }
+
+      if (tablet.rowSize == tablet.getMaxRowNumber()) {
+        try {
+          session.insertRelationalTablet(tablet);
+          fail();
+        } catch (IoTDBConnectionException e) {
+          assertEquals(
+              "the session connection = TEndPoint(ip:127.0.0.1, port:55560) is 
broken",
+              e.getMessage());
+        }
+        tablet.reset();
+      }
+      timestamp++;
+    }
+
+    assertNull(session.tableModelDeviceIdToEndpoint);
+    assertNull(session.endPointToSessionConnection);
+    try {
+      session.close();
+    } catch (IoTDBConnectionException e) {
+      Assert.fail(e.getMessage());
+    }
+
+    // with leader cache
+    // rest the session connection
+    session = new MockSession("127.0.0.1", 55560, true, "table");
+    try {
+      session.open();
+    } catch (IoTDBConnectionException e) {
+      Assert.fail(e.getMessage());
+    }
+    assertEquals(0, session.tableModelDeviceIdToEndpoint.size());
+    assertEquals(1, session.endPointToSessionConnection.size());
+
+    for (long row = 0; row < 100; row++) {
+      int rowIndex = tablet.rowSize++;
+      tablet.addTimestamp(rowIndex, timestamp);
+      tablet.addValue(schemaList.get(0).getMeasurementId(), rowIndex, "id" + 
(rowIndex % 4));
+      for (int s = 1; s < 3; s++) {
+        long value = new Random().nextLong();
+        tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
+      }
+
+      if (tablet.rowSize == tablet.getMaxRowNumber()) {
+        try {
+          session.insertRelationalTablet(tablet);
+        } catch (IoTDBConnectionException e) {
+          Assert.fail(e.getMessage());
+        }
+        tablet.reset();
+      }
+      timestamp++;
+    }
+
+    // set the session connection as broken
+    ((MockSession) 
session).getLastConstructedSessionConnection().setConnectionBroken(true);
+    // set connection as broken, due to we enable the cache leader, when we 
called
+    // ((MockSession) session).getLastConstructedSessionConnection(), the 
session's endpoint has
+    // been changed to EndPoint(ip:127.0.0.1, port:55562)
+    Assert.assertEquals(
+        "MockSessionConnection{ endPoint=TEndPoint(ip:127.0.0.1, port:55562)}",
+        ((MockSession) 
session).getLastConstructedSessionConnection().toString());
+
+    for (long row = 0; row < 100; row++) {
+      int rowIndex = tablet.rowSize++;
+      tablet.addTimestamp(rowIndex, timestamp);
+      tablet.addValue(schemaList.get(0).getMeasurementId(), rowIndex, "id" + 
(rowIndex % 4));
+      for (int s = 1; s < 3; s++) {
+        long value = new Random().nextLong();
+        tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
+      }
+
+      if (tablet.rowSize == tablet.getMaxRowNumber()) {
+        try {
+          session.insertRelationalTablet(tablet);
+        } catch (IoTDBConnectionException e) {
+          Assert.fail(e.getMessage());
+        }
+        tablet.reset();
+      }
+      timestamp++;
+    }
+
+    assertEquals(3, session.tableModelDeviceIdToEndpoint.size());
+    for (Map.Entry<IDeviceID, TEndPoint> endPointEntry :
+        session.tableModelDeviceIdToEndpoint.entrySet()) {
+      assertEquals(getDeviceIdBelongedEndpoint(endPointEntry.getKey()), 
endPointEntry.getValue());
+    }
+    assertEquals(3, session.endPointToSessionConnection.size());
+    try {
+      session.close();
+    } catch (IoTDBConnectionException e) {
+      Assert.fail(e.getMessage());
+    }
+  }
+
   private void addLine(
       List<Long> times,
       List<List<String>> measurements,
@@ -977,6 +1189,12 @@ public class SessionCacheLeaderTest {
       this.enableAutoFetch = false;
     }
 
+    public MockSession(String host, int rpcPort, boolean enableRedirection, 
String sqlDialect) {
+      this(host, rpcPort, enableRedirection);
+      this.sqlDialect = sqlDialect;
+      this.enableAutoFetch = false;
+    }
+
     @Override
     public SessionConnection constructSessionConnection(
         Session session, TEndPoint endpoint, ZoneId zoneId) {
@@ -996,7 +1214,7 @@ public class SessionCacheLeaderTest {
     private IoTDBConnectionException ioTDBConnectionException;
 
     public MockSessionConnection(Session session, TEndPoint endPoint, ZoneId 
zoneId) {
-      super();
+      super(session.sqlDialect);
       this.endPoint = endPoint;
       ioTDBConnectionException =
           new IoTDBConnectionException(
@@ -1057,7 +1275,18 @@ public class SessionCacheLeaderTest {
       if (isConnectionBroken()) {
         throw ioTDBConnectionException;
       }
-      throw new 
RedirectException(getDeviceIdBelongedEndpoint(request.prefixPath));
+      if (request.writeToTable) {
+        if (request.size >= 50) {
+          // multi devices
+          List<TEndPoint> endPoints = new ArrayList<>();
+          for (int i = 0; i < request.size; i++) {
+            endPoints.add(endpoints.get(i % 4));
+          }
+          throw new RedirectException(endPoints);
+        }
+      } else {
+        throw new 
RedirectException(getDeviceIdBelongedEndpoint(request.prefixPath));
+      }
     }
 
     @Override
diff --git 
a/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionConnectionTest.java
 
b/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionConnectionTest.java
index 0785bc800d9..a10d9b1d6f4 100644
--- 
a/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionConnectionTest.java
+++ 
b/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionConnectionTest.java
@@ -84,7 +84,7 @@ public class SessionConnectionTest {
   @Before
   public void setUp() throws IoTDBConnectionException, 
StatementExecutionException, TException {
     MockitoAnnotations.initMocks(this);
-    sessionConnection = new SessionConnection();
+    sessionConnection = new SessionConnection("tree");
     Whitebox.setInternalState(sessionConnection, "transport", transport);
     Whitebox.setInternalState(sessionConnection, "client", client);
     session =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
index 11e8e42ec83..1f8905cc666 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
@@ -414,6 +414,7 @@ public class Analysis implements IAnalysis {
     this.schemaTree = schemaTree;
   }
 
+  @Override
   public List<TEndPoint> getRedirectNodeList() {
     return redirectNodeList;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
index 05c3638a95d..e03f75db230 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
@@ -66,6 +66,8 @@ public interface IAnalysis {
 
   DataPartition getDataPartitionInfo();
 
+  List<TEndPoint> getRedirectNodeList();
+
   void setRedirectNodeList(List<TEndPoint> redirectNodeList);
 
   void addEndPointToRedirectNodeList(TEndPoint endPoint);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index d40d1c50b84..3a2027d1927 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -248,7 +248,7 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
     return deviceIDSplitInfoMap;
   }
 
-  private Map<TRegionReplicaSet, List<Integer>> splitByReplicaSet(
+  protected Map<TRegionReplicaSet, List<Integer>> splitByReplicaSet(
       Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap, IAnalysis 
analysis) {
     Map<TRegionReplicaSet, List<Integer>> splitMap = new HashMap<>();
 
@@ -1189,12 +1189,12 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
     return deviceID;
   }
 
-  private static class PartitionSplitInfo {
+  protected static class PartitionSplitInfo {
 
     // for each List in split, they are range1.start, range1.end, 
range2.start, range2.end, ...
-    private List<Integer> ranges = new ArrayList<>();
-    private List<TTimePartitionSlot> timePartitionSlots = new ArrayList<>();
-    private List<TRegionReplicaSet> replicaSets;
+    List<Integer> ranges = new ArrayList<>();
+    List<TTimePartitionSlot> timePartitionSlots = new ArrayList<>();
+    List<TRegionReplicaSet> replicaSets;
   }
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
index c2129f8d8d7..b6b97457cff 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
@@ -19,10 +19,13 @@
 
 package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.db.exception.query.OutOfTTLException;
+import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
@@ -41,7 +44,9 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.function.IntToLongFunction;
 
 public class RelationalInsertTabletNode extends InsertTabletNode {
@@ -139,6 +144,44 @@ public class RelationalInsertTabletNode extends 
InsertTabletNode {
         columnCategories);
   }
 
+  protected Map<TRegionReplicaSet, List<Integer>> splitByReplicaSet(
+      Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap, IAnalysis 
analysis) {
+    Map<TRegionReplicaSet, List<Integer>> splitMap = new HashMap<>();
+    Map<IDeviceID, TEndPoint> endPointMap = new HashMap<>();
+
+    for (Map.Entry<IDeviceID, PartitionSplitInfo> entry : 
deviceIDSplitInfoMap.entrySet()) {
+      final IDeviceID deviceID = entry.getKey();
+      final PartitionSplitInfo splitInfo = entry.getValue();
+      final List<TRegionReplicaSet> replicaSets =
+          analysis
+              .getDataPartitionInfo()
+              .getDataRegionReplicaSetForWriting(
+                  deviceID, splitInfo.timePartitionSlots, 
analysis.getDatabaseName());
+      splitInfo.replicaSets = replicaSets;
+      // collect redirectInfo
+      endPointMap.put(
+          deviceID,
+          replicaSets
+              .get(replicaSets.size() - 1)
+              .getDataNodeLocations()
+              .get(0)
+              .getClientRpcEndPoint());
+      for (int i = 0; i < replicaSets.size(); i++) {
+        List<Integer> subRanges =
+            splitMap.computeIfAbsent(replicaSets.get(i), x -> new 
ArrayList<>());
+        subRanges.add(splitInfo.ranges.get(2 * i));
+        subRanges.add(splitInfo.ranges.get(2 * i + 1));
+      }
+    }
+    List<TEndPoint> redirectNodeList = new ArrayList<>(times.length);
+    for (int i = 0; i < times.length; i++) {
+      IDeviceID deviceId = getDeviceID(i);
+      redirectNodeList.add(endPointMap.get(deviceId));
+    }
+    analysis.setRedirectNodeList(redirectNodeList);
+    return splitMap;
+  }
+
   public static RelationalInsertTabletNode deserialize(ByteBuffer byteBuffer) {
     RelationalInsertTabletNode insertNode = new RelationalInsertTabletNode(new 
PlanNodeId(""));
     insertNode.subDeserialize(byteBuffer);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
index a9ddeeea37e..3ae791bd496 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
@@ -754,6 +754,11 @@ public class Analysis implements IAnalysis {
     }
   }
 
+  @Override
+  public List<TEndPoint> getRedirectNodeList() {
+    return redirectNodeList;
+  }
+
   @Override
   public void setRedirectNodeList(List<TEndPoint> redirectNodeList) {
     this.redirectNodeList = redirectNodeList;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
index e750e2d95a8..1b90384efc3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
@@ -38,12 +38,18 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableDistributedPlanner;
 import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WrappedInsertStatement;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
 import org.apache.iotdb.db.queryengine.plan.scheduler.ClusterScheduler;
 import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -149,7 +155,39 @@ public class TableModelPlanner implements IPlanner {
 
   @Override
   public void setRedirectInfo(
-      IAnalysis analysis, TEndPoint localEndPoint, TSStatus tsstatus, 
TSStatusCode statusCode) {}
+      IAnalysis iAnalysis, TEndPoint localEndPoint, TSStatus tsstatus, 
TSStatusCode statusCode) {
+    Analysis analysis = (Analysis) iAnalysis;
+    if (!(analysis.getStatement() instanceof WrappedInsertStatement)) {
+      return;
+    }
+    InsertBaseStatement insertStatement =
+        ((WrappedInsertStatement) 
analysis.getStatement()).getInnerTreeStatement();
+
+    if (!analysis.isFinishQueryAfterAnalyze()) {
+      // Table Model Session only supports insertTablet
+      if (insertStatement instanceof InsertTabletStatement) {
+        if (statusCode == TSStatusCode.SUCCESS_STATUS) {
+          boolean needRedirect = false;
+          List<TEndPoint> redirectNodeList = analysis.getRedirectNodeList();
+          List<TSStatus> subStatus = new ArrayList<>(redirectNodeList.size());
+          for (TEndPoint endPoint : redirectNodeList) {
+            // redirect writing only if the redirectEndPoint is not the 
current node
+            if (!localEndPoint.equals(endPoint)) {
+              subStatus.add(
+                  
RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS).setRedirectNode(endPoint));
+              needRedirect = true;
+            } else {
+              subStatus.add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+            }
+          }
+          if (needRedirect) {
+            
tsstatus.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode());
+            tsstatus.setSubStatus(subStatus);
+          }
+        }
+      }
+    }
+  }
 
   private static class NopAccessControl implements AccessControl {}
 }

Reply via email to