This is an automated email from the ASF dual-hosted git repository. sunzesong pushed a commit to branch insertTablet in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f39bc420b766758e842dbead0bf1584142f2793c Author: samperson1997 <[email protected]> AuthorDate: Sun Mar 14 16:57:54 2021 +0800 [IOTDB-1228] Refactor PlanExecutor.insertTablet method to support aligned timeseries --- .../org/apache/iotdb/db/metadata/MManager.java | 123 ++++++++++++++++----- .../org/apache/iotdb/db/metadata/MetaUtils.java | 3 +- .../iotdb/db/qp/physical/InsertTabletPlanTest.java | 51 +++++++++ .../write/schema/VectorMeasurementSchema.java | 5 +- 4 files changed, 150 insertions(+), 32 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java index 2bca513..6ddafbf 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java @@ -92,7 +92,6 @@ import org.apache.iotdb.db.rescon.PrimitiveArrayManager; import org.apache.iotdb.db.utils.RandomDeleteCache; import org.apache.iotdb.db.utils.SchemaUtils; import org.apache.iotdb.db.utils.TestOnly; -import org.apache.iotdb.db.utils.TypeInferenceUtils; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.exception.cache.CacheException; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; @@ -107,6 +106,34 @@ import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static java.util.stream.Collectors.toList; +import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding; +import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR; + /** * This class takes the responsibility of serialization of all the metadata info and persistent it * into files. This class contains all the interfaces to modify the metadata for delta system. All @@ -1928,9 +1955,10 @@ public class MManager { Pair<MNode, Template> deviceMNode = getDeviceNodeWithAutoCreate(deviceId); // 2. get schema of each measurement - // if do not has measurement + // if do not have measurement MeasurementMNode measurementMNode; - TSDataType dataType; + int loc = 0; + for (int i = 0; i < measurementList.length; i++) { try { MNode child = getMNode(deviceMNode.left, measurementList[i]); @@ -1943,27 +1971,70 @@ public class MManager { throw new PathNotExistException(deviceId + PATH_SEPARATOR + measurementList[i]); } else { // child is null or child is type of MNode - dataType = getTypeInLoc(plan, i); - // create it, may concurrent created by multiple thread - internalCreateTimeseries(deviceId.concatNode(measurementList[i]), dataType); - measurementMNode = (MeasurementMNode) deviceMNode.left.getChild(measurementList[i]); + // get dataType of plan, only support InsertRowPlan and InsertTabletPlan + if (plan instanceof InsertRowPlan) { + TSDataType dataType = plan.getDataTypes()[i]; + // create it, may concurrent created by multiple thread + internalCreateTimeseries(deviceId.concatNode(measurementList[i]), dataType); + measurementMNode = (MeasurementMNode) deviceMNode.left.getChild(measurementList[i]); + } else if (plan instanceof InsertTabletPlan) { + List<TSDataType> dataTypes = new ArrayList<>(); + List<String> measurements = + Arrays.asList(measurementList[i].replace("(", "").replace(")", "").split(",")); + if (measurements.size() == 1) { + internalCreateTimeseries( + deviceId.concatNode(measurementList[i]), plan.getDataTypes()[loc]); + measurementMNode = (MeasurementMNode) deviceMNode.left.getChild(measurementList[i]); + } else { + for (int j = 0; j < measurements.size(); j++) { + dataTypes.add(plan.getDataTypes()[loc]); + loc++; + } + internalAlignedCreateTimeseries(deviceId, measurements, dataTypes); + measurementMNode = (MeasurementMNode) deviceMNode.left.getChild(measurements.get(0)); + } + } else { + throw new MetadataException( + String.format( + "Only support insertRow and insertTablet, plan is [%s]", + plan.getOperatorType())); + } } } // check type is match + boolean mismatch = false; TSDataType insertDataType = null; if (plan instanceof InsertRowPlan) { if (!((InsertRowPlan) plan).isNeedInferType()) { - // only when InsertRowPlan's values is object[], we should check type - insertDataType = getTypeInLoc(plan, i); + // only when InsertRowPlan's values list is object[], we should check type + insertDataType = plan.getDataTypes()[i]; } else { insertDataType = measurementMNode.getSchema().getType(); } + mismatch = measurementMNode.getSchema().getType() != insertDataType; } else if (plan instanceof InsertTabletPlan) { - insertDataType = getTypeInLoc(plan, i); + int measurementSize = measurementList[i].split(",").length; + loc -= measurementSize; + if (measurementSize == 1) { + insertDataType = measurementMNode.getSchema().getType(); + mismatch = measurementMNode.getSchema().getType() != insertDataType; + } else { + for (int j = 0; j < measurementSize; j++) { + TSDataType dataTypeInNode = + measurementMNode.getSchema().getValueTSDataTypeList().get(j); + insertDataType = plan.getDataTypes()[loc]; + if (dataTypeInNode != insertDataType) { + mismatch = true; + insertDataType = measurementMNode.getSchema().getType(); + break; + } + loc++; + } + } } - if (measurementMNode.getSchema().getType() != insertDataType) { + if (mismatch) { logger.warn( "DataType mismatch, Insert measurement {} type {}, metadata tree type {}", measurementList[i], @@ -1985,7 +2056,6 @@ public class MManager { // set measurementName instead of alias measurementList[i] = measurementMNode.getName(); - } catch (MetadataException e) { logger.warn( "meet error when check {}.{}, message: {}", @@ -2008,7 +2078,7 @@ public class MManager { return deviceMNode.getChild(measurementName); } - /** create timeseries with ignore PathAlreadyExistException */ + /** create timeseries ignoring PathAlreadyExistException */ private void internalCreateTimeseries(PartialPath path, TSDataType dataType) throws MetadataException { createTimeseries( @@ -2019,21 +2089,20 @@ public class MManager { Collections.emptyMap()); } - /** get dataType of plan, in loc measurements only support InsertRowPlan and InsertTabletPlan */ - private TSDataType getTypeInLoc(InsertPlan plan, int loc) throws MetadataException { - TSDataType dataType; - if (plan instanceof InsertRowPlan) { - InsertRowPlan tPlan = (InsertRowPlan) plan; - dataType = - TypeInferenceUtils.getPredictedDataType(tPlan.getValues()[loc], tPlan.isNeedInferType()); - } else if (plan instanceof InsertTabletPlan) { - dataType = (plan).getDataTypes()[loc]; - } else { - throw new MetadataException( - String.format( - "Only support insert and insertTablet, plan is [%s]", plan.getOperatorType())); + /** create aligned timeseries ignoring PathAlreadyExistException */ + private void internalAlignedCreateTimeseries( + PartialPath devicePath, List<String> measurements, List<TSDataType> dataTypes) + throws MetadataException { + List<TSEncoding> encodings = new ArrayList<>(); + for (TSDataType dataType : dataTypes) { + encodings.add(getDefaultEncoding(dataType)); } - return dataType; + createAlignedTimeSeries( + devicePath, + measurements, + dataTypes, + encodings, + TSFileDescriptor.getInstance().getConfig().getCompressor()); } /** diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MetaUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/MetaUtils.java index a02f5be..fab9114 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MetaUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MetaUtils.java @@ -115,7 +115,8 @@ public class MetaUtils { */ public static List<String> getMeasurementsInPartialPath(PartialPath fullPath) { if (fullPath.getMeasurement().contains("(") && fullPath.getMeasurement().contains(",")) { - return (Arrays.asList(fullPath.getMeasurement().split("\\(")[1].split("\\)")[0].split(","))); + return (Arrays.asList( + fullPath.getMeasurement().replace("(", "").replace(")", "").split(","))); } else { return Arrays.asList(fullPath.getMeasurement()); } diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java index 52b5b64..1246eea 100644 --- a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java +++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java @@ -106,4 +106,55 @@ public class InsertTabletPlanTest { Assert.assertEquals(6, record.getFields().size()); } } + + @Test + public void testInsertTabletPlanWithAlignedTimeseries() + throws QueryProcessException, MetadataException, InterruptedException, + QueryFilterOptimizationException, StorageEngineException, IOException { + long[] times = new long[] {110L, 111L, 112L, 113L}; + List<Integer> dataTypes = new ArrayList<>(); + dataTypes.add(TSDataType.DOUBLE.ordinal()); + dataTypes.add(TSDataType.FLOAT.ordinal()); + dataTypes.add(TSDataType.INT64.ordinal()); + dataTypes.add(TSDataType.INT32.ordinal()); + dataTypes.add(TSDataType.BOOLEAN.ordinal()); + dataTypes.add(TSDataType.TEXT.ordinal()); + + Object[] columns = new Object[6]; + columns[0] = new double[4]; + columns[1] = new float[4]; + columns[2] = new long[4]; + columns[3] = new int[4]; + columns[4] = new boolean[4]; + columns[5] = new Binary[4]; + + for (int r = 0; r < 4; r++) { + ((double[]) columns[0])[r] = 1.0; + ((float[]) columns[1])[r] = 2; + ((long[]) columns[2])[r] = 10000; + ((int[]) columns[3])[r] = 100; + ((boolean[]) columns[4])[r] = false; + ((Binary[]) columns[5])[r] = new Binary("hh" + r); + } + + InsertTabletPlan tabletPlan = + new InsertTabletPlan( + new PartialPath("root.isp.d1"), + new String[] {"(s1,s2,s3)", "(s4,s5)", "s6"}, + dataTypes); + tabletPlan.setTimes(times); + tabletPlan.setColumns(columns); + tabletPlan.setRowCount(times.length); + + PlanExecutor executor = new PlanExecutor(); + executor.insertTablet(tabletPlan); + + QueryPlan queryPlan = (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.isp.d1"); + QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT); + Assert.assertEquals(6, dataSet.getPaths().size()); + while (dataSet.hasNext()) { + RowRecord record = dataSet.next(); + Assert.assertEquals(6, record.getFields().size()); + } + } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java index 0411be9..def8386 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java @@ -318,10 +318,7 @@ public class VectorMeasurementSchema TSDataType.deserialize(types[i]).toString(), ",", TSEncoding.deserialize(encodings[i]).toString()); - sc.addTail("]"); - if (i != measurements.length - 1) { - sc.addTail(", "); - } + sc.addTail("],"); } sc.addTail(CompressionType.deserialize(compressor).toString()); return sc.toString();
