This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e49a07d1129 Pipe: fix PipeEnrichedStatement can't contain redirect 
info (#12579)
e49a07d1129 is described below

commit e49a07d11297426c4652f8bde7fad142e23afec3
Author: Zikun Ma <[email protected]>
AuthorDate: Fri May 24 14:35:53 2024 +0800

    Pipe: fix PipeEnrichedStatement can't contain redirect info (#12579)
---
 .../evolvable/builder/PipeTransferBatchReqBuilder.java    |  8 +-------
 .../handler/PipeTransferTabletInsertNodeEventHandler.java |  8 ++------
 .../thrift/sync/IoTDBDataRegionSyncConnector.java         |  9 ++-------
 .../common/tablet/PipeInsertNodeTabletInsertionEvent.java | 13 +++++++++++++
 .../db/pipe/event/realtime/PipeRealtimeEventFactory.java  |  1 +
 .../db/queryengine/plan/planner/TreeModelPlanner.java     | 15 +++++++++++++--
 6 files changed, 32 insertions(+), 22 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
index 087c969b89a..33b524b1a9a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeCacheLeaderClientManager;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -112,12 +111,7 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
     if (event instanceof PipeRawTabletInsertionEvent) {
       deviceId = ((PipeRawTabletInsertionEvent) event).getDeviceId();
     } else if (event instanceof PipeInsertNodeTabletInsertionEvent) {
-      final InsertNode insertNode =
-          ((PipeInsertNodeTabletInsertionEvent) 
event).getInsertNodeViaCacheIfPossible();
-      // insertNode.getDevicePath() is null for InsertRowsNode
-      if (Objects.nonNull(insertNode) && 
Objects.nonNull(insertNode.getDevicePath())) {
-        deviceId = insertNode.getDevicePath().getFullPath();
-      }
+      deviceId = ((PipeInsertNodeTabletInsertionEvent) event).getDeviceId();
     }
 
     if (Objects.isNull(deviceId)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
index 21cc976be77..660b2677ad5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
@@ -47,12 +46,9 @@ public class PipeTransferTabletInsertNodeEventHandler
 
   @Override
   protected void updateLeaderCache(TSStatus status) {
-    final InsertNode insertNode =
-        ((PipeInsertNodeTabletInsertionEvent) 
event).getInsertNodeViaCacheIfPossible();
-    // insertNode.getDevicePath() is null for InsertRowsNode
-    if (insertNode != null && insertNode.getDevicePath() != null) {
+    if (((PipeInsertNodeTabletInsertionEvent) event).getDeviceId() != null) {
       connector.updateLeaderCache(
-          insertNode.getDevicePath().getFullPath(), status.getRedirectNode());
+          ((PipeInsertNodeTabletInsertionEvent) event).getDeviceId(), 
status.getRedirectNode());
     }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index 12e9438f245..8ee65ef94ed 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -236,13 +236,9 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
 
     try {
       insertNode = 
pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
-
+      // getDeviceId() may return null for InsertRowsNode, will be equal to 
getClient(null)
+      clientAndStatus = 
clientManager.getClient(pipeInsertNodeTabletInsertionEvent.getDeviceId());
       if (insertNode != null) {
-        clientAndStatus =
-            // insertNode.getDevicePath() is null for InsertRowsNode
-            Objects.nonNull(insertNode.getDevicePath())
-                ? 
clientManager.getClient(insertNode.getDevicePath().getFullPath())
-                : clientManager.getClient();
         resp =
             clientAndStatus
                 .getLeft()
@@ -250,7 +246,6 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
                     compressIfNeeded(
                         
PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode)));
       } else {
-        clientAndStatus = clientManager.getClient();
         resp =
             clientAndStatus
                 .getLeft()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 437d8b97d50..11711fa1d03 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.event.common.tablet;
 
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
@@ -62,15 +63,19 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
 
   private List<TabletInsertionDataContainer> dataContainers;
 
+  private final PartialPath devicePath;
+
   private ProgressIndex progressIndex;
 
   public PipeInsertNodeTabletInsertionEvent(
       WALEntryHandler walEntryHandler,
+      PartialPath devicePath,
       ProgressIndex progressIndex,
       boolean isAligned,
       boolean isGeneratedByPipe) {
     this(
         walEntryHandler,
+        devicePath,
         progressIndex,
         isAligned,
         isGeneratedByPipe,
@@ -83,6 +88,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
 
   private PipeInsertNodeTabletInsertionEvent(
       WALEntryHandler walEntryHandler,
+      PartialPath devicePath,
       ProgressIndex progressIndex,
       boolean isAligned,
       boolean isGeneratedByPipe,
@@ -93,6 +99,8 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
       long endTime) {
     super(pipeName, pipeTaskMeta, pattern, startTime, endTime);
     this.walEntryHandler = walEntryHandler;
+    // Record device path here so there's no need to get it from InsertNode 
cache later.
+    this.devicePath = devicePath;
     this.progressIndex = progressIndex;
     this.isAligned = isAligned;
     this.isGeneratedByPipe = isGeneratedByPipe;
@@ -114,6 +122,10 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
     return walEntryHandler.getInsertNodeViaCacheIfPossible();
   }
 
+  public String getDeviceId() {
+    return Objects.nonNull(devicePath) ? devicePath.getFullPath() : null;
+  }
+
   /////////////////////////// EnrichedEvent ///////////////////////////
 
   @Override
@@ -170,6 +182,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
       long endTime) {
     return new PipeInsertNodeTabletInsertionEvent(
         walEntryHandler,
+        devicePath,
         progressIndex,
         isAligned,
         isGeneratedByPipe,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
index 32be413e660..94793b74bc1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
@@ -44,6 +44,7 @@ public class PipeRealtimeEventFactory {
     return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeTabletInsertionEvent(
         new PipeInsertNodeTabletInsertionEvent(
             walEntryHandler,
+            insertNode.getDevicePath(),
             insertNode.getProgressIndex(),
             insertNode.isAligned(),
             insertNode.isGeneratedByPipe()),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
index a42ff843f30..4ea3048f2c3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
@@ -46,12 +46,16 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 
 public class TreeModelPlanner implements IPlanner {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TreeModelPlanner.class);
 
   private final Statement statement;
 
@@ -159,9 +163,16 @@ public class TreeModelPlanner implements IPlanner {
   public void setRedirectInfo(
       IAnalysis iAnalysis, TEndPoint localEndPoint, TSStatus tsstatus, 
TSStatusCode statusCode) {
     Analysis analysis = (Analysis) iAnalysis;
-    if (analysis.getStatement() instanceof InsertBaseStatement
+
+    // Get the inner statement of PipeEnrichedStatement
+    Statement statementToRedirect =
+        analysis.getStatement() instanceof PipeEnrichedStatement
+            ? ((PipeEnrichedStatement) 
analysis.getStatement()).getInnerStatement()
+            : analysis.getStatement();
+
+    if (statementToRedirect instanceof InsertBaseStatement
         && !analysis.isFinishQueryAfterAnalyze()) {
-      InsertBaseStatement insertStatement = (InsertBaseStatement) 
analysis.getStatement();
+      InsertBaseStatement insertStatement = (InsertBaseStatement) 
statementToRedirect;
       List<TEndPoint> redirectNodeList = analysis.getRedirectNodeList();
       if (insertStatement instanceof InsertRowsStatement
           || insertStatement instanceof InsertMultiTabletsStatement) {

Reply via email to