This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8885df31144 fix PointPriorityReader calculation of aligned series null 
value num (#11945)
8885df31144 is described below

commit 8885df31144105714f33e82136d071a2136b0180
Author: shuwenwei <[email protected]>
AuthorDate: Mon Jan 22 15:08:27 2024 +0800

    fix PointPriorityReader calculation of aligned series null value num 
(#11945)
---
 .../execute/utils/reader/PointPriorityReader.java  |   2 +-
 .../FastInnerCompactionPerformerTest.java          | 108 +++++++++++++++++++++
 2 files changed, 109 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/reader/PointPriorityReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/reader/PointPriorityReader.java
index f600593e9b7..918d147bbe5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/reader/PointPriorityReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/reader/PointPriorityReader.java
@@ -88,8 +88,8 @@ public class PointPriorityReader {
     pointElementsWithSameTimestamp.add(pointQueue.poll());
 
     TsPrimitiveType[] currentValues = currentPoint.getValue().getVector();
-    int nullValueNum = currentValues.length;
     while (!pointQueue.isEmpty()) {
+      int nullValueNum = currentValues.length;
       if (pointQueue.peek().timestamp > lastTime) {
         // the smallest time of all pages is later than the last time, then 
break the loop
         break;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastInnerCompactionPerformerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastInnerCompactionPerformerTest.java
index aafe6b52f56..35a79e81761 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastInnerCompactionPerformerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastInnerCompactionPerformerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.storageengine.dataregion.compaction;
 
+import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.AlignedPath;
 import org.apache.iotdb.commons.path.MeasurementPath;
@@ -30,14 +31,27 @@ import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.Inne
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.reader.IDataBlockReader;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.reader.SeriesDataBlockReader;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionFileGeneratorUtils;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
 import 
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.AlignedChunkReader;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -51,6 +65,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -1886,4 +1901,97 @@ public class FastInnerCompactionPerformerTest extends 
AbstractCompactionTest {
       }
     }
   }
+
+  @Test
+  public void testMergeAlignedSeriesTimeValuePairFromDifferentFiles()
+      throws IOException, IllegalPathException {
+    TsFileResource resource1 = createEmptyFileAndResource(false);
+    try (CompactionTestFileWriter writer = new 
CompactionTestFileWriter(resource1)) {
+      writer.startChunkGroup("d1");
+      writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
+          Arrays.asList("s1", "s2", "s3", "s4", "s5"),
+          new TimeRange[] {new TimeRange(10, 20)},
+          TSEncoding.PLAIN,
+          CompressionType.LZ4,
+          Arrays.asList(true, true, true, false, false));
+      writer.endChunkGroup();
+      writer.endFile();
+    }
+    TsFileResource resource2 = createEmptyFileAndResource(false);
+    try (CompactionTestFileWriter writer = new 
CompactionTestFileWriter(resource2)) {
+      writer.startChunkGroup("d1");
+      writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
+          Arrays.asList("s1", "s2", "s3", "s4", "s5"),
+          new TimeRange[] {new TimeRange(10, 20)},
+          TSEncoding.PLAIN,
+          CompressionType.LZ4,
+          Arrays.asList(true, true, false, true, true));
+      writer.endChunkGroup();
+      writer.endFile();
+    }
+    TsFileResource resource3 = createEmptyFileAndResource(false);
+    try (CompactionTestFileWriter writer = new 
CompactionTestFileWriter(resource3)) {
+      writer.startChunkGroup("d1");
+      writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
+          Arrays.asList("s1", "s2", "s3", "s4", "s5"),
+          new TimeRange[] {new TimeRange(10, 20)},
+          TSEncoding.PLAIN,
+          CompressionType.LZ4,
+          Arrays.asList(false, true, true, true, true));
+      writer.endChunkGroup();
+      writer.endFile();
+    }
+    TsFileResource resource4 = createEmptyFileAndResource(false);
+    try (CompactionTestFileWriter writer = new 
CompactionTestFileWriter(resource4)) {
+      writer.startChunkGroup("d1");
+      writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
+          Arrays.asList("s1", "s2", "s3", "s4", "s5"),
+          new TimeRange[] {new TimeRange(10, 20)},
+          TSEncoding.PLAIN,
+          CompressionType.LZ4,
+          Arrays.asList(true, false, true, true, true));
+      writer.endChunkGroup();
+      writer.endFile();
+    }
+
+    unseqResources.add(resource1);
+    unseqResources.add(resource2);
+    unseqResources.add(resource3);
+    unseqResources.add(resource4);
+
+    InnerSpaceCompactionTask task =
+        new InnerSpaceCompactionTask(
+            0, tsFileManager, unseqResources, false, new 
FastCompactionPerformer(false), 0);
+    Assert.assertTrue(task.start());
+
+    TsFileResource targetResource = tsFileManager.getTsFileList(false).get(0);
+    try (TsFileSequenceReader reader = new 
TsFileSequenceReader(targetResource.getTsFilePath())) {
+      List<AlignedChunkMetadata> chunkMetadataList =
+          reader.getAlignedChunkMetadata("root.testsg.d1");
+      for (AlignedChunkMetadata alignedChunkMetadata : chunkMetadataList) {
+        ChunkMetadata timeChunkMetadata =
+            (ChunkMetadata) alignedChunkMetadata.getTimeChunkMetadata();
+        Chunk timeChunk = reader.readMemChunk(timeChunkMetadata);
+        List<Chunk> valueChunks = new ArrayList<>();
+        for (IChunkMetadata chunkMetadata : 
alignedChunkMetadata.getValueChunkMetadataList()) {
+          Chunk valueChunk = reader.readMemChunk((ChunkMetadata) 
chunkMetadata);
+          valueChunks.add(valueChunk);
+        }
+        AlignedChunkReader alignedChunkReader =
+            new AlignedChunkReader(timeChunk, valueChunks, null);
+        while (alignedChunkReader.hasNextSatisfiedPage()) {
+          BatchData batchData = alignedChunkReader.nextPageData();
+          IPointReader pointReader = batchData.getBatchDataIterator();
+          while (pointReader.hasNextTimeValuePair()) {
+            TimeValuePair timeValuePair = pointReader.nextTimeValuePair();
+            for (Object value : timeValuePair.getValues()) {
+              if (value == null) {
+                Assert.fail();
+              }
+            }
+          }
+        }
+      }
+    }
+  }
 }

Reply via email to