This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e49a07d1129 Pipe: fix PipeEnrichedStatement can't contain redirect
info (#12579)
e49a07d1129 is described below
commit e49a07d11297426c4652f8bde7fad142e23afec3
Author: Zikun Ma <[email protected]>
AuthorDate: Fri May 24 14:35:53 2024 +0800
Pipe: fix PipeEnrichedStatement can't contain redirect info (#12579)
---
.../evolvable/builder/PipeTransferBatchReqBuilder.java | 8 +-------
.../handler/PipeTransferTabletInsertNodeEventHandler.java | 8 ++------
.../thrift/sync/IoTDBDataRegionSyncConnector.java | 9 ++-------
.../common/tablet/PipeInsertNodeTabletInsertionEvent.java | 13 +++++++++++++
.../db/pipe/event/realtime/PipeRealtimeEventFactory.java | 1 +
.../db/queryengine/plan/planner/TreeModelPlanner.java | 15 +++++++++++++--
6 files changed, 32 insertions(+), 22 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
index 087c969b89a..33b524b1a9a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeCacheLeaderClientManager;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
@@ -112,12 +111,7 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
if (event instanceof PipeRawTabletInsertionEvent) {
deviceId = ((PipeRawTabletInsertionEvent) event).getDeviceId();
} else if (event instanceof PipeInsertNodeTabletInsertionEvent) {
- final InsertNode insertNode =
- ((PipeInsertNodeTabletInsertionEvent)
event).getInsertNodeViaCacheIfPossible();
- // insertNode.getDevicePath() is null for InsertRowsNode
- if (Objects.nonNull(insertNode) &&
Objects.nonNull(insertNode.getDevicePath())) {
- deviceId = insertNode.getDevicePath().getFullPath();
- }
+ deviceId = ((PipeInsertNodeTabletInsertionEvent) event).getDeviceId();
}
if (Objects.isNull(deviceId)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
index 21cc976be77..660b2677ad5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
@@ -47,12 +46,9 @@ public class PipeTransferTabletInsertNodeEventHandler
@Override
protected void updateLeaderCache(TSStatus status) {
- final InsertNode insertNode =
- ((PipeInsertNodeTabletInsertionEvent)
event).getInsertNodeViaCacheIfPossible();
- // insertNode.getDevicePath() is null for InsertRowsNode
- if (insertNode != null && insertNode.getDevicePath() != null) {
+ if (((PipeInsertNodeTabletInsertionEvent) event).getDeviceId() != null) {
connector.updateLeaderCache(
- insertNode.getDevicePath().getFullPath(), status.getRedirectNode());
+ ((PipeInsertNodeTabletInsertionEvent) event).getDeviceId(),
status.getRedirectNode());
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index 12e9438f245..8ee65ef94ed 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -236,13 +236,9 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
try {
insertNode =
pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
-
+ // getDeviceId() may return null for InsertRowsNode, will be equal to
getClient(null)
+ clientAndStatus =
clientManager.getClient(pipeInsertNodeTabletInsertionEvent.getDeviceId());
if (insertNode != null) {
- clientAndStatus =
- // insertNode.getDevicePath() is null for InsertRowsNode
- Objects.nonNull(insertNode.getDevicePath())
- ?
clientManager.getClient(insertNode.getDevicePath().getFullPath())
- : clientManager.getClient();
resp =
clientAndStatus
.getLeft()
@@ -250,7 +246,6 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
compressIfNeeded(
PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode)));
} else {
- clientAndStatus = clientManager.getClient();
resp =
clientAndStatus
.getLeft()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 437d8b97d50..11711fa1d03 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.event.common.tablet;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
@@ -62,15 +63,19 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
private List<TabletInsertionDataContainer> dataContainers;
+ private final PartialPath devicePath;
+
private ProgressIndex progressIndex;
public PipeInsertNodeTabletInsertionEvent(
WALEntryHandler walEntryHandler,
+ PartialPath devicePath,
ProgressIndex progressIndex,
boolean isAligned,
boolean isGeneratedByPipe) {
this(
walEntryHandler,
+ devicePath,
progressIndex,
isAligned,
isGeneratedByPipe,
@@ -83,6 +88,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
private PipeInsertNodeTabletInsertionEvent(
WALEntryHandler walEntryHandler,
+ PartialPath devicePath,
ProgressIndex progressIndex,
boolean isAligned,
boolean isGeneratedByPipe,
@@ -93,6 +99,8 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
long endTime) {
super(pipeName, pipeTaskMeta, pattern, startTime, endTime);
this.walEntryHandler = walEntryHandler;
+ // Record device path here so there's no need to get it from InsertNode
cache later.
+ this.devicePath = devicePath;
this.progressIndex = progressIndex;
this.isAligned = isAligned;
this.isGeneratedByPipe = isGeneratedByPipe;
@@ -114,6 +122,10 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
return walEntryHandler.getInsertNodeViaCacheIfPossible();
}
+ public String getDeviceId() {
+ return Objects.nonNull(devicePath) ? devicePath.getFullPath() : null;
+ }
+
/////////////////////////// EnrichedEvent ///////////////////////////
@Override
@@ -170,6 +182,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
long endTime) {
return new PipeInsertNodeTabletInsertionEvent(
walEntryHandler,
+ devicePath,
progressIndex,
isAligned,
isGeneratedByPipe,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
index 32be413e660..94793b74bc1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
@@ -44,6 +44,7 @@ public class PipeRealtimeEventFactory {
return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeTabletInsertionEvent(
new PipeInsertNodeTabletInsertionEvent(
walEntryHandler,
+ insertNode.getDevicePath(),
insertNode.getProgressIndex(),
insertNode.isAligned(),
insertNode.isGeneratedByPipe()),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
index a42ff843f30..4ea3048f2c3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
@@ -46,12 +46,16 @@ import
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
public class TreeModelPlanner implements IPlanner {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TreeModelPlanner.class);
private final Statement statement;
@@ -159,9 +163,16 @@ public class TreeModelPlanner implements IPlanner {
public void setRedirectInfo(
IAnalysis iAnalysis, TEndPoint localEndPoint, TSStatus tsstatus,
TSStatusCode statusCode) {
Analysis analysis = (Analysis) iAnalysis;
- if (analysis.getStatement() instanceof InsertBaseStatement
+
+ // Get the inner statement of PipeEnrichedStatement
+ Statement statementToRedirect =
+ analysis.getStatement() instanceof PipeEnrichedStatement
+ ? ((PipeEnrichedStatement)
analysis.getStatement()).getInnerStatement()
+ : analysis.getStatement();
+
+ if (statementToRedirect instanceof InsertBaseStatement
&& !analysis.isFinishQueryAfterAnalyze()) {
- InsertBaseStatement insertStatement = (InsertBaseStatement)
analysis.getStatement();
+ InsertBaseStatement insertStatement = (InsertBaseStatement)
statementToRedirect;
List<TEndPoint> redirectNodeList = analysis.getRedirectNodeList();
if (insertStatement instanceof InsertRowsStatement
|| insertStatement instanceof InsertMultiTabletsStatement) {