This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 40c32518da0 Pipe Schema: Receiver Agent: config plan node to statement
transformation logic (#11689)
40c32518da0 is described below
commit 40c32518da0542552d9a1b5e825f6efded3575db
Author: Caideyipi <[email protected]>
AuthorDate: Sat Dec 16 16:23:14 2023 +0800
Pipe Schema: Receiver Agent: config plan node to statement transformation
logic (#11689)
---
.../consensus/request/ConfigPhysicalPlan.java | 4 +
.../consensus/request/ConfigPhysicalPlanType.java | 4 +-
.../request/write/pipe/PipeEnrichedPlan.java | 79 +++++++++
.../iotdb/confignode/manager/ConfigManager.java | 7 +
.../apache/iotdb/confignode/manager/IManager.java | 8 +
.../persistence/executor/ConfigPlanExecutor.java | 3 +
.../thrift/ConfigNodeRPCServiceProcessor.java | 30 +---
.../pipe/receiver/PipePlanToStatementVisitor.java | 186 +++++++++++++++++++++
.../iotdb/db/protocol/client/ConfigNodeClient.java | 30 +---
.../queryengine/plan/analyze/AnalyzeVisitor.java | 2 +-
.../config/executor/ClusterConfigTaskExecutor.java | 82 ++++++++-
.../config/executor/IConfigTaskExecutor.java | 8 +
.../plan/planner/LogicalPlanVisitor.java | 4 +-
.../metadata/view/CreateLogicalViewStatement.java | 10 +-
.../commons/utils/ThriftConfigNodeSerDeUtils.java | 3 +
.../src/main/thrift/confignode.thrift | 30 +---
16 files changed, 409 insertions(+), 81 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index e4364dc63b4..ef853cfd9d7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -74,6 +74,7 @@ import
org.apache.iotdb.confignode.consensus.request.write.function.DropFunction
import
org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan;
+import
org.apache.iotdb.confignode.consensus.request.write.pipe.PipeEnrichedPlan;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleLeaderChangePlan;
@@ -419,6 +420,9 @@ public abstract class ConfigPhysicalPlan implements
IConsensusRequest {
case PipeHandleMetaChange:
plan = new PipeHandleMetaChangePlan();
break;
+ case PipeEnriched:
+ plan = new PipeEnrichedPlan();
+ break;
case GetRegionId:
plan = new GetRegionIdPlan();
break;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index eae4bd44668..aaadd5aa55c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -194,7 +194,6 @@ public enum ConfigPhysicalPlanType {
/** Pipe Task. */
CreatePipeV2((short) 1500),
- /** START PIPE & STOP PIPE. */
SetPipeStatusV2((short) 1501),
DropPipeV2((short) 1502),
ShowPipeV2((short) 1503),
@@ -202,6 +201,9 @@ public enum ConfigPhysicalPlanType {
/** Pipe Runtime. */
PipeHandleLeaderChange((short) 1600),
PipeHandleMetaChange((short) 1601),
+
+ /** Pipe PayLoad. */
+ PipeEnriched((short) 1700),
;
private final short planType;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/PipeEnrichedPlan.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/PipeEnrichedPlan.java
new file mode 100644
index 00000000000..e1e7221a5de
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/PipeEnrichedPlan.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.write.pipe;
+
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class PipeEnrichedPlan extends ConfigPhysicalPlan {
+
+ private ConfigPhysicalPlan innerPlan;
+
+ public PipeEnrichedPlan() {
+ super(ConfigPhysicalPlanType.PipeEnriched);
+ }
+
+ public PipeEnrichedPlan(ConfigPhysicalPlan innerPlan) {
+ super(ConfigPhysicalPlanType.PipeEnriched);
+ this.innerPlan = innerPlan;
+ }
+
+ public ConfigPhysicalPlan getInnerPlan() {
+ return innerPlan;
+ }
+
+ @Override
+ protected void serializeImpl(DataOutputStream stream) throws IOException {
+ stream.writeShort(getType().getPlanType());
+ stream.write(innerPlan.serializeToByteBuffer().array());
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ innerPlan = ConfigPhysicalPlan.Factory.create(buffer);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ PipeEnrichedPlan that = (PipeEnrichedPlan) obj;
+ return innerPlan.equals(that.innerPlan);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(innerPlan);
+ }
+
+ @Override
+ public String toString() {
+ return "PipeEnrichedPlan{" + "innerPlan='" + innerPlan + "'}";
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index cdd43c7822d..485e8b3373b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -1724,6 +1724,13 @@ public class ConfigManager implements IManager {
: new TGetAllPipeInfoResp(status, Collections.emptyList());
}
+ @Override
+ public TSStatus executeSyncCommand(ByteBuffer configPhysicalPlanBinary) {
+ TSStatus status = confirmLeader();
+ // TODO: determine whether to use procedure based on plan type
+ return status;
+ }
+
@Override
public TGetRegionIdResp getRegionId(TGetRegionIdReq req) {
TSStatus status = confirmLeader();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 8656d9d411d..920e1fe4714 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -106,6 +106,7 @@ import
org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
import org.apache.iotdb.consensus.common.DataSet;
+import java.nio.ByteBuffer;
import java.util.List;
/**
@@ -590,6 +591,13 @@ public interface IManager {
*/
TGetAllPipeInfoResp getAllPipeInfo();
+ /**
+ * Execute the config plan received from pipe.
+ *
+ * @return The result of the command execution.
+ */
+ TSStatus executeSyncCommand(ByteBuffer configPhysicalPlanBinary);
+
/**
* Get RegionId. used for Show cluster slots information in
* docs/zh/UserGuide/Cluster/Cluster-Maintenance.md.
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 53759f6cc43..3a9994848c3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -71,6 +71,7 @@ import
org.apache.iotdb.confignode.consensus.request.write.function.DropFunction
import
org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan;
+import
org.apache.iotdb.confignode.consensus.request.write.pipe.PipeEnrichedPlan;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleLeaderChangePlan;
@@ -446,6 +447,8 @@ public class ConfigPlanExecutor {
return quotaInfo.setSpaceQuota((SetSpaceQuotaPlan) physicalPlan);
case setThrottleQuota:
return quotaInfo.setThrottleQuota((SetThrottleQuotaPlan) physicalPlan);
+ case PipeEnriched:
+ return executeNonQueryPlan(((PipeEnrichedPlan)
physicalPlan).getInnerPlan());
default:
throw new UnknownPhysicalPlanTypeException(physicalPlan.getType());
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 717c7abe983..7c03d7e7ce7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -105,7 +105,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
-import org.apache.iotdb.confignode.rpc.thrift.TDropPipeSinkReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
@@ -118,8 +117,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
-import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
-import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
@@ -132,7 +129,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
@@ -917,27 +913,6 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
return configManager.alterLogicalView(req);
}
- @Override
- @Deprecated
- public TSStatus createPipeSink(TPipeSinkInfo req) {
- // To be deleted
- return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- }
-
- @Override
- @Deprecated
- public TSStatus dropPipeSink(TDropPipeSinkReq req) {
- // To be deleted
- return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- }
-
- @Override
- @Deprecated
- public TGetPipeSinkResp getPipeSink(TGetPipeSinkReq req) {
- // To be deleted
- return new TGetPipeSinkResp();
- }
-
@Override
public TSStatus createPipe(TCreatePipeReq req) {
return configManager.createPipe(req);
@@ -968,6 +943,11 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
return configManager.getAllPipeInfo();
}
+ @Override
+ public TSStatus executeSyncCommand(ByteBuffer configPhysicalPlanBinary) {
+ return configManager.executeSyncCommand(configPhysicalPlanBinary);
+ }
+
@Override
public TGetRegionIdResp getRegionId(TGetRegionIdReq req) {
if (req.isSetTimeStamp() && req.getType() !=
TConsensusGroupType.DataRegion) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipePlanToStatementVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipePlanToStatementVisitor.java
new file mode 100644
index 00000000000..19b064643ca
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipePlanToStatementVisitor.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.receiver;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.ActivateTemplateNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.BatchActivateTemplateNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalBatchActivateTemplateNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalCreateMultiTimeSeriesNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.MeasurementGroup;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.DeleteDataStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateTimeSeriesStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.AlterTimeSeriesStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateMultiTimeSeriesStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateTimeSeriesStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.ActivateTemplateStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchActivateTemplateStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogicalViewStatement;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class PipePlanToStatementVisitor extends PlanVisitor<Statement, Void> {
+
+ @Override
+ public Statement visitPlan(PlanNode node, Void context) {
+ throw new UnsupportedOperationException(
+ "PipePlanToStatementVisitor does not support visiting general plan.");
+ }
+
+ @Override
+ public CreateTimeSeriesStatement visitCreateTimeSeries(CreateTimeSeriesNode
node, Void context) {
+ CreateTimeSeriesStatement statement = new CreateTimeSeriesStatement();
+ statement.setPath(node.getPath());
+ statement.setDataType(node.getDataType());
+ statement.setEncoding(node.getEncoding());
+ statement.setCompressor(node.getCompressor());
+ statement.setProps(node.getProps());
+ statement.setAttributes(node.getAttributes());
+ statement.setAlias(node.getAlias());
+ statement.setTags(node.getTags());
+ return statement;
+ }
+
+ @Override
+ public CreateMultiTimeSeriesStatement visitCreateMultiTimeSeries(
+ CreateMultiTimeSeriesNode node, Void context) {
+ CreateMultiTimeSeriesStatement statement = new
CreateMultiTimeSeriesStatement();
+
+ List<PartialPath> paths = new ArrayList<>();
+ List<TSDataType> dataTypes = new ArrayList<>();
+ List<TSEncoding> encodings = new ArrayList<>();
+ List<CompressionType> compressors = new ArrayList<>();
+ List<Map<String, String>> propsList = new ArrayList<>();
+ List<String> aliasList = new ArrayList<>();
+ List<Map<String, String>> tagsList = new ArrayList<>();
+ List<Map<String, String>> attributesList = new ArrayList<>();
+
+ for (Map.Entry<PartialPath, MeasurementGroup> path2Group :
+ node.getMeasurementGroupMap().entrySet()) {
+ MeasurementGroup group = path2Group.getValue();
+ dataTypes.addAll(group.getDataTypes());
+ encodings.addAll(group.getEncodings());
+ compressors.addAll(group.getCompressors());
+ propsList.addAll(group.getPropsList());
+ aliasList.addAll(group.getAliasList());
+ tagsList.addAll(group.getTagsList());
+ attributesList.addAll(group.getAttributesList());
+ for (int i = 0; i < group.getAttributesList().size(); ++i) {
+ paths.add(path2Group.getKey());
+ }
+ }
+
+ statement.setPaths(paths);
+ statement.setDataTypes(dataTypes);
+ statement.setEncodings(encodings);
+ statement.setCompressors(compressors);
+ statement.setPropsList(propsList);
+ statement.setAliasList(aliasList);
+ statement.setTagsList(tagsList);
+ statement.setAttributesList(attributesList);
+ return statement;
+ }
+
+ @Override
+ public AlterTimeSeriesStatement visitAlterTimeSeries(AlterTimeSeriesNode
node, Void context) {
+ AlterTimeSeriesStatement statement = new AlterTimeSeriesStatement();
+ statement.setAlterMap(node.getAlterMap());
+ statement.setAlterType(node.getAlterType());
+ statement.setAttributesMap(node.getAttributesMap());
+ statement.setAlias(node.getAlias());
+ statement.setTagsMap(node.getTagsMap());
+ statement.setPath(node.getPath());
+ return statement;
+ }
+
+ @Override
+ public InternalCreateTimeSeriesStatement visitInternalCreateTimeSeries(
+ InternalCreateTimeSeriesNode node, Void context) {
+ return new InternalCreateTimeSeriesStatement(
+ node.getDevicePath(),
+ node.getMeasurementGroup().getMeasurements(),
+ node.getMeasurementGroup().getDataTypes(),
+ node.getMeasurementGroup().getEncodings(),
+ node.getMeasurementGroup().getCompressors(),
+ node.isAligned());
+ }
+
+ @Override
+ public ActivateTemplateStatement visitActivateTemplate(ActivateTemplateNode
node, Void context) {
+ ActivateTemplateStatement statement = new ActivateTemplateStatement();
+ statement.setPath(node.getActivatePath());
+ return statement;
+ }
+
+ @Override
+ public BatchActivateTemplateStatement visitInternalBatchActivateTemplate(
+ InternalBatchActivateTemplateNode node, Void context) {
+ return new BatchActivateTemplateStatement(
+ new ArrayList<>(node.getTemplateActivationMap().keySet()));
+ }
+
+ @Override
+ public InternalCreateMultiTimeSeriesStatement
visitInternalCreateMultiTimeSeries(
+ InternalCreateMultiTimeSeriesNode node, Void context) {
+ return new InternalCreateMultiTimeSeriesStatement(node.getDeviceMap());
+ }
+
+ @Override
+ public BatchActivateTemplateStatement visitBatchActivateTemplate(
+ BatchActivateTemplateNode node, Void context) {
+ return new BatchActivateTemplateStatement(
+ new ArrayList<>(node.getTemplateActivationMap().keySet()));
+ }
+
+ @Override
+ public CreateLogicalViewStatement visitCreateLogicalView(
+ CreateLogicalViewNode node, Void context) {
+ CreateLogicalViewStatement statement = new CreateLogicalViewStatement();
+ statement.setTargetFullPaths(node.getViewPathList());
+ statement.setViewExpressions(new
ArrayList<>(node.getViewPathToSourceExpressionMap().values()));
+ return statement;
+ }
+
+ // We do not support AlterLogicalViewNode parsing and use direct rpc instead
+
+ @Override
+ public DeleteDataStatement visitDeleteData(DeleteDataNode node, Void
context) {
+ DeleteDataStatement statement = new DeleteDataStatement();
+ statement.setDeleteEndTime(node.getDeleteEndTime());
+ statement.setDeleteStartTime(node.getDeleteStartTime());
+ statement.setPathList(node.getPathList());
+ return statement;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index 8c3382f9ddb..a661171d173 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -73,7 +73,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
-import org.apache.iotdb.confignode.rpc.thrift.TDropPipeSinkReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
@@ -86,8 +85,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
-import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
-import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
@@ -100,7 +97,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
@@ -140,6 +136,7 @@ import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -854,24 +851,6 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
() -> client.alterLogicalView(req), status ->
!updateConfigNodeLeader(status));
}
- @Override
- public TSStatus createPipeSink(TPipeSinkInfo req) throws TException {
- return executeRemoteCallWithRetry(
- () -> client.createPipeSink(req), status ->
!updateConfigNodeLeader(status));
- }
-
- @Override
- public TSStatus dropPipeSink(TDropPipeSinkReq req) throws TException {
- return executeRemoteCallWithRetry(
- () -> client.dropPipeSink(req), status ->
!updateConfigNodeLeader(status));
- }
-
- @Override
- public TGetPipeSinkResp getPipeSink(TGetPipeSinkReq req) throws TException {
- return executeRemoteCallWithRetry(
- () -> client.getPipeSink(req), resp ->
!updateConfigNodeLeader(resp.status));
- }
-
@Override
public TSStatus createPipe(TCreatePipeReq req) throws TException {
return executeRemoteCallWithRetry(
@@ -908,6 +887,13 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
() -> client.getAllPipeInfo(), resp ->
!updateConfigNodeLeader(resp.status));
}
+ @Override
+ public TSStatus executeSyncCommand(ByteBuffer configPhysicalPlanBinary)
throws TException {
+ return executeRemoteCallWithRetry(
+ () -> client.executeSyncCommand(configPhysicalPlanBinary),
+ status -> !updateConfigNodeLeader(status));
+ }
+
@Override
public TGetRegionIdResp getRegionId(TGetRegionIdReq req) throws TException {
return executeRemoteCallWithRetry(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 6ddf58895ac..f6e9fe453e9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -3400,7 +3400,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
context.setQueryType(QueryType.WRITE);
analysis.setStatement(createLogicalViewStatement);
- if (createLogicalViewStatement.getViewExpression() == null) {
+ if (createLogicalViewStatement.getViewExpressions() == null) {
// analyze query in statement
QueryStatement queryStatement =
createLogicalViewStatement.getQueryStatement();
if (queryStatement != null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 2cf0ab7ea8d..4cdefa63146 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -138,6 +138,7 @@ import
org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.ShowSpace
import
org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.ShowThrottleQuotaTask;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import
org.apache.iotdb.db.queryengine.plan.expression.visitor.TransformToViewExpressionVisitor;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.AlterLogicalViewNode;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.CountDatabaseStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.CountTimeSlotListStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateContinuousQueryStatement;
@@ -1678,6 +1679,24 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
return future;
}
+ @Override
+ public SettableFuture<ConfigTaskResult> executeSyncCommand(ByteBuffer
configPhysicalPlanBinary) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try (ConfigNodeClient configNodeClient =
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ TSStatus tsStatus =
configNodeClient.executeSyncCommand(configPhysicalPlanBinary);
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
+ LOGGER.warn("Failed to executeSyncCommand, status is {}.", tsStatus);
+ future.setException(new IoTDBException(tsStatus.message,
tsStatus.code));
+ } else {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ }
+ } catch (Exception e) {
+ future.setException(e);
+ }
+ return future;
+ }
+
@Override
public SettableFuture<ConfigTaskResult> deleteTimeSeries(
String queryId, DeleteTimeSeriesStatement deleteTimeSeriesStatement) {
@@ -1792,7 +1811,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
CreateLogicalViewStatement createLogicalViewStatement = new
CreateLogicalViewStatement();
createLogicalViewStatement.setTargetFullPaths(
Collections.singletonList(renameLogicalViewStatement.getNewName()));
- createLogicalViewStatement.setViewExpression(viewExpression);
+
createLogicalViewStatement.setViewExpressions(Collections.singletonList(viewExpression));
ExecutionResult executionResult =
Coordinator.getInstance()
.execute(
@@ -1925,6 +1944,67 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
}
}
+ @Override
+ public SettableFuture<ConfigTaskResult> alterLogicalViewByPipe(
+ AlterLogicalViewNode alterLogicalViewNode, MPPQueryContext context) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+
+ Map<PartialPath, ViewExpression> viewPathToSourceMap =
+ alterLogicalViewNode.getViewPathToSourceMap();
+
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ try {
+ ReadWriteIOUtils.write(viewPathToSourceMap.size(), stream);
+ for (Map.Entry<PartialPath, ViewExpression> entry :
viewPathToSourceMap.entrySet()) {
+ entry.getKey().serialize(stream);
+ ViewExpression.serialize(entry.getValue(), stream);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ TAlterLogicalViewReq req =
+ new TAlterLogicalViewReq(
+ context.getQueryId().getId(),
ByteBuffer.wrap(stream.toByteArray()));
+ try (ConfigNodeClient client =
+
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+ TSStatus tsStatus;
+ do {
+ try {
+ tsStatus = client.alterLogicalView(req);
+ } catch (TTransportException e) {
+ if (e.getType() == TTransportException.TIMED_OUT
+ || e.getCause() instanceof SocketTimeoutException) {
+ // time out mainly caused by slow execution, wait until
+ tsStatus =
RpcUtils.getStatus(TSStatusCode.OVERLAP_WITH_EXISTING_TASK);
+ } else {
+ throw e;
+ }
+ }
+ // keep waiting until task ends
+ } while (TSStatusCode.OVERLAP_WITH_EXISTING_TASK.getStatusCode() ==
tsStatus.getCode());
+
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
+ LOGGER.warn(
+ "Failed to execute alter view {} by pipe, status is {}.",
+ viewPathToSourceMap,
+ tsStatus);
+ if (tsStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode())
{
+ future.setException(
+ new BatchProcessException(tsStatus.subStatus.toArray(new
TSStatus[0])));
+ } else {
+ future.setException(new IoTDBException(tsStatus.getMessage(),
tsStatus.getCode()));
+ }
+ } else {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ }
+ return future;
+ } catch (ClientManagerException | TException e) {
+ future.setException(e);
+ return future;
+ }
+ }
+
@Override
public SettableFuture<ConfigTaskResult> getRegionId(GetRegionIdStatement
getRegionIdStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
index 89f4e0c5e12..cc5591c6a95 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.AlterLogicalViewNode;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.CountDatabaseStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.CountTimeSlotListStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateContinuousQueryStatement;
@@ -69,6 +70,8 @@ import
org.apache.iotdb.db.queryengine.plan.statement.sys.quota.ShowThrottleQuot
import com.google.common.util.concurrent.SettableFuture;
+import java.nio.ByteBuffer;
+
public interface IConfigTaskExecutor {
SettableFuture<ConfigTaskResult> setDatabase(DatabaseSchemaStatement
databaseSchemaStatement);
@@ -164,6 +167,8 @@ public interface IConfigTaskExecutor {
SettableFuture<ConfigTaskResult> showPipes(ShowPipesStatement
showPipesStatement);
+ SettableFuture<ConfigTaskResult> executeSyncCommand(ByteBuffer
configPhysicalPlanBinary);
+
SettableFuture<ConfigTaskResult> deleteTimeSeries(
String queryId, DeleteTimeSeriesStatement deleteTimeSeriesStatement);
@@ -176,6 +181,9 @@ public interface IConfigTaskExecutor {
SettableFuture<ConfigTaskResult> alterLogicalView(
AlterLogicalViewStatement alterLogicalViewStatement, MPPQueryContext
context);
+ SettableFuture<ConfigTaskResult> alterLogicalViewByPipe(
+ AlterLogicalViewNode alterLogicalViewNode, MPPQueryContext context);
+
SettableFuture<ConfigTaskResult> getRegionId(GetRegionIdStatement
getRegionIdStatement);
SettableFuture<ConfigTaskResult> getSeriesSlotList(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index e55521ea107..1ebb78ef36f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -944,7 +944,7 @@ public class LogicalPlanVisitor extends
StatementVisitor<PlanNode, MPPQueryConte
public PlanNode visitCreateLogicalView(
CreateLogicalViewStatement createLogicalViewStatement, MPPQueryContext
context) {
List<ViewExpression> viewExpressionList = new ArrayList<>();
- if (createLogicalViewStatement.getViewExpression() == null) {
+ if (createLogicalViewStatement.getViewExpressions() == null) {
// Transform all Expressions into ViewExpressions.
TransformToViewExpressionVisitor transformToViewExpressionVisitor =
new TransformToViewExpressionVisitor();
@@ -953,7 +953,7 @@ public class LogicalPlanVisitor extends
StatementVisitor<PlanNode, MPPQueryConte
viewExpressionList.add(transformToViewExpressionVisitor.process(expression,
null));
}
} else {
- viewExpressionList.add(createLogicalViewStatement.getViewExpression());
+ viewExpressionList = createLogicalViewStatement.getViewExpressions();
}
return new CreateLogicalViewNode(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/view/CreateLogicalViewStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/view/CreateLogicalViewStatement.java
index 5a3a28cb4c4..f6cbb097260 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/view/CreateLogicalViewStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/view/CreateLogicalViewStatement.java
@@ -56,7 +56,7 @@ public class CreateLogicalViewStatement extends Statement {
private QueryStatement queryStatement;
// if not null, all related check and generation will be skipped
- private ViewExpression viewExpression;
+ private List<ViewExpression> viewExpressions;
public CreateLogicalViewStatement() {
super();
@@ -129,8 +129,8 @@ public class CreateLogicalViewStatement extends Statement {
return this.queryStatement;
}
- public ViewExpression getViewExpression() {
- return viewExpression;
+ public List<ViewExpression> getViewExpressions() {
+ return viewExpressions;
}
// set source paths
@@ -194,8 +194,8 @@ public class CreateLogicalViewStatement extends Statement {
this.targetPaths.generateFullPathsFromPathsGroup();
}
- public void setViewExpression(ViewExpression viewExpression) {
- this.viewExpression = viewExpression;
+ public void setViewExpressions(List<ViewExpression> viewExpressions) {
+ this.viewExpressions = viewExpressions;
}
public void setTargetIntoItem(IntoItem intoItem) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
index 8b2f3a3c25e..5d495b46a81 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.commons.utils;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
@@ -132,6 +133,7 @@ public class ThriftConfigNodeSerDeUtils {
return configNodeLocation;
}
+ // Deprecated, restored for compatibility
public static void serializeTPipeSinkInfo(TPipeSinkInfo pipeSinkInfo,
DataOutputStream stream) {
try {
pipeSinkInfo.write(generateWriteProtocol(stream));
@@ -140,6 +142,7 @@ public class ThriftConfigNodeSerDeUtils {
}
}
+ // Deprecated, restored for compatibility
public static TPipeSinkInfo deserializeTPipeSinkInfo(ByteBuffer buffer) {
TPipeSinkInfo pipeSinkInfo = new TPipeSinkInfo();
try {
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index a2db9037355..4e9257a469d 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -697,25 +697,13 @@ struct TCreatePipeReq {
4: required map<string, string> connectorAttributes
}
+// Deprecated, restored for compatibility
struct TPipeSinkInfo {
1: required string pipeSinkName
2: required string pipeSinkType
3: optional map<string, string> attributes
}
-struct TDropPipeSinkReq {
- 1: required string pipeSinkName
-}
-
-struct TGetPipeSinkReq {
- 1: optional string pipeSinkName
-}
-
-struct TGetPipeSinkResp {
- 1: required common.TSStatus status
- 2: required list<TPipeSinkInfo> pipeSinkInfoList
-}
-
struct TShowPipeReq {
1: optional string pipeName
2: optional bool whereClause
@@ -1326,15 +1314,6 @@ service IConfigNodeRPCService {
// Sync
// ======================================================
- /** Create PipeSink */
- common.TSStatus createPipeSink(TPipeSinkInfo req)
-
- /** Drop PipeSink */
- common.TSStatus dropPipeSink(TDropPipeSinkReq req)
-
- /** Get PipeSink by name, if name is empty, get all PipeSink */
- TGetPipeSinkResp getPipeSink(TGetPipeSinkReq req)
-
/** Create Pipe */
common.TSStatus createPipe(TCreatePipeReq req)
@@ -1350,8 +1329,11 @@ service IConfigNodeRPCService {
/** Show Pipe by name, if name is empty, show all Pipe */
TShowPipeResp showPipe(TShowPipeReq req)
- /* Get all pipe information. It is used for DataNode registration and
restart*/
- TGetAllPipeInfoResp getAllPipeInfo();
+ /** Get all pipe information. It is used for DataNode registration and
restart*/
+ TGetAllPipeInfoResp getAllPipeInfo()
+
+ /** Execute schema language from external pipes */
+ common.TSStatus executeSyncCommand(binary configPhysicalPlanBinary)
// ======================================================
// TestTools