This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch improvedAlign_for_expr in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 29325681d2b477fb6b23c5f6d26b3e2008c24177 Author: JackieTien97 <[email protected]> AuthorDate: Thu May 26 10:04:47 2022 +0800 improve aligned tsfile write api --- .../write/chunk/AlignedChunkGroupWriterImpl.java | 220 ++++++++++++++++----- .../iotdb/tsfile/write/chunk/TimeChunkWriter.java | 4 + .../iotdb/tsfile/write/chunk/ValueChunkWriter.java | 4 + 3 files changed, 184 insertions(+), 44 deletions(-) diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java index 79d9ed271e..20e0e50c53 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java @@ -43,6 +43,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter { private static final Logger LOG = LoggerFactory.getLogger(AlignedChunkGroupWriterImpl.class); @@ -144,57 +145,188 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter { @Override public int write(Tablet tablet) throws WriteProcessException, IOException { - int pointCount = 0; - List<MeasurementSchema> measurementSchemas = tablet.getSchemas(); + // write time for (int row = 0; row < tablet.rowSize; row++) { long time = tablet.timestamps[row]; checkIsHistoryData("", time); - for (int columnIndex = 0; columnIndex < measurementSchemas.size(); columnIndex++) { - writenMeasurementSet.add(measurementSchemas.get(columnIndex).getMeasurementId()); - boolean isNull = false; - // check isNull by bitMap in tablet - if (tablet.bitMaps != null - && tablet.bitMaps[columnIndex] != null - && tablet.bitMaps[columnIndex].isMarked(row)) { - isNull = true; - } - ValueChunkWriter valueChunkWriter = - valueChunkWriterMap.get(measurementSchemas.get(columnIndex).getMeasurementId()); - switch (measurementSchemas.get(columnIndex).getType()) { - case BOOLEAN: - valueChunkWriter.write(time, ((boolean[]) tablet.values[columnIndex])[row], isNull); - break; - case INT32: - valueChunkWriter.write(time, ((int[]) tablet.values[columnIndex])[row], isNull); - break; - case INT64: - valueChunkWriter.write(time, ((long[]) tablet.values[columnIndex])[row], isNull); - break; - case FLOAT: - valueChunkWriter.write(time, ((float[]) tablet.values[columnIndex])[row], isNull); - break; - case DOUBLE: - valueChunkWriter.write(time, ((double[]) tablet.values[columnIndex])[row], isNull); - break; - case TEXT: - valueChunkWriter.write(time, ((Binary[]) tablet.values[columnIndex])[row], isNull); - break; - default: - throw new UnSupportedDataTypeException( - String.format( - "Data type %s is not supported.", - measurementSchemas.get(columnIndex).getType())); - } - } - writeEmptyDataInOneRow(time); timeChunkWriter.write(time); lastTime = time; - if (checkPageSizeAndMayOpenANewPage()) { - writePageToPageBuffer(); + if (timeChunkWriter.needANewPage()) { + timeChunkWriter.writePageToPageBuffer(); } - pointCount++; } - return pointCount; + + List<MeasurementSchema> measurementSchemas = tablet.getSchemas(); + // write existed values + for (int columnIndex = 0; columnIndex < measurementSchemas.size(); columnIndex++) { + ValueChunkWriter valueChunkWriter = + valueChunkWriterMap.get(measurementSchemas.get(columnIndex).getMeasurementId()); + TSDataType dataType = measurementSchemas.get(columnIndex).getType(); + switch (dataType) { + case BOOLEAN: + boolean[] booleanValues = (boolean[]) tablet.values[columnIndex]; + for (int row = 0; row < tablet.rowSize; row++) { + long time = tablet.timestamps[row]; + boolean isNull = + tablet.bitMaps != null + && tablet.bitMaps[columnIndex] != null + && tablet.bitMaps[columnIndex].isMarked(row); + // check isNull by bitMap in tablet + valueChunkWriter.write(time, booleanValues[row], isNull); + if (valueChunkWriter.needANewPage()) { + valueChunkWriter.writePageToPageBuffer(); + } + } + break; + case INT32: + int[] intValues = (int[]) tablet.values[columnIndex]; + for (int row = 0; row < tablet.rowSize; row++) { + long time = tablet.timestamps[row]; + boolean isNull = + tablet.bitMaps != null + && tablet.bitMaps[columnIndex] != null + && tablet.bitMaps[columnIndex].isMarked(row); + // check isNull by bitMap in tablet + valueChunkWriter.write(time, intValues[row], isNull); + if (valueChunkWriter.needANewPage()) { + valueChunkWriter.writePageToPageBuffer(); + } + } + break; + case INT64: + long[] longValues = (long[]) tablet.values[columnIndex]; + for (int row = 0; row < tablet.rowSize; row++) { + long time = tablet.timestamps[row]; + boolean isNull = + tablet.bitMaps != null + && tablet.bitMaps[columnIndex] != null + && tablet.bitMaps[columnIndex].isMarked(row); + // check isNull by bitMap in tablet + valueChunkWriter.write(time, longValues[row], isNull); + if (valueChunkWriter.needANewPage()) { + valueChunkWriter.writePageToPageBuffer(); + } + } + break; + case FLOAT: + float[] floatValues = (float[]) tablet.values[columnIndex]; + for (int row = 0; row < tablet.rowSize; row++) { + long time = tablet.timestamps[row]; + boolean isNull = + tablet.bitMaps != null + && tablet.bitMaps[columnIndex] != null + && tablet.bitMaps[columnIndex].isMarked(row); + // check isNull by bitMap in tablet + valueChunkWriter.write(time, floatValues[row], isNull); + if (valueChunkWriter.needANewPage()) { + valueChunkWriter.writePageToPageBuffer(); + } + } + break; + case DOUBLE: + double[] doubleValues = (double[]) tablet.values[columnIndex]; + for (int row = 0; row < tablet.rowSize; row++) { + long time = tablet.timestamps[row]; + boolean isNull = + tablet.bitMaps != null + && tablet.bitMaps[columnIndex] != null + && tablet.bitMaps[columnIndex].isMarked(row); + // check isNull by bitMap in tablet + valueChunkWriter.write(time, doubleValues[row], isNull); + if (valueChunkWriter.needANewPage()) { + valueChunkWriter.writePageToPageBuffer(); + } + } + break; + case TEXT: + Binary[] binaryValues = (Binary[]) tablet.values[columnIndex]; + for (int row = 0; row < tablet.rowSize; row++) { + long time = tablet.timestamps[row]; + boolean isNull = + tablet.bitMaps != null + && tablet.bitMaps[columnIndex] != null + && tablet.bitMaps[columnIndex].isMarked(row); + // check isNull by bitMap in tablet + valueChunkWriter.write(time, binaryValues[row], isNull); + if (valueChunkWriter.needANewPage()) { + valueChunkWriter.writePageToPageBuffer(); + } + } + break; + default: + throw new UnSupportedDataTypeException( + String.format("Data type %s is not supported.", dataType)); + } + } + + if (measurementSchemas.size() != valueChunkWriterMap.size()) { + Set<String> existingMeasurements = + measurementSchemas.stream() + .map(MeasurementSchema::getMeasurementId) + .collect(Collectors.toSet()); + // write non-existed values + for (Map.Entry<String, ValueChunkWriter> entry : valueChunkWriterMap.entrySet()) { + if (!existingMeasurements.contains(entry.getKey())) { + ValueChunkWriter valueChunkWriter = entry.getValue(); + TSDataType dataType = valueChunkWriter.getDataType(); + switch (dataType) { + case BOOLEAN: + for (int row = 0; row < tablet.rowSize; row++) { + valueChunkWriter.write(-1, false, true); + if (valueChunkWriter.needANewPage()) { + valueChunkWriter.writePageToPageBuffer(); + } + } + break; + case INT32: + for (int row = 0; row < tablet.rowSize; row++) { + valueChunkWriter.write(-1, 0, true); + if (valueChunkWriter.needANewPage()) { + valueChunkWriter.writePageToPageBuffer(); + } + } + break; + case INT64: + for (int row = 0; row < tablet.rowSize; row++) { + valueChunkWriter.write(-1, 0L, true); + if (valueChunkWriter.needANewPage()) { + valueChunkWriter.writePageToPageBuffer(); + } + } + break; + case FLOAT: + for (int row = 0; row < tablet.rowSize; row++) { + valueChunkWriter.write(-1, 0.0f, true); + if (valueChunkWriter.needANewPage()) { + valueChunkWriter.writePageToPageBuffer(); + } + } + break; + case DOUBLE: + for (int row = 0; row < tablet.rowSize; row++) { + valueChunkWriter.write(-1, 0.0, true); + if (valueChunkWriter.needANewPage()) { + valueChunkWriter.writePageToPageBuffer(); + } + } + break; + case TEXT: + for (int row = 0; row < tablet.rowSize; row++) { + valueChunkWriter.write(-1, null, true); + if (valueChunkWriter.needANewPage()) { + valueChunkWriter.writePageToPageBuffer(); + } + } + break; + default: + throw new UnSupportedDataTypeException( + String.format("Data type %s is not supported.", dataType)); + } + } + } + } + + return tablet.rowSize; } @Override diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java index b96f1a09ec..bd37840d79 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java @@ -136,6 +136,10 @@ public class TimeChunkWriter { return false; } + public boolean needANewPage() { + return pageWriter.getPointNumber() >= maxNumberOfPointsInPage; + } + public void writePageToPageBuffer() { try { if (numOfPages == 0) { // record the firstPageStatistics diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java index 3ece00b403..7a143c9f95 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java @@ -252,6 +252,10 @@ public class ValueChunkWriter { return false; } + public boolean needANewPage() { + return pageWriter.getSize() >= maxNumberOfPointsInPage; + } + public void sealCurrentPage() { // if the page contains no points, we still need to serialize it if (pageWriter != null && pageWriter.getSize() != 0) {
