This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch optimize_write_redirection_logic in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ddd4b62301d8b6968f3102054aba4574f9f8e56b Author: HTHou <[email protected]> AuthorDate: Fri Dec 30 23:06:36 2022 +0800 Optimize write redirection logic --- .../db/mpp/plan/execution/QueryExecution.java | 77 ++++++++++++---------- 1 file changed, 41 insertions(+), 36 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java index 7ed5f82aae..3b5d42b234 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java @@ -18,12 +18,29 @@ */ package org.apache.iotdb.db.mpp.plan.execution; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Throwables.throwIfUnchecked; +import static org.apache.iotdb.db.mpp.common.DataNodeEndPoints.isSameNode; +import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.SCHEDULE; +import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.WAIT_FOR_RESULT; +import static org.apache.iotdb.db.mpp.metric.QueryPlanCostMetricSet.DISTRIBUTION_PLANNER; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.exception.IoTDBException; -import org.apache.iotdb.commons.utils.StatusUtils; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException; @@ -62,30 +79,9 @@ import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.read.common.block.TsBlock; - -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Throwables.throwIfUnchecked; -import static org.apache.iotdb.db.mpp.common.DataNodeEndPoints.isSameNode; -import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.SCHEDULE; -import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.WAIT_FOR_RESULT; -import static org.apache.iotdb.db.mpp.metric.QueryPlanCostMetricSet.DISTRIBUTION_PLANNER; - /** * QueryExecution stores all the status of a query which is being prepared or running inside the MPP * frame. It takes three main responsibilities: 1. Prepare a query. Transform a query from statement @@ -569,7 +565,10 @@ public class QueryExecution implements IQueryExecution { : TSStatusCode.EXECUTE_STATEMENT_ERROR; } - TSStatus tsstatus = RpcUtils.getStatus(statusCode, stateMachine.getFailureMessage()); + TSStatus tsstatus = + RpcUtils.getStatus( + statusCode, + statusCode == TSStatusCode.SUCCESS_STATUS ? "" : stateMachine.getFailureMessage()); // If RETRYING is triggered by this QueryExecution, the stateMachine.getFailureStatus() is also // not null. We should only return the failure status when QueryExecution is in Done state. @@ -581,29 +580,35 @@ public class QueryExecution implements IQueryExecution { if (analysis.getStatement() instanceof InsertBaseStatement && !analysis.isFinishQueryAfterAnalyze()) { InsertBaseStatement insertStatement = (InsertBaseStatement) analysis.getStatement(); - List<TEndPoint> redirectNodeList; - if (config.isClusterMode()) { - redirectNodeList = insertStatement.collectRedirectInfo(analysis.getDataPartitionInfo()); - } else { - redirectNodeList = Collections.emptyList(); - } + List<TEndPoint> redirectNodeList = + insertStatement.collectRedirectInfo(analysis.getDataPartitionInfo()); if (insertStatement instanceof InsertRowsStatement || insertStatement instanceof InsertMultiTabletsStatement) { // multiple devices if (statusCode == TSStatusCode.SUCCESS_STATUS) { + boolean needRedirect = false; List<TSStatus> subStatus = new ArrayList<>(); - tsstatus.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()); for (TEndPoint endPoint : redirectNodeList) { - subStatus.add( - StatusUtils.getStatus(TSStatusCode.REDIRECTION_RECOMMEND) - .setRedirectNode(endPoint)); + // redirect writing only if the redirectEndPoint is not the current node + if (!config.getAddressAndPort().equals(endPoint)) { + subStatus.add( + RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS).setRedirectNode(endPoint)); + needRedirect = true; + } else { + subStatus.add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + } + } + if (needRedirect) { + tsstatus.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()); + tsstatus.setSubStatus(subStatus); } - tsstatus.setSubStatus(subStatus); } } else { // single device - if (config.isClusterMode()) { - tsstatus.setRedirectNode(redirectNodeList.get(0)); + TEndPoint redirectEndPoint = redirectNodeList.get(0); + // redirect writing only if the redirectEndPoint is not the current node + if (!config.getAddressAndPort().equals(redirectEndPoint)) { + tsstatus.setRedirectNode(redirectEndPoint); } } }
