This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch NonAlignedTablet in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 448ed1ceeaa9d6ef8171c488e2d793afb2f30953 Author: JackieTien97 <[email protected]> AuthorDate: Tue Mar 1 20:42:15 2022 +0800 Support NonAlignedTablet --- .../apache/iotdb/tsfile/TsFileWriteWithTablet.java | 32 ++- .../apache/iotdb/tsfile/write/TsFileWriter.java | 28 +++ .../chunk/NonAlignedChunkGroupWriterImpl.java | 72 ++++++ .../tsfile/write/record/NonAlignedTablet.java | 280 +++++++++++++++++++++ 4 files changed, 411 insertions(+), 1 deletion(-) diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileWriteWithTablet.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileWriteWithTablet.java index c454e0c..2bb19d8 100644 --- a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileWriteWithTablet.java +++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileWriteWithTablet.java @@ -26,6 +26,7 @@ import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.write.TsFileWriter; +import org.apache.iotdb.tsfile.write.record.NonAlignedTablet; import org.apache.iotdb.tsfile.write.record.Tablet; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; @@ -69,7 +70,7 @@ public class TsFileWriteWithTablet { writeMeasurementScheams.add(measurementSchemas.get(0)); writeMeasurementScheams.add(measurementSchemas.get(1)); writeMeasurementScheams.add(measurementSchemas.get(2)); - writeWithTablet(tsFileWriter, DEVICE_1, writeMeasurementScheams, 10000, 0, 0); + writeWithNonAlignedTablet(tsFileWriter, DEVICE_1, writeMeasurementScheams, 10000, 0, 0); } } catch (Exception e) { logger.error("meet error in TsFileWrite with tablet", e); @@ -108,4 +109,33 @@ public class TsFileWriteWithTablet { tablet.reset(); } } + + private static void writeWithNonAlignedTablet( + TsFileWriter tsFileWriter, + String deviceId, + List<MeasurementSchema> schemas, + long rowNum, + long startTime, + long startValue) + throws IOException, WriteProcessException { + NonAlignedTablet tablet = new NonAlignedTablet(deviceId, schemas); + + long sensorNum = schemas.size(); + for (long r = 0; r < rowNum; r++, startValue++) { + for (int i = 0; i < sensorNum; i++) { + tablet.addValue( + schemas.get(i).getMeasurementId(), startTime++, new Binary("testString.........")); + } + // write + if (tablet.maxRowSize == tablet.getMaxRowNumber()) { + tsFileWriter.write(tablet); + tablet.reset(); + } + } + // write + if (tablet.maxRowSize != 0) { + tsFileWriter.write(tablet); + tablet.reset(); + } + } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java index f2ced03..5e8cdfb 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java @@ -27,6 +27,7 @@ import org.apache.iotdb.tsfile.utils.MeasurementGroup; import org.apache.iotdb.tsfile.write.chunk.AlignedChunkGroupWriterImpl; import org.apache.iotdb.tsfile.write.chunk.IChunkGroupWriter; import org.apache.iotdb.tsfile.write.chunk.NonAlignedChunkGroupWriterImpl; +import org.apache.iotdb.tsfile.write.record.NonAlignedTablet; import org.apache.iotdb.tsfile.write.record.TSRecord; import org.apache.iotdb.tsfile.write.record.Tablet; import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; @@ -380,6 +381,24 @@ public class TsFileWriter implements AutoCloseable { } } + private void checkIsTimeseriesExist(NonAlignedTablet tablet) throws WriteProcessException { + IChunkGroupWriter groupWriter = tryToInitialGroupWriter(tablet.deviceId, false); + + Path devicePath = new Path(tablet.deviceId); + List<MeasurementSchema> schemas = tablet.getSchemas(); + if (schema.containsDevice(devicePath)) { + checkIsAllMeasurementsInGroup(schema.getSeriesSchema(devicePath), schemas, false); + groupWriter.tryToAddSeriesWriter(schemas); + } else if (schema.getSchemaTemplates() != null && schema.getSchemaTemplates().size() == 1) { + MeasurementGroup measurementGroup = + schema.getSchemaTemplates().entrySet().iterator().next().getValue(); + checkIsAllMeasurementsInGroup(measurementGroup, schemas, false); + groupWriter.tryToAddSeriesWriter(schemas); + } else { + throw new NoMeasurementException("input devicePath is invalid: " + devicePath); + } + } + /** * If it's aligned, then all measurementSchemas should be contained in the measurementGroup, or it * will throw exception. If it's nonAligned, then remove the measurementSchema that is not @@ -513,6 +532,15 @@ public class TsFileWriter implements AutoCloseable { return checkMemorySizeAndMayFlushChunks(); } + public boolean write(NonAlignedTablet tablet) throws IOException, WriteProcessException { + // make sure the ChunkGroupWriter for this Tablet exist + checkIsTimeseriesExist(tablet); + // get corresponding ChunkGroupWriter and write this Tablet + recordCount += + ((NonAlignedChunkGroupWriterImpl) groupWriters.get(tablet.deviceId)).write(tablet); + return checkMemorySizeAndMayFlushChunks(); + } + public boolean writeAligned(Tablet tablet) throws IOException, WriteProcessException { // make sure the ChunkGroupWriter for this Tablet exist checkIsTimeseriesExist(tablet, true); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java index 8b6038e..9d65318 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java @@ -22,6 +22,7 @@ import org.apache.iotdb.tsfile.common.constant.TsFileConstant; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.exception.write.WriteProcessException; import org.apache.iotdb.tsfile.utils.Binary; +import org.apache.iotdb.tsfile.write.record.NonAlignedTablet; import org.apache.iotdb.tsfile.write.record.Tablet; import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; @@ -129,6 +130,77 @@ public class NonAlignedChunkGroupWriterImpl implements IChunkGroupWriter { return pointCount; } + public int write(NonAlignedTablet tablet) throws WriteProcessException { + int pointCount = 0; + List<MeasurementSchema> timeseries = tablet.getSchemas(); + for (int column = 0; column < timeseries.size(); column++) { + String measurementId = timeseries.get(column).getMeasurementId(); + int rowSize = tablet.rowSize[column]; + pointCount = Math.max(pointCount, rowSize); + long[] timestamps = tablet.timestamps[column]; + switch (timeseries.get(column).getType()) { + case INT32: + int[] intValues = (int[]) tablet.values[column]; + for (int row = 0; row < rowSize; row++) { + long time = timestamps[row]; + checkIsHistoryData(measurementId, time); + chunkWriters.get(measurementId).write(time, intValues[row]); + lastTimeMap.put(measurementId, time); + } + break; + case INT64: + long[] longValues = (long[]) tablet.values[column]; + for (int row = 0; row < rowSize; row++) { + long time = timestamps[row]; + checkIsHistoryData(measurementId, time); + chunkWriters.get(measurementId).write(time, longValues[row]); + lastTimeMap.put(measurementId, time); + } + break; + case FLOAT: + float[] floatValues = (float[]) tablet.values[column]; + for (int row = 0; row < rowSize; row++) { + long time = timestamps[row]; + checkIsHistoryData(measurementId, time); + chunkWriters.get(measurementId).write(time, floatValues[row]); + lastTimeMap.put(measurementId, time); + } + break; + case DOUBLE: + double[] doubleValues = (double[]) tablet.values[column]; + for (int row = 0; row < rowSize; row++) { + long time = timestamps[row]; + checkIsHistoryData(measurementId, time); + chunkWriters.get(measurementId).write(time, doubleValues[row]); + lastTimeMap.put(measurementId, time); + } + break; + case BOOLEAN: + boolean[] booleanValues = (boolean[]) tablet.values[column]; + for (int row = 0; row < rowSize; row++) { + long time = timestamps[row]; + checkIsHistoryData(measurementId, time); + chunkWriters.get(measurementId).write(time, booleanValues[row]); + lastTimeMap.put(measurementId, time); + } + break; + case TEXT: + Binary[] binaryValues = (Binary[]) tablet.values[column]; + for (int row = 0; row < rowSize; row++) { + long time = timestamps[row]; + checkIsHistoryData(measurementId, time); + chunkWriters.get(measurementId).write(time, binaryValues[row]); + lastTimeMap.put(measurementId, time); + } + break; + default: + throw new UnSupportedDataTypeException( + String.format("Data type %s is not supported.", timeseries.get(column).getType())); + } + } + return pointCount; + } + @Override public long flushToFileWriter(TsFileIOWriter fileWriter) throws IOException { LOG.debug("start flush device id:{}", deviceId); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/NonAlignedTablet.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/NonAlignedTablet.java new file mode 100644 index 0000000..f0be58f --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/NonAlignedTablet.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.write.record; + +import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.utils.Binary; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +import java.util.*; + +public class NonAlignedTablet { + + private static final int DEFAULT_SIZE = 1024; + private static final String NOT_SUPPORT_DATATYPE = "Data type %s is not supported."; + + /** deviceId of this tablet */ + public String deviceId; + + /** the list of measurement schemas for creating the tablet */ + private List<MeasurementSchema> schemas; + + /** measurementId->indexOf(measurementSchema) */ + private final Map<String, Integer> measurementIndex; + + /** timestamps in this tablet */ + public long[][] timestamps; + /** each object is a primitive type array, which represents values of one measurement */ + public Object[] values; + + /** the number of rows for each sensor to include in this tablet */ + public int[] rowSize; + /** the max number of rows in each sensor */ + public int maxRowSize; + /** the maximum number of rows for this tablet */ + private final int maxRowNumber; + + /** + * Return a tablet with default specified row number. This is the standard constructor (all Tablet + * should be the same size). + * + * @param deviceId the name of the device specified to be written in + * @param schemas the list of measurement schemas for creating the tablet, only measurementId and + * type take effects + */ + public NonAlignedTablet(String deviceId, List<MeasurementSchema> schemas) { + this(deviceId, schemas, DEFAULT_SIZE); + } + + /** + * Return a tablet with the specified number of rows (maxBatchSize). Only call this constructor + * directly for testing purposes. Tablet should normally always be default size. + * + * @param deviceId the name of the device specified to be written in + * @param schemas the list of measurement schemas for creating the row batch, only measurementId + * and type take effects + * @param maxRowNumber the maximum number of rows for this tablet + */ + public NonAlignedTablet(String deviceId, List<MeasurementSchema> schemas, int maxRowNumber) { + this.deviceId = deviceId; + this.schemas = new ArrayList<>(schemas); + this.maxRowNumber = maxRowNumber; + measurementIndex = new HashMap<>(); + + int indexInSchema = 0; + for (MeasurementSchema schema : schemas) { + if (schema.getType() == TSDataType.VECTOR) { + for (String measurementId : schema.getSubMeasurementsList()) { + measurementIndex.put(measurementId, indexInSchema); + } + } else { + measurementIndex.put(schema.getMeasurementId(), indexInSchema); + } + indexInSchema++; + } + + createColumns(); + + reset(); + } + + public void setDeviceId(String deviceId) { + this.deviceId = deviceId; + } + + public void setSchemas(List<MeasurementSchema> schemas) { + this.schemas = schemas; + } + + public void addValue(String measurementId, long time, Object value) { + if (value == null) { + return; + } + int indexOfSchema = measurementIndex.get(measurementId); + MeasurementSchema measurementSchema = schemas.get(indexOfSchema); + addValueOfDataType( + measurementSchema.getType(), rowSize[indexOfSchema]++, indexOfSchema, time, value); + maxRowSize = Math.max(maxRowSize, rowSize[indexOfSchema]); + } + + private void addValueOfDataType( + TSDataType dataType, int rowIndex, int indexOfSchema, long time, Object value) { + timestamps[indexOfSchema][rowIndex] = time; + switch (dataType) { + case TEXT: + { + Binary[] sensor = (Binary[]) values[indexOfSchema]; + sensor[rowIndex] = value != null ? (Binary) value : Binary.EMPTY_VALUE; + break; + } + case FLOAT: + { + float[] sensor = (float[]) values[indexOfSchema]; + sensor[rowIndex] = value != null ? (float) value : Float.MIN_VALUE; + break; + } + case INT32: + { + int[] sensor = (int[]) values[indexOfSchema]; + sensor[rowIndex] = value != null ? (int) value : Integer.MIN_VALUE; + break; + } + case INT64: + { + long[] sensor = (long[]) values[indexOfSchema]; + sensor[rowIndex] = value != null ? (long) value : Long.MIN_VALUE; + break; + } + case DOUBLE: + { + double[] sensor = (double[]) values[indexOfSchema]; + sensor[rowIndex] = value != null ? (double) value : Double.MIN_VALUE; + break; + } + case BOOLEAN: + { + boolean[] sensor = (boolean[]) values[indexOfSchema]; + sensor[rowIndex] = value != null && (boolean) value; + break; + } + default: + throw new UnSupportedDataTypeException(String.format(NOT_SUPPORT_DATATYPE, dataType)); + } + } + + public List<MeasurementSchema> getSchemas() { + return schemas; + } + + /** Return the maximum number of rows for this tablet */ + public int getMaxRowNumber() { + return maxRowNumber; + } + + /** Reset Tablet to the default state - set the rowSize to 0 and reset bitMaps */ + public void reset() { + maxRowSize = 0; + if (rowSize == null) { + rowSize = new int[schemas.size()]; + } else { + Arrays.fill(rowSize, 0); + } + } + + private void createColumns() { + // create timestamp column + timestamps = new long[schemas.size()][maxRowNumber]; + + // calculate total value column size + int valueColumnsSize = schemas.size(); + + // value column + values = new Object[valueColumnsSize]; + int columnIndex = 0; + for (MeasurementSchema schema : schemas) { + TSDataType dataType = schema.getType(); + values[columnIndex] = createValueColumnOfDataType(dataType); + columnIndex++; + } + } + + private Object createValueColumnOfDataType(TSDataType dataType) { + + Object valueColumn; + switch (dataType) { + case INT32: + valueColumn = new int[maxRowNumber]; + break; + case INT64: + valueColumn = new long[maxRowNumber]; + break; + case FLOAT: + valueColumn = new float[maxRowNumber]; + break; + case DOUBLE: + valueColumn = new double[maxRowNumber]; + break; + case BOOLEAN: + valueColumn = new boolean[maxRowNumber]; + break; + case TEXT: + valueColumn = new Binary[maxRowNumber]; + break; + default: + throw new UnSupportedDataTypeException(String.format(NOT_SUPPORT_DATATYPE, dataType)); + } + return valueColumn; + } + + // public int getTimeBytesSize() { + // return rowSize * 8; + // } + // + // /** + // * @return total bytes of values + // */ + // public int getTotalValueOccupation() { + // int valueOccupation = 0; + // int columnIndex = 0; + // for (MeasurementSchema schema : schemas) { + // valueOccupation += calOccupationOfOneColumn(schema.getType(), columnIndex); + // columnIndex++; + // } + // // add bitmap size if the tablet has bitMaps + // if (bitMaps != null) { + // for (BitMap bitMap : bitMaps) { + // // marker byte + // valueOccupation++; + // if (bitMap != null && !bitMap.isAllUnmarked()) { + // valueOccupation += rowSize / Byte.SIZE + 1; + // } + // } + // } + // return valueOccupation; + // } + + // private int calOccupationOfOneColumn(TSDataType dataType, int columnIndex) { + // int valueOccupation = 0; + // switch (dataType) { + // case BOOLEAN: + // valueOccupation += rowSize; + // break; + // case INT32: + // case FLOAT: + // valueOccupation += rowSize * 4; + // break; + // case INT64: + // case DOUBLE: + // valueOccupation += rowSize * 8; + // break; + // case TEXT: + // valueOccupation += rowSize * 4; + // Binary[] binaries = (Binary[]) values[columnIndex]; + // for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) { + // valueOccupation += binaries[rowIndex].getLength(); + // } + // break; + // default: + // throw new UnSupportedDataTypeException(String.format(NOT_SUPPORT_DATATYPE, + // dataType)); + // } + // return valueOccupation; + // } +}
