This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 40c32518da0 Pipe Schema: Receiver Agent: config plan node to statement 
transformation logic (#11689)
40c32518da0 is described below

commit 40c32518da0542552d9a1b5e825f6efded3575db
Author: Caideyipi <[email protected]>
AuthorDate: Sat Dec 16 16:23:14 2023 +0800

    Pipe Schema: Receiver Agent: config plan node to statement transformation 
logic (#11689)
---
 .../consensus/request/ConfigPhysicalPlan.java      |   4 +
 .../consensus/request/ConfigPhysicalPlanType.java  |   4 +-
 .../request/write/pipe/PipeEnrichedPlan.java       |  79 +++++++++
 .../iotdb/confignode/manager/ConfigManager.java    |   7 +
 .../apache/iotdb/confignode/manager/IManager.java  |   8 +
 .../persistence/executor/ConfigPlanExecutor.java   |   3 +
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  30 +---
 .../pipe/receiver/PipePlanToStatementVisitor.java  | 186 +++++++++++++++++++++
 .../iotdb/db/protocol/client/ConfigNodeClient.java |  30 +---
 .../queryengine/plan/analyze/AnalyzeVisitor.java   |   2 +-
 .../config/executor/ClusterConfigTaskExecutor.java |  82 ++++++++-
 .../config/executor/IConfigTaskExecutor.java       |   8 +
 .../plan/planner/LogicalPlanVisitor.java           |   4 +-
 .../metadata/view/CreateLogicalViewStatement.java  |  10 +-
 .../commons/utils/ThriftConfigNodeSerDeUtils.java  |   3 +
 .../src/main/thrift/confignode.thrift              |  30 +---
 16 files changed, 409 insertions(+), 81 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index e4364dc63b4..ef853cfd9d7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -74,6 +74,7 @@ import 
org.apache.iotdb.confignode.consensus.request.write.function.DropFunction
 import 
org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.pipe.PipeEnrichedPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleLeaderChangePlan;
@@ -419,6 +420,9 @@ public abstract class ConfigPhysicalPlan implements 
IConsensusRequest {
         case PipeHandleMetaChange:
           plan = new PipeHandleMetaChangePlan();
           break;
+        case PipeEnriched:
+          plan = new PipeEnrichedPlan();
+          break;
         case GetRegionId:
           plan = new GetRegionIdPlan();
           break;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index eae4bd44668..aaadd5aa55c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -194,7 +194,6 @@ public enum ConfigPhysicalPlanType {
 
   /** Pipe Task. */
   CreatePipeV2((short) 1500),
-  /** START PIPE & STOP PIPE. */
   SetPipeStatusV2((short) 1501),
   DropPipeV2((short) 1502),
   ShowPipeV2((short) 1503),
@@ -202,6 +201,9 @@ public enum ConfigPhysicalPlanType {
   /** Pipe Runtime. */
   PipeHandleLeaderChange((short) 1600),
   PipeHandleMetaChange((short) 1601),
+
+  /** Pipe PayLoad. */
+  PipeEnriched((short) 1700),
   ;
 
   private final short planType;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/PipeEnrichedPlan.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/PipeEnrichedPlan.java
new file mode 100644
index 00000000000..e1e7221a5de
--- /dev/null
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/PipeEnrichedPlan.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.write.pipe;
+
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class PipeEnrichedPlan extends ConfigPhysicalPlan {
+
+  private ConfigPhysicalPlan innerPlan;
+
+  public PipeEnrichedPlan() {
+    super(ConfigPhysicalPlanType.PipeEnriched);
+  }
+
+  public PipeEnrichedPlan(ConfigPhysicalPlan innerPlan) {
+    super(ConfigPhysicalPlanType.PipeEnriched);
+    this.innerPlan = innerPlan;
+  }
+
+  public ConfigPhysicalPlan getInnerPlan() {
+    return innerPlan;
+  }
+
+  @Override
+  protected void serializeImpl(DataOutputStream stream) throws IOException {
+    stream.writeShort(getType().getPlanType());
+    stream.write(innerPlan.serializeToByteBuffer().array());
+  }
+
+  @Override
+  protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+    innerPlan = ConfigPhysicalPlan.Factory.create(buffer);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    PipeEnrichedPlan that = (PipeEnrichedPlan) obj;
+    return innerPlan.equals(that.innerPlan);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(innerPlan);
+  }
+
+  @Override
+  public String toString() {
+    return "PipeEnrichedPlan{" + "innerPlan='" + innerPlan + "'}";
+  }
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index cdd43c7822d..485e8b3373b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -1724,6 +1724,13 @@ public class ConfigManager implements IManager {
         : new TGetAllPipeInfoResp(status, Collections.emptyList());
   }
 
+  @Override
+  public TSStatus executeSyncCommand(ByteBuffer configPhysicalPlanBinary) {
+    TSStatus status = confirmLeader();
+    // TODO: determine whether to use procedure based on plan type
+    return status;
+  }
+
   @Override
   public TGetRegionIdResp getRegionId(TGetRegionIdReq req) {
     TSStatus status = confirmLeader();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 8656d9d411d..920e1fe4714 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -106,6 +106,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
 import org.apache.iotdb.consensus.common.DataSet;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 
 /**
@@ -590,6 +591,13 @@ public interface IManager {
    */
   TGetAllPipeInfoResp getAllPipeInfo();
 
+  /**
+   * Execute the config plan received from pipe.
+   *
+   * @return The result of the command execution.
+   */
+  TSStatus executeSyncCommand(ByteBuffer configPhysicalPlanBinary);
+
   /**
    * Get RegionId. used for Show cluster slots information in
    * docs/zh/UserGuide/Cluster/Cluster-Maintenance.md.
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 53759f6cc43..3a9994848c3 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -71,6 +71,7 @@ import 
org.apache.iotdb.confignode.consensus.request.write.function.DropFunction
 import 
org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.pipe.PipeEnrichedPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleLeaderChangePlan;
@@ -446,6 +447,8 @@ public class ConfigPlanExecutor {
         return quotaInfo.setSpaceQuota((SetSpaceQuotaPlan) physicalPlan);
       case setThrottleQuota:
         return quotaInfo.setThrottleQuota((SetThrottleQuotaPlan) physicalPlan);
+      case PipeEnriched:
+        return executeNonQueryPlan(((PipeEnrichedPlan) 
physicalPlan).getInnerPlan());
       default:
         throw new UnknownPhysicalPlanTypeException(physicalPlan.getType());
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 717c7abe983..7c03d7e7ce7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -105,7 +105,6 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
-import org.apache.iotdb.confignode.rpc.thrift.TDropPipeSinkReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
@@ -118,8 +117,6 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
-import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
-import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
@@ -132,7 +129,6 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
@@ -917,27 +913,6 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
     return configManager.alterLogicalView(req);
   }
 
-  @Override
-  @Deprecated
-  public TSStatus createPipeSink(TPipeSinkInfo req) {
-    // To be deleted
-    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-  }
-
-  @Override
-  @Deprecated
-  public TSStatus dropPipeSink(TDropPipeSinkReq req) {
-    // To be deleted
-    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-  }
-
-  @Override
-  @Deprecated
-  public TGetPipeSinkResp getPipeSink(TGetPipeSinkReq req) {
-    // To be deleted
-    return new TGetPipeSinkResp();
-  }
-
   @Override
   public TSStatus createPipe(TCreatePipeReq req) {
     return configManager.createPipe(req);
@@ -968,6 +943,11 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
     return configManager.getAllPipeInfo();
   }
 
+  @Override
+  public TSStatus executeSyncCommand(ByteBuffer configPhysicalPlanBinary) {
+    return configManager.executeSyncCommand(configPhysicalPlanBinary);
+  }
+
   @Override
   public TGetRegionIdResp getRegionId(TGetRegionIdReq req) {
     if (req.isSetTimeStamp() && req.getType() != 
TConsensusGroupType.DataRegion) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipePlanToStatementVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipePlanToStatementVisitor.java
new file mode 100644
index 00000000000..19b064643ca
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipePlanToStatementVisitor.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.receiver;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.ActivateTemplateNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.BatchActivateTemplateNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalBatchActivateTemplateNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalCreateMultiTimeSeriesNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.MeasurementGroup;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.DeleteDataStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateTimeSeriesStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.AlterTimeSeriesStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateMultiTimeSeriesStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateTimeSeriesStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.ActivateTemplateStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchActivateTemplateStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogicalViewStatement;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class PipePlanToStatementVisitor extends PlanVisitor<Statement, Void> {
+
+  @Override
+  public Statement visitPlan(PlanNode node, Void context) {
+    throw new UnsupportedOperationException(
+        "PipePlanToStatementVisitor does not support visiting general plan.");
+  }
+
+  @Override
+  public CreateTimeSeriesStatement visitCreateTimeSeries(CreateTimeSeriesNode 
node, Void context) {
+    CreateTimeSeriesStatement statement = new CreateTimeSeriesStatement();
+    statement.setPath(node.getPath());
+    statement.setDataType(node.getDataType());
+    statement.setEncoding(node.getEncoding());
+    statement.setCompressor(node.getCompressor());
+    statement.setProps(node.getProps());
+    statement.setAttributes(node.getAttributes());
+    statement.setAlias(node.getAlias());
+    statement.setTags(node.getTags());
+    return statement;
+  }
+
+  @Override
+  public CreateMultiTimeSeriesStatement visitCreateMultiTimeSeries(
+      CreateMultiTimeSeriesNode node, Void context) {
+    CreateMultiTimeSeriesStatement statement = new 
CreateMultiTimeSeriesStatement();
+
+    List<PartialPath> paths = new ArrayList<>();
+    List<TSDataType> dataTypes = new ArrayList<>();
+    List<TSEncoding> encodings = new ArrayList<>();
+    List<CompressionType> compressors = new ArrayList<>();
+    List<Map<String, String>> propsList = new ArrayList<>();
+    List<String> aliasList = new ArrayList<>();
+    List<Map<String, String>> tagsList = new ArrayList<>();
+    List<Map<String, String>> attributesList = new ArrayList<>();
+
+    for (Map.Entry<PartialPath, MeasurementGroup> path2Group :
+        node.getMeasurementGroupMap().entrySet()) {
+      MeasurementGroup group = path2Group.getValue();
+      dataTypes.addAll(group.getDataTypes());
+      encodings.addAll(group.getEncodings());
+      compressors.addAll(group.getCompressors());
+      propsList.addAll(group.getPropsList());
+      aliasList.addAll(group.getAliasList());
+      tagsList.addAll(group.getTagsList());
+      attributesList.addAll(group.getAttributesList());
+      for (int i = 0; i < group.getAttributesList().size(); ++i) {
+        paths.add(path2Group.getKey());
+      }
+    }
+
+    statement.setPaths(paths);
+    statement.setDataTypes(dataTypes);
+    statement.setEncodings(encodings);
+    statement.setCompressors(compressors);
+    statement.setPropsList(propsList);
+    statement.setAliasList(aliasList);
+    statement.setTagsList(tagsList);
+    statement.setAttributesList(attributesList);
+    return statement;
+  }
+
+  @Override
+  public AlterTimeSeriesStatement visitAlterTimeSeries(AlterTimeSeriesNode 
node, Void context) {
+    AlterTimeSeriesStatement statement = new AlterTimeSeriesStatement();
+    statement.setAlterMap(node.getAlterMap());
+    statement.setAlterType(node.getAlterType());
+    statement.setAttributesMap(node.getAttributesMap());
+    statement.setAlias(node.getAlias());
+    statement.setTagsMap(node.getTagsMap());
+    statement.setPath(node.getPath());
+    return statement;
+  }
+
+  @Override
+  public InternalCreateTimeSeriesStatement visitInternalCreateTimeSeries(
+      InternalCreateTimeSeriesNode node, Void context) {
+    return new InternalCreateTimeSeriesStatement(
+        node.getDevicePath(),
+        node.getMeasurementGroup().getMeasurements(),
+        node.getMeasurementGroup().getDataTypes(),
+        node.getMeasurementGroup().getEncodings(),
+        node.getMeasurementGroup().getCompressors(),
+        node.isAligned());
+  }
+
+  @Override
+  public ActivateTemplateStatement visitActivateTemplate(ActivateTemplateNode 
node, Void context) {
+    ActivateTemplateStatement statement = new ActivateTemplateStatement();
+    statement.setPath(node.getActivatePath());
+    return statement;
+  }
+
+  @Override
+  public BatchActivateTemplateStatement visitInternalBatchActivateTemplate(
+      InternalBatchActivateTemplateNode node, Void context) {
+    return new BatchActivateTemplateStatement(
+        new ArrayList<>(node.getTemplateActivationMap().keySet()));
+  }
+
+  @Override
+  public InternalCreateMultiTimeSeriesStatement 
visitInternalCreateMultiTimeSeries(
+      InternalCreateMultiTimeSeriesNode node, Void context) {
+    return new InternalCreateMultiTimeSeriesStatement(node.getDeviceMap());
+  }
+
+  @Override
+  public BatchActivateTemplateStatement visitBatchActivateTemplate(
+      BatchActivateTemplateNode node, Void context) {
+    return new BatchActivateTemplateStatement(
+        new ArrayList<>(node.getTemplateActivationMap().keySet()));
+  }
+
+  @Override
+  public CreateLogicalViewStatement visitCreateLogicalView(
+      CreateLogicalViewNode node, Void context) {
+    CreateLogicalViewStatement statement = new CreateLogicalViewStatement();
+    statement.setTargetFullPaths(node.getViewPathList());
+    statement.setViewExpressions(new 
ArrayList<>(node.getViewPathToSourceExpressionMap().values()));
+    return statement;
+  }
+
+  // We do not support AlterLogicalViewNode parsing and use direct rpc instead
+
+  @Override
+  public DeleteDataStatement visitDeleteData(DeleteDataNode node, Void 
context) {
+    DeleteDataStatement statement = new DeleteDataStatement();
+    statement.setDeleteEndTime(node.getDeleteEndTime());
+    statement.setDeleteStartTime(node.getDeleteStartTime());
+    statement.setPathList(node.getPathList());
+    return statement;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index 8c3382f9ddb..a661171d173 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -73,7 +73,6 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
-import org.apache.iotdb.confignode.rpc.thrift.TDropPipeSinkReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
@@ -86,8 +85,6 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
-import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
-import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
@@ -100,7 +97,6 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
@@ -140,6 +136,7 @@ import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
@@ -854,24 +851,6 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
         () -> client.alterLogicalView(req), status -> 
!updateConfigNodeLeader(status));
   }
 
-  @Override
-  public TSStatus createPipeSink(TPipeSinkInfo req) throws TException {
-    return executeRemoteCallWithRetry(
-        () -> client.createPipeSink(req), status -> 
!updateConfigNodeLeader(status));
-  }
-
-  @Override
-  public TSStatus dropPipeSink(TDropPipeSinkReq req) throws TException {
-    return executeRemoteCallWithRetry(
-        () -> client.dropPipeSink(req), status -> 
!updateConfigNodeLeader(status));
-  }
-
-  @Override
-  public TGetPipeSinkResp getPipeSink(TGetPipeSinkReq req) throws TException {
-    return executeRemoteCallWithRetry(
-        () -> client.getPipeSink(req), resp -> 
!updateConfigNodeLeader(resp.status));
-  }
-
   @Override
   public TSStatus createPipe(TCreatePipeReq req) throws TException {
     return executeRemoteCallWithRetry(
@@ -908,6 +887,13 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
         () -> client.getAllPipeInfo(), resp -> 
!updateConfigNodeLeader(resp.status));
   }
 
+  @Override
+  public TSStatus executeSyncCommand(ByteBuffer configPhysicalPlanBinary) 
throws TException {
+    return executeRemoteCallWithRetry(
+        () -> client.executeSyncCommand(configPhysicalPlanBinary),
+        status -> !updateConfigNodeLeader(status));
+  }
+
   @Override
   public TGetRegionIdResp getRegionId(TGetRegionIdReq req) throws TException {
     return executeRemoteCallWithRetry(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 6ddf58895ac..f6e9fe453e9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -3400,7 +3400,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     context.setQueryType(QueryType.WRITE);
     analysis.setStatement(createLogicalViewStatement);
 
-    if (createLogicalViewStatement.getViewExpression() == null) {
+    if (createLogicalViewStatement.getViewExpressions() == null) {
       // analyze query in statement
       QueryStatement queryStatement = 
createLogicalViewStatement.getQueryStatement();
       if (queryStatement != null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 2cf0ab7ea8d..4cdefa63146 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -138,6 +138,7 @@ import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.ShowSpace
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.ShowThrottleQuotaTask;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
 import 
org.apache.iotdb.db.queryengine.plan.expression.visitor.TransformToViewExpressionVisitor;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.AlterLogicalViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.CountDatabaseStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.CountTimeSlotListStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateContinuousQueryStatement;
@@ -1678,6 +1679,24 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
     return future;
   }
 
+  @Override
+  public SettableFuture<ConfigTaskResult> executeSyncCommand(ByteBuffer 
configPhysicalPlanBinary) {
+    SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+    try (ConfigNodeClient configNodeClient =
+        
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+      TSStatus tsStatus = 
configNodeClient.executeSyncCommand(configPhysicalPlanBinary);
+      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
+        LOGGER.warn("Failed to executeSyncCommand, status is {}.", tsStatus);
+        future.setException(new IoTDBException(tsStatus.message, 
tsStatus.code));
+      } else {
+        future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+      }
+    } catch (Exception e) {
+      future.setException(e);
+    }
+    return future;
+  }
+
   @Override
   public SettableFuture<ConfigTaskResult> deleteTimeSeries(
       String queryId, DeleteTimeSeriesStatement deleteTimeSeriesStatement) {
@@ -1792,7 +1811,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
     CreateLogicalViewStatement createLogicalViewStatement = new 
CreateLogicalViewStatement();
     createLogicalViewStatement.setTargetFullPaths(
         Collections.singletonList(renameLogicalViewStatement.getNewName()));
-    createLogicalViewStatement.setViewExpression(viewExpression);
+    
createLogicalViewStatement.setViewExpressions(Collections.singletonList(viewExpression));
     ExecutionResult executionResult =
         Coordinator.getInstance()
             .execute(
@@ -1925,6 +1944,67 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
     }
   }
 
+  @Override
+  public SettableFuture<ConfigTaskResult> alterLogicalViewByPipe(
+      AlterLogicalViewNode alterLogicalViewNode, MPPQueryContext context) {
+    SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+
+    Map<PartialPath, ViewExpression> viewPathToSourceMap =
+        alterLogicalViewNode.getViewPathToSourceMap();
+
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    try {
+      ReadWriteIOUtils.write(viewPathToSourceMap.size(), stream);
+      for (Map.Entry<PartialPath, ViewExpression> entry : 
viewPathToSourceMap.entrySet()) {
+        entry.getKey().serialize(stream);
+        ViewExpression.serialize(entry.getValue(), stream);
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    TAlterLogicalViewReq req =
+        new TAlterLogicalViewReq(
+            context.getQueryId().getId(), 
ByteBuffer.wrap(stream.toByteArray()));
+    try (ConfigNodeClient client =
+        
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
 {
+      TSStatus tsStatus;
+      do {
+        try {
+          tsStatus = client.alterLogicalView(req);
+        } catch (TTransportException e) {
+          if (e.getType() == TTransportException.TIMED_OUT
+              || e.getCause() instanceof SocketTimeoutException) {
+            // time out mainly caused by slow execution, wait until
+            tsStatus = 
RpcUtils.getStatus(TSStatusCode.OVERLAP_WITH_EXISTING_TASK);
+          } else {
+            throw e;
+          }
+        }
+        // keep waiting until task ends
+      } while (TSStatusCode.OVERLAP_WITH_EXISTING_TASK.getStatusCode() == 
tsStatus.getCode());
+
+      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
+        LOGGER.warn(
+            "Failed to execute alter view {} by pipe, status is {}.",
+            viewPathToSourceMap,
+            tsStatus);
+        if (tsStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) 
{
+          future.setException(
+              new BatchProcessException(tsStatus.subStatus.toArray(new 
TSStatus[0])));
+        } else {
+          future.setException(new IoTDBException(tsStatus.getMessage(), 
tsStatus.getCode()));
+        }
+      } else {
+        future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+      }
+      return future;
+    } catch (ClientManagerException | TException e) {
+      future.setException(e);
+      return future;
+    }
+  }
+
   @Override
   public SettableFuture<ConfigTaskResult> getRegionId(GetRegionIdStatement 
getRegionIdStatement) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
index 89f4e0c5e12..cc5591c6a95 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
 import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.AlterLogicalViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.CountDatabaseStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.CountTimeSlotListStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateContinuousQueryStatement;
@@ -69,6 +70,8 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.ShowThrottleQuot
 
 import com.google.common.util.concurrent.SettableFuture;
 
+import java.nio.ByteBuffer;
+
 public interface IConfigTaskExecutor {
 
   SettableFuture<ConfigTaskResult> setDatabase(DatabaseSchemaStatement 
databaseSchemaStatement);
@@ -164,6 +167,8 @@ public interface IConfigTaskExecutor {
 
   SettableFuture<ConfigTaskResult> showPipes(ShowPipesStatement 
showPipesStatement);
 
+  SettableFuture<ConfigTaskResult> executeSyncCommand(ByteBuffer 
configPhysicalPlanBinary);
+
   SettableFuture<ConfigTaskResult> deleteTimeSeries(
       String queryId, DeleteTimeSeriesStatement deleteTimeSeriesStatement);
 
@@ -176,6 +181,9 @@ public interface IConfigTaskExecutor {
   SettableFuture<ConfigTaskResult> alterLogicalView(
       AlterLogicalViewStatement alterLogicalViewStatement, MPPQueryContext 
context);
 
+  SettableFuture<ConfigTaskResult> alterLogicalViewByPipe(
+      AlterLogicalViewNode alterLogicalViewNode, MPPQueryContext context);
+
   SettableFuture<ConfigTaskResult> getRegionId(GetRegionIdStatement 
getRegionIdStatement);
 
   SettableFuture<ConfigTaskResult> getSeriesSlotList(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index e55521ea107..1ebb78ef36f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -944,7 +944,7 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
   public PlanNode visitCreateLogicalView(
       CreateLogicalViewStatement createLogicalViewStatement, MPPQueryContext 
context) {
     List<ViewExpression> viewExpressionList = new ArrayList<>();
-    if (createLogicalViewStatement.getViewExpression() == null) {
+    if (createLogicalViewStatement.getViewExpressions() == null) {
       // Transform all Expressions into ViewExpressions.
       TransformToViewExpressionVisitor transformToViewExpressionVisitor =
           new TransformToViewExpressionVisitor();
@@ -953,7 +953,7 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
         
viewExpressionList.add(transformToViewExpressionVisitor.process(expression, 
null));
       }
     } else {
-      viewExpressionList.add(createLogicalViewStatement.getViewExpression());
+      viewExpressionList = createLogicalViewStatement.getViewExpressions();
     }
 
     return new CreateLogicalViewNode(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/view/CreateLogicalViewStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/view/CreateLogicalViewStatement.java
index 5a3a28cb4c4..f6cbb097260 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/view/CreateLogicalViewStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/view/CreateLogicalViewStatement.java
@@ -56,7 +56,7 @@ public class CreateLogicalViewStatement extends Statement {
   private QueryStatement queryStatement;
 
   // if not null, all related check and generation will be skipped
-  private ViewExpression viewExpression;
+  private List<ViewExpression> viewExpressions;
 
   public CreateLogicalViewStatement() {
     super();
@@ -129,8 +129,8 @@ public class CreateLogicalViewStatement extends Statement {
     return this.queryStatement;
   }
 
-  public ViewExpression getViewExpression() {
-    return viewExpression;
+  public List<ViewExpression> getViewExpressions() {
+    return viewExpressions;
   }
 
   // set source paths
@@ -194,8 +194,8 @@ public class CreateLogicalViewStatement extends Statement {
     this.targetPaths.generateFullPathsFromPathsGroup();
   }
 
-  public void setViewExpression(ViewExpression viewExpression) {
-    this.viewExpression = viewExpression;
+  public void setViewExpressions(List<ViewExpression> viewExpressions) {
+    this.viewExpressions = viewExpressions;
   }
 
   public void setTargetIntoItem(IntoItem intoItem) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
index 8b2f3a3c25e..5d495b46a81 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.commons.utils;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
@@ -132,6 +133,7 @@ public class ThriftConfigNodeSerDeUtils {
     return configNodeLocation;
   }
 
+  // Deprecated, restored for compatibility
   public static void serializeTPipeSinkInfo(TPipeSinkInfo pipeSinkInfo, 
DataOutputStream stream) {
     try {
       pipeSinkInfo.write(generateWriteProtocol(stream));
@@ -140,6 +142,7 @@ public class ThriftConfigNodeSerDeUtils {
     }
   }
 
+  // Deprecated, restored for compatibility
   public static TPipeSinkInfo deserializeTPipeSinkInfo(ByteBuffer buffer) {
     TPipeSinkInfo pipeSinkInfo = new TPipeSinkInfo();
     try {
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift 
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index a2db9037355..4e9257a469d 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -697,25 +697,13 @@ struct TCreatePipeReq {
     4: required map<string, string> connectorAttributes
 }
 
+// Deprecated, restored for compatibility
 struct TPipeSinkInfo {
   1: required string pipeSinkName
   2: required string pipeSinkType
   3: optional map<string, string> attributes
 }
 
-struct TDropPipeSinkReq {
-  1: required string pipeSinkName
-}
-
-struct TGetPipeSinkReq {
-  1: optional string pipeSinkName
-}
-
-struct TGetPipeSinkResp {
-  1: required common.TSStatus status
-  2: required list<TPipeSinkInfo> pipeSinkInfoList
-}
-
 struct TShowPipeReq {
   1: optional string pipeName
   2: optional bool whereClause
@@ -1326,15 +1314,6 @@ service IConfigNodeRPCService {
   // Sync
   // ======================================================
 
-  /** Create PipeSink */
-  common.TSStatus createPipeSink(TPipeSinkInfo req)
-
-  /** Drop PipeSink */
-  common.TSStatus dropPipeSink(TDropPipeSinkReq req)
-
-  /** Get PipeSink by name, if name is empty, get all PipeSink */
-  TGetPipeSinkResp getPipeSink(TGetPipeSinkReq req)
-
   /** Create Pipe */
   common.TSStatus createPipe(TCreatePipeReq req)
 
@@ -1350,8 +1329,11 @@ service IConfigNodeRPCService {
   /** Show Pipe by name, if name is empty, show all Pipe */
   TShowPipeResp showPipe(TShowPipeReq req)
 
-  /* Get all pipe information. It is used for DataNode registration and 
restart*/
-  TGetAllPipeInfoResp getAllPipeInfo();
+  /** Get all pipe information. It is used for DataNode registration and 
restart*/
+  TGetAllPipeInfoResp getAllPipeInfo()
+
+  /** Execute schema language from external pipes */
+  common.TSStatus executeSyncCommand(binary configPhysicalPlanBinary)
 
   // ======================================================
   // TestTools


Reply via email to