This is an automated email from the ASF dual-hosted git repository. sunzesong pushed a commit to branch createMultiTimeseries in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 979c2a61523ce46062105a500d89aa46d58d1c22 Author: samperson1997 <[email protected]> AuthorDate: Tue Mar 16 10:53:36 2021 +0800 [IOTDB-1235] Refactor createMultiTimeseries --- .../java/org/apache/iotdb/db/metadata/MTree.java | 2 +- .../apache/iotdb/db/qp/executor/PlanExecutor.java | 59 +++++++++++++++++----- .../qp/physical/sys/CreateMultiTimeSeriesPlan.java | 7 ++- .../org/apache/iotdb/db/service/TSServiceImpl.java | 10 ++-- .../db/qp/physical/PhysicalPlanSerializeTest.java | 26 ++++++++++ .../java/org/apache/iotdb/session/Session.java | 4 +- 6 files changed, 85 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java index 42bb0ac..389071f 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java @@ -1339,7 +1339,7 @@ public class MTree implements Serializable { tsRow[2] = schema.getValueTSDataTypeList().get(i).toString(); tsRow[3] = schema.getValueTSEncodingList().get(i).toString(); tsRow[4] = schema.getCompressor().toString(); - tsRow[5] = "0"; + tsRow[5] = "-1"; tsRow[6] = needLast ? String.valueOf(getLastTimeStamp((MeasurementMNode) node, queryContext)) diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java index b97343c..75a2369 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java @@ -44,6 +44,7 @@ import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.metadata.PathNotExistException; import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.metadata.MetaUtils; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.metadata.mnode.MNode; import org.apache.iotdb.db.metadata.mnode.MeasurementMNode; @@ -129,6 +130,7 @@ import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.read.common.Field; import org.apache.iotdb.tsfile.read.common.Path; @@ -1376,24 +1378,53 @@ public class PlanExecutor implements IPlanExecutor { private boolean createMultiTimeSeries(CreateMultiTimeSeriesPlan multiPlan) throws BatchProcessException { + int dataTypeIdx = 0; for (int i = 0; i < multiPlan.getPaths().size(); i++) { if (multiPlan.getResults().containsKey(i)) { continue; } - CreateTimeSeriesPlan plan = - new CreateTimeSeriesPlan( - multiPlan.getPaths().get(i), - multiPlan.getDataTypes().get(i), - multiPlan.getEncodings().get(i), - multiPlan.getCompressors().get(i), - multiPlan.getProps() == null ? null : multiPlan.getProps().get(i), - multiPlan.getTags() == null ? null : multiPlan.getTags().get(i), - multiPlan.getAttributes() == null ? null : multiPlan.getAttributes().get(i), - multiPlan.getAlias() == null ? null : multiPlan.getAlias().get(i)); - try { - createTimeSeries(plan); - } catch (QueryProcessException e) { - multiPlan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage())); + PartialPath path = multiPlan.getPaths().get(i); + String measurement = path.getMeasurement(); + if (measurement.contains("(") && measurement.contains(",")) { + PartialPath devicePath = path.getDevicePath(); + List<String> measurements = MetaUtils.getMeasurementsInPartialPath(path); + List<TSDataType> dataTypes = new ArrayList<>(); + List<TSEncoding> encodings = new ArrayList<>(); + for (int j = 0; j < measurements.size(); j++) { + dataTypes.add(multiPlan.getDataTypes().get(dataTypeIdx)); + encodings.add(multiPlan.getEncodings().get(dataTypeIdx)); + dataTypeIdx++; + } + CreateAlignedTimeSeriesPlan plan = + new CreateAlignedTimeSeriesPlan( + devicePath, + measurements, + dataTypes, + encodings, + multiPlan.getCompressors().get(i), + Collections.emptyList()); + try { + createAlignedTimeSeries(plan); + } catch (QueryProcessException e) { + multiPlan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage())); + } + } else { + CreateTimeSeriesPlan plan = + new CreateTimeSeriesPlan( + multiPlan.getPaths().get(i), + multiPlan.getDataTypes().get(i), + multiPlan.getEncodings().get(i), + multiPlan.getCompressors().get(i), + multiPlan.getProps() == null ? null : multiPlan.getProps().get(i), + multiPlan.getTags() == null ? null : multiPlan.getTags().get(i), + multiPlan.getAttributes() == null ? null : multiPlan.getAttributes().get(i), + multiPlan.getAlias() == null ? null : multiPlan.getAlias().get(i)); + dataTypeIdx++; + try { + createTimeSeries(plan); + } catch (QueryProcessException e) { + multiPlan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage())); + } } } if (!multiPlan.getResults().isEmpty()) { diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java index 9ebc54b..ad7284f 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java @@ -152,6 +152,7 @@ public class CreateMultiTimeSeriesPlan extends PhysicalPlan { int type = PhysicalPlanType.CREATE_MULTI_TIMESERIES.ordinal(); stream.write(type); stream.writeInt(paths.size()); + stream.writeInt(dataTypes.size()); // size of datatypes, encodings for aligned timeseries for (PartialPath path : paths) { putString(stream, path.getFullPath()); @@ -209,6 +210,7 @@ public class CreateMultiTimeSeriesPlan extends PhysicalPlan { int type = PhysicalPlanType.CREATE_MULTI_TIMESERIES.ordinal(); buffer.put((byte) type); buffer.putInt(paths.size()); + buffer.putInt(dataTypes.size()); // size of datatypes, encodings for aligned timeseries for (PartialPath path : paths) { putString(buffer, path.getFullPath()); @@ -264,16 +266,17 @@ public class CreateMultiTimeSeriesPlan extends PhysicalPlan { @Override public void deserialize(ByteBuffer buffer) throws IllegalPathException { int totalSize = buffer.getInt(); + int dataTypeSize = buffer.getInt(); paths = new ArrayList<>(totalSize); for (int i = 0; i < totalSize; i++) { paths.add(new PartialPath(readString(buffer))); } dataTypes = new ArrayList<>(totalSize); - for (int i = 0; i < totalSize; i++) { + for (int i = 0; i < dataTypeSize; i++) { dataTypes.add(TSDataType.values()[buffer.get()]); } encodings = new ArrayList<>(totalSize); - for (int i = 0; i < totalSize; i++) { + for (int i = 0; i < dataTypeSize; i++) { encodings.add(TSEncoding.values()[buffer.get()]); } compressors = new ArrayList<>(totalSize); diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 81a95e7..02f0621 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -1655,8 +1655,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { CreateMultiTimeSeriesPlan multiPlan = new CreateMultiTimeSeriesPlan(); List<PartialPath> paths = new ArrayList<>(req.paths.size()); - List<TSDataType> dataTypes = new ArrayList<>(req.paths.size()); - List<TSEncoding> encodings = new ArrayList<>(req.paths.size()); + List<TSDataType> dataTypes = new ArrayList<>(req.dataTypes.size()); + List<TSEncoding> encodings = new ArrayList<>(req.dataTypes.size()); List<CompressionType> compressors = new ArrayList<>(req.paths.size()); List<String> alias = null; if (req.measurementAliasList != null) { @@ -1687,8 +1687,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } paths.add(new PartialPath(req.paths.get(i))); - dataTypes.add(TSDataType.values()[req.dataTypes.get(i)]); - encodings.add(TSEncoding.values()[req.encodings.get(i)]); compressors.add(CompressionType.values()[req.compressors.get(i)]); if (alias != null) { alias.add(req.measurementAliasList.get(i)); @@ -1703,6 +1701,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { attributes.add(req.attributesList.get(i)); } } + for (int i = 0; i < req.dataTypes.size(); i++) { + dataTypes.add(TSDataType.values()[req.dataTypes.get(i)]); + encodings.add(TSEncoding.values()[req.encodings.get(i)]); + } multiPlan.setPaths(paths); multiPlan.setDataTypes(dataTypes); diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanSerializeTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanSerializeTest.java index 9a3eee3..e4f570e 100644 --- a/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanSerializeTest.java +++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanSerializeTest.java @@ -255,6 +255,32 @@ public class PhysicalPlanSerializeTest { } @Test + public void createMuSerializeTest3() throws IOException, IllegalPathException { + // same as: + // create timeseries root.sg.d1.s0 with datatype=DOUBLE, encoding=GORILLA, compression=SNAPPY + // create aligned timeseries root.sg.d1.(s1 INT64, s2 DOUBLE, s3 INT64) + // with encoding=(GORILLA, GORILLA, GORILLA), compression=SNAPPY + CreateMultiTimeSeriesPlan plan = new CreateMultiTimeSeriesPlan(); + plan.setPaths( + Arrays.asList(new PartialPath("root.sg.d1.s0"), new PartialPath("root.sg.d1.(s1,s2,s3)"))); + plan.setDataTypes( + Arrays.asList(TSDataType.DOUBLE, TSDataType.INT64, TSDataType.DOUBLE, TSDataType.INT64)); + plan.setEncodings( + Arrays.asList( + TSEncoding.GORILLA, TSEncoding.GORILLA, TSEncoding.GORILLA, TSEncoding.GORILLA)); + plan.setCompressors(Arrays.asList(CompressionType.SNAPPY, CompressionType.SNAPPY)); + plan.setProps(null); + plan.setTags(null); + plan.setAttributes(null); + plan.setAlias(null); + + PhysicalPlan result = testTwoSerializeMethodAndDeserialize(plan); + + Assert.assertEquals(OperatorType.CREATE_MULTI_TIMESERIES, result.getOperatorType()); + Assert.assertEquals(plan, result); + } + + @Test public void AlterTimeSeriesPlanSerializeTest() throws IOException, IllegalPathException { AlterTimeSeriesPlan alterTimeSeriesPlan = new AlterTimeSeriesPlan( diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java index 9167d0a..b829cd0 100644 --- a/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/session/src/main/java/org/apache/iotdb/session/Session.java @@ -438,13 +438,13 @@ public class Session { request.setPaths(paths); - List<Integer> dataTypeOrdinals = new ArrayList<>(paths.size()); + List<Integer> dataTypeOrdinals = new ArrayList<>(dataTypes.size()); for (TSDataType dataType : dataTypes) { dataTypeOrdinals.add(dataType.ordinal()); } request.setDataTypes(dataTypeOrdinals); - List<Integer> encodingOrdinals = new ArrayList<>(paths.size()); + List<Integer> encodingOrdinals = new ArrayList<>(dataTypes.size()); for (TSEncoding encoding : encodings) { encodingOrdinals.add(encoding.ordinal()); }
