This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch AlignedTsFileAPIBug in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a008b576aec03baa54debc408641680bec7be6e1 Author: JackieTien97 <[email protected]> AuthorDate: Thu May 26 10:21:57 2022 +0800 Fix aligned write bug in tsfile api --- .../write/chunk/AlignedChunkGroupWriterImpl.java | 85 +++++++++++++++------- 1 file changed, 60 insertions(+), 25 deletions(-) diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java index 79d9ed271e..9a43b55b7b 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java @@ -38,11 +38,11 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter { private static final Logger LOG = LoggerFactory.getLogger(AlignedChunkGroupWriterImpl.class); @@ -50,11 +50,9 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter { private final String deviceId; // measurementID -> ValueChunkWriter - private Map<String, ValueChunkWriter> valueChunkWriterMap = new LinkedHashMap<>(); + private final Map<String, ValueChunkWriter> valueChunkWriterMap = new LinkedHashMap<>(); - private TimeChunkWriter timeChunkWriter; - - private Set<String> writenMeasurementSet = new HashSet<>(); + private final TimeChunkWriter timeChunkWriter; private long lastTime = -1; @@ -103,10 +101,16 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter { @Override public int write(long time, List<DataPoint> data) throws WriteProcessException, IOException { - checkIsHistoryData("", time); - + checkIsHistoryData(time); + List<ValueChunkWriter> emptyValueChunkWriters = new ArrayList<>(); + Set<String> existingMeasurements = + data.stream().map(DataPoint::getMeasurementId).collect(Collectors.toSet()); + for (Map.Entry<String, ValueChunkWriter> entry : valueChunkWriterMap.entrySet()) { + if (!existingMeasurements.contains(entry.getKey())) { + emptyValueChunkWriters.add(entry.getValue()); + } + } for (DataPoint point : data) { - writenMeasurementSet.add(point.getMeasurementId()); boolean isNull = point.getValue() == null; ValueChunkWriter valueChunkWriter = valueChunkWriterMap.get(point.getMeasurementId()); switch (point.getType()) { @@ -133,7 +137,9 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter { String.format("Data type %s is not supported.", point.getType())); } } - writeEmptyDataInOneRow(time); + if (!emptyValueChunkWriters.isEmpty()) { + writeEmptyDataInOneRow(emptyValueChunkWriters); + } timeChunkWriter.write(time); lastTime = time; if (checkPageSizeAndMayOpenANewPage()) { @@ -146,18 +152,25 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter { public int write(Tablet tablet) throws WriteProcessException, IOException { int pointCount = 0; List<MeasurementSchema> measurementSchemas = tablet.getSchemas(); + List<ValueChunkWriter> emptyValueChunkWriters = new ArrayList<>(); + Set<String> existingMeasurements = + measurementSchemas.stream() + .map(MeasurementSchema::getMeasurementId) + .collect(Collectors.toSet()); + for (Map.Entry<String, ValueChunkWriter> entry : valueChunkWriterMap.entrySet()) { + if (!existingMeasurements.contains(entry.getKey())) { + emptyValueChunkWriters.add(entry.getValue()); + } + } for (int row = 0; row < tablet.rowSize; row++) { long time = tablet.timestamps[row]; - checkIsHistoryData("", time); + checkIsHistoryData(time); for (int columnIndex = 0; columnIndex < measurementSchemas.size(); columnIndex++) { - writenMeasurementSet.add(measurementSchemas.get(columnIndex).getMeasurementId()); - boolean isNull = false; + boolean isNull = + tablet.bitMaps != null + && tablet.bitMaps[columnIndex] != null + && tablet.bitMaps[columnIndex].isMarked(row); // check isNull by bitMap in tablet - if (tablet.bitMaps != null - && tablet.bitMaps[columnIndex] != null - && tablet.bitMaps[columnIndex].isMarked(row)) { - isNull = true; - } ValueChunkWriter valueChunkWriter = valueChunkWriterMap.get(measurementSchemas.get(columnIndex).getMeasurementId()); switch (measurementSchemas.get(columnIndex).getType()) { @@ -186,7 +199,9 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter { measurementSchemas.get(columnIndex).getType())); } } - writeEmptyDataInOneRow(time); + if (!emptyValueChunkWriters.isEmpty()) { + writeEmptyDataInOneRow(emptyValueChunkWriters); + } timeChunkWriter.write(time); lastTime = time; if (checkPageSizeAndMayOpenANewPage()) { @@ -241,13 +256,33 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter { } } - private void writeEmptyDataInOneRow(long time) { - for (Map.Entry<String, ValueChunkWriter> entry : valueChunkWriterMap.entrySet()) { - if (!writenMeasurementSet.contains(entry.getKey())) { - entry.getValue().write(time, 0, true); + private void writeEmptyDataInOneRow(List<ValueChunkWriter> valueChunkWriterList) { + for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) { + TSDataType dataType = valueChunkWriter.getDataType(); + switch (dataType) { + case BOOLEAN: + valueChunkWriter.write(-1, false, true); + break; + case INT32: + valueChunkWriter.write(-1, 0, true); + break; + case INT64: + valueChunkWriter.write(-1, 0L, true); + break; + case FLOAT: + valueChunkWriter.write(-1, 0.0f, true); + break; + case DOUBLE: + valueChunkWriter.write(-1, 0.0d, true); + break; + case TEXT: + valueChunkWriter.write(-1, null, true); + break; + default: + throw new UnSupportedDataTypeException( + String.format("Data type %s is not supported.", dataType)); } } - writenMeasurementSet.clear(); } /** @@ -280,13 +315,13 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter { } } - private void checkIsHistoryData(String measurementId, long time) throws WriteProcessException { + private void checkIsHistoryData(long time) throws WriteProcessException { if (time <= lastTime) { throw new WriteProcessException( "Not allowed to write out-of-order data in timeseries " + deviceId + TsFileConstant.PATH_SEPARATOR - + measurementId + + "" + ", time should later than " + lastTime); }
