This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/TableModelGrammar in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6fb4d2538e09cced852bdd74eda9be99f4fb4ff2 Author: JackieTien97 <[email protected]> AuthorDate: Tue Apr 16 15:21:51 2024 +0800 implement TableScanOperator --- .../source/relational/TableScanOperator.java | 369 +++++++++++++++++++++ .../execution/config/TableConfigTaskVisitor.java | 22 +- .../relational/planner/node/TableScanNode.java | 25 +- .../plan/relational/type/InternalTypeManager.java | 23 ++ .../dataregion/read/QueryDataSource.java | 9 + .../org/apache/iotdb/commons/path/AlignedPath.java | 4 + .../iotdb/tsfile/common/conf/TSFileConfig.java | 2 + .../iotdb/tsfile/file/metadata/IDeviceID.java | 53 +++ .../iotdb/tsfile/file/metadata/PlainDeviceID.java | 15 + .../tsfile/file/metadata/StringArrayDeviceID.java | 273 +++++++++++++++ .../org/apache/iotdb/tsfile/utils/WriteUtils.java | 36 ++ 11 files changed, 810 insertions(+), 21 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java new file mode 100644 index 00000000000..97ffe2b2282 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.source.relational; + +import org.apache.iotdb.commons.path.AlignedPath; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanUtil; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; +import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn; +import org.apache.iotdb.tsfile.read.common.block.column.Column; +import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn; +import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; +import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder; +import org.apache.iotdb.tsfile.utils.Binary; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType; + +public class TableScanOperator extends AbstractDataSourceOperator { + + private final List<Symbol> outputColumnNames; + + private final List<ColumnSchema> columnSchemas; + + private final int[] columnsIndexArray; + + private final int measurementColumnCount; + + private final List<DeviceEntry> deviceEntries; + + private final int deviceCount; + + private final Ordering scanOrder; + private final SeriesScanOptions seriesScanOptions; + + private final List<String> measurementColumnNames; + + private final List<IMeasurementSchema> measurementSchemas; + + private final List<TSDataType> measurementColumnTSDataTypes; + + private final TsBlockBuilder measurementDataBuilder; + private TsBlock measurementDataBlock; + + private QueryDataSource queryDataSource; + + private int currentDeviceIndex = 0; + + private int maxTsBlockLineNum = -1; + + public TableScanOperator( + OperatorContext context, + PlanNodeId sourceId, + List<Symbol> outputColumnNames, + List<ColumnSchema> columnSchemas, + int[] columnsIndexArray, + int measurementColumnCount, + List<DeviceEntry> deviceEntries, + Ordering scanOrder, + SeriesScanOptions seriesScanOptions, + List<String> measurementColumnNames, + List<IMeasurementSchema> measurementSchemas, + int maxTsBlockLineNum) { + this.sourceId = sourceId; + this.operatorContext = context; + this.outputColumnNames = outputColumnNames; + this.columnSchemas = columnSchemas; + this.columnsIndexArray = columnsIndexArray; + this.measurementColumnCount = measurementColumnCount; + this.deviceEntries = deviceEntries; + this.deviceCount = deviceEntries.size(); + this.scanOrder = scanOrder; + this.seriesScanOptions = seriesScanOptions; + this.measurementColumnNames = measurementColumnNames; + this.measurementSchemas = measurementSchemas; + this.measurementColumnTSDataTypes = + measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList()); + this.currentDeviceIndex = 0; + + this.maxReturnSize = + Math.min( + maxReturnSize, + (1L + outputColumnNames.size()) + * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()); + this.maxTsBlockLineNum = maxTsBlockLineNum; + + this.seriesScanUtil = constructAlignedSeriesScanUtil(deviceEntries.get(currentDeviceIndex)); + + this.measurementDataBuilder = new TsBlockBuilder(this.measurementColumnTSDataTypes); + this.resultTsBlockBuilder.setMaxTsBlockLineNumber(this.maxTsBlockLineNum); + } + + @Override + public TsBlock next() throws Exception { + if (retainedTsBlock != null) { + return getResultFromRetainedTsBlock(); + } + + try { + + // start stopwatch + long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); + long start = System.nanoTime(); + + // here use do-while to promise doing this at least once + do { + /* + * 1. consume page data firstly + * 2. consume chunk data secondly + * 3. consume next file finally + */ + if (!readPageData() && !readChunkData() && !readFileData()) { + break; + } + + } while (System.nanoTime() - start < maxRuntime + && !measurementDataBuilder.isFull() + && measurementDataBlock == null); + + // current device' data is consumed up + if (measurementDataBuilder.isEmpty() && measurementDataBlock == null) { + currentDeviceIndex++; + prepareForNextDevice(); + } + + } catch (IOException e) { + throw new RuntimeException("Error happened while scanning the file", e); + } + + // get all measurement column data and time column data + if (!measurementDataBuilder.isEmpty()) { + measurementDataBlock = measurementDataBuilder.build(); + measurementDataBuilder.reset(); + } + + // append id column and attribute column + if (!isEmpty(measurementDataBlock)) { + constructResultTsBlock(); + } + return checkTsBlockSizeAndGetResult(); + } + + private boolean readFileData() throws IOException { + while (seriesScanUtil.hasNextFile()) { + if (readChunkData()) { + return true; + } + } + return false; + } + + private boolean readChunkData() throws IOException { + while (seriesScanUtil.hasNextChunk()) { + if (readPageData()) { + return true; + } + } + return false; + } + + private boolean readPageData() throws IOException { + while (seriesScanUtil.hasNextPage()) { + TsBlock tsBlock = seriesScanUtil.nextPage(); + if (!isEmpty(tsBlock)) { + appendToBuilder(tsBlock); + return true; + } + } + return false; + } + + private void appendToBuilder(TsBlock tsBlock) { + int size = tsBlock.getPositionCount(); + if (measurementDataBuilder.isEmpty() + && tsBlock.getPositionCount() >= measurementDataBuilder.getMaxTsBlockLineNumber()) { + measurementDataBlock = tsBlock; + return; + } + TimeColumnBuilder timeColumnBuilder = measurementDataBuilder.getTimeColumnBuilder(); + TimeColumn timeColumn = tsBlock.getTimeColumn(); + for (int i = 0; i < size; i++) { + timeColumnBuilder.writeLong(timeColumn.getLong(i)); + measurementDataBuilder.declarePosition(); + } + for (int columnIndex = 0, columnSize = tsBlock.getValueColumnCount(); + columnIndex < columnSize; + columnIndex++) { + appendOneColumn(columnIndex, tsBlock, size); + } + } + + private void appendOneColumn(int columnIndex, TsBlock tsBlock, int size) { + ColumnBuilder columnBuilder = measurementDataBuilder.getColumnBuilder(columnIndex); + Column column = tsBlock.getColumn(columnIndex); + if (column.mayHaveNull()) { + for (int i = 0; i < size; i++) { + if (column.isNull(i)) { + columnBuilder.appendNull(); + } else { + columnBuilder.write(column, i); + } + } + } else { + for (int i = 0; i < size; i++) { + columnBuilder.write(column, i); + } + } + } + + private boolean isEmpty(TsBlock tsBlock) { + return tsBlock == null || tsBlock.isEmpty(); + } + + private void constructResultTsBlock() { + int positionCount = measurementDataBlock.getPositionCount(); + DeviceEntry currentDeviceEntry = deviceEntries.get(currentDeviceIndex); + Column[] valueColumns = new Column[columnsIndexArray.length]; + for (int i = 0; i < columnsIndexArray.length; i++) { + switch (columnSchemas.get(i).getColumnCategory()) { + case ID: + // +1 for skip the table name segment + String idColumnValue = + (String) currentDeviceEntry.getDeviceID().segment(columnsIndexArray[i] + 1); + valueColumns[i] = getIdOrAttributeValueColumn(idColumnValue, positionCount); + break; + case ATTRIBUTE: + String attributeColumnValue = + currentDeviceEntry.getAttributeColumnValues().get(columnsIndexArray[i]); + valueColumns[i] = getIdOrAttributeValueColumn(attributeColumnValue, positionCount); + break; + case MEASUREMENT: + valueColumns[i] = measurementDataBlock.getColumn(columnsIndexArray[i]); + break; + default: + throw new IllegalArgumentException( + "Unexpected column category: " + columnSchemas.get(i).getColumnCategory()); + } + } + this.resultTsBlock = + new TsBlock(positionCount, measurementDataBlock.getTimeColumn(), valueColumns); + } + + private RunLengthEncodedColumn getIdOrAttributeValueColumn(String value, int positionCount) { + if (value == null) { + return new RunLengthEncodedColumn( + new BinaryColumn(1, Optional.of(new boolean[] {true}), new Binary[] {null}), + positionCount); + } else { + return new RunLengthEncodedColumn( + new BinaryColumn( + 1, Optional.empty(), new Binary[] {new Binary(value, TSFileConfig.STRING_CHARSET)}), + positionCount); + } + } + + @Override + public boolean hasNext() throws Exception { + return currentDeviceIndex < deviceCount; + } + + @Override + public boolean isFinished() throws Exception { + return currentDeviceIndex >= deviceCount; + } + + @Override + public long calculateMaxPeekMemory() { + return (1L + outputColumnNames.size()) + * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() + * 3L; + } + + @Override + public long calculateMaxReturnSize() { + return maxReturnSize; + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return calculateMaxPeekMemoryWithCounter(); + } + + @Override + protected List<TSDataType> getResultDataTypes() { + List<TSDataType> resultDataTypes = new ArrayList<>(columnSchemas.size()); + for (ColumnSchema columnSchema : columnSchemas) { + resultDataTypes.add(getTSDataType(columnSchema.getType())); + } + return resultDataTypes; + } + + @Override + public void initQueryDataSource(QueryDataSource dataSource) { + this.queryDataSource = dataSource; + this.seriesScanUtil.initQueryDataSource(dataSource); + this.resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes()); + this.resultTsBlockBuilder.setMaxTsBlockLineNumber(this.maxTsBlockLineNum); + } + + private void prepareForNextDevice() { + if (currentDeviceIndex < deviceCount) { + // construct AlignedSeriesScanUtil for next device + this.seriesScanUtil = constructAlignedSeriesScanUtil(deviceEntries.get(currentDeviceIndex)); + + // reset QueryDataSource + queryDataSource.reset(); + this.seriesScanUtil.initQueryDataSource(queryDataSource); + } + } + + private AlignedSeriesScanUtil constructAlignedSeriesScanUtil(DeviceEntry deviceEntry) { + + String[] devicePath = new String[1 + deviceEntry.getDeviceID().segmentNum()]; + devicePath[0] = "root"; + for (int i = 1; i < devicePath.length; i++) { + devicePath[i] = (String) deviceEntry.getDeviceID().segment(i - 1); + } + AlignedPath alignedPath = new AlignedPath(new PartialPath(devicePath)); + + alignedPath.setMeasurementList(measurementColumnNames); + alignedPath.setSchemaList(measurementSchemas); + + return new AlignedSeriesScanUtil( + alignedPath, + scanOrder, + seriesScanOptions, + operatorContext.getInstanceContext(), + true, + measurementColumnTSDataTypes); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java index 3ac52823be0..2a82aca4c4f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java @@ -51,9 +51,8 @@ import org.apache.iotdb.db.relational.sql.tree.ShowTables; import org.apache.iotdb.db.relational.sql.tree.Use; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.read.common.type.Type; -import org.apache.iotdb.tsfile.read.common.type.TypeEnum; +import static org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType; import static org.apache.iotdb.db.queryengine.plan.relational.type.TypeSignatureTranslator.toTypeSignature; import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding; @@ -143,24 +142,7 @@ public class TableConfigTaskVisitor extends AstVisitor<IConfigTask, MPPQueryCont private TSDataType getDataType(DataType dataType) { try { - Type type = metadata.getType(toTypeSignature(dataType)); - TypeEnum typeEnum = type.getTypeEnum(); - switch (typeEnum) { - case TEXT: - return TSDataType.TEXT; - case FLOAT: - return TSDataType.FLOAT; - case DOUBLE: - return TSDataType.DOUBLE; - case INT32: - return TSDataType.INT32; - case INT64: - return TSDataType.INT64; - case BOOLEAN: - return TSDataType.BOOLEAN; - default: - throw new IllegalArgumentException(); - } + return getTSDataType(metadata.getType(toTypeSignature(dataType))); } catch (TypeNotFoundException e) { throw new SemanticException(String.format("Unknown type: %s", dataType)); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java index 0f8e3290bfe..d6aa012f57e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java @@ -12,6 +12,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; public class TableScanNode extends PlanNode { @@ -59,7 +61,7 @@ public class TableScanNode extends PlanNode { @Override public List<String> getOutputColumnNames() { - return null; + return outputSymbols.stream().map(Symbol::getName).collect(Collectors.toList()); } @Override @@ -72,4 +74,25 @@ public class TableScanNode extends PlanNode { public List<Symbol> getOutputSymbols() { return outputSymbols; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + TableScanNode that = (TableScanNode) o; + return Objects.equals(qualifiedTableName, that.qualifiedTableName) + && Objects.equals(outputSymbols, that.outputSymbols); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), qualifiedTableName, outputSymbols); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/InternalTypeManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/InternalTypeManager.java index 3b3cdf32bc3..eda52a408df 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/InternalTypeManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/InternalTypeManager.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.plan.relational.type; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.type.Type; import org.apache.iotdb.tsfile.read.common.type.TypeEnum; @@ -64,4 +65,26 @@ public class InternalTypeManager implements TypeManager { public Type getType(TypeId id) { throw new UnsupportedOperationException(); } + + public static TSDataType getTSDataType(Type type) { + TypeEnum typeEnum = type.getTypeEnum(); + switch (typeEnum) { + case TEXT: + return TSDataType.TEXT; + case FLOAT: + return TSDataType.FLOAT; + case DOUBLE: + return TSDataType.DOUBLE; + case INT32: + return TSDataType.INT32; + case INT64: + return TSDataType.INT64; + case BOOLEAN: + return TSDataType.BOOLEAN; + case UNKNOWN: + return TSDataType.UNKNOWN; + default: + throw new IllegalArgumentException(); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java index 660fbea5861..be19f4be267 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java @@ -218,4 +218,13 @@ public class QueryDataSource { public void setSingleDevice(boolean singleDevice) { isSingleDevice = singleDevice; } + + public void reset() { + curSeqIndex = -1; + curSeqOrderTime = 0; + curSeqSatisfied = null; + curUnSeqIndex = -1; + curUnSeqOrderTime = 0; + curUnSeqSatisfied = null; + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java index 6cdda34c885..6e46ed21178 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java @@ -170,6 +170,10 @@ public class AlignedPath extends PartialPath { this.schemaList.addAll(schemas); } + public void setSchemaList(List<IMeasurementSchema> schemaList) { + this.schemaList = schemaList; + } + public void addMeasurement(MeasurementPath measurementPath) { if (measurementList == null) { measurementList = new ArrayList<>(); diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java index 8a257cadff9..43dd158db39 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java @@ -73,6 +73,8 @@ public class TSFileConfig implements Serializable { /** The primitive array capacity threshold. */ public static final int ARRAY_CAPACITY_THRESHOLD = 1000; + + public static final int DEFAULT_SEGMENT_NUM_FOR_TABLE_NAME = 3; /** Memory size threshold for flushing to disk, default value is 128MB. */ private int groupSizeInByte = 128 * 1024 * 1024; /** The memory size for each series writer to pack page, default value is 64KB. */ diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IDeviceID.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IDeviceID.java index 08aa018154a..72917941d88 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IDeviceID.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IDeviceID.java @@ -22,6 +22,10 @@ package org.apache.iotdb.tsfile.file.metadata; import org.apache.iotdb.tsfile.utils.Accountable; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -30,6 +34,8 @@ import java.nio.ByteBuffer; /** Device id interface. */ public interface IDeviceID extends Comparable<IDeviceID>, Accountable { + Logger LOGGER = LoggerFactory.getLogger(IDeviceID.class); + int serialize(ByteBuffer byteBuffer); int serialize(OutputStream outputStream) throws IOException; @@ -38,6 +44,53 @@ public interface IDeviceID extends Comparable<IDeviceID>, Accountable { boolean isEmpty(); + /** + * @return the table name associated with the device. For a path-DeviceId, like "root.a.b.c.d", it + * is converted according to a fixed rule, like assuming the first three levels ("root.a.b") + * as the table name; for a tuple-deviceId, like "(table1, beijing, turbine)", it is the first + * element in the deviceId, namely "table1". + */ + String getTableName(); + + /** + * @return how many segments this DeviceId consists of. For a path-DeviceId, like "root.a.b.c.d", + * it is 5; fot a tuple-DeviceId, like "(table1, beijing, turbine)", it is 3. + */ + int segmentNum(); + + /** + * @param i the sequence number of the segment that should be returned. + * @return i-th segment in this DeviceId. + * @throws ArrayIndexOutOfBoundsException if i >= segmentNum(). + */ + Object segment(int i); + + default int serializedSize() { + LOGGER.debug( + "Using default inefficient implementation of serialized size by {}", this.getClass()); + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + serialize(baos); + return baos.size(); + } catch (IOException e) { + LOGGER.error("Failed to serialize device ID: {}", this, e); + return -1; + } + } + + interface Deserializer { + IDeviceID deserializeFrom(ByteBuffer byteBuffer); + + IDeviceID deserializeFrom(InputStream inputStream) throws IOException; + + Deserializer DEFAULT_DESERIALIZER = StringArrayDeviceID.getDESERIALIZER(); + } + + interface Factory { + IDeviceID create(String deviceIdString); + + Factory DEFAULT_FACTORY = StringArrayDeviceID.getFACTORY(); + } + static IDeviceID deserializeFrom(ByteBuffer byteBuffer) { // TODO return new PlainDeviceID(ReadWriteIOUtils.readVarIntString(byteBuffer)); diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/PlainDeviceID.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/PlainDeviceID.java index 1721dc1e1aa..8404c39dfed 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/PlainDeviceID.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/PlainDeviceID.java @@ -86,6 +86,21 @@ public class PlainDeviceID implements IDeviceID { return deviceID.isEmpty(); } + @Override + public String getTableName() { + throw new UnsupportedOperationException(); + } + + @Override + public int segmentNum() { + throw new UnsupportedOperationException(); + } + + @Override + public Object segment(int i) { + throw new UnsupportedOperationException(); + } + @Override public long ramBytesUsed() { return INSTANCE_SIZE + sizeOfCharArray(deviceID.length()); diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/StringArrayDeviceID.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/StringArrayDeviceID.java new file mode 100644 index 00000000000..3b125982644 --- /dev/null +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/StringArrayDeviceID.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.tsfile.file.metadata; + +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.common.constant.TsFileConstant; +import org.apache.iotdb.tsfile.exception.TsFileRuntimeException; +import org.apache.iotdb.tsfile.utils.RamUsageEstimator; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; +import org.apache.iotdb.tsfile.utils.WriteUtils; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Objects; + +public class StringArrayDeviceID implements IDeviceID { + + private static final Deserializer DESERIALIZER = + new Deserializer() { + @Override + public IDeviceID deserializeFrom(ByteBuffer byteBuffer) { + return deserialize(byteBuffer); + } + + @Override + public IDeviceID deserializeFrom(InputStream inputStream) throws IOException { + return deserialize(inputStream); + } + }; + + private static final Factory FACTORY = + new Factory() { + @Override + public IDeviceID create(String deviceIdString) { + return new StringArrayDeviceID(deviceIdString); + } + }; + + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(StringArrayDeviceID.class); + + // TODO: change to Object[] and rename to just ArrayDeviceID + // or we can just use a tuple like Relational DB. + private final String[] segments; + + public StringArrayDeviceID(String... segments) { + this.segments = segments; + } + + public StringArrayDeviceID(String deviceIdString) { + this.segments = splitDeviceIdString(deviceIdString); + } + + @SuppressWarnings("java:S125") // confusing comments with codes + private static String[] splitDeviceIdString(String deviceIdString) { + int lastSeparatorPos = -1; + int currPos = 0; + int segmentCnt = 1; + // split the string with '.', stop when finding enough segments to form a table name + // String.split is not used here to avoid unnecessary string copy + for (; + currPos < deviceIdString.length() + && segmentCnt < TSFileConfig.DEFAULT_SEGMENT_NUM_FOR_TABLE_NAME + 1; + currPos++) { + if (deviceIdString.charAt(currPos) == TsFileConstant.PATH_SEPARATOR_CHAR) { + lastSeparatorPos = currPos; + segmentCnt++; + } + } + + String tableName; + String[] segments; + // assuming DEFAULT_SEGMENT_NUM_FOR_TABLE_NAME = 3 + if (segmentCnt < TSFileConfig.DEFAULT_SEGMENT_NUM_FOR_TABLE_NAME + 1) { + // "root" -> {"", "root"} + // "root.a" -> {"root", "a"} + // "root.a.b" -> {"root.a", "b"} + tableName = segmentCnt == 1 ? "" : deviceIdString.substring(0, lastSeparatorPos); + segments = new String[2]; + segments[0] = tableName; + segments[1] = deviceIdString.substring(lastSeparatorPos + 1); + } else { + // "root.a.b.c" -> {"root.a.b", "c"} + // "root.a.b.c.d" -> {"root.a.b", "c", "d"} + tableName = deviceIdString.substring(0, lastSeparatorPos); + String[] idSegments = + deviceIdString + .substring(lastSeparatorPos + 1) + .split(TsFileConstant.PATH_SEPARATER_NO_REGEX); + segments = new String[idSegments.length + 1]; + segments[0] = tableName; + System.arraycopy(idSegments, 0, segments, 1, idSegments.length); + } + + return segments; + } + + public static Deserializer getDESERIALIZER() { + return DESERIALIZER; + } + + public static Factory getFACTORY() { + return FACTORY; + } + + @Override + public int serialize(ByteBuffer byteBuffer) { + int cnt = 0; + cnt += ReadWriteIOUtils.write(segments.length, byteBuffer); + for (String segment : segments) { + cnt += ReadWriteIOUtils.write(segment, byteBuffer); + } + return cnt; + } + + @Override + public int serialize(OutputStream outputStream) throws IOException { + int cnt = 0; + cnt += ReadWriteIOUtils.write(segments.length, outputStream); + for (String segment : segments) { + cnt += ReadWriteIOUtils.write(segment, outputStream); + } + return cnt; + } + + public static StringArrayDeviceID deserialize(ByteBuffer byteBuffer) { + final int cnt = byteBuffer.getInt(); + String[] segments = new String[cnt]; + for (int i = 0; i < cnt; i++) { + final int stringSize = byteBuffer.getInt(); + byte[] stringBytes = new byte[stringSize]; + byteBuffer.get(stringBytes); + segments[i] = new String(stringBytes, TSFileConfig.STRING_CHARSET); + } + return new StringArrayDeviceID(segments); + } + + public static StringArrayDeviceID deserialize(InputStream stream) throws IOException { + final int cnt = ReadWriteIOUtils.readInt(stream); + if (cnt == 0) { + return new StringArrayDeviceID(new String[] {""}); + } + + String[] segments = new String[cnt]; + for (int i = 0; i < cnt; i++) { + final int stringSize = ReadWriteIOUtils.readInt(stream); + byte[] stringBytes = new byte[stringSize]; + final int readCnt = stream.read(stringBytes); + if (readCnt != stringSize) { + throw new IOException(String.format("Expected %d bytes but read %d", stringSize, readCnt)); + } + segments[i] = new String(stringBytes, TSFileConfig.STRING_CHARSET); + } + return new StringArrayDeviceID(segments); + } + + @Override + public byte[] getBytes() { + ByteArrayOutputStream publicBAOS = new ByteArrayOutputStream(256); + for (String segment : segments) { + try { + publicBAOS.write(segment.getBytes(TSFileConfig.STRING_CHARSET)); + } catch (IOException e) { + throw new TsFileRuntimeException(e.getMessage()); + } + } + return publicBAOS.toByteArray(); + } + + @Override + public boolean isEmpty() { + return segments == null || segments.length == 0; + } + + @Override + public String getTableName() { + return segments[0]; + } + + @Override + public int segmentNum() { + return segments.length; + } + + @Override + public String segment(int i) { + return segments[i]; + } + + @Override + public int compareTo(IDeviceID o) { + int thisSegmentNum = segmentNum(); + int otherSegmentNum = o.segmentNum(); + for (int i = 0; i < thisSegmentNum; i++) { + if (i >= otherSegmentNum) { + // the other ID is a prefix of this one + return 1; + } + final int comp = + Objects.compare(this.segment(i), ((String) o.segment(i)), WriteUtils::compareStrings); + if (comp != 0) { + // the partial comparison has a result + return comp; + } + } + + if (thisSegmentNum < otherSegmentNum) { + // this ID is a prefix of the other one + return -1; + } + + // two ID equal + return 0; + } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + RamUsageEstimator.sizeOf(segments); + } + + @Override + public int serializedSize() { + int cnt = Integer.BYTES; + for (String segment : segments) { + cnt += Integer.BYTES; + cnt += segment.getBytes(TSFileConfig.STRING_CHARSET).length; + } + return cnt; + } + + @Override + public String toString() { + return String.join(".", segments); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + StringArrayDeviceID deviceID = (StringArrayDeviceID) o; + return Objects.deepEquals(segments, deviceID.segments); + } + + @Override + public int hashCode() { + return Arrays.hashCode(segments); + } +} diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/WriteUtils.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/WriteUtils.java new file mode 100644 index 00000000000..7fb3bc7aab3 --- /dev/null +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/WriteUtils.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.tsfile.utils; + +public class WriteUtils { + + public static int compareStrings(String a, String b) { + if (a == null && b == null) { + return 0; + } + if (a == null) { + return -1; + } + if (b == null) { + return 1; + } + return a.compareTo(b); + } +}
