This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8885df31144 fix PointPriorityReader calculation of aligned series null
value num (#11945)
8885df31144 is described below
commit 8885df31144105714f33e82136d071a2136b0180
Author: shuwenwei <[email protected]>
AuthorDate: Mon Jan 22 15:08:27 2024 +0800
fix PointPriorityReader calculation of aligned series null value num
(#11945)
---
.../execute/utils/reader/PointPriorityReader.java | 2 +-
.../FastInnerCompactionPerformerTest.java | 108 +++++++++++++++++++++
2 files changed, 109 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/reader/PointPriorityReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/reader/PointPriorityReader.java
index f600593e9b7..918d147bbe5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/reader/PointPriorityReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/reader/PointPriorityReader.java
@@ -88,8 +88,8 @@ public class PointPriorityReader {
pointElementsWithSameTimestamp.add(pointQueue.poll());
TsPrimitiveType[] currentValues = currentPoint.getValue().getVector();
- int nullValueNum = currentValues.length;
while (!pointQueue.isEmpty()) {
+ int nullValueNum = currentValues.length;
if (pointQueue.peek().timestamp > lastTime) {
// the smallest time of all pages is later than the last time, then
break the loop
break;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastInnerCompactionPerformerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastInnerCompactionPerformerTest.java
index aafe6b52f56..35a79e81761 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastInnerCompactionPerformerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastInnerCompactionPerformerTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.storageengine.dataregion.compaction;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.MeasurementPath;
@@ -30,14 +31,27 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.Inne
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.reader.IDataBlockReader;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.reader.SeriesDataBlockReader;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionFileGeneratorUtils;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
import
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.AlignedChunkReader;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -51,6 +65,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -1886,4 +1901,97 @@ public class FastInnerCompactionPerformerTest extends
AbstractCompactionTest {
}
}
}
+
+ @Test
+ public void testMergeAlignedSeriesTimeValuePairFromDifferentFiles()
+ throws IOException, IllegalPathException {
+ TsFileResource resource1 = createEmptyFileAndResource(false);
+ try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource1)) {
+ writer.startChunkGroup("d1");
+ writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
+ Arrays.asList("s1", "s2", "s3", "s4", "s5"),
+ new TimeRange[] {new TimeRange(10, 20)},
+ TSEncoding.PLAIN,
+ CompressionType.LZ4,
+ Arrays.asList(true, true, true, false, false));
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ TsFileResource resource2 = createEmptyFileAndResource(false);
+ try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource2)) {
+ writer.startChunkGroup("d1");
+ writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
+ Arrays.asList("s1", "s2", "s3", "s4", "s5"),
+ new TimeRange[] {new TimeRange(10, 20)},
+ TSEncoding.PLAIN,
+ CompressionType.LZ4,
+ Arrays.asList(true, true, false, true, true));
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ TsFileResource resource3 = createEmptyFileAndResource(false);
+ try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource3)) {
+ writer.startChunkGroup("d1");
+ writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
+ Arrays.asList("s1", "s2", "s3", "s4", "s5"),
+ new TimeRange[] {new TimeRange(10, 20)},
+ TSEncoding.PLAIN,
+ CompressionType.LZ4,
+ Arrays.asList(false, true, true, true, true));
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ TsFileResource resource4 = createEmptyFileAndResource(false);
+ try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource4)) {
+ writer.startChunkGroup("d1");
+ writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
+ Arrays.asList("s1", "s2", "s3", "s4", "s5"),
+ new TimeRange[] {new TimeRange(10, 20)},
+ TSEncoding.PLAIN,
+ CompressionType.LZ4,
+ Arrays.asList(true, false, true, true, true));
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+
+ unseqResources.add(resource1);
+ unseqResources.add(resource2);
+ unseqResources.add(resource3);
+ unseqResources.add(resource4);
+
+ InnerSpaceCompactionTask task =
+ new InnerSpaceCompactionTask(
+ 0, tsFileManager, unseqResources, false, new
FastCompactionPerformer(false), 0);
+ Assert.assertTrue(task.start());
+
+ TsFileResource targetResource = tsFileManager.getTsFileList(false).get(0);
+ try (TsFileSequenceReader reader = new
TsFileSequenceReader(targetResource.getTsFilePath())) {
+ List<AlignedChunkMetadata> chunkMetadataList =
+ reader.getAlignedChunkMetadata("root.testsg.d1");
+ for (AlignedChunkMetadata alignedChunkMetadata : chunkMetadataList) {
+ ChunkMetadata timeChunkMetadata =
+ (ChunkMetadata) alignedChunkMetadata.getTimeChunkMetadata();
+ Chunk timeChunk = reader.readMemChunk(timeChunkMetadata);
+ List<Chunk> valueChunks = new ArrayList<>();
+ for (IChunkMetadata chunkMetadata :
alignedChunkMetadata.getValueChunkMetadataList()) {
+ Chunk valueChunk = reader.readMemChunk((ChunkMetadata)
chunkMetadata);
+ valueChunks.add(valueChunk);
+ }
+ AlignedChunkReader alignedChunkReader =
+ new AlignedChunkReader(timeChunk, valueChunks, null);
+ while (alignedChunkReader.hasNextSatisfiedPage()) {
+ BatchData batchData = alignedChunkReader.nextPageData();
+ IPointReader pointReader = batchData.getBatchDataIterator();
+ while (pointReader.hasNextTimeValuePair()) {
+ TimeValuePair timeValuePair = pointReader.nextTimeValuePair();
+ for (Object value : timeValuePair.getValues()) {
+ if (value == null) {
+ Assert.fail();
+ }
+ }
+ }
+ }
+ }
+ }
+ }
}