This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new 37adb3645e8 Pipe: Update TsFileInsertionScanDataContainer to support
partially sequential aligned chunks (#13168) (#13233)
37adb3645e8 is described below
commit 37adb3645e8dab1edc252ca1f8b7977473610470
Author: Caideyipi <[email protected]>
AuthorDate: Tue Aug 20 14:35:13 2024 +0800
Pipe: Update TsFileInsertionScanDataContainer to support partially
sequential aligned chunks (#13168) (#13233)
---
.../scan/TsFileInsertionScanDataContainer.java | 107 ++--
.../event/TsFileInsertionDataContainerTest.java | 553 +++++----------------
2 files changed, 200 insertions(+), 460 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index a841986064e..e2995f61fdc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -52,8 +52,10 @@ import java.io.File;
import java.io.IOException;
import java.time.LocalDate;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
@@ -71,6 +73,12 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
private boolean currentIsAligned;
private final List<MeasurementSchema> currentMeasurements = new
ArrayList<>();
+ // Cached time chunk
+ private final List<Chunk> timeChunkList = new ArrayList<>();
+ private final Map<String, Integer> measurementIndexMap = new HashMap<>();
+ private int lastIndex = -1;
+ private ChunkHeader firstChunkHeader4NextSequentialValueChunks;
+
private byte lastMarker = Byte.MIN_VALUE;
public TsFileInsertionScanDataContainer(
@@ -262,7 +270,6 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
private void moveToNextChunkReader() throws IOException,
IllegalStateException {
ChunkHeader chunkHeader;
- Chunk timeChunk = null;
final List<Chunk> valueChunkList = new ArrayList<>();
currentMeasurements.clear();
@@ -280,16 +287,8 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
case MetaMarker.TIME_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
- if (Objects.nonNull(timeChunk) && !currentMeasurements.isEmpty()) {
- chunkReader =
- isMultiPage
- ? new AlignedChunkReader(timeChunk, valueChunkList, filter)
- : new AlignedSinglePageWholeChunkReader(timeChunk,
valueChunkList);
- currentIsAligned = true;
- lastMarker = marker;
- return;
- }
-
+ // Notice that the data in one chunk group is either aligned or
non-aligned
+ // There is no need to consider non-aligned chunks when there are
value chunks
isMultiPage = marker == MetaMarker.CHUNK_HEADER || marker ==
MetaMarker.TIME_CHUNK_HEADER;
chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
@@ -302,9 +301,9 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
== TsFileConstant.TIME_COLUMN_MASK) {
- timeChunk =
+ timeChunkList.add(
new Chunk(
- chunkHeader, tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize()));
+ chunkHeader, tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize())));
break;
}
@@ -331,39 +330,61 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
return;
case MetaMarker.VALUE_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
- chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
+ if (Objects.isNull(firstChunkHeader4NextSequentialValueChunks)) {
+ chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
+
+ if (Objects.isNull(currentDevice)
+ || !pattern.matchesMeasurement(currentDevice,
chunkHeader.getMeasurementID())) {
+ tsFileSequenceReader.position(
+ tsFileSequenceReader.position() + chunkHeader.getDataSize());
+ break;
+ }
- if (Objects.isNull(currentDevice)
- || !pattern.matchesMeasurement(currentDevice,
chunkHeader.getMeasurementID())) {
- tsFileSequenceReader.position(
- tsFileSequenceReader.position() + chunkHeader.getDataSize());
- break;
- }
+ // Increase value index
+ final int valueIndex =
+ measurementIndexMap.compute(
+ chunkHeader.getMeasurementID(),
+ (measurement, index) -> Objects.nonNull(index) ? index + 1
: 0);
- // Do not record empty chunk
- if (chunkHeader.getDataSize() > 0) {
- valueChunkList.add(
- new Chunk(
- chunkHeader, tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize())));
- currentMeasurements.add(
- new MeasurementSchema(chunkHeader.getMeasurementID(),
chunkHeader.getDataType()));
+ // Emit when encountered non-sequential value chunk
+ // Do not record or end current value chunks when there are empty
chunks
+ if (chunkHeader.getDataSize() == 0) {
+ break;
+ }
+ boolean needReturn = false;
+ if (lastIndex >= 0 && valueIndex != lastIndex) {
+ needReturn = recordAlignedChunk(valueChunkList, marker);
+ }
+ lastIndex = valueIndex;
+ if (needReturn) {
+ firstChunkHeader4NextSequentialValueChunks = chunkHeader;
+ return;
+ }
+ } else {
+ chunkHeader = firstChunkHeader4NextSequentialValueChunks;
+ firstChunkHeader4NextSequentialValueChunks = null;
}
+
+ valueChunkList.add(
+ new Chunk(
+ chunkHeader, tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize())));
+ currentMeasurements.add(
+ new MeasurementSchema(chunkHeader.getMeasurementID(),
chunkHeader.getDataType()));
break;
case MetaMarker.CHUNK_GROUP_HEADER:
// Return before "currentDevice" changes
- if (Objects.nonNull(timeChunk) && !currentMeasurements.isEmpty()) {
- chunkReader =
- isMultiPage
- ? new AlignedChunkReader(timeChunk, valueChunkList, filter)
- : new AlignedSinglePageWholeChunkReader(timeChunk,
valueChunkList);
- currentIsAligned = true;
- lastMarker = marker;
+ if (recordAlignedChunk(valueChunkList, marker)) {
return;
}
// TODO: Replace it by IDeviceID
final String deviceID =
((PlainDeviceID)
tsFileSequenceReader.readChunkGroupHeader().getDeviceID())
.toStringID();
+ // Clear because the cached data will never be used in the next
chunk group
+ lastIndex = -1;
+ timeChunkList.clear();
+ measurementIndexMap.clear();
+
currentDevice = pattern.mayOverlapWithDevice(deviceID) ? deviceID :
null;
break;
case MetaMarker.OPERATION_INDEX_RANGE:
@@ -375,14 +396,22 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
}
lastMarker = marker;
- if (Objects.nonNull(timeChunk) && !currentMeasurements.isEmpty()) {
+ if (!recordAlignedChunk(valueChunkList, marker)) {
+ chunkReader = null;
+ }
+ }
+
+ private boolean recordAlignedChunk(final List<Chunk> valueChunkList, final
byte marker)
+ throws IOException {
+ if (!valueChunkList.isEmpty()) {
chunkReader =
isMultiPage
- ? new AlignedChunkReader(timeChunk, valueChunkList, filter)
- : new AlignedSinglePageWholeChunkReader(timeChunk,
valueChunkList);
+ ? new AlignedChunkReader(timeChunkList.get(lastIndex),
valueChunkList, filter)
+ : new
AlignedSinglePageWholeChunkReader(timeChunkList.get(lastIndex), valueChunkList);
currentIsAligned = true;
- } else {
- chunkReader = null;
+ lastMarker = marker;
+ return true;
}
+ return false;
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
index 74d500006f8..8e6ecbd80b7 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
@@ -26,10 +26,17 @@ import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.query.TsFileInsertionQueryDataContainer;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import org.apache.iotdb.pipe.api.access.Row;
import org.apache.tsfile.file.metadata.PlainDeviceID;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.read.common.Path;
+import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.TsFileGeneratorUtils;
import org.junit.After;
@@ -40,7 +47,9 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.util.Arrays;
import java.util.HashSet;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -59,6 +68,7 @@ public class TsFileInsertionDataContainerTest {
private File alignedTsFile;
private File nonalignedTsFile;
+ private TsFileResource resource;
@After
public void tearDown() throws Exception {
@@ -68,6 +78,9 @@ public class TsFileInsertionDataContainerTest {
if (nonalignedTsFile != null) {
nonalignedTsFile.delete();
}
+ if (Objects.nonNull(resource)) {
+ resource.remove();
+ }
}
@Test
@@ -85,6 +98,10 @@ public class TsFileInsertionDataContainerTest {
}
public void testToTabletInsertionEvents(final boolean isQuery) throws
Exception {
+ // Test empty chunk
+ testMixedTsFileWithEmptyChunk(isQuery);
+
+ // Test the combinations of pipe and tsFile settings
final Set<Integer> deviceNumbers = new HashSet<>();
deviceNumbers.add(1);
deviceNumbers.add(2);
@@ -329,120 +346,20 @@ public class TsFileInsertionDataContainerTest {
break;
}
- try (final TsFileInsertionDataContainer alignedContainer =
- isQuery
- ? new TsFileInsertionQueryDataContainer(
- alignedTsFile, rootPattern, startTime, endTime)
- : new TsFileInsertionScanDataContainer(
- alignedTsFile, rootPattern, startTime, endTime, null,
null);
- final TsFileInsertionDataContainer nonalignedContainer =
- isQuery
- ? new TsFileInsertionQueryDataContainer(
- nonalignedTsFile, rootPattern, startTime, endTime)
- : new TsFileInsertionScanDataContainer(
- nonalignedTsFile, rootPattern, startTime, endTime, null,
null)) {
- final AtomicInteger count1 = new AtomicInteger(0);
- final AtomicInteger count2 = new AtomicInteger(0);
- final AtomicInteger count3 = new AtomicInteger(0);
-
- alignedContainer
- .toTabletInsertionEvents()
- .forEach(
- event ->
- event
- .processRowByRow(
- (row, collector) -> {
- try {
- collector.collectRow(row);
- Assert.assertEquals(measurementNumber,
row.size());
- count1.incrementAndGet();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- })
- .forEach(
- tabletInsertionEvent1 ->
- tabletInsertionEvent1
- .processRowByRow(
- (row, collector) -> {
- try {
- collector.collectRow(row);
-
Assert.assertEquals(measurementNumber, row.size());
- count2.incrementAndGet();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- })
- .forEach(
- tabletInsertionEvent2 ->
- tabletInsertionEvent2.processTablet(
- (tablet, rowCollector) ->
- new
PipeRawTabletInsertionEvent(tablet, false)
- .processRowByRow(
- (row, collector) -> {
- try {
-
rowCollector.collectRow(row);
-
Assert.assertEquals(
-
measurementNumber, row.size());
-
count3.incrementAndGet();
- } catch
(IOException e) {
- throw new
RuntimeException(e);
- }
- })))));
-
- Assert.assertEquals(count1.getAndSet(0), deviceNumber *
expectedRowNumber);
- Assert.assertEquals(count2.getAndSet(0), deviceNumber *
expectedRowNumber);
- Assert.assertEquals(count3.getAndSet(0), deviceNumber *
expectedRowNumber);
-
- nonalignedContainer
- .toTabletInsertionEvents()
- .forEach(
- event ->
- event
- .processTablet(
- (tablet, rowCollector) ->
- new PipeRawTabletInsertionEvent(tablet, false)
- .processRowByRow(
- (row, collector) -> {
- try {
- rowCollector.collectRow(row);
- count1.addAndGet(row.size());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }))
- .forEach(
- tabletInsertionEvent1 ->
- tabletInsertionEvent1
- .processRowByRow(
- (row, collector) -> {
- try {
- collector.collectRow(row);
- count2.addAndGet(row.size());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- })
- .forEach(
- tabletInsertionEvent2 ->
-
tabletInsertionEvent2.processRowByRow(
- (row, collector) -> {
- try {
- collector.collectRow(row);
- count3.addAndGet(row.size());
- } catch (IOException e) {
- throw new
RuntimeException(e);
- }
- }))));
-
- // Calculate points in non-aligned tablets
- Assert.assertEquals(deviceNumber * expectedRowNumber *
measurementNumber, count1.get());
- Assert.assertEquals(deviceNumber * expectedRowNumber *
measurementNumber, count2.get());
- Assert.assertEquals(deviceNumber * expectedRowNumber *
measurementNumber, count3.get());
- } catch (final Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ testTsFilePointNum(
+ alignedTsFile,
+ rootPattern,
+ startTime,
+ endTime,
+ isQuery,
+ deviceNumber * expectedRowNumber * measurementNumber);
+ testTsFilePointNum(
+ nonalignedTsFile,
+ rootPattern,
+ startTime,
+ endTime,
+ isQuery,
+ deviceNumber * expectedRowNumber * measurementNumber);
final AtomicReference<String> oneDeviceInAlignedTsFile = new
AtomicReference<>();
final AtomicReference<String> oneMeasurementInAlignedTsFile = new
AtomicReference<>();
@@ -493,120 +410,20 @@ public class TsFileInsertionDataContainerTest {
break;
}
- try (final TsFileInsertionDataContainer alignedContainer =
- isQuery
- ? new TsFileInsertionQueryDataContainer(
- alignedTsFile, oneAlignedDevicePattern, startTime, endTime)
- : new TsFileInsertionScanDataContainer(
- alignedTsFile, oneAlignedDevicePattern, startTime,
endTime, null, null);
- final TsFileInsertionDataContainer nonalignedContainer =
- isQuery
- ? new TsFileInsertionQueryDataContainer(
- nonalignedTsFile, oneNonAlignedDevicePattern, startTime,
endTime)
- : new TsFileInsertionScanDataContainer(
- nonalignedTsFile, oneNonAlignedDevicePattern, startTime,
endTime, null, null)) {
- final AtomicInteger count1 = new AtomicInteger(0);
- final AtomicInteger count2 = new AtomicInteger(0);
- final AtomicInteger count3 = new AtomicInteger(0);
-
- alignedContainer
- .toTabletInsertionEvents()
- .forEach(
- event ->
- event
- .processRowByRow(
- (row, collector) -> {
- try {
- collector.collectRow(row);
- Assert.assertEquals(measurementNumber,
row.size());
- count1.incrementAndGet();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- })
- .forEach(
- tabletInsertionEvent1 ->
- tabletInsertionEvent1
- .processRowByRow(
- (row, collector) -> {
- try {
- collector.collectRow(row);
-
Assert.assertEquals(measurementNumber, row.size());
- count2.incrementAndGet();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- })
- .forEach(
- tabletInsertionEvent2 ->
- tabletInsertionEvent2.processTablet(
- (tablet, rowCollector) ->
- new
PipeRawTabletInsertionEvent(tablet, false)
- .processRowByRow(
- (row, collector) -> {
- try {
-
rowCollector.collectRow(row);
-
Assert.assertEquals(
-
measurementNumber, row.size());
-
count3.incrementAndGet();
- } catch
(IOException e) {
- throw new
RuntimeException(e);
- }
- })))));
-
- Assert.assertEquals(expectedRowNumber, count1.getAndSet(0));
- Assert.assertEquals(expectedRowNumber, count2.getAndSet(0));
- Assert.assertEquals(expectedRowNumber, count3.getAndSet(0));
-
- nonalignedContainer
- .toTabletInsertionEvents()
- .forEach(
- event ->
- event
- .processTablet(
- (tablet, rowCollector) ->
- new PipeRawTabletInsertionEvent(tablet, false)
- .processRowByRow(
- (row, collector) -> {
- try {
- rowCollector.collectRow(row);
- count1.addAndGet(row.size());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }))
- .forEach(
- tabletInsertionEvent1 ->
- tabletInsertionEvent1
- .processRowByRow(
- (row, collector) -> {
- try {
- collector.collectRow(row);
- count2.addAndGet(row.size());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- })
- .forEach(
- tabletInsertionEvent2 ->
-
tabletInsertionEvent2.processRowByRow(
- (row, collector) -> {
- try {
- collector.collectRow(row);
- count3.addAndGet(row.size());
- } catch (IOException e) {
- throw new
RuntimeException(e);
- }
- }))));
-
- // Calculate points in non-aligned tablets
- Assert.assertEquals(expectedRowNumber * measurementNumber, count1.get());
- Assert.assertEquals(expectedRowNumber * measurementNumber, count2.get());
- Assert.assertEquals(expectedRowNumber * measurementNumber, count3.get());
- } catch (final Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ testTsFilePointNum(
+ alignedTsFile,
+ oneAlignedDevicePattern,
+ startTime,
+ endTime,
+ isQuery,
+ expectedRowNumber * measurementNumber);
+ testTsFilePointNum(
+ nonalignedTsFile,
+ oneNonAlignedDevicePattern,
+ startTime,
+ endTime,
+ isQuery,
+ expectedRowNumber * measurementNumber);
final PipePattern oneAlignedMeasurementPattern;
final PipePattern oneNonAlignedMeasurementPattern;
@@ -624,126 +441,20 @@ public class TsFileInsertionDataContainerTest {
break;
}
- try (final TsFileInsertionDataContainer alignedContainer =
- isQuery
- ? new TsFileInsertionQueryDataContainer(
- alignedTsFile, oneAlignedMeasurementPattern, startTime,
endTime)
- : new TsFileInsertionScanDataContainer(
- alignedTsFile, oneAlignedMeasurementPattern, startTime,
endTime, null, null);
- final TsFileInsertionDataContainer nonalignedContainer =
- isQuery
- ? new TsFileInsertionQueryDataContainer(
- nonalignedTsFile, oneNonAlignedMeasurementPattern,
startTime, endTime)
- : new TsFileInsertionScanDataContainer(
- nonalignedTsFile,
- oneNonAlignedMeasurementPattern,
- startTime,
- endTime,
- null,
- null)) {
- final AtomicInteger count1 = new AtomicInteger(0);
- final AtomicInteger count2 = new AtomicInteger(0);
- final AtomicInteger count3 = new AtomicInteger(0);
-
- alignedContainer
- .toTabletInsertionEvents()
- .forEach(
- event ->
- event
- .processRowByRow(
- (row, collector) -> {
- try {
- collector.collectRow(row);
- Assert.assertEquals(1, row.size());
- count1.incrementAndGet();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- })
- .forEach(
- tabletInsertionEvent1 ->
- tabletInsertionEvent1
- .processRowByRow(
- (row, collector) -> {
- try {
- collector.collectRow(row);
- Assert.assertEquals(1, row.size());
- count2.incrementAndGet();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- })
- .forEach(
- tabletInsertionEvent2 ->
- tabletInsertionEvent2.processTablet(
- (tablet, rowCollector) ->
- new
PipeRawTabletInsertionEvent(tablet, false)
- .processRowByRow(
- (row, collector) -> {
- try {
-
rowCollector.collectRow(row);
-
Assert.assertEquals(1, row.size());
-
count3.incrementAndGet();
- } catch
(IOException e) {
- throw new
RuntimeException(e);
- }
- })))));
-
- Assert.assertEquals(expectedRowNumber, count1.getAndSet(0));
- Assert.assertEquals(expectedRowNumber, count2.getAndSet(0));
- Assert.assertEquals(expectedRowNumber, count3.getAndSet(0));
-
- nonalignedContainer
- .toTabletInsertionEvents()
- .forEach(
- event ->
- event
- .processTablet(
- (tablet, rowCollector) ->
- new PipeRawTabletInsertionEvent(tablet, false)
- .processRowByRow(
- (row, collector) -> {
- try {
- rowCollector.collectRow(row);
- Assert.assertEquals(1, row.size());
- count1.incrementAndGet();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }))
- .forEach(
- tabletInsertionEvent1 ->
- tabletInsertionEvent1
- .processRowByRow(
- (row, collector) -> {
- try {
- collector.collectRow(row);
- Assert.assertEquals(1, row.size());
- count2.incrementAndGet();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- })
- .forEach(
- tabletInsertionEvent2 ->
-
tabletInsertionEvent2.processRowByRow(
- (row, collector) -> {
- try {
- collector.collectRow(row);
- Assert.assertEquals(1,
row.size());
- count3.incrementAndGet();
- } catch (IOException e) {
- throw new
RuntimeException(e);
- }
- }))));
-
- Assert.assertEquals(expectedRowNumber, count1.get());
- Assert.assertEquals(expectedRowNumber, count2.get());
- Assert.assertEquals(expectedRowNumber, count3.get());
- } catch (final Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ testTsFilePointNum(
+ alignedTsFile,
+ oneAlignedMeasurementPattern,
+ startTime,
+ endTime,
+ isQuery,
+ expectedRowNumber);
+ testTsFilePointNum(
+ nonalignedTsFile,
+ oneNonAlignedMeasurementPattern,
+ startTime,
+ endTime,
+ isQuery,
+ expectedRowNumber);
final PipePattern notExistPattern;
switch (patternFormat) {
@@ -756,23 +467,64 @@ public class TsFileInsertionDataContainerTest {
break;
}
- try (final TsFileInsertionDataContainer alignedContainer =
- isQuery
- ? new TsFileInsertionQueryDataContainer(
- alignedTsFile, notExistPattern, startTime, endTime)
- : new TsFileInsertionScanDataContainer(
- alignedTsFile, notExistPattern, startTime, endTime, null,
null);
- final TsFileInsertionDataContainer nonalignedContainer =
- isQuery
- ? new TsFileInsertionQueryDataContainer(
- nonalignedTsFile, notExistPattern, startTime, endTime)
- : new TsFileInsertionScanDataContainer(
- nonalignedTsFile, notExistPattern, startTime, endTime,
null, null)) {
+ testTsFilePointNum(alignedTsFile, notExistPattern, startTime, endTime,
isQuery, 0);
+ testTsFilePointNum(nonalignedTsFile, notExistPattern, startTime, endTime,
isQuery, 0);
+ }
+
+ private void testMixedTsFileWithEmptyChunk(final boolean isQuery) throws
IOException {
+ final File tsFile = new File("0-0-1-0.tsfile");
+ resource = new TsFileResource(tsFile);
+ resource.updatePlanIndexes(0);
+ resource.setStatusForTest(TsFileResourceStatus.NORMAL);
+ try (final CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource)) {
+ writer.startChunkGroup("d1");
+ writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
+ Arrays.asList("s0", "s1", "s2"),
+ new TimeRange[] {new TimeRange(10, 40)},
+ TSEncoding.PLAIN,
+ CompressionType.LZ4,
+ Arrays.asList(false, false, true));
+ writer.endChunkGroup();
+ writer.startChunkGroup("d2");
+ writer.generateSimpleNonAlignedSeriesToCurrentDevice(
+ "s0", new TimeRange[] {new TimeRange(10, 40)}, TSEncoding.PLAIN,
CompressionType.LZ4);
+ writer.generateSimpleNonAlignedSeriesToCurrentDevice(
+ "s1",
+ new TimeRange[] {new TimeRange(40, 40), new TimeRange(50, 70)},
+ TSEncoding.PLAIN,
+ CompressionType.LZ4);
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+
+ testTsFilePointNum(
+ resource.getTsFile(),
+ new PrefixPipePattern("root"),
+ Long.MIN_VALUE,
+ Long.MAX_VALUE,
+ isQuery,
+ 115);
+ resource.remove();
+ resource = null;
+ }
+
+ private void testTsFilePointNum(
+ final File tsFile,
+ final PipePattern pattern,
+ final long startTime,
+ final long endTime,
+ final boolean isQuery,
+ final int expectedCount) {
+ try (final TsFileInsertionDataContainer tsFileContainer =
+ isQuery
+ ? new TsFileInsertionQueryDataContainer(tsFile, pattern,
startTime, endTime)
+ : new TsFileInsertionScanDataContainer(
+ tsFile, pattern, startTime, endTime, null, null)) {
final AtomicInteger count1 = new AtomicInteger(0);
final AtomicInteger count2 = new AtomicInteger(0);
final AtomicInteger count3 = new AtomicInteger(0);
- alignedContainer
+ tsFileContainer
.toTabletInsertionEvents()
.forEach(
event ->
@@ -781,9 +533,8 @@ public class TsFileInsertionDataContainerTest {
(row, collector) -> {
try {
collector.collectRow(row);
- Assert.assertEquals(0, row.size());
- count1.incrementAndGet();
- } catch (IOException e) {
+ count1.addAndGet(getNonNullSize(row));
+ } catch (final IOException e) {
throw new RuntimeException(e);
}
})
@@ -794,9 +545,8 @@ public class TsFileInsertionDataContainerTest {
(row, collector) -> {
try {
collector.collectRow(row);
- Assert.assertEquals(0, row.size());
- count2.incrementAndGet();
- } catch (IOException e) {
+
count2.addAndGet(getNonNullSize(row));
+ } catch (final IOException e) {
throw new RuntimeException(e);
}
})
@@ -809,67 +559,28 @@ public class TsFileInsertionDataContainerTest {
(row, collector) -> {
try {
rowCollector.collectRow(row);
-
Assert.assertEquals(0, row.size());
-
count3.incrementAndGet();
- } catch
(IOException e) {
+
count3.addAndGet(getNonNullSize(row));
+ } catch (final
IOException e) {
throw new
RuntimeException(e);
}
})))));
- Assert.assertEquals(0, count1.getAndSet(0));
- Assert.assertEquals(0, count2.getAndSet(0));
- Assert.assertEquals(0, count3.getAndSet(0));
-
- nonalignedContainer
- .toTabletInsertionEvents()
- .forEach(
- event ->
- event
- .processTablet(
- (tablet, rowCollector) ->
- new PipeRawTabletInsertionEvent(tablet, false)
- .processRowByRow(
- (row, collector) -> {
- try {
- rowCollector.collectRow(row);
- Assert.assertEquals(0, row.size());
- count1.incrementAndGet();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }))
- .forEach(
- tabletInsertionEvent1 ->
- tabletInsertionEvent1
- .processRowByRow(
- (row, collector) -> {
- try {
- collector.collectRow(row);
- Assert.assertEquals(0, row.size());
- count2.incrementAndGet();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- })
- .forEach(
- tabletInsertionEvent2 ->
-
tabletInsertionEvent2.processRowByRow(
- (row, collector) -> {
- try {
- collector.collectRow(row);
- Assert.assertEquals(0,
row.size());
- count3.incrementAndGet();
- } catch (IOException e) {
- throw new
RuntimeException(e);
- }
- }))));
-
- Assert.assertEquals(0, count1.get());
- Assert.assertEquals(0, count2.get());
- Assert.assertEquals(0, count3.get());
+ Assert.assertEquals(expectedCount, count1.get());
+ Assert.assertEquals(expectedCount, count2.get());
+ Assert.assertEquals(expectedCount, count3.get());
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
+
+ private int getNonNullSize(final Row row) {
+ int count = 0;
+ for (int i = 0; i < row.size(); ++i) {
+ if (!row.isNull(i)) {
+ ++count;
+ }
+ }
+ return count;
+ }
}