This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9b0e48f2f42 Pipe: Fixed the shouldMarkAsPipeRequest for CreateTable
and AlterLogicalView Sync (#16619)
9b0e48f2f42 is described below
commit 9b0e48f2f427b6e44e026c3855dd35e6be7d29b8
Author: Caideyipi <[email protected]>
AuthorDate: Thu Oct 23 14:14:42 2025 +0800
Pipe: Fixed the shouldMarkAsPipeRequest for CreateTable and
AlterLogicalView Sync (#16619)
* unwebbed
* fix
---
.../pipe/receiver/protocol/IoTDBConfigNodeReceiver.java | 14 +++++++++-----
.../receiver/protocol/thrift/IoTDBDataNodeReceiver.java | 3 ++-
.../config/executor/ClusterConfigTaskExecutor.java | 5 +++--
.../execution/config/executor/IConfigTaskExecutor.java | 3 ++-
4 files changed, 16 insertions(+), 9 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index fb864d449bd..e4ccb60332c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -654,7 +654,7 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
case DeleteTriggerInTable:
return configManager.dropTrigger(
new TDropTriggerReq(((DeleteTriggerInTablePlan)
plan).getTriggerName())
- .setIsGeneratedByPipe(true));
+ .setIsGeneratedByPipe(shouldMarkAsPipeRequest.get()));
case SetTTL:
return ((SetTTLPlan) plan).getTTL() == TTLCache.NULL_TTL
? configManager
@@ -664,7 +664,8 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
.getTTLManager()
.setTTL((SetTTLPlan) plan, shouldMarkAsPipeRequest.get());
case PipeCreateTableOrView:
- return executeIdempotentCreateTableOrView((PipeCreateTableOrViewPlan)
plan, queryId);
+ return executeIdempotentCreateTableOrView(
+ (PipeCreateTableOrViewPlan) plan, queryId,
shouldMarkAsPipeRequest.get());
case AddTableColumn:
return configManager
.getProcedureManager()
@@ -941,7 +942,10 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
}
private TSStatus executeIdempotentCreateTableOrView(
- final PipeCreateTableOrViewPlan plan, final String queryId) throws
ConsensusException {
+ final PipeCreateTableOrViewPlan plan,
+ final String queryId,
+ final boolean shouldMarkAsPipeRequest)
+ throws ConsensusException {
final String database = plan.getDatabase();
final TsTable table = plan.getTable();
final boolean isView = TreeViewSchema.isTreeViewTable(table);
@@ -957,8 +961,8 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
? ProcedureType.CREATE_TABLE_VIEW_PROCEDURE
: ProcedureType.CREATE_TABLE_PROCEDURE,
isView
- ? new CreateTableViewProcedure(database, table, true, true)
- : new CreateTableProcedure(database, table, true));
+ ? new CreateTableViewProcedure(database, table, true,
shouldMarkAsPipeRequest)
+ : new CreateTableProcedure(database, table,
shouldMarkAsPipeRequest));
// Note that the view and its column won't be auto created
// Skip it to avoid affecting the existing base table
if (!isView && result.getCode() ==
TSStatusCode.TABLE_ALREADY_EXISTS.getStatusCode()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 88b8f463a94..8e14571074b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -685,7 +685,8 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
}
return new TPipeTransferResp(
ClusterConfigTaskExecutor.getInstance()
- .alterLogicalViewByPipe((AlterLogicalViewNode)
req.getPlanNode()));
+ .alterLogicalViewByPipe(
+ (AlterLogicalViewNode) req.getPlanNode(),
shouldMarkAsPipeRequest.get()));
}
final Object statement =
PLAN_TO_STATEMENT_VISITOR.process(req.getPlanNode(), null);
return statement instanceof Statement
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 1a8105dc613..8770ed2873c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -3018,7 +3018,8 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
}
@Override
- public TSStatus alterLogicalViewByPipe(final AlterLogicalViewNode
alterLogicalViewNode) {
+ public TSStatus alterLogicalViewByPipe(
+ final AlterLogicalViewNode alterLogicalViewNode, final boolean
shouldMarkAsPipeRequest) {
final Map<PartialPath, ViewExpression> viewPathToSourceMap =
alterLogicalViewNode.getViewPathToSourceMap();
@@ -3037,7 +3038,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
new TAlterLogicalViewReq(
Coordinator.getInstance().createQueryId().getId(),
ByteBuffer.wrap(stream.toByteArray()))
- .setIsGeneratedByPipe(true);
+ .setIsGeneratedByPipe(shouldMarkAsPipeRequest);
TSStatus tsStatus;
try (final ConfigNodeClient client =
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
index 96471f21036..3250e18de14 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -250,7 +250,8 @@ public interface IConfigTaskExecutor {
SettableFuture<ConfigTaskResult> alterLogicalView(
AlterLogicalViewStatement alterLogicalViewStatement, MPPQueryContext
context);
- TSStatus alterLogicalViewByPipe(AlterLogicalViewNode alterLogicalViewNode);
+ TSStatus alterLogicalViewByPipe(
+ AlterLogicalViewNode alterLogicalViewNode, boolean
shouldMarkAsPipeRequest);
SettableFuture<ConfigTaskResult> getRegionId(GetRegionIdStatement
getRegionIdStatement);