This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b0e48f2f42 Pipe: Fixed the shouldMarkAsPipeRequest for CreateTable 
and AlterLogicalView Sync (#16619)
9b0e48f2f42 is described below

commit 9b0e48f2f427b6e44e026c3855dd35e6be7d29b8
Author: Caideyipi <[email protected]>
AuthorDate: Thu Oct 23 14:14:42 2025 +0800

    Pipe: Fixed the shouldMarkAsPipeRequest for CreateTable and 
AlterLogicalView Sync (#16619)
    
    * unwebbed
    
    * fix
---
 .../pipe/receiver/protocol/IoTDBConfigNodeReceiver.java    | 14 +++++++++-----
 .../receiver/protocol/thrift/IoTDBDataNodeReceiver.java    |  3 ++-
 .../config/executor/ClusterConfigTaskExecutor.java         |  5 +++--
 .../execution/config/executor/IConfigTaskExecutor.java     |  3 ++-
 4 files changed, 16 insertions(+), 9 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index fb864d449bd..e4ccb60332c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -654,7 +654,7 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
       case DeleteTriggerInTable:
         return configManager.dropTrigger(
             new TDropTriggerReq(((DeleteTriggerInTablePlan) 
plan).getTriggerName())
-                .setIsGeneratedByPipe(true));
+                .setIsGeneratedByPipe(shouldMarkAsPipeRequest.get()));
       case SetTTL:
         return ((SetTTLPlan) plan).getTTL() == TTLCache.NULL_TTL
             ? configManager
@@ -664,7 +664,8 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
                 .getTTLManager()
                 .setTTL((SetTTLPlan) plan, shouldMarkAsPipeRequest.get());
       case PipeCreateTableOrView:
-        return executeIdempotentCreateTableOrView((PipeCreateTableOrViewPlan) 
plan, queryId);
+        return executeIdempotentCreateTableOrView(
+            (PipeCreateTableOrViewPlan) plan, queryId, 
shouldMarkAsPipeRequest.get());
       case AddTableColumn:
         return configManager
             .getProcedureManager()
@@ -941,7 +942,10 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
   }
 
   private TSStatus executeIdempotentCreateTableOrView(
-      final PipeCreateTableOrViewPlan plan, final String queryId) throws 
ConsensusException {
+      final PipeCreateTableOrViewPlan plan,
+      final String queryId,
+      final boolean shouldMarkAsPipeRequest)
+      throws ConsensusException {
     final String database = plan.getDatabase();
     final TsTable table = plan.getTable();
     final boolean isView = TreeViewSchema.isTreeViewTable(table);
@@ -957,8 +961,8 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
                     ? ProcedureType.CREATE_TABLE_VIEW_PROCEDURE
                     : ProcedureType.CREATE_TABLE_PROCEDURE,
                 isView
-                    ? new CreateTableViewProcedure(database, table, true, true)
-                    : new CreateTableProcedure(database, table, true));
+                    ? new CreateTableViewProcedure(database, table, true, 
shouldMarkAsPipeRequest)
+                    : new CreateTableProcedure(database, table, 
shouldMarkAsPipeRequest));
     // Note that the view and its column won't be auto created
     // Skip it to avoid affecting the existing base table
     if (!isView && result.getCode() == 
TSStatusCode.TABLE_ALREADY_EXISTS.getStatusCode()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 88b8f463a94..8e14571074b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -685,7 +685,8 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
       }
       return new TPipeTransferResp(
           ClusterConfigTaskExecutor.getInstance()
-              .alterLogicalViewByPipe((AlterLogicalViewNode) 
req.getPlanNode()));
+              .alterLogicalViewByPipe(
+                  (AlterLogicalViewNode) req.getPlanNode(), 
shouldMarkAsPipeRequest.get()));
     }
     final Object statement = 
PLAN_TO_STATEMENT_VISITOR.process(req.getPlanNode(), null);
     return statement instanceof Statement
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 1a8105dc613..8770ed2873c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -3018,7 +3018,8 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
   }
 
   @Override
-  public TSStatus alterLogicalViewByPipe(final AlterLogicalViewNode 
alterLogicalViewNode) {
+  public TSStatus alterLogicalViewByPipe(
+      final AlterLogicalViewNode alterLogicalViewNode, final boolean 
shouldMarkAsPipeRequest) {
     final Map<PartialPath, ViewExpression> viewPathToSourceMap =
         alterLogicalViewNode.getViewPathToSourceMap();
 
@@ -3037,7 +3038,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
         new TAlterLogicalViewReq(
                 Coordinator.getInstance().createQueryId().getId(),
                 ByteBuffer.wrap(stream.toByteArray()))
-            .setIsGeneratedByPipe(true);
+            .setIsGeneratedByPipe(shouldMarkAsPipeRequest);
     TSStatus tsStatus;
     try (final ConfigNodeClient client =
         
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
 {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
index 96471f21036..3250e18de14 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -250,7 +250,8 @@ public interface IConfigTaskExecutor {
   SettableFuture<ConfigTaskResult> alterLogicalView(
       AlterLogicalViewStatement alterLogicalViewStatement, MPPQueryContext 
context);
 
-  TSStatus alterLogicalViewByPipe(AlterLogicalViewNode alterLogicalViewNode);
+  TSStatus alterLogicalViewByPipe(
+      AlterLogicalViewNode alterLogicalViewNode, boolean 
shouldMarkAsPipeRequest);
 
   SettableFuture<ConfigTaskResult> getRegionId(GetRegionIdStatement 
getRegionIdStatement);
 

Reply via email to