This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch support_table_model_redirect
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 73674da61587bad74cc898d53f00e87763097749
Author: HTHou <[email protected]>
AuthorDate: Thu Sep 19 18:51:12 2024 +0800

    dev session
---
 .../org/apache/iotdb/rpc/RedirectException.java    |  15 ++
 .../main/java/org/apache/iotdb/rpc/RpcUtils.java   |  12 ++
 .../java/org/apache/iotdb/session/Session.java     | 164 ++++++++++++++++++++-
 .../plan/relational/planner/TableModelPlanner.java |   5 +-
 4 files changed, 189 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
index 8da65e8249c..81833854c23 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.rpc;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 
 public class RedirectException extends IOException {
@@ -29,17 +30,27 @@ public class RedirectException extends IOException {
   private final TEndPoint endPoint;
 
   private final Map<String, TEndPoint> deviceEndPointMap;
+  private final List<TEndPoint> deviceEndPointList;
 
   public RedirectException(TEndPoint endPoint) {
     super("later request in same group will be redirected to " + 
endPoint.toString());
     this.endPoint = endPoint;
     this.deviceEndPointMap = null;
+    this.deviceEndPointList = null;
   }
 
   public RedirectException(Map<String, TEndPoint> deviceEndPointMap) {
     super("later request in same group will be redirected to " + 
deviceEndPointMap);
     this.endPoint = null;
     this.deviceEndPointMap = deviceEndPointMap;
+    this.deviceEndPointList = null;
+  }
+
+  public RedirectException(List<TEndPoint> deviceEndPointList) {
+    super("later request in same group will be redirected to " + 
deviceEndPointList);
+    this.endPoint = null;
+    this.deviceEndPointMap = null;
+    this.deviceEndPointList = deviceEndPointList;
   }
 
   public TEndPoint getEndPoint() {
@@ -49,4 +60,8 @@ public class RedirectException extends IOException {
   public Map<String, TEndPoint> getDeviceEndPointMap() {
     return deviceEndPointMap;
   }
+
+  public List<TEndPoint> getDeviceEndPointList() {
+    return deviceEndPointList;
+  }
 }
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index 1a7bc9970f6..440cbe4fbe9 100644
--- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -36,6 +36,7 @@ import java.time.Instant;
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
 import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -112,6 +113,17 @@ public class RpcUtils {
     if (status.isSetRedirectNode()) {
       throw new RedirectException(status.getRedirectNode());
     }
+    if (status.isSetSubStatus()) { // insertRelationalTablet may set subStatus
+      List<TSStatus> statusSubStatus = status.getSubStatus();
+      List<TEndPoint> deviceEndPointList = new 
ArrayList<>(statusSubStatus.size());
+      for (int i = 0; i < statusSubStatus.size(); i++) {
+        TSStatus subStatus = statusSubStatus.get(i);
+        if (subStatus.isSetRedirectNode()) {
+          deviceEndPointList.set(i, subStatus.getRedirectNode());
+        }
+      }
+      throw new RedirectException(deviceEndPointList);
+    }
   }
 
   public static void verifySuccessWithRedirectionForMultiDevices(
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
index 35229b54162..1769337b748 100644
--- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -63,6 +63,7 @@ import org.apache.iotdb.session.util.ThreadUtils;
 
 import org.apache.thrift.TException;
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.utils.Binary;
@@ -164,6 +165,9 @@ public class Session implements ISession {
   @SuppressWarnings("squid:S3077") // Non-primitive fields should not be 
"volatile"
   protected volatile Map<String, TEndPoint> deviceIdToEndpoint;
 
+  @SuppressWarnings("squid:S3077") // Non-primitive fields should not be 
"volatile"
+  protected volatile Map<IDeviceID, TEndPoint> tableModelDeviceIdToEndpoint;
+
   @SuppressWarnings("squid:S3077") // Non-primitive fields should not be 
"volatile"
   protected volatile Map<TEndPoint, SessionConnection> 
endPointToSessionConnection;
 
@@ -535,6 +539,7 @@ public class Session implements ISession {
     isClosed = false;
     if (enableRedirection || enableQueryRedirection) {
       deviceIdToEndpoint = new ConcurrentHashMap<>();
+      tableModelDeviceIdToEndpoint = new ConcurrentHashMap<>();
       endPointToSessionConnection = new ConcurrentHashMap<>();
       endPointToSessionConnection.put(defaultEndPoint, 
defaultSessionConnection);
     }
@@ -1321,6 +1326,18 @@ public class Session implements ISession {
     }
   }
 
+  private SessionConnection getSessionConnection(IDeviceID deviceId) {
+    TEndPoint endPoint;
+    if (enableRedirection
+        && tableModelDeviceIdToEndpoint != null
+        && (endPoint = tableModelDeviceIdToEndpoint.get(deviceId)) != null
+        && endPointToSessionConnection.containsKey(endPoint)) {
+      return endPointToSessionConnection.get(endPoint);
+    } else {
+      return defaultSessionConnection;
+    }
+  }
+
   @Override
   public String getTimestampPrecision() throws TException {
     return 
defaultSessionConnection.getClient().getProperties().getTimestampPrecision();
@@ -1384,6 +1401,34 @@ public class Session implements ISession {
     }
   }
 
+  private void handleRedirection(IDeviceID deviceId, TEndPoint endpoint) {
+    if (enableRedirection) {
+      // no need to redirection
+      if (endpoint.ip.equals("0.0.0.0")) {
+        return;
+      }
+      AtomicReference<IoTDBConnectionException> exceptionReference = new 
AtomicReference<>();
+      if (!tableModelDeviceIdToEndpoint.containsKey(deviceId)
+          || !tableModelDeviceIdToEndpoint.get(deviceId).equals(endpoint)) {
+        tableModelDeviceIdToEndpoint.put(deviceId, endpoint);
+      }
+      SessionConnection connection =
+          endPointToSessionConnection.computeIfAbsent(
+              endpoint,
+              k -> {
+                try {
+                  return constructSessionConnection(this, endpoint, zoneId);
+                } catch (IoTDBConnectionException ex) {
+                  exceptionReference.set(ex);
+                  return null;
+                }
+              });
+      if (connection == null) {
+        tableModelDeviceIdToEndpoint.remove(deviceId);
+      }
+    }
+  }
+
   private void handleQueryRedirection(TEndPoint endPoint) throws 
IoTDBConnectionException {
     if (enableQueryRedirection) {
       AtomicReference<IoTDBConnectionException> exceptionReference = new 
AtomicReference<>();
@@ -2679,11 +2724,20 @@ public class Session implements ISession {
   @Override
   public void insertRelationalTablet(Tablet tablet, boolean sorted)
       throws IoTDBConnectionException, StatementExecutionException {
-    TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted, false);
-    request.setWriteToTable(true);
-    request.setColumnCategories(
-        tablet.getColumnTypes().stream().map(t -> (byte) 
t.ordinal()).collect(Collectors.toList()));
-    insertTabletInternal(tablet, request);
+    if (enableRedirection) {
+      insertRelationalTabletWithLeaderCache(tablet);
+    } else {
+      TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted, false);
+      request.setWriteToTable(true);
+      request.setColumnCategories(
+          tablet.getColumnTypes().stream()
+              .map(t -> (byte) t.ordinal())
+              .collect(Collectors.toList()));
+      try {
+        defaultSessionConnection.insertTablet(request);
+      } catch (RedirectException ignored) {
+      }
+    }
   }
 
   /**
@@ -2697,6 +2751,106 @@ public class Session implements ISession {
     insertRelationalTablet(tablet, false);
   }
 
+  private void insertRelationalTabletWithLeaderCache(Tablet tablet)
+      throws IoTDBConnectionException, StatementExecutionException {
+    Map<SessionConnection, Tablet> relationalTabletGroup = new HashMap<>();
+
+    for (int i = 0; i < tablet.rowSize; i++) {
+      IDeviceID iDeviceID = tablet.getDeviceID(i);
+      final SessionConnection connection = getSessionConnection(iDeviceID);
+      int finalI = i;
+      relationalTabletGroup.compute(
+          connection,
+          (k, v) -> {
+            if (v == null) {
+              v =
+                  new Tablet(
+                      tablet.getTableName(),
+                      tablet.getSchemas(),
+                      tablet.getColumnTypes(),
+                      tablet.rowSize);
+            }
+            for (int j = 0; j < v.getSchemas().size(); j++) {
+              v.addValue(
+                  v.getSchemas().get(j).getMeasurementId(), finalI, 
tablet.getValue(finalI, j));
+            }
+            v.addTimestamp(finalI, tablet.timestamps[finalI]);
+            v.rowSize++;
+            return v;
+          });
+    }
+    insertRelationalTabletByGroup(relationalTabletGroup);
+  }
+
+  @SuppressWarnings({
+    "squid:S3776"
+  }) // ignore Cognitive Complexity of methods should not be too high
+  private <T> void insertRelationalTabletByGroup(
+      Map<SessionConnection, Tablet> relationalTabletGroup)
+      throws IoTDBConnectionException, StatementExecutionException {
+    List<CompletableFuture<Void>> completableFutures =
+        relationalTabletGroup.entrySet().stream()
+            .map(
+                entry -> {
+                  SessionConnection connection = entry.getKey();
+                  Tablet subTablet = entry.getValue();
+                  return CompletableFuture.runAsync(
+                      () -> {
+                        TSInsertTabletReq request = 
genTSInsertTabletReq(subTablet, false, false);
+                        request.setWriteToTable(true);
+                        request.setColumnCategories(
+                            subTablet.getColumnTypes().stream()
+                                .map(t -> (byte) t.ordinal())
+                                .collect(Collectors.toList()));
+                        InsertConsumer<TSInsertTabletReq> insertConsumer =
+                            SessionConnection::insertTablet;
+                        try {
+                          insertConsumer.insert(connection, request);
+                        } catch (RedirectException e) {
+                          List<TEndPoint> endPointList = 
e.getDeviceEndPointList();
+                          Map<IDeviceID, TEndPoint> endPointMap = new 
HashMap<>();
+                          for (int i = 0; i < endPointList.size(); i++) {
+                            if (endPointList.get(i) != null) {
+                              endPointMap.put(subTablet.getDeviceID(i), 
endPointList.get(i));
+                            }
+                          }
+                          endPointMap.forEach(this::handleRedirection);
+                        } catch (StatementExecutionException e) {
+                          throw new CompletionException(e);
+                        } catch (IoTDBConnectionException e) {
+                          // remove the broken session
+                          removeBrokenSessionConnection(connection);
+                          try {
+                            insertConsumer.insert(defaultSessionConnection, 
request);
+                          } catch (IoTDBConnectionException | 
StatementExecutionException ex) {
+                            throw new CompletionException(ex);
+                          } catch (RedirectException ignored) {
+                          }
+                        }
+                      },
+                      OPERATION_EXECUTOR);
+                })
+            .collect(Collectors.toList());
+
+    StringBuilder errMsgBuilder = new StringBuilder();
+    for (CompletableFuture<Void> completableFuture : completableFutures) {
+      try {
+        completableFuture.join();
+      } catch (CompletionException completionException) {
+        Throwable cause = completionException.getCause();
+        logger.error("Meet error when async insert!", cause);
+        if (cause instanceof IoTDBConnectionException) {
+          throw (IoTDBConnectionException) cause;
+        } else {
+          errMsgBuilder.append(cause.getMessage());
+        }
+      }
+    }
+    if (errMsgBuilder.length() > 0) {
+      throw new StatementExecutionException(errMsgBuilder.toString());
+    }
+  }
+
   /**
    * insert the aligned timeseries data of a device. For each timestamp, the 
number of measurements
    * is the same.
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
index 762939c7c74..1b90384efc3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
@@ -157,8 +157,9 @@ public class TableModelPlanner implements IPlanner {
   public void setRedirectInfo(
       IAnalysis iAnalysis, TEndPoint localEndPoint, TSStatus tsstatus, 
TSStatusCode statusCode) {
     Analysis analysis = (Analysis) iAnalysis;
-
-    assert analysis.getStatement() instanceof WrappedInsertStatement;
+    if (!(analysis.getStatement() instanceof WrappedInsertStatement)) {
+      return;
+    }
     InsertBaseStatement insertStatement =
         ((WrappedInsertStatement) 
analysis.getStatement()).getInnerTreeStatement();
 

Reply via email to