This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new b8b1238c49 Fix chunk size overflow (#6029)
b8b1238c49 is described below

commit b8b1238c49b66c5c0cb84d6764619b064ffcc53f
Author: Liu Xuxin <[email protected]>
AuthorDate: Fri May 27 09:03:59 2022 +0800

    Fix chunk size overflow (#6029)
---
 .../engine/compaction/utils/CompactionUtils.java   | 37 ++++++----
 .../compaction/LevelCompactionMergeTest.java       | 82 ++++++++++++++++++++++
 2 files changed, 106 insertions(+), 13 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index 936a451274..585039e98d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -72,10 +72,11 @@ public class CompactionUtils {
     throw new IllegalStateException("Utility class");
   }
 
-  private static Pair<ChunkMetadata, Chunk> readByAppendPageMerge(
+  private static List<Pair<ChunkMetadata, Chunk>> readByAppendPageMerge(
       Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap) 
throws IOException {
     ChunkMetadata newChunkMetadata = null;
     Chunk newChunk = null;
+    List<Pair<ChunkMetadata, Chunk>> newChunkList = new LinkedList<>();
     for (Entry<TsFileSequenceReader, List<ChunkMetadata>> entry :
         readerChunkMetadataMap.entrySet()) {
       TsFileSequenceReader reader = entry.getKey();
@@ -89,9 +90,17 @@ public class CompactionUtils {
           newChunk.mergeChunk(chunk);
           newChunkMetadata.mergeChunkMetadata(chunkMetadata);
         }
+        if (newChunk.getHeader().getDataSize() >= 8L * 1024L * 1024L) {
+          newChunkList.add(new Pair<>(newChunkMetadata, newChunk));
+          newChunkMetadata = null;
+          newChunk = null;
+        }
       }
     }
-    return new Pair<>(newChunkMetadata, newChunk);
+    if (newChunk != null) {
+      newChunkList.add(new Pair<>(newChunkMetadata, newChunk));
+    }
+    return newChunkList;
   }
 
   private static void readByDeserializePageMerge(
@@ -156,17 +165,19 @@ public class CompactionUtils {
       TsFileResource targetResource,
       RestorableTsFileIOWriter writer)
       throws IOException {
-    Pair<ChunkMetadata, Chunk> chunkPair = 
readByAppendPageMerge(entry.getValue());
-    ChunkMetadata newChunkMetadata = chunkPair.left;
-    Chunk newChunk = chunkPair.right;
-    if (newChunkMetadata != null && newChunk != null) {
-      // wait for limit write
-      MergeManager.mergeRateLimiterAcquire(
-          compactionWriteRateLimiter,
-          (long) newChunk.getHeader().getDataSize() + 
newChunk.getData().position());
-      writer.writeChunk(newChunk, newChunkMetadata);
-      targetResource.updateStartTime(device, newChunkMetadata.getStartTime());
-      targetResource.updateEndTime(device, newChunkMetadata.getEndTime());
+    List<Pair<ChunkMetadata, Chunk>> chunkPairList = 
readByAppendPageMerge(entry.getValue());
+    for (Pair<ChunkMetadata, Chunk> chunkPair : chunkPairList) {
+      ChunkMetadata newChunkMetadata = chunkPair.left;
+      Chunk newChunk = chunkPair.right;
+      if (newChunkMetadata != null && newChunk != null) {
+        // wait for limit write
+        MergeManager.mergeRateLimiterAcquire(
+            compactionWriteRateLimiter,
+            (long) newChunk.getHeader().getDataSize() + 
newChunk.getData().position());
+        writer.writeChunk(newChunk, newChunkMetadata);
+        targetResource.updateStartTime(device, 
newChunkMetadata.getStartTime());
+        targetResource.updateEndTime(device, newChunkMetadata.getEndTime());
+      }
     }
   }
 
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
index 36567ab654..5e2ebfc835 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.constant.TestConstant;
 import 
org.apache.iotdb.db.engine.compaction.TsFileManagement.CompactionMergeTask;
 import 
org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
+import org.apache.iotdb.db.engine.compaction.utils.CompactionUtils;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
@@ -33,12 +34,21 @@ import 
org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
+import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.apache.commons.io.FileUtils;
 import org.junit.After;
@@ -48,8 +58,11 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
 
@@ -372,4 +385,73 @@ public class LevelCompactionMergeTest extends 
LevelCompactionTest {
     
IoTDBDescriptor.getInstance().getConfig().setSeqFileNumInEachLevel(prevSeqLevelFileNum);
     IoTDBDescriptor.getInstance().getConfig().setSeqLevelNum(prevSeqLevelNum);
   }
+
+  @Test
+  public void testCompactLargeChunk() throws IOException, 
WriteProcessException, MetadataException {
+    Random random = new Random();
+    List<TsFileResource> resources = new ArrayList<>();
+    for (int i = 0; i < seqResources.size(); ++i) {
+      TsFileResource resource = seqResources.get(i);
+      resource.remove();
+      resource = new TsFileResource(resource.getTsFile());
+      try (TsFileWriter writer = new TsFileWriter(resource.getTsFile())) {
+        MeasurementSchema schema = new MeasurementSchema("s1", 
TSDataType.TEXT);
+        writer.registerTimeseries(new Path("root.test.d.s1"), schema);
+        for (int time = 1024 * i; time < 1024 * (i + 1); ++time) {
+          TSRecord record = new TSRecord(i, "root.test.d");
+          StringBuilder sb = new StringBuilder();
+          for (int j = 0; j < 40000; ++j) {
+            sb.append(random.nextInt(1024));
+          }
+          record.addTuple(DataPoint.getDataPoint(TSDataType.TEXT, "s1", 
sb.toString()));
+          writer.write(record);
+          resource.updateStartTime("root.test.d", time);
+          resource.updateEndTime("root.test.d", time);
+        }
+      }
+      resources.add(resource);
+    }
+    for (int i = 7; i < 25; ++i) {
+      TsFileResource resource =
+          new TsFileResource(
+              new File(
+                  resources.get(0).getTsFile().getParentFile(),
+                  String.format("%d-%d-0-0.tsfile", i, i)));
+      try (TsFileWriter writer = new TsFileWriter(resource.getTsFile())) {
+        MeasurementSchema schema = new MeasurementSchema("s1", 
TSDataType.TEXT);
+        writer.registerTimeseries(new Path("root.test.d.s1"), schema);
+        for (int time = 1024 * i; time < 1024 * (i + 1); ++time) {
+          TSRecord record = new TSRecord(i, "root.test.d");
+          StringBuilder sb = new StringBuilder();
+          for (int j = 0; j < 40000; ++j) {
+            sb.append(random.nextInt(1024));
+          }
+          record.addTuple(DataPoint.getDataPoint(TSDataType.TEXT, "s1", 
sb.toString()));
+          writer.write(record);
+          resource.updateStartTime("root.test.d", time);
+          resource.updateEndTime("root.test.d", time);
+        }
+      }
+      resources.add(resource);
+    }
+    IoTDB.metaManager.createTimeseries(
+        new PartialPath("root.test.d.s1"),
+        TSDataType.TEXT,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        Collections.emptyMap());
+    
IoTDBDescriptor.getInstance().getConfig().setMergeWriteThroughputMbPerSec(1024);
+    TsFileResource targetResource =
+        new TsFileResource(
+            new File(resources.get(0).getTsFile().getParentFile(), 
"0-0-1-0.tsfile"));
+    CompactionUtils.merge(
+        targetResource,
+        resources,
+        "root.test",
+        null,
+        new HashSet<>(),
+        true,
+        Collections.EMPTY_LIST,
+        null);
+  }
 }

Reply via email to