This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d4dd9e1427 Pipe: Fix insertNode.getDevicePath() is not handled 
correctly for insertRowsNode (NPE) (#12569)
7d4dd9e1427 is described below

commit 7d4dd9e1427324816143192437121efe9f2a03a5
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 22 14:19:20 2024 +0800

    Pipe: Fix insertNode.getDevicePath() is not handled correctly for 
insertRowsNode (NPE) (#12569)
---
 .../apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java   |  8 ++++----
 .../apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java |  5 ++---
 .../evolvable/builder/PipeTransferBatchReqBuilder.java        |  3 ++-
 .../protocol/thrift/async/IoTDBDataRegionAsyncConnector.java  |  5 ++++-
 .../handler/PipeTransferTabletInsertNodeEventHandler.java     |  3 ++-
 .../protocol/thrift/sync/IoTDBDataRegionSyncConnector.java    |  9 +++++++--
 .../common/tablet/PipeInsertNodeTabletInsertionEvent.java     | 11 ++++++++++-
 7 files changed, 31 insertions(+), 13 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
index e90c7ecf8ad..7ebf56c6035 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
@@ -915,9 +915,9 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeDualAutoIT {
       if (!TestUtils.tryExecuteNonQueriesWithRetry(
           senderEnv,
           Arrays.asList(
-              "insert into root.db.d1(time, s1) values (-123, 3)",
-              "insert into root.db.d1(time, s1) values (now(), 3)",
-              "flush"))) {
+              // Test the correctness of insertRowsNode transmission
+              "insert into root.db.d1(time, s1) values (-122, 3)",
+              "insert into root.db.d1(time, s1) values (-123, 3), (now(), 
3)"))) {
         return;
       }
 
@@ -925,7 +925,7 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeDualAutoIT {
           receiverEnv,
           "select count(*) from root.**",
           "count(root.db.d1.s1),",
-          Collections.singleton("5,"));
+          Collections.singleton("6,"));
     }
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
index 28852463a77..b0504f8b88e 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
@@ -632,8 +632,7 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualAutoIT {
               "insert into root.db.d1 (time, at1) values (2, 11)",
               "insert into root.db.d2 (time, at1) values (2, 21)",
               "insert into root.db.d3 (time, at1) values (2, 31)",
-              "insert into root.db.d4 (time, at1) values (2, 41)",
-              "flush"))) {
+              "insert into root.db.d4 (time, at1) values (2, 41), (3, 51)"))) {
         return;
       }
 
@@ -646,7 +645,7 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualAutoIT {
           receiverEnv,
           "select count(*) from root.** where time >= 2",
           "count(root.db.d4.at1),count(root.db.d2.at1),count(root.db.d3.at1),",
-          Collections.singleton("1,1,0,"));
+          Collections.singleton("2,1,0,"));
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
index 2b19bb1275d..087c969b89a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
@@ -114,7 +114,8 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
     } else if (event instanceof PipeInsertNodeTabletInsertionEvent) {
       final InsertNode insertNode =
           ((PipeInsertNodeTabletInsertionEvent) 
event).getInsertNodeViaCacheIfPossible();
-      if (Objects.nonNull(insertNode)) {
+      // insertNode.getDevicePath() is null for InsertRowsNode
+      if (Objects.nonNull(insertNode) && 
Objects.nonNull(insertNode.getDevicePath())) {
         deviceId = insertNode.getDevicePath().getFullPath();
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index dcab0b69b3c..632d929e372 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -196,7 +196,10 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
                 pipeInsertNodeTabletInsertionEvent, pipeTransferReq, this);
 
         transfer(
-            Objects.nonNull(insertNode) ? 
insertNode.getDevicePath().getFullPath() : null,
+            // insertNode.getDevicePath() is null for InsertRowsNode
+            Objects.nonNull(insertNode) && 
Objects.nonNull(insertNode.getDevicePath())
+                ? insertNode.getDevicePath().getFullPath()
+                : null,
             pipeTransferInsertNodeReqHandler);
       } else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
         final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
index 0d22722c4fc..21cc976be77 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
@@ -49,7 +49,8 @@ public class PipeTransferTabletInsertNodeEventHandler
   protected void updateLeaderCache(TSStatus status) {
     final InsertNode insertNode =
         ((PipeInsertNodeTabletInsertionEvent) 
event).getInsertNodeViaCacheIfPossible();
-    if (insertNode != null) {
+    // insertNode.getDevicePath() is null for InsertRowsNode
+    if (insertNode != null && insertNode.getDevicePath() != null) {
       connector.updateLeaderCache(
           insertNode.getDevicePath().getFullPath(), status.getRedirectNode());
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index cb93bffd411..12e9438f245 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -238,7 +238,11 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
       insertNode = 
pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
 
       if (insertNode != null) {
-        clientAndStatus = 
clientManager.getClient(insertNode.getDevicePath().getFullPath());
+        clientAndStatus =
+            // insertNode.getDevicePath() is null for InsertRowsNode
+            Objects.nonNull(insertNode.getDevicePath())
+                ? 
clientManager.getClient(insertNode.getDevicePath().getFullPath())
+                : clientManager.getClient();
         resp =
             clientAndStatus
                 .getLeft()
@@ -277,7 +281,8 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
               pipeInsertNodeTabletInsertionEvent.coreReportMessage(), status),
           pipeInsertNodeTabletInsertionEvent.toString());
     }
-    if (insertNode != null && status.isSetRedirectNode()) {
+    // insertNode.getDevicePath() is null for InsertRowsNode
+    if (insertNode != null && insertNode.getDevicePath() != null && 
status.isSetRedirectNode()) {
       clientManager.updateLeaderCache(
           insertNode.getDevicePath().getFullPath(), status.getRedirectNode());
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index dad53f2b69d..437d8b97d50 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
@@ -303,7 +304,15 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
     final InsertNode node = getInsertNodeViaCacheIfPossible();
     return super.shouldParsePattern()
         && Objects.nonNull(pipePattern)
-        && (Objects.isNull(node) || 
!pipePattern.coversDevice(node.getDevicePath().getFullPath()));
+        && (Objects.isNull(node)
+            || (node.getType() == PlanNodeType.INSERT_ROWS
+                ? ((InsertRowsNode) node)
+                    .getInsertRowNodeList().stream()
+                        .anyMatch(
+                            insertRowNode ->
+                                !pipePattern.coversDevice(
+                                    
insertRowNode.getDevicePath().getFullPath()))
+                : 
!pipePattern.coversDevice(node.getDevicePath().getFullPath())));
   }
 
   public List<PipeRawTabletInsertionEvent> toRawTabletInsertionEvents() {

Reply via email to