This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch optimize_write_redirection_logic
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ddd4b62301d8b6968f3102054aba4574f9f8e56b
Author: HTHou <[email protected]>
AuthorDate: Fri Dec 30 23:06:36 2022 +0800

    Optimize write redirection logic
---
 .../db/mpp/plan/execution/QueryExecution.java      | 77 ++++++++++++----------
 1 file changed, 41 insertions(+), 36 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 7ed5f82aae..3b5d42b234 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -18,12 +18,29 @@
  */
 package org.apache.iotdb.db.mpp.plan.execution;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Throwables.throwIfUnchecked;
+import static org.apache.iotdb.db.mpp.common.DataNodeEndPoints.isSameNode;
+import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.SCHEDULE;
+import static 
org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.WAIT_FOR_RESULT;
+import static 
org.apache.iotdb.db.mpp.metric.QueryPlanCostMetricSet.DISTRIBUTION_PLANNER;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
@@ -62,30 +79,9 @@ import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Throwables.throwIfUnchecked;
-import static org.apache.iotdb.db.mpp.common.DataNodeEndPoints.isSameNode;
-import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.SCHEDULE;
-import static 
org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.WAIT_FOR_RESULT;
-import static 
org.apache.iotdb.db.mpp.metric.QueryPlanCostMetricSet.DISTRIBUTION_PLANNER;
-
 /**
  * QueryExecution stores all the status of a query which is being prepared or 
running inside the MPP
  * frame. It takes three main responsibilities: 1. Prepare a query. Transform 
a query from statement
@@ -569,7 +565,10 @@ public class QueryExecution implements IQueryExecution {
               : TSStatusCode.EXECUTE_STATEMENT_ERROR;
     }
 
-    TSStatus tsstatus = RpcUtils.getStatus(statusCode, 
stateMachine.getFailureMessage());
+    TSStatus tsstatus =
+        RpcUtils.getStatus(
+            statusCode,
+            statusCode == TSStatusCode.SUCCESS_STATUS ? "" : 
stateMachine.getFailureMessage());
 
     // If RETRYING is triggered by this QueryExecution, the 
stateMachine.getFailureStatus() is also
     // not null. We should only return the failure status when QueryExecution 
is in Done state.
@@ -581,29 +580,35 @@ public class QueryExecution implements IQueryExecution {
     if (analysis.getStatement() instanceof InsertBaseStatement
         && !analysis.isFinishQueryAfterAnalyze()) {
       InsertBaseStatement insertStatement = (InsertBaseStatement) 
analysis.getStatement();
-      List<TEndPoint> redirectNodeList;
-      if (config.isClusterMode()) {
-        redirectNodeList = 
insertStatement.collectRedirectInfo(analysis.getDataPartitionInfo());
-      } else {
-        redirectNodeList = Collections.emptyList();
-      }
+      List<TEndPoint> redirectNodeList =
+          insertStatement.collectRedirectInfo(analysis.getDataPartitionInfo());
       if (insertStatement instanceof InsertRowsStatement
           || insertStatement instanceof InsertMultiTabletsStatement) {
         // multiple devices
         if (statusCode == TSStatusCode.SUCCESS_STATUS) {
+          boolean needRedirect = false;
           List<TSStatus> subStatus = new ArrayList<>();
-          tsstatus.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode());
           for (TEndPoint endPoint : redirectNodeList) {
-            subStatus.add(
-                StatusUtils.getStatus(TSStatusCode.REDIRECTION_RECOMMEND)
-                    .setRedirectNode(endPoint));
+            // redirect writing only if the redirectEndPoint is not the 
current node
+            if (!config.getAddressAndPort().equals(endPoint)) {
+              subStatus.add(
+                  
RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS).setRedirectNode(endPoint));
+              needRedirect = true;
+            } else {
+              subStatus.add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+            }
+          }
+          if (needRedirect) {
+            
tsstatus.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode());
+            tsstatus.setSubStatus(subStatus);
           }
-          tsstatus.setSubStatus(subStatus);
         }
       } else {
         // single device
-        if (config.isClusterMode()) {
-          tsstatus.setRedirectNode(redirectNodeList.get(0));
+        TEndPoint redirectEndPoint = redirectNodeList.get(0);
+        // redirect writing only if the redirectEndPoint is not the current 
node
+        if (!config.getAddressAndPort().equals(redirectEndPoint)) {
+          tsstatus.setRedirectNode(redirectEndPoint);
         }
       }
     }

Reply via email to