This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new 70134341618 [to rc/1.3.3] Check decoded page size in compaction 
(#13432)
70134341618 is described below

commit 701343416188665c287cf3d6adab7b6bbde81396
Author: shuwenwei <[email protected]>
AuthorDate: Mon Sep 9 09:33:08 2024 +0800

    [to rc/1.3.3] Check decoded page size in compaction (#13432)
    
    * check decoded page size in seq compaction
    
    * add comment in ut
    
    * fix comment
    
    * add comment
---
 .../ReadChunkAlignedSeriesCompactionExecutor.java  | 52 ++++++++++++++++++++--
 .../dataregion/wal/io/WALInputStream.java          |  1 -
 ...nkCompactionPerformerWithAlignedSeriesTest.java | 30 +++++++++++++
 3 files changed, 79 insertions(+), 4 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java
index e8288d4cf5e..2404c99f114 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java
@@ -496,9 +496,13 @@ public class ReadChunkAlignedSeriesCompactionExecutor {
     }
 
     private boolean canFlushPage(PageLoader timePage, List<PageLoader> 
valuePages) {
+      long count = timePage.getHeader().getStatistics().getCount();
       boolean largeEnough =
-          timePage.getHeader().getUncompressedSize() >= targetPageSize
-              || timePage.getHeader().getStatistics().getCount() >= 
targetPagePointNum;
+          count >= targetPagePointNum
+              || Math.max(
+                      estimateMemorySizeAsPageWriter(timePage),
+                      timePage.getHeader().getUncompressedSize())
+                  >= targetPageSize;
       if (timeSchema.getEncodingType() != timePage.getEncoding()
           || timeSchema.getCompressor() != timePage.getCompressionType()) {
         return false;
@@ -516,11 +520,53 @@ public class ReadChunkAlignedSeriesCompactionExecutor {
         if (valuePage.getModifiedStatus() == ModifiedStatus.PARTIAL_DELETED) {
           return false;
         }
-        if (valuePage.getHeader().getUncompressedSize() >= targetPageSize) {
+        if (Math.max(
+                valuePage.getHeader().getUncompressedSize(),
+                estimateMemorySizeAsPageWriter(valuePage))
+            >= targetPageSize) {
           largeEnough = true;
         }
       }
       return largeEnough;
     }
+
+    private long estimateMemorySizeAsPageWriter(PageLoader pageLoader) {
+      long count = pageLoader.getHeader().getStatistics().getCount();
+      long size;
+      switch (pageLoader.getDataType()) {
+        case INT32:
+        case DATE:
+          size = count * Integer.BYTES;
+          break;
+        case TIMESTAMP:
+        case INT64:
+        case VECTOR:
+          size = count * Long.BYTES;
+          break;
+        case FLOAT:
+          size = count * Float.BYTES;
+          break;
+        case DOUBLE:
+          size = count * Double.BYTES;
+          break;
+        case BOOLEAN:
+          size = count * Byte.BYTES;
+          break;
+        case TEXT:
+        case STRING:
+        case BLOB:
+          size = pageLoader.getHeader().getUncompressedSize();
+          break;
+        default:
+          throw new IllegalArgumentException(
+              "Unsupported data type: " + pageLoader.getDataType().toString());
+      }
+      // Due to the fact that the page writer in memory includes some other 
objects
+      // and has a special calculation method, the estimated size will 
actually be
+      // larger. So we simply adopt the method of multiplying by 1.05 times. 
If this
+      // is not done, the result here might be close to the target page size 
but
+      // slightly smaller.
+      return (long) (size * 1.05);
+    }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
index 582e9448445..e3f544a8894 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
@@ -120,7 +120,6 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
       channel.read(metadataSizeBuf, position);
       metadataSizeBuf.flip();
       int metadataSize = metadataSizeBuf.getInt();
-      // -1 is for the endmarker
       endOffset = channel.size() - version.getVersionBytes().length - 
Integer.BYTES - metadataSize;
     } finally {
       if (version == WALFileVersion.V2) {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/NewReadChunkCompactionPerformerWithAlignedSeriesTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/NewReadChunkCompactionPerformerWithAlignedSeriesTest.java
index fd11c884620..cbf105aed4a 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/NewReadChunkCompactionPerformerWithAlignedSeriesTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/NewReadChunkCompactionPerformerWithAlignedSeriesTest.java
@@ -503,6 +503,36 @@ public class 
NewReadChunkCompactionPerformerWithAlignedSeriesTest extends Abstra
                 Collections.singletonList(targetResource), 
Collections.emptyList())));
   }
 
+  @Test
+  public void testCompactionByFlushPage() throws Exception {
+    // chunk1: [[1000,7000]] chunk2: [[8000,15000]]
+    TsFileResource seqResource1 =
+        generateSingleAlignedSeriesFile(
+            "d0",
+            Arrays.asList("s0", "s1", "s2"),
+            new TimeRange[] {new TimeRange(1000, 7000), new TimeRange(8000, 
15000)},
+            TSEncoding.RLE,
+            CompressionType.LZ4,
+            Arrays.asList(false, false, false),
+            true);
+    seqResources.add(seqResource1);
+    CompactionTaskSummary summary = new CompactionTaskSummary();
+    TsFileResource targetResource = performCompaction(summary);
+    seqResources.clear();
+    Assert.assertEquals(8, summary.getDeserializeChunkCount());
+    Assert.assertEquals(0, summary.getDirectlyFlushPageCount());
+    seqResources.add(targetResource);
+    summary = new CompactionTaskSummary();
+    // the point num of first page is less than 10000 because the page writer 
of it reach the page
+    // size limit
+    // chunk1: [[1000,10053], [10054,15000]]
+    performCompaction(summary);
+    Assert.assertEquals(4, summary.getDeserializeChunkCount());
+    // the first aligned page can be flushed directly
+    Assert.assertEquals(4, summary.getDirectlyFlushPageCount());
+    Assert.assertEquals(4, summary.getDeserializePageCount());
+  }
+
   @Test
   public void testSimpleCompactionByWritePoint() throws Exception {
     TsFileResource seqResource1 =

Reply via email to