This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch multitimeseries in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 79ef4a6e4ae5c4e8a7de82cc5b6a1c2318958510 Author: HTHou <[email protected]> AuthorDate: Fri May 13 15:54:54 2022 +0800 Support CreateMultiTimeseries in new cluster --- .../metadata/visitor/SchemaExecutionVisitor.java | 50 +++ .../apache/iotdb/db/mpp/plan/analyze/Analyzer.java | 15 + .../db/mpp/plan/parser/StatementGenerator.java | 32 ++ .../iotdb/db/mpp/plan/planner/LogicalPlanner.java | 16 + .../mpp/plan/planner/plan/node/PlanNodeType.java | 6 +- .../db/mpp/plan/planner/plan/node/PlanVisitor.java | 5 + .../metedata/write/CreateMultiTimeSeriesNode.java | 409 +++++++++++++++++++++ .../db/mpp/plan/statement/StatementVisitor.java | 7 + .../metadata/CreateMultiTimeSeriesStatement.java | 160 ++++++++ .../thrift/impl/DataNodeTSIServiceImpl.java | 43 ++- 10 files changed, 740 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java b/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java index 657f737b3e..6561eb6dee 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java @@ -27,9 +27,11 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan; +import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan; import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -39,6 +41,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Arrays; /** Schema write PlanNode visitor */ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion> { @@ -69,6 +72,39 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion> return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully"); } + @Override + public TSStatus visitCreateMultiTimeSeries( + CreateMultiTimeSeriesNode node, ISchemaRegion schemaRegion) { + CreateMultiTimeSeriesPlan multiPlan = + (CreateMultiTimeSeriesPlan) + node.accept(new PhysicalPlanTransformer(), new TransformerContext()); + for (int i = 0; i < multiPlan.getPaths().size(); i++) { + if (multiPlan.getResults().containsKey(i) || multiPlan.isExecuted(i)) { + continue; + } + CreateTimeSeriesPlan plan = + new CreateTimeSeriesPlan( + multiPlan.getPaths().get(i), + multiPlan.getDataTypes().get(i), + multiPlan.getEncodings().get(i), + multiPlan.getCompressors().get(i), + multiPlan.getProps() == null ? null : multiPlan.getProps().get(i), + multiPlan.getTags() == null ? null : multiPlan.getTags().get(i), + multiPlan.getAttributes() == null ? null : multiPlan.getAttributes().get(i), + multiPlan.getAlias() == null ? null : multiPlan.getAlias().get(i)); + try { + schemaRegion.createTimeseries(plan, -1); + } catch (MetadataException e) { + logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e); + multiPlan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage())); + } + } + if (!multiPlan.getResults().isEmpty()) { + return RpcUtils.getStatus(Arrays.asList(multiPlan.getFailingStatus())); + } + return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully"); + } + @Override public TSStatus visitAlterTimeSeries(AlterTimeSeriesNode node, ISchemaRegion schemaRegion) { try { @@ -143,6 +179,20 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion> node.getTagsList(), node.getAttributesList()); } + + public PhysicalPlan visitCreateMultiTimeSeries( + CreateMultiTimeSeriesNode node, TransformerContext context) { + + CreateMultiTimeSeriesPlan multiPlan = new CreateMultiTimeSeriesPlan(); + multiPlan.setPaths(node.getPaths()); + multiPlan.setDataTypes(node.getDataTypes()); + multiPlan.setEncodings(node.getEncodings()); + multiPlan.setCompressors(node.getCompressors()); + multiPlan.setAlias(node.getAliasList()); + multiPlan.setTags(node.getTagsList()); + multiPlan.setAttributes(node.getAttributesList()); + return multiPlan; + } } private static class TransformerContext {} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java index f52ba6fd77..5c698d1ff0 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java @@ -58,6 +58,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CountLevelTimeSeriesState import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.CountTimeSeriesStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement; +import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.SchemaFetchStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDevicesStatement; @@ -854,6 +855,20 @@ public class Analyzer { return analysis; } + @Override + public Analysis visitCreateMultiTimeseries( + CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, MPPQueryContext context) { + context.setQueryType(QueryType.WRITE); + Analysis analysis = new Analysis(); + analysis.setStatement(createMultiTimeSeriesStatement); + + SchemaPartition schemaPartitionInfo = + partitionFetcher.getOrCreateSchemaPartition( + new PathPatternTree(createMultiTimeSeriesStatement.getPaths())); + analysis.setSchemaPartitionInfo(schemaPartitionInfo); + return analysis; + } + @Override public Analysis visitAlterTimeseries( AlterTimeSeriesStatement alterTimeSeriesStatement, MPPQueryContext context) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java index 51726de6ca..a473299bb0 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java @@ -37,6 +37,7 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement; +import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement; import org.apache.iotdb.db.qp.sql.IoTDBSqlParser; @@ -45,6 +46,7 @@ import org.apache.iotdb.db.qp.strategy.SQLParseError; import org.apache.iotdb.db.query.expression.leaf.TimeSeriesOperand; import org.apache.iotdb.db.utils.QueryDataSetUtils; import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq; +import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq; import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq; @@ -373,6 +375,36 @@ public class StatementGenerator { return statement; } + public static Statement createStatement(TSCreateMultiTimeseriesReq req) + throws IllegalPathException { + // construct create multi timeseries statement + CreateMultiTimeSeriesStatement statement = new CreateMultiTimeSeriesStatement(); + List<PartialPath> paths = new ArrayList<>(); + for (String path : req.paths) { + paths.add(new PartialPath(path)); + } + List<TSDataType> dataTypes = new ArrayList<>(); + for (int dataType : req.dataTypes) { + dataTypes.add(TSDataType.values()[dataType]); + } + List<TSEncoding> encodings = new ArrayList<>(); + for (int encoding : req.encodings) { + encodings.add(TSEncoding.values()[encoding]); + } + List<CompressionType> compressors = new ArrayList<>(); + for (int compressor : req.compressors) { + compressors.add(CompressionType.values()[compressor]); + } + statement.setPaths(paths); + statement.setDataTypes(dataTypes); + statement.setEncodings(encodings); + statement.setCompressors(compressors); + statement.setTagsList(req.tagsList); + statement.setAttributesList(req.attributesList); + statement.setAliasList(req.measurementAliasList); + return statement; + } + private static Statement invokeParser(String sql, ZoneId zoneId) { ASTVisitor astVisitor = new ASTVisitor(); astVisitor.setZoneId(zoneId); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java index befcdb71f4..97dfd8344e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode; @@ -45,6 +46,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CountDevicesStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.CountLevelTimeSeriesStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.CountTimeSeriesStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement; +import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.SchemaFetchStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDevicesStatement; @@ -281,6 +283,20 @@ public class LogicalPlanner { createAlignedTimeSeriesStatement.getAttributesList()); } + @Override + public PlanNode visitCreateMultiTimeseries( + CreateMultiTimeSeriesStatement createAlignedTimeSeriesStatement, MPPQueryContext context) { + return new CreateMultiTimeSeriesNode( + context.getQueryId().genPlanNodeId(), + createAlignedTimeSeriesStatement.getPaths(), + createAlignedTimeSeriesStatement.getDataTypes(), + createAlignedTimeSeriesStatement.getEncodings(), + createAlignedTimeSeriesStatement.getCompressors(), + createAlignedTimeSeriesStatement.getAliasList(), + createAlignedTimeSeriesStatement.getTagsList(), + createAlignedTimeSeriesStatement.getAttributesList()); + } + @Override public PlanNode visitAlterTimeseries( AlterTimeSeriesStatement alterTimeSeriesStatement, MPPQueryContext context) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java index 5662d0ca54..339445a86e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java @@ -30,6 +30,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesCo import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode; @@ -98,7 +99,8 @@ public enum PlanNodeType { ALIGNED_SERIES_AGGREGATE_SCAN((short) 34), DEVICE_MERGE((short) 35), SCHEMA_FETCH_MERGE((short) 36), - TRANSFORM((short) 37); + TRANSFORM((short) 37), + CREATE_MULTI_TIME_SERIES((short) 38); private final short nodeType; @@ -198,6 +200,8 @@ public enum PlanNodeType { return SchemaFetchMergeNode.deserialize(buffer); case 37: return TransformNode.deserialize(buffer); + case 38: + return CreateMultiTimeSeriesNode.deserialize(buffer); default: throw new IllegalArgumentException("Invalid node type: " + nodeType); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java index 8a3d3ec5be..c681a60a51 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java @@ -30,6 +30,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesCo import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode; @@ -184,6 +185,10 @@ public abstract class PlanVisitor<R, C> { return visitPlan(node, context); } + public R visitCreateMultiTimeSeries(CreateMultiTimeSeriesNode node, C context) { + return visitPlan(node, context); + } + public R visitAlterTimeSeries(AlterTimeSeriesNode node, C context) { return visitPlan(node, context); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java new file mode 100644 index 0000000000..8ba1167b76 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java @@ -0,0 +1,409 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write; + +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.mpp.plan.analyze.Analysis; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode; +import org.apache.iotdb.tsfile.exception.NotImplementedException; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class CreateMultiTimeSeriesNode extends WritePlanNode { + private List<PartialPath> paths = new ArrayList<>();; + private List<TSDataType> dataTypes = new ArrayList<>(); + private List<TSEncoding> encodings = new ArrayList<>(); + private List<CompressionType> compressors = new ArrayList<>(); + private List<String> aliasList; + private List<Map<String, String>> tagsList; + private List<Map<String, String>> attributesList; + private List<Long> tagOffsets; + private TRegionReplicaSet regionReplicaSet; + + public CreateMultiTimeSeriesNode(PlanNodeId id) { + super(id); + } + + public CreateMultiTimeSeriesNode( + PlanNodeId id, + List<PartialPath> paths, + List<TSDataType> dataTypes, + List<TSEncoding> encodings, + List<CompressionType> compressors, + List<String> aliasList, + List<Map<String, String>> tagsList, + List<Map<String, String>> attributesList) { + super(id); + this.paths = paths; + this.dataTypes = dataTypes; + this.encodings = encodings; + this.compressors = compressors; + this.aliasList = aliasList; + this.tagsList = tagsList; + this.attributesList = attributesList; + } + + public List<PartialPath> getPaths() { + return paths; + } + + public void setPaths(List<PartialPath> paths) { + this.paths = paths; + } + + public List<TSDataType> getDataTypes() { + return dataTypes; + } + + public void setDataTypes(List<TSDataType> dataTypes) { + this.dataTypes = dataTypes; + } + + public List<TSEncoding> getEncodings() { + return encodings; + } + + public void setEncodings(List<TSEncoding> encodings) { + this.encodings = encodings; + } + + public List<CompressionType> getCompressors() { + return compressors; + } + + public void setCompressors(List<CompressionType> compressors) { + this.compressors = compressors; + } + + public List<String> getAliasList() { + return aliasList; + } + + public void setAliasList(List<String> aliasList) { + this.aliasList = aliasList; + } + + public List<Map<String, String>> getTagsList() { + return tagsList; + } + + public void setTagsList(List<Map<String, String>> tagsList) { + this.tagsList = tagsList; + } + + public List<Map<String, String>> getAttributesList() { + return attributesList; + } + + public void setAttributesList(List<Map<String, String>> attributesList) { + this.attributesList = attributesList; + } + + public List<Long> getTagOffsets() { + return tagOffsets; + } + + public void setTagOffsets(List<Long> tagOffsets) { + this.tagOffsets = tagOffsets; + } + + public void addTimeSeries( + PartialPath path, + TSDataType dataType, + TSEncoding encoding, + CompressionType compressor, + String alias, + Map<String, String> tags, + Map<String, String> attributes) { + this.paths.add(path); + this.dataTypes.add(dataType); + this.encodings.add(encoding); + this.compressors.add(compressor); + if (alias != null) { + if (this.aliasList == null) { + aliasList = new ArrayList<>(); + } + aliasList.add(alias); + } + if (tags != null) { + if (this.tagsList == null) { + tagsList = new ArrayList<>(); + } + tagsList.add(tags); + } + if (attributes != null) { + if (this.attributesList == null) { + attributesList = new ArrayList<>(); + } + attributesList.add(attributes); + } + } + + @Override + public List<PlanNode> getChildren() { + return new ArrayList<>(); + } + + @Override + public void addChild(PlanNode child) {} + + @Override + public PlanNode clone() { + throw new NotImplementedException("Clone of CreateMultiTimeSeriesNode is not implemented"); + } + + @Override + public int allowedChildCount() { + return NO_CHILD_ALLOWED; + } + + @Override + public List<String> getOutputColumnNames() { + return null; + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C schemaRegion) { + return visitor.visitCreateMultiTimeSeries(this, schemaRegion); + } + + public static CreateMultiTimeSeriesNode deserialize(ByteBuffer byteBuffer) { + String id; + List<PartialPath> paths; + List<TSDataType> dataTypes; + List<TSEncoding> encodings; + List<CompressionType> compressors; + List<String> aliasList = null; + List<Map<String, String>> tagsList = null; + List<Map<String, String>> attributesList = null; + + int size = byteBuffer.getInt(); + paths = new ArrayList<>(); + for (int i = 0; i < size; i++) { + try { + paths.add(new PartialPath(ReadWriteIOUtils.readString(byteBuffer))); + } catch (IllegalPathException e) { + throw new IllegalArgumentException("Can not deserialize CreateMultiTimeSeriesNode", e); + } + } + + dataTypes = new ArrayList<>(); + for (int i = 0; i < size; i++) { + dataTypes.add(TSDataType.values()[byteBuffer.get()]); + } + + encodings = new ArrayList<>(); + for (int i = 0; i < size; i++) { + encodings.add(TSEncoding.values()[byteBuffer.get()]); + } + + compressors = new ArrayList<>(); + for (int i = 0; i < size; i++) { + compressors.add(CompressionType.values()[byteBuffer.get()]); + } + + byte label = byteBuffer.get(); + if (label >= 0) { + aliasList = new ArrayList<>(); + if (label == 1) { + for (int i = 0; i < size; i++) { + aliasList.add(ReadWriteIOUtils.readString(byteBuffer)); + } + } + } + + label = byteBuffer.get(); + if (label >= 0) { + tagsList = new ArrayList<>(); + if (label == 1) { + for (int i = 0; i < size; i++) { + tagsList.add(ReadWriteIOUtils.readMap(byteBuffer)); + } + } + } + + label = byteBuffer.get(); + if (label >= 0) { + attributesList = new ArrayList<>(); + if (label == 1) { + for (int i = 0; i < size; i++) { + attributesList.add(ReadWriteIOUtils.readMap(byteBuffer)); + } + } + } + + id = ReadWriteIOUtils.readString(byteBuffer); + + return new CreateMultiTimeSeriesNode( + new PlanNodeId(id), + paths, + dataTypes, + encodings, + compressors, + aliasList, + tagsList, + attributesList); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CreateMultiTimeSeriesNode that = (CreateMultiTimeSeriesNode) o; + return this.getPlanNodeId().equals(that.getPlanNodeId()) + && Objects.equals(paths, that.paths) + && Objects.equals(dataTypes, that.dataTypes) + && Objects.equals(encodings, that.encodings) + && Objects.equals(compressors, that.compressors) + && Objects.equals(tagOffsets, that.tagOffsets) + && Objects.equals(aliasList, that.aliasList) + && Objects.equals(tagsList, that.tagsList) + && Objects.equals(attributesList, that.attributesList); + } + + @Override + protected void serializeAttributes(ByteBuffer byteBuffer) { + PlanNodeType.CREATE_MULTI_TIME_SERIES.serialize(byteBuffer); + + // paths + byteBuffer.putInt(paths.size()); + for (PartialPath path : paths) { + ReadWriteIOUtils.write(path.getFullPath(), byteBuffer); + } + + // dataTypes + for (TSDataType dataType : dataTypes) { + byteBuffer.put((byte) dataType.ordinal()); + } + + // encodings + for (TSEncoding encoding : encodings) { + byteBuffer.put((byte) encoding.ordinal()); + } + + // compressors + for (CompressionType compressor : compressors) { + byteBuffer.put((byte) compressor.ordinal()); + } + + // alias + if (aliasList == null) { + byteBuffer.put((byte) -1); + } else if (aliasList.isEmpty()) { + byteBuffer.put((byte) 0); + } else { + byteBuffer.put((byte) 1); + for (String alias : aliasList) { + ReadWriteIOUtils.write(alias, byteBuffer); + } + } + + // tags + if (tagsList == null) { + byteBuffer.put((byte) -1); + } else if (tagsList.isEmpty()) { + byteBuffer.put((byte) 0); + } else { + byteBuffer.put((byte) 1); + for (Map<String, String> tags : tagsList) { + ReadWriteIOUtils.write(tags, byteBuffer); + } + } + + // attributes + if (attributesList == null) { + byteBuffer.put((byte) -1); + } else if (attributesList.isEmpty()) { + byteBuffer.put((byte) 0); + } else { + byteBuffer.put((byte) 1); + for (Map<String, String> attributes : attributesList) { + ReadWriteIOUtils.write(attributes, byteBuffer); + } + } + } + + public int hashCode() { + return Objects.hash( + this.getPlanNodeId(), + paths, + dataTypes, + encodings, + compressors, + tagOffsets, + aliasList, + tagsList, + attributesList); + } + + @Override + public TRegionReplicaSet getRegionReplicaSet() { + return regionReplicaSet; + } + + public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) { + this.regionReplicaSet = regionReplicaSet; + } + + @Override + public List<WritePlanNode> splitByPartition(Analysis analysis) { + Map<TRegionReplicaSet, CreateMultiTimeSeriesNode> splitMap = new HashMap<>(); + for (int i = 0; i < paths.size(); i++) { + TRegionReplicaSet regionReplicaSet = + analysis.getSchemaPartitionInfo().getSchemaRegionReplicaSet(paths.get(i).getDevice()); + CreateMultiTimeSeriesNode tmpNode; + if (splitMap.containsKey(regionReplicaSet)) { + tmpNode = splitMap.get(regionReplicaSet); + } else { + tmpNode = new CreateMultiTimeSeriesNode(this.getPlanNodeId()); + tmpNode.setRegionReplicaSet(regionReplicaSet); + splitMap.put(regionReplicaSet, tmpNode); + } + tmpNode.addTimeSeries( + paths.get(i), + dataTypes.get(i), + encodings.get(i), + compressors.get(i), + aliasList == null ? null : aliasList.get(i), + attributesList == null ? null : tagsList.get(i), + attributesList == null ? null : attributesList.get(i)); + } + return new ArrayList<>(splitMap.values()); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java index 59f16004b3..59d11c3c9a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java @@ -32,6 +32,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CountLevelTimeSeriesState import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.CountTimeSeriesStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement; +import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.SchemaFetchStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement; @@ -77,6 +78,12 @@ public abstract class StatementVisitor<R, C> { return visitStatement(createAlignedTimeSeriesStatement, context); } + // Create Multi Timeseries + public R visitCreateMultiTimeseries( + CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, C context) { + return visitStatement(createMultiTimeSeriesStatement, context); + } + // Alter Timeseries public R visitAlterTimeseries(AlterTimeSeriesStatement alterTimeSeriesStatement, C context) { return visitStatement(alterTimeSeriesStatement, context); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateMultiTimeSeriesStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateMultiTimeSeriesStatement.java new file mode 100644 index 0000000000..341e4d12a1 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateMultiTimeSeriesStatement.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.plan.statement.metadata; + +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.mpp.plan.constant.StatementType; +import org.apache.iotdb.db.mpp.plan.statement.Statement; +import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** CREATE MULTI TIMESERIES statement. */ +public class CreateMultiTimeSeriesStatement extends Statement { + + private static final Logger logger = + LoggerFactory.getLogger(CreateMultiTimeSeriesStatement.class); + + private List<PartialPath> paths; + private List<TSDataType> dataTypes = new ArrayList<>(); + private List<TSEncoding> encodings = new ArrayList<>(); + private List<CompressionType> compressors = new ArrayList<>(); + private List<String> aliasList = new ArrayList<>(); + private List<Map<String, String>> tagsList = new ArrayList<>(); + private List<Map<String, String>> attributesList = new ArrayList<>(); + private List<Long> tagOffsets = null; + + public CreateMultiTimeSeriesStatement() { + super(); + statementType = StatementType.CREATE_MULTI_TIMESERIES; + } + + @Override + public List<PartialPath> getPaths() { + return paths; + } + + public void setPaths(List<PartialPath> paths) { + this.paths = paths; + } + + public List<TSDataType> getDataTypes() { + return dataTypes; + } + + public void setDataTypes(List<TSDataType> dataTypes) { + this.dataTypes = dataTypes; + } + + public void addDataType(TSDataType dataType) { + this.dataTypes.add(dataType); + } + + public List<TSEncoding> getEncodings() { + return encodings; + } + + public void setEncodings(List<TSEncoding> encodings) { + this.encodings = encodings; + } + + public void addEncoding(TSEncoding encoding) { + this.encodings.add(encoding); + } + + public List<CompressionType> getCompressors() { + return compressors; + } + + public void setCompressors(List<CompressionType> compressors) { + this.compressors = compressors; + } + + public void addCompressor(CompressionType compression) { + this.compressors.add(compression); + } + + public List<String> getAliasList() { + return aliasList; + } + + public void setAliasList(List<String> aliasList) { + this.aliasList = aliasList; + } + + public void addAliasList(String alias) { + this.aliasList.add(alias); + } + + public List<Map<String, String>> getTagsList() { + return tagsList; + } + + public void setTagsList(List<Map<String, String>> tagsList) { + this.tagsList = tagsList; + } + + public void addTagsList(Map<String, String> tags) { + this.tagsList.add(tags); + } + + public List<Map<String, String>> getAttributesList() { + return attributesList; + } + + public void setAttributesList(List<Map<String, String>> attributesList) { + this.attributesList = attributesList; + } + + public void addAttributesList(Map<String, String> attributes) { + this.attributesList.add(attributes); + } + + public List<Long> getTagOffsets() { + if (tagOffsets == null) { + tagOffsets = new ArrayList<>(); + for (int i = 0; i < paths.size(); i++) { + tagOffsets.add(Long.parseLong("-1")); + } + } + return tagOffsets; + } + + public void setTagOffsets(List<Long> tagOffsets) { + this.tagOffsets = tagOffsets; + } + + public void addTagOffsets(Long tagsOffset) { + this.tagOffsets.add(tagsOffset); + } + + @Override + public <R, C> R accept(StatementVisitor<R, C> visitor, C context) { + return visitor.visitCreateMultiTimeseries(this, context); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java index c9635fced0..f7f863a33c 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java @@ -43,6 +43,7 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatemen import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement; +import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement; import org.apache.iotdb.db.query.control.SessionManager; @@ -367,7 +368,7 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler { req.getMeasurements()); } - // Step 1: transfer from CreateAlignedTimeSeriesStatement to Statement + // Step 1: transfer from CreateAlignedTimeSeriesReq to Statement CreateAlignedTimeSeriesStatement statement = (CreateAlignedTimeSeriesStatement) StatementGenerator.createStatement(req); @@ -397,7 +398,45 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler { @Override public TSStatus createMultiTimeseries(TSCreateMultiTimeseriesReq req) { - throw new UnsupportedOperationException(); + try { + if (!SESSION_MANAGER.checkLogin(req.getSessionId())) { + return getNotLoggedInStatus(); + } + + if (AUDIT_LOGGER.isDebugEnabled()) { + AUDIT_LOGGER.debug( + "Session-{} create {} timeseries, the first is {}", + SESSION_MANAGER.getCurrSessionId(), + req.getPaths().size(), + req.getPaths().get(0)); + } + + // Step 1: transfer from CreateMultiTimeSeriesReq to Statement + CreateMultiTimeSeriesStatement statement = + (CreateMultiTimeSeriesStatement) StatementGenerator.createStatement(req); + + // permission check + TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; + } + + // Step 2: call the coordinator + long queryId = SESSION_MANAGER.requestQueryId(false); + ExecutionResult result = + COORDINATOR.execute( + statement, + new QueryId(String.valueOf(queryId)), + SESSION_MANAGER.getSessionInfo(req.sessionId), + "", + PARTITION_FETCHER, + SCHEMA_FETCHER); + + return result.status; + } catch (Exception e) { + return onNPEOrUnexpectedException( + e, OperationType.CREATE_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR); + } } @Override
