This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7d4dd9e1427 Pipe: Fix insertNode.getDevicePath() is not handled
correctly for insertRowsNode (NPE) (#12569)
7d4dd9e1427 is described below
commit 7d4dd9e1427324816143192437121efe9f2a03a5
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 22 14:19:20 2024 +0800
Pipe: Fix insertNode.getDevicePath() is not handled correctly for
insertRowsNode (NPE) (#12569)
---
.../apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java | 8 ++++----
.../apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java | 5 ++---
.../evolvable/builder/PipeTransferBatchReqBuilder.java | 3 ++-
.../protocol/thrift/async/IoTDBDataRegionAsyncConnector.java | 5 ++++-
.../handler/PipeTransferTabletInsertNodeEventHandler.java | 3 ++-
.../protocol/thrift/sync/IoTDBDataRegionSyncConnector.java | 9 +++++++--
.../common/tablet/PipeInsertNodeTabletInsertionEvent.java | 11 ++++++++++-
7 files changed, 31 insertions(+), 13 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
index e90c7ecf8ad..7ebf56c6035 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
@@ -915,9 +915,9 @@ public class IoTDBPipeClusterIT extends
AbstractPipeDualAutoIT {
if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
Arrays.asList(
- "insert into root.db.d1(time, s1) values (-123, 3)",
- "insert into root.db.d1(time, s1) values (now(), 3)",
- "flush"))) {
+ // Test the correctness of insertRowsNode transmission
+ "insert into root.db.d1(time, s1) values (-122, 3)",
+ "insert into root.db.d1(time, s1) values (-123, 3), (now(),
3)"))) {
return;
}
@@ -925,7 +925,7 @@ public class IoTDBPipeClusterIT extends
AbstractPipeDualAutoIT {
receiverEnv,
"select count(*) from root.**",
"count(root.db.d1.s1),",
- Collections.singleton("5,"));
+ Collections.singleton("6,"));
}
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
index 28852463a77..b0504f8b88e 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
@@ -632,8 +632,7 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualAutoIT {
"insert into root.db.d1 (time, at1) values (2, 11)",
"insert into root.db.d2 (time, at1) values (2, 21)",
"insert into root.db.d3 (time, at1) values (2, 31)",
- "insert into root.db.d4 (time, at1) values (2, 41)",
- "flush"))) {
+ "insert into root.db.d4 (time, at1) values (2, 41), (3, 51)"))) {
return;
}
@@ -646,7 +645,7 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualAutoIT {
receiverEnv,
"select count(*) from root.** where time >= 2",
"count(root.db.d4.at1),count(root.db.d2.at1),count(root.db.d3.at1),",
- Collections.singleton("1,1,0,"));
+ Collections.singleton("2,1,0,"));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
index 2b19bb1275d..087c969b89a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
@@ -114,7 +114,8 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
} else if (event instanceof PipeInsertNodeTabletInsertionEvent) {
final InsertNode insertNode =
((PipeInsertNodeTabletInsertionEvent)
event).getInsertNodeViaCacheIfPossible();
- if (Objects.nonNull(insertNode)) {
+ // insertNode.getDevicePath() is null for InsertRowsNode
+ if (Objects.nonNull(insertNode) &&
Objects.nonNull(insertNode.getDevicePath())) {
deviceId = insertNode.getDevicePath().getFullPath();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index dcab0b69b3c..632d929e372 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -196,7 +196,10 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
pipeInsertNodeTabletInsertionEvent, pipeTransferReq, this);
transfer(
- Objects.nonNull(insertNode) ?
insertNode.getDevicePath().getFullPath() : null,
+ // insertNode.getDevicePath() is null for InsertRowsNode
+ Objects.nonNull(insertNode) &&
Objects.nonNull(insertNode.getDevicePath())
+ ? insertNode.getDevicePath().getFullPath()
+ : null,
pipeTransferInsertNodeReqHandler);
} else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
index 0d22722c4fc..21cc976be77 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
@@ -49,7 +49,8 @@ public class PipeTransferTabletInsertNodeEventHandler
protected void updateLeaderCache(TSStatus status) {
final InsertNode insertNode =
((PipeInsertNodeTabletInsertionEvent)
event).getInsertNodeViaCacheIfPossible();
- if (insertNode != null) {
+ // insertNode.getDevicePath() is null for InsertRowsNode
+ if (insertNode != null && insertNode.getDevicePath() != null) {
connector.updateLeaderCache(
insertNode.getDevicePath().getFullPath(), status.getRedirectNode());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index cb93bffd411..12e9438f245 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -238,7 +238,11 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
insertNode =
pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
if (insertNode != null) {
- clientAndStatus =
clientManager.getClient(insertNode.getDevicePath().getFullPath());
+ clientAndStatus =
+ // insertNode.getDevicePath() is null for InsertRowsNode
+ Objects.nonNull(insertNode.getDevicePath())
+ ?
clientManager.getClient(insertNode.getDevicePath().getFullPath())
+ : clientManager.getClient();
resp =
clientAndStatus
.getLeft()
@@ -277,7 +281,8 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
pipeInsertNodeTabletInsertionEvent.coreReportMessage(), status),
pipeInsertNodeTabletInsertionEvent.toString());
}
- if (insertNode != null && status.isSetRedirectNode()) {
+ // insertNode.getDevicePath() is null for InsertRowsNode
+ if (insertNode != null && insertNode.getDevicePath() != null &&
status.isSetRedirectNode()) {
clientManager.updateLeaderCache(
insertNode.getDevicePath().getFullPath(), status.getRedirectNode());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index dad53f2b69d..437d8b97d50 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
@@ -303,7 +304,15 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
final InsertNode node = getInsertNodeViaCacheIfPossible();
return super.shouldParsePattern()
&& Objects.nonNull(pipePattern)
- && (Objects.isNull(node) ||
!pipePattern.coversDevice(node.getDevicePath().getFullPath()));
+ && (Objects.isNull(node)
+ || (node.getType() == PlanNodeType.INSERT_ROWS
+ ? ((InsertRowsNode) node)
+ .getInsertRowNodeList().stream()
+ .anyMatch(
+ insertRowNode ->
+ !pipePattern.coversDevice(
+
insertRowNode.getDevicePath().getFullPath()))
+ :
!pipePattern.coversDevice(node.getDevicePath().getFullPath())));
}
public List<PipeRawTabletInsertionEvent> toRawTabletInsertionEvents() {