This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new 37adb3645e8 Pipe: Update TsFileInsertionScanDataContainer to support 
partially sequential aligned chunks (#13168) (#13233)
37adb3645e8 is described below

commit 37adb3645e8dab1edc252ca1f8b7977473610470
Author: Caideyipi <[email protected]>
AuthorDate: Tue Aug 20 14:35:13 2024 +0800

    Pipe: Update TsFileInsertionScanDataContainer to support partially 
sequential aligned chunks (#13168) (#13233)
---
 .../scan/TsFileInsertionScanDataContainer.java     | 107 ++--
 .../event/TsFileInsertionDataContainerTest.java    | 553 +++++----------------
 2 files changed, 200 insertions(+), 460 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index a841986064e..e2995f61fdc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -52,8 +52,10 @@ import java.io.File;
 import java.io.IOException;
 import java.time.LocalDate;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 
@@ -71,6 +73,12 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
   private boolean currentIsAligned;
   private final List<MeasurementSchema> currentMeasurements = new 
ArrayList<>();
 
+  // Cached time chunk
+  private final List<Chunk> timeChunkList = new ArrayList<>();
+  private final Map<String, Integer> measurementIndexMap = new HashMap<>();
+  private int lastIndex = -1;
+  private ChunkHeader firstChunkHeader4NextSequentialValueChunks;
+
   private byte lastMarker = Byte.MIN_VALUE;
 
   public TsFileInsertionScanDataContainer(
@@ -262,7 +270,6 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
 
   private void moveToNextChunkReader() throws IOException, 
IllegalStateException {
     ChunkHeader chunkHeader;
-    Chunk timeChunk = null;
     final List<Chunk> valueChunkList = new ArrayList<>();
     currentMeasurements.clear();
 
@@ -280,16 +287,8 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
         case MetaMarker.TIME_CHUNK_HEADER:
         case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
         case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
-          if (Objects.nonNull(timeChunk) && !currentMeasurements.isEmpty()) {
-            chunkReader =
-                isMultiPage
-                    ? new AlignedChunkReader(timeChunk, valueChunkList, filter)
-                    : new AlignedSinglePageWholeChunkReader(timeChunk, 
valueChunkList);
-            currentIsAligned = true;
-            lastMarker = marker;
-            return;
-          }
-
+          // Notice that the data in one chunk group is either aligned or 
non-aligned
+          // There is no need to consider non-aligned chunks when there are 
value chunks
           isMultiPage = marker == MetaMarker.CHUNK_HEADER || marker == 
MetaMarker.TIME_CHUNK_HEADER;
 
           chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
@@ -302,9 +301,9 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
 
           if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
               == TsFileConstant.TIME_COLUMN_MASK) {
-            timeChunk =
+            timeChunkList.add(
                 new Chunk(
-                    chunkHeader, tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize()));
+                    chunkHeader, tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize())));
             break;
           }
 
@@ -331,39 +330,61 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
           return;
         case MetaMarker.VALUE_CHUNK_HEADER:
         case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
-          chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
+          if (Objects.isNull(firstChunkHeader4NextSequentialValueChunks)) {
+            chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
+
+            if (Objects.isNull(currentDevice)
+                || !pattern.matchesMeasurement(currentDevice, 
chunkHeader.getMeasurementID())) {
+              tsFileSequenceReader.position(
+                  tsFileSequenceReader.position() + chunkHeader.getDataSize());
+              break;
+            }
 
-          if (Objects.isNull(currentDevice)
-              || !pattern.matchesMeasurement(currentDevice, 
chunkHeader.getMeasurementID())) {
-            tsFileSequenceReader.position(
-                tsFileSequenceReader.position() + chunkHeader.getDataSize());
-            break;
-          }
+            // Increase value index
+            final int valueIndex =
+                measurementIndexMap.compute(
+                    chunkHeader.getMeasurementID(),
+                    (measurement, index) -> Objects.nonNull(index) ? index + 1 
: 0);
 
-          // Do not record empty chunk
-          if (chunkHeader.getDataSize() > 0) {
-            valueChunkList.add(
-                new Chunk(
-                    chunkHeader, tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize())));
-            currentMeasurements.add(
-                new MeasurementSchema(chunkHeader.getMeasurementID(), 
chunkHeader.getDataType()));
+            // Emit when encountered non-sequential value chunk
+            // Do not record or end current value chunks when there are empty 
chunks
+            if (chunkHeader.getDataSize() == 0) {
+              break;
+            }
+            boolean needReturn = false;
+            if (lastIndex >= 0 && valueIndex != lastIndex) {
+              needReturn = recordAlignedChunk(valueChunkList, marker);
+            }
+            lastIndex = valueIndex;
+            if (needReturn) {
+              firstChunkHeader4NextSequentialValueChunks = chunkHeader;
+              return;
+            }
+          } else {
+            chunkHeader = firstChunkHeader4NextSequentialValueChunks;
+            firstChunkHeader4NextSequentialValueChunks = null;
           }
+
+          valueChunkList.add(
+              new Chunk(
+                  chunkHeader, tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize())));
+          currentMeasurements.add(
+              new MeasurementSchema(chunkHeader.getMeasurementID(), 
chunkHeader.getDataType()));
           break;
         case MetaMarker.CHUNK_GROUP_HEADER:
           // Return before "currentDevice" changes
-          if (Objects.nonNull(timeChunk) && !currentMeasurements.isEmpty()) {
-            chunkReader =
-                isMultiPage
-                    ? new AlignedChunkReader(timeChunk, valueChunkList, filter)
-                    : new AlignedSinglePageWholeChunkReader(timeChunk, 
valueChunkList);
-            currentIsAligned = true;
-            lastMarker = marker;
+          if (recordAlignedChunk(valueChunkList, marker)) {
             return;
           }
           // TODO: Replace it by IDeviceID
           final String deviceID =
               ((PlainDeviceID) 
tsFileSequenceReader.readChunkGroupHeader().getDeviceID())
                   .toStringID();
+          // Clear because the cached data will never be used in the next 
chunk group
+          lastIndex = -1;
+          timeChunkList.clear();
+          measurementIndexMap.clear();
+
           currentDevice = pattern.mayOverlapWithDevice(deviceID) ? deviceID : 
null;
           break;
         case MetaMarker.OPERATION_INDEX_RANGE:
@@ -375,14 +396,22 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
     }
 
     lastMarker = marker;
-    if (Objects.nonNull(timeChunk) && !currentMeasurements.isEmpty()) {
+    if (!recordAlignedChunk(valueChunkList, marker)) {
+      chunkReader = null;
+    }
+  }
+
+  private boolean recordAlignedChunk(final List<Chunk> valueChunkList, final 
byte marker)
+      throws IOException {
+    if (!valueChunkList.isEmpty()) {
       chunkReader =
           isMultiPage
-              ? new AlignedChunkReader(timeChunk, valueChunkList, filter)
-              : new AlignedSinglePageWholeChunkReader(timeChunk, 
valueChunkList);
+              ? new AlignedChunkReader(timeChunkList.get(lastIndex), 
valueChunkList, filter)
+              : new 
AlignedSinglePageWholeChunkReader(timeChunkList.get(lastIndex), valueChunkList);
       currentIsAligned = true;
-    } else {
-      chunkReader = null;
+      lastMarker = marker;
+      return true;
     }
+    return false;
   }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
index 74d500006f8..8e6ecbd80b7 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
@@ -26,10 +26,17 @@ import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.query.TsFileInsertionQueryDataContainer;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import org.apache.iotdb.pipe.api.access.Row;
 
 import org.apache.tsfile.file.metadata.PlainDeviceID;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.read.TsFileSequenceReader;
 import org.apache.tsfile.read.common.Path;
+import org.apache.tsfile.read.common.TimeRange;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.TsFileGeneratorUtils;
 import org.junit.After;
@@ -40,7 +47,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashSet;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -59,6 +68,7 @@ public class TsFileInsertionDataContainerTest {
 
   private File alignedTsFile;
   private File nonalignedTsFile;
+  private TsFileResource resource;
 
   @After
   public void tearDown() throws Exception {
@@ -68,6 +78,9 @@ public class TsFileInsertionDataContainerTest {
     if (nonalignedTsFile != null) {
       nonalignedTsFile.delete();
     }
+    if (Objects.nonNull(resource)) {
+      resource.remove();
+    }
   }
 
   @Test
@@ -85,6 +98,10 @@ public class TsFileInsertionDataContainerTest {
   }
 
   public void testToTabletInsertionEvents(final boolean isQuery) throws 
Exception {
+    // Test empty chunk
+    testMixedTsFileWithEmptyChunk(isQuery);
+
+    // Test the combinations of pipe and tsFile settings
     final Set<Integer> deviceNumbers = new HashSet<>();
     deviceNumbers.add(1);
     deviceNumbers.add(2);
@@ -329,120 +346,20 @@ public class TsFileInsertionDataContainerTest {
         break;
     }
 
-    try (final TsFileInsertionDataContainer alignedContainer =
-            isQuery
-                ? new TsFileInsertionQueryDataContainer(
-                    alignedTsFile, rootPattern, startTime, endTime)
-                : new TsFileInsertionScanDataContainer(
-                    alignedTsFile, rootPattern, startTime, endTime, null, 
null);
-        final TsFileInsertionDataContainer nonalignedContainer =
-            isQuery
-                ? new TsFileInsertionQueryDataContainer(
-                    nonalignedTsFile, rootPattern, startTime, endTime)
-                : new TsFileInsertionScanDataContainer(
-                    nonalignedTsFile, rootPattern, startTime, endTime, null, 
null)) {
-      final AtomicInteger count1 = new AtomicInteger(0);
-      final AtomicInteger count2 = new AtomicInteger(0);
-      final AtomicInteger count3 = new AtomicInteger(0);
-
-      alignedContainer
-          .toTabletInsertionEvents()
-          .forEach(
-              event ->
-                  event
-                      .processRowByRow(
-                          (row, collector) -> {
-                            try {
-                              collector.collectRow(row);
-                              Assert.assertEquals(measurementNumber, 
row.size());
-                              count1.incrementAndGet();
-                            } catch (IOException e) {
-                              throw new RuntimeException(e);
-                            }
-                          })
-                      .forEach(
-                          tabletInsertionEvent1 ->
-                              tabletInsertionEvent1
-                                  .processRowByRow(
-                                      (row, collector) -> {
-                                        try {
-                                          collector.collectRow(row);
-                                          
Assert.assertEquals(measurementNumber, row.size());
-                                          count2.incrementAndGet();
-                                        } catch (IOException e) {
-                                          throw new RuntimeException(e);
-                                        }
-                                      })
-                                  .forEach(
-                                      tabletInsertionEvent2 ->
-                                          tabletInsertionEvent2.processTablet(
-                                              (tablet, rowCollector) ->
-                                                  new 
PipeRawTabletInsertionEvent(tablet, false)
-                                                      .processRowByRow(
-                                                          (row, collector) -> {
-                                                            try {
-                                                              
rowCollector.collectRow(row);
-                                                              
Assert.assertEquals(
-                                                                  
measurementNumber, row.size());
-                                                              
count3.incrementAndGet();
-                                                            } catch 
(IOException e) {
-                                                              throw new 
RuntimeException(e);
-                                                            }
-                                                          })))));
-
-      Assert.assertEquals(count1.getAndSet(0), deviceNumber * 
expectedRowNumber);
-      Assert.assertEquals(count2.getAndSet(0), deviceNumber * 
expectedRowNumber);
-      Assert.assertEquals(count3.getAndSet(0), deviceNumber * 
expectedRowNumber);
-
-      nonalignedContainer
-          .toTabletInsertionEvents()
-          .forEach(
-              event ->
-                  event
-                      .processTablet(
-                          (tablet, rowCollector) ->
-                              new PipeRawTabletInsertionEvent(tablet, false)
-                                  .processRowByRow(
-                                      (row, collector) -> {
-                                        try {
-                                          rowCollector.collectRow(row);
-                                          count1.addAndGet(row.size());
-                                        } catch (IOException e) {
-                                          throw new RuntimeException(e);
-                                        }
-                                      }))
-                      .forEach(
-                          tabletInsertionEvent1 ->
-                              tabletInsertionEvent1
-                                  .processRowByRow(
-                                      (row, collector) -> {
-                                        try {
-                                          collector.collectRow(row);
-                                          count2.addAndGet(row.size());
-                                        } catch (IOException e) {
-                                          throw new RuntimeException(e);
-                                        }
-                                      })
-                                  .forEach(
-                                      tabletInsertionEvent2 ->
-                                          
tabletInsertionEvent2.processRowByRow(
-                                              (row, collector) -> {
-                                                try {
-                                                  collector.collectRow(row);
-                                                  count3.addAndGet(row.size());
-                                                } catch (IOException e) {
-                                                  throw new 
RuntimeException(e);
-                                                }
-                                              }))));
-
-      // Calculate points in non-aligned tablets
-      Assert.assertEquals(deviceNumber * expectedRowNumber * 
measurementNumber, count1.get());
-      Assert.assertEquals(deviceNumber * expectedRowNumber * 
measurementNumber, count2.get());
-      Assert.assertEquals(deviceNumber * expectedRowNumber * 
measurementNumber, count3.get());
-    } catch (final Exception e) {
-      e.printStackTrace();
-      fail(e.getMessage());
-    }
+    testTsFilePointNum(
+        alignedTsFile,
+        rootPattern,
+        startTime,
+        endTime,
+        isQuery,
+        deviceNumber * expectedRowNumber * measurementNumber);
+    testTsFilePointNum(
+        nonalignedTsFile,
+        rootPattern,
+        startTime,
+        endTime,
+        isQuery,
+        deviceNumber * expectedRowNumber * measurementNumber);
 
     final AtomicReference<String> oneDeviceInAlignedTsFile = new 
AtomicReference<>();
     final AtomicReference<String> oneMeasurementInAlignedTsFile = new 
AtomicReference<>();
@@ -493,120 +410,20 @@ public class TsFileInsertionDataContainerTest {
         break;
     }
 
-    try (final TsFileInsertionDataContainer alignedContainer =
-            isQuery
-                ? new TsFileInsertionQueryDataContainer(
-                    alignedTsFile, oneAlignedDevicePattern, startTime, endTime)
-                : new TsFileInsertionScanDataContainer(
-                    alignedTsFile, oneAlignedDevicePattern, startTime, 
endTime, null, null);
-        final TsFileInsertionDataContainer nonalignedContainer =
-            isQuery
-                ? new TsFileInsertionQueryDataContainer(
-                    nonalignedTsFile, oneNonAlignedDevicePattern, startTime, 
endTime)
-                : new TsFileInsertionScanDataContainer(
-                    nonalignedTsFile, oneNonAlignedDevicePattern, startTime, 
endTime, null, null)) {
-      final AtomicInteger count1 = new AtomicInteger(0);
-      final AtomicInteger count2 = new AtomicInteger(0);
-      final AtomicInteger count3 = new AtomicInteger(0);
-
-      alignedContainer
-          .toTabletInsertionEvents()
-          .forEach(
-              event ->
-                  event
-                      .processRowByRow(
-                          (row, collector) -> {
-                            try {
-                              collector.collectRow(row);
-                              Assert.assertEquals(measurementNumber, 
row.size());
-                              count1.incrementAndGet();
-                            } catch (IOException e) {
-                              throw new RuntimeException(e);
-                            }
-                          })
-                      .forEach(
-                          tabletInsertionEvent1 ->
-                              tabletInsertionEvent1
-                                  .processRowByRow(
-                                      (row, collector) -> {
-                                        try {
-                                          collector.collectRow(row);
-                                          
Assert.assertEquals(measurementNumber, row.size());
-                                          count2.incrementAndGet();
-                                        } catch (IOException e) {
-                                          throw new RuntimeException(e);
-                                        }
-                                      })
-                                  .forEach(
-                                      tabletInsertionEvent2 ->
-                                          tabletInsertionEvent2.processTablet(
-                                              (tablet, rowCollector) ->
-                                                  new 
PipeRawTabletInsertionEvent(tablet, false)
-                                                      .processRowByRow(
-                                                          (row, collector) -> {
-                                                            try {
-                                                              
rowCollector.collectRow(row);
-                                                              
Assert.assertEquals(
-                                                                  
measurementNumber, row.size());
-                                                              
count3.incrementAndGet();
-                                                            } catch 
(IOException e) {
-                                                              throw new 
RuntimeException(e);
-                                                            }
-                                                          })))));
-
-      Assert.assertEquals(expectedRowNumber, count1.getAndSet(0));
-      Assert.assertEquals(expectedRowNumber, count2.getAndSet(0));
-      Assert.assertEquals(expectedRowNumber, count3.getAndSet(0));
-
-      nonalignedContainer
-          .toTabletInsertionEvents()
-          .forEach(
-              event ->
-                  event
-                      .processTablet(
-                          (tablet, rowCollector) ->
-                              new PipeRawTabletInsertionEvent(tablet, false)
-                                  .processRowByRow(
-                                      (row, collector) -> {
-                                        try {
-                                          rowCollector.collectRow(row);
-                                          count1.addAndGet(row.size());
-                                        } catch (IOException e) {
-                                          throw new RuntimeException(e);
-                                        }
-                                      }))
-                      .forEach(
-                          tabletInsertionEvent1 ->
-                              tabletInsertionEvent1
-                                  .processRowByRow(
-                                      (row, collector) -> {
-                                        try {
-                                          collector.collectRow(row);
-                                          count2.addAndGet(row.size());
-                                        } catch (IOException e) {
-                                          throw new RuntimeException(e);
-                                        }
-                                      })
-                                  .forEach(
-                                      tabletInsertionEvent2 ->
-                                          
tabletInsertionEvent2.processRowByRow(
-                                              (row, collector) -> {
-                                                try {
-                                                  collector.collectRow(row);
-                                                  count3.addAndGet(row.size());
-                                                } catch (IOException e) {
-                                                  throw new 
RuntimeException(e);
-                                                }
-                                              }))));
-
-      // Calculate points in non-aligned tablets
-      Assert.assertEquals(expectedRowNumber * measurementNumber, count1.get());
-      Assert.assertEquals(expectedRowNumber * measurementNumber, count2.get());
-      Assert.assertEquals(expectedRowNumber * measurementNumber, count3.get());
-    } catch (final Exception e) {
-      e.printStackTrace();
-      fail(e.getMessage());
-    }
+    testTsFilePointNum(
+        alignedTsFile,
+        oneAlignedDevicePattern,
+        startTime,
+        endTime,
+        isQuery,
+        expectedRowNumber * measurementNumber);
+    testTsFilePointNum(
+        nonalignedTsFile,
+        oneNonAlignedDevicePattern,
+        startTime,
+        endTime,
+        isQuery,
+        expectedRowNumber * measurementNumber);
 
     final PipePattern oneAlignedMeasurementPattern;
     final PipePattern oneNonAlignedMeasurementPattern;
@@ -624,126 +441,20 @@ public class TsFileInsertionDataContainerTest {
         break;
     }
 
-    try (final TsFileInsertionDataContainer alignedContainer =
-            isQuery
-                ? new TsFileInsertionQueryDataContainer(
-                    alignedTsFile, oneAlignedMeasurementPattern, startTime, 
endTime)
-                : new TsFileInsertionScanDataContainer(
-                    alignedTsFile, oneAlignedMeasurementPattern, startTime, 
endTime, null, null);
-        final TsFileInsertionDataContainer nonalignedContainer =
-            isQuery
-                ? new TsFileInsertionQueryDataContainer(
-                    nonalignedTsFile, oneNonAlignedMeasurementPattern, 
startTime, endTime)
-                : new TsFileInsertionScanDataContainer(
-                    nonalignedTsFile,
-                    oneNonAlignedMeasurementPattern,
-                    startTime,
-                    endTime,
-                    null,
-                    null)) {
-      final AtomicInteger count1 = new AtomicInteger(0);
-      final AtomicInteger count2 = new AtomicInteger(0);
-      final AtomicInteger count3 = new AtomicInteger(0);
-
-      alignedContainer
-          .toTabletInsertionEvents()
-          .forEach(
-              event ->
-                  event
-                      .processRowByRow(
-                          (row, collector) -> {
-                            try {
-                              collector.collectRow(row);
-                              Assert.assertEquals(1, row.size());
-                              count1.incrementAndGet();
-                            } catch (IOException e) {
-                              throw new RuntimeException(e);
-                            }
-                          })
-                      .forEach(
-                          tabletInsertionEvent1 ->
-                              tabletInsertionEvent1
-                                  .processRowByRow(
-                                      (row, collector) -> {
-                                        try {
-                                          collector.collectRow(row);
-                                          Assert.assertEquals(1, row.size());
-                                          count2.incrementAndGet();
-                                        } catch (IOException e) {
-                                          throw new RuntimeException(e);
-                                        }
-                                      })
-                                  .forEach(
-                                      tabletInsertionEvent2 ->
-                                          tabletInsertionEvent2.processTablet(
-                                              (tablet, rowCollector) ->
-                                                  new 
PipeRawTabletInsertionEvent(tablet, false)
-                                                      .processRowByRow(
-                                                          (row, collector) -> {
-                                                            try {
-                                                              
rowCollector.collectRow(row);
-                                                              
Assert.assertEquals(1, row.size());
-                                                              
count3.incrementAndGet();
-                                                            } catch 
(IOException e) {
-                                                              throw new 
RuntimeException(e);
-                                                            }
-                                                          })))));
-
-      Assert.assertEquals(expectedRowNumber, count1.getAndSet(0));
-      Assert.assertEquals(expectedRowNumber, count2.getAndSet(0));
-      Assert.assertEquals(expectedRowNumber, count3.getAndSet(0));
-
-      nonalignedContainer
-          .toTabletInsertionEvents()
-          .forEach(
-              event ->
-                  event
-                      .processTablet(
-                          (tablet, rowCollector) ->
-                              new PipeRawTabletInsertionEvent(tablet, false)
-                                  .processRowByRow(
-                                      (row, collector) -> {
-                                        try {
-                                          rowCollector.collectRow(row);
-                                          Assert.assertEquals(1, row.size());
-                                          count1.incrementAndGet();
-                                        } catch (IOException e) {
-                                          throw new RuntimeException(e);
-                                        }
-                                      }))
-                      .forEach(
-                          tabletInsertionEvent1 ->
-                              tabletInsertionEvent1
-                                  .processRowByRow(
-                                      (row, collector) -> {
-                                        try {
-                                          collector.collectRow(row);
-                                          Assert.assertEquals(1, row.size());
-                                          count2.incrementAndGet();
-                                        } catch (IOException e) {
-                                          throw new RuntimeException(e);
-                                        }
-                                      })
-                                  .forEach(
-                                      tabletInsertionEvent2 ->
-                                          
tabletInsertionEvent2.processRowByRow(
-                                              (row, collector) -> {
-                                                try {
-                                                  collector.collectRow(row);
-                                                  Assert.assertEquals(1, 
row.size());
-                                                  count3.incrementAndGet();
-                                                } catch (IOException e) {
-                                                  throw new 
RuntimeException(e);
-                                                }
-                                              }))));
-
-      Assert.assertEquals(expectedRowNumber, count1.get());
-      Assert.assertEquals(expectedRowNumber, count2.get());
-      Assert.assertEquals(expectedRowNumber, count3.get());
-    } catch (final Exception e) {
-      e.printStackTrace();
-      fail(e.getMessage());
-    }
+    testTsFilePointNum(
+        alignedTsFile,
+        oneAlignedMeasurementPattern,
+        startTime,
+        endTime,
+        isQuery,
+        expectedRowNumber);
+    testTsFilePointNum(
+        nonalignedTsFile,
+        oneNonAlignedMeasurementPattern,
+        startTime,
+        endTime,
+        isQuery,
+        expectedRowNumber);
 
     final PipePattern notExistPattern;
     switch (patternFormat) {
@@ -756,23 +467,64 @@ public class TsFileInsertionDataContainerTest {
         break;
     }
 
-    try (final TsFileInsertionDataContainer alignedContainer =
-            isQuery
-                ? new TsFileInsertionQueryDataContainer(
-                    alignedTsFile, notExistPattern, startTime, endTime)
-                : new TsFileInsertionScanDataContainer(
-                    alignedTsFile, notExistPattern, startTime, endTime, null, 
null);
-        final TsFileInsertionDataContainer nonalignedContainer =
-            isQuery
-                ? new TsFileInsertionQueryDataContainer(
-                    nonalignedTsFile, notExistPattern, startTime, endTime)
-                : new TsFileInsertionScanDataContainer(
-                    nonalignedTsFile, notExistPattern, startTime, endTime, 
null, null)) {
+    testTsFilePointNum(alignedTsFile, notExistPattern, startTime, endTime, 
isQuery, 0);
+    testTsFilePointNum(nonalignedTsFile, notExistPattern, startTime, endTime, 
isQuery, 0);
+  }
+
+  private void testMixedTsFileWithEmptyChunk(final boolean isQuery) throws 
IOException {
+    final File tsFile = new File("0-0-1-0.tsfile");
+    resource = new TsFileResource(tsFile);
+    resource.updatePlanIndexes(0);
+    resource.setStatusForTest(TsFileResourceStatus.NORMAL);
+    try (final CompactionTestFileWriter writer = new 
CompactionTestFileWriter(resource)) {
+      writer.startChunkGroup("d1");
+      writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
+          Arrays.asList("s0", "s1", "s2"),
+          new TimeRange[] {new TimeRange(10, 40)},
+          TSEncoding.PLAIN,
+          CompressionType.LZ4,
+          Arrays.asList(false, false, true));
+      writer.endChunkGroup();
+      writer.startChunkGroup("d2");
+      writer.generateSimpleNonAlignedSeriesToCurrentDevice(
+          "s0", new TimeRange[] {new TimeRange(10, 40)}, TSEncoding.PLAIN, 
CompressionType.LZ4);
+      writer.generateSimpleNonAlignedSeriesToCurrentDevice(
+          "s1",
+          new TimeRange[] {new TimeRange(40, 40), new TimeRange(50, 70)},
+          TSEncoding.PLAIN,
+          CompressionType.LZ4);
+      writer.endChunkGroup();
+      writer.endFile();
+    }
+
+    testTsFilePointNum(
+        resource.getTsFile(),
+        new PrefixPipePattern("root"),
+        Long.MIN_VALUE,
+        Long.MAX_VALUE,
+        isQuery,
+        115);
+    resource.remove();
+    resource = null;
+  }
+
+  private void testTsFilePointNum(
+      final File tsFile,
+      final PipePattern pattern,
+      final long startTime,
+      final long endTime,
+      final boolean isQuery,
+      final int expectedCount) {
+    try (final TsFileInsertionDataContainer tsFileContainer =
+        isQuery
+            ? new TsFileInsertionQueryDataContainer(tsFile, pattern, 
startTime, endTime)
+            : new TsFileInsertionScanDataContainer(
+                tsFile, pattern, startTime, endTime, null, null)) {
       final AtomicInteger count1 = new AtomicInteger(0);
       final AtomicInteger count2 = new AtomicInteger(0);
       final AtomicInteger count3 = new AtomicInteger(0);
 
-      alignedContainer
+      tsFileContainer
           .toTabletInsertionEvents()
           .forEach(
               event ->
@@ -781,9 +533,8 @@ public class TsFileInsertionDataContainerTest {
                           (row, collector) -> {
                             try {
                               collector.collectRow(row);
-                              Assert.assertEquals(0, row.size());
-                              count1.incrementAndGet();
-                            } catch (IOException e) {
+                              count1.addAndGet(getNonNullSize(row));
+                            } catch (final IOException e) {
                               throw new RuntimeException(e);
                             }
                           })
@@ -794,9 +545,8 @@ public class TsFileInsertionDataContainerTest {
                                       (row, collector) -> {
                                         try {
                                           collector.collectRow(row);
-                                          Assert.assertEquals(0, row.size());
-                                          count2.incrementAndGet();
-                                        } catch (IOException e) {
+                                          
count2.addAndGet(getNonNullSize(row));
+                                        } catch (final IOException e) {
                                           throw new RuntimeException(e);
                                         }
                                       })
@@ -809,67 +559,28 @@ public class TsFileInsertionDataContainerTest {
                                                           (row, collector) -> {
                                                             try {
                                                               
rowCollector.collectRow(row);
-                                                              
Assert.assertEquals(0, row.size());
-                                                              
count3.incrementAndGet();
-                                                            } catch 
(IOException e) {
+                                                              
count3.addAndGet(getNonNullSize(row));
+                                                            } catch (final 
IOException e) {
                                                               throw new 
RuntimeException(e);
                                                             }
                                                           })))));
 
-      Assert.assertEquals(0, count1.getAndSet(0));
-      Assert.assertEquals(0, count2.getAndSet(0));
-      Assert.assertEquals(0, count3.getAndSet(0));
-
-      nonalignedContainer
-          .toTabletInsertionEvents()
-          .forEach(
-              event ->
-                  event
-                      .processTablet(
-                          (tablet, rowCollector) ->
-                              new PipeRawTabletInsertionEvent(tablet, false)
-                                  .processRowByRow(
-                                      (row, collector) -> {
-                                        try {
-                                          rowCollector.collectRow(row);
-                                          Assert.assertEquals(0, row.size());
-                                          count1.incrementAndGet();
-                                        } catch (IOException e) {
-                                          throw new RuntimeException(e);
-                                        }
-                                      }))
-                      .forEach(
-                          tabletInsertionEvent1 ->
-                              tabletInsertionEvent1
-                                  .processRowByRow(
-                                      (row, collector) -> {
-                                        try {
-                                          collector.collectRow(row);
-                                          Assert.assertEquals(0, row.size());
-                                          count2.incrementAndGet();
-                                        } catch (IOException e) {
-                                          throw new RuntimeException(e);
-                                        }
-                                      })
-                                  .forEach(
-                                      tabletInsertionEvent2 ->
-                                          
tabletInsertionEvent2.processRowByRow(
-                                              (row, collector) -> {
-                                                try {
-                                                  collector.collectRow(row);
-                                                  Assert.assertEquals(0, 
row.size());
-                                                  count3.incrementAndGet();
-                                                } catch (IOException e) {
-                                                  throw new 
RuntimeException(e);
-                                                }
-                                              }))));
-
-      Assert.assertEquals(0, count1.get());
-      Assert.assertEquals(0, count2.get());
-      Assert.assertEquals(0, count3.get());
+      Assert.assertEquals(expectedCount, count1.get());
+      Assert.assertEquals(expectedCount, count2.get());
+      Assert.assertEquals(expectedCount, count3.get());
     } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
     }
   }
+
+  private int getNonNullSize(final Row row) {
+    int count = 0;
+    for (int i = 0; i < row.size(); ++i) {
+      if (!row.isNull(i)) {
+        ++count;
+      }
+    }
+    return count;
+  }
 }


Reply via email to