This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new 70134341618 [to rc/1.3.3] Check decoded page size in compaction
(#13432)
70134341618 is described below
commit 701343416188665c287cf3d6adab7b6bbde81396
Author: shuwenwei <[email protected]>
AuthorDate: Mon Sep 9 09:33:08 2024 +0800
[to rc/1.3.3] Check decoded page size in compaction (#13432)
* check decoded page size in seq compaction
* add comment in ut
* fix comment
* add comment
---
.../ReadChunkAlignedSeriesCompactionExecutor.java | 52 ++++++++++++++++++++--
.../dataregion/wal/io/WALInputStream.java | 1 -
...nkCompactionPerformerWithAlignedSeriesTest.java | 30 +++++++++++++
3 files changed, 79 insertions(+), 4 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java
index e8288d4cf5e..2404c99f114 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java
@@ -496,9 +496,13 @@ public class ReadChunkAlignedSeriesCompactionExecutor {
}
private boolean canFlushPage(PageLoader timePage, List<PageLoader>
valuePages) {
+ long count = timePage.getHeader().getStatistics().getCount();
boolean largeEnough =
- timePage.getHeader().getUncompressedSize() >= targetPageSize
- || timePage.getHeader().getStatistics().getCount() >=
targetPagePointNum;
+ count >= targetPagePointNum
+ || Math.max(
+ estimateMemorySizeAsPageWriter(timePage),
+ timePage.getHeader().getUncompressedSize())
+ >= targetPageSize;
if (timeSchema.getEncodingType() != timePage.getEncoding()
|| timeSchema.getCompressor() != timePage.getCompressionType()) {
return false;
@@ -516,11 +520,53 @@ public class ReadChunkAlignedSeriesCompactionExecutor {
if (valuePage.getModifiedStatus() == ModifiedStatus.PARTIAL_DELETED) {
return false;
}
- if (valuePage.getHeader().getUncompressedSize() >= targetPageSize) {
+ if (Math.max(
+ valuePage.getHeader().getUncompressedSize(),
+ estimateMemorySizeAsPageWriter(valuePage))
+ >= targetPageSize) {
largeEnough = true;
}
}
return largeEnough;
}
+
+ private long estimateMemorySizeAsPageWriter(PageLoader pageLoader) {
+ long count = pageLoader.getHeader().getStatistics().getCount();
+ long size;
+ switch (pageLoader.getDataType()) {
+ case INT32:
+ case DATE:
+ size = count * Integer.BYTES;
+ break;
+ case TIMESTAMP:
+ case INT64:
+ case VECTOR:
+ size = count * Long.BYTES;
+ break;
+ case FLOAT:
+ size = count * Float.BYTES;
+ break;
+ case DOUBLE:
+ size = count * Double.BYTES;
+ break;
+ case BOOLEAN:
+ size = count * Byte.BYTES;
+ break;
+ case TEXT:
+ case STRING:
+ case BLOB:
+ size = pageLoader.getHeader().getUncompressedSize();
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported data type: " + pageLoader.getDataType().toString());
+ }
+ // Due to the fact that the page writer in memory includes some other
objects
+ // and has a special calculation method, the estimated size will
actually be
+ // larger. So we simply adopt the method of multiplying by 1.05 times.
If this
+ // is not done, the result here might be close to the target page size
but
+ // slightly smaller.
+ return (long) (size * 1.05);
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
index 582e9448445..e3f544a8894 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
@@ -120,7 +120,6 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
channel.read(metadataSizeBuf, position);
metadataSizeBuf.flip();
int metadataSize = metadataSizeBuf.getInt();
- // -1 is for the endmarker
endOffset = channel.size() - version.getVersionBytes().length -
Integer.BYTES - metadataSize;
} finally {
if (version == WALFileVersion.V2) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/NewReadChunkCompactionPerformerWithAlignedSeriesTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/NewReadChunkCompactionPerformerWithAlignedSeriesTest.java
index fd11c884620..cbf105aed4a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/NewReadChunkCompactionPerformerWithAlignedSeriesTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/NewReadChunkCompactionPerformerWithAlignedSeriesTest.java
@@ -503,6 +503,36 @@ public class
NewReadChunkCompactionPerformerWithAlignedSeriesTest extends Abstra
Collections.singletonList(targetResource),
Collections.emptyList())));
}
+ @Test
+ public void testCompactionByFlushPage() throws Exception {
+ // chunk1: [[1000,7000]] chunk2: [[8000,15000]]
+ TsFileResource seqResource1 =
+ generateSingleAlignedSeriesFile(
+ "d0",
+ Arrays.asList("s0", "s1", "s2"),
+ new TimeRange[] {new TimeRange(1000, 7000), new TimeRange(8000,
15000)},
+ TSEncoding.RLE,
+ CompressionType.LZ4,
+ Arrays.asList(false, false, false),
+ true);
+ seqResources.add(seqResource1);
+ CompactionTaskSummary summary = new CompactionTaskSummary();
+ TsFileResource targetResource = performCompaction(summary);
+ seqResources.clear();
+ Assert.assertEquals(8, summary.getDeserializeChunkCount());
+ Assert.assertEquals(0, summary.getDirectlyFlushPageCount());
+ seqResources.add(targetResource);
+ summary = new CompactionTaskSummary();
+ // the point num of first page is less than 10000 because the page writer
of it reach the page
+ // size limit
+ // chunk1: [[1000,10053], [10054,15000]]
+ performCompaction(summary);
+ Assert.assertEquals(4, summary.getDeserializeChunkCount());
+ // the first aligned page can be flushed directly
+ Assert.assertEquals(4, summary.getDirectlyFlushPageCount());
+ Assert.assertEquals(4, summary.getDeserializePageCount());
+ }
+
@Test
public void testSimpleCompactionByWritePoint() throws Exception {
TsFileResource seqResource1 =