This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new b8b1238c49 Fix chunk size overflow (#6029)
b8b1238c49 is described below
commit b8b1238c49b66c5c0cb84d6764619b064ffcc53f
Author: Liu Xuxin <[email protected]>
AuthorDate: Fri May 27 09:03:59 2022 +0800
Fix chunk size overflow (#6029)
---
.../engine/compaction/utils/CompactionUtils.java | 37 ++++++----
.../compaction/LevelCompactionMergeTest.java | 82 ++++++++++++++++++++++
2 files changed, 106 insertions(+), 13 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index 936a451274..585039e98d 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -72,10 +72,11 @@ public class CompactionUtils {
throw new IllegalStateException("Utility class");
}
- private static Pair<ChunkMetadata, Chunk> readByAppendPageMerge(
+ private static List<Pair<ChunkMetadata, Chunk>> readByAppendPageMerge(
Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap)
throws IOException {
ChunkMetadata newChunkMetadata = null;
Chunk newChunk = null;
+ List<Pair<ChunkMetadata, Chunk>> newChunkList = new LinkedList<>();
for (Entry<TsFileSequenceReader, List<ChunkMetadata>> entry :
readerChunkMetadataMap.entrySet()) {
TsFileSequenceReader reader = entry.getKey();
@@ -89,9 +90,17 @@ public class CompactionUtils {
newChunk.mergeChunk(chunk);
newChunkMetadata.mergeChunkMetadata(chunkMetadata);
}
+ if (newChunk.getHeader().getDataSize() >= 8L * 1024L * 1024L) {
+ newChunkList.add(new Pair<>(newChunkMetadata, newChunk));
+ newChunkMetadata = null;
+ newChunk = null;
+ }
}
}
- return new Pair<>(newChunkMetadata, newChunk);
+ if (newChunk != null) {
+ newChunkList.add(new Pair<>(newChunkMetadata, newChunk));
+ }
+ return newChunkList;
}
private static void readByDeserializePageMerge(
@@ -156,17 +165,19 @@ public class CompactionUtils {
TsFileResource targetResource,
RestorableTsFileIOWriter writer)
throws IOException {
- Pair<ChunkMetadata, Chunk> chunkPair =
readByAppendPageMerge(entry.getValue());
- ChunkMetadata newChunkMetadata = chunkPair.left;
- Chunk newChunk = chunkPair.right;
- if (newChunkMetadata != null && newChunk != null) {
- // wait for limit write
- MergeManager.mergeRateLimiterAcquire(
- compactionWriteRateLimiter,
- (long) newChunk.getHeader().getDataSize() +
newChunk.getData().position());
- writer.writeChunk(newChunk, newChunkMetadata);
- targetResource.updateStartTime(device, newChunkMetadata.getStartTime());
- targetResource.updateEndTime(device, newChunkMetadata.getEndTime());
+ List<Pair<ChunkMetadata, Chunk>> chunkPairList =
readByAppendPageMerge(entry.getValue());
+ for (Pair<ChunkMetadata, Chunk> chunkPair : chunkPairList) {
+ ChunkMetadata newChunkMetadata = chunkPair.left;
+ Chunk newChunk = chunkPair.right;
+ if (newChunkMetadata != null && newChunk != null) {
+ // wait for limit write
+ MergeManager.mergeRateLimiterAcquire(
+ compactionWriteRateLimiter,
+ (long) newChunk.getHeader().getDataSize() +
newChunk.getData().position());
+ writer.writeChunk(newChunk, newChunkMetadata);
+ targetResource.updateStartTime(device,
newChunkMetadata.getStartTime());
+ targetResource.updateEndTime(device, newChunkMetadata.getEndTime());
+ }
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
index 36567ab654..5e2ebfc835 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
import
org.apache.iotdb.db.engine.compaction.TsFileManagement.CompactionMergeTask;
import
org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
+import org.apache.iotdb.db.engine.compaction.utils.CompactionUtils;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
@@ -33,12 +34,21 @@ import
org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
+import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.commons.io.FileUtils;
import org.junit.After;
@@ -48,8 +58,11 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import static org.junit.Assert.assertEquals;
@@ -372,4 +385,73 @@ public class LevelCompactionMergeTest extends
LevelCompactionTest {
IoTDBDescriptor.getInstance().getConfig().setSeqFileNumInEachLevel(prevSeqLevelFileNum);
IoTDBDescriptor.getInstance().getConfig().setSeqLevelNum(prevSeqLevelNum);
}
+
+ @Test
+ public void testCompactLargeChunk() throws IOException,
WriteProcessException, MetadataException {
+ Random random = new Random();
+ List<TsFileResource> resources = new ArrayList<>();
+ for (int i = 0; i < seqResources.size(); ++i) {
+ TsFileResource resource = seqResources.get(i);
+ resource.remove();
+ resource = new TsFileResource(resource.getTsFile());
+ try (TsFileWriter writer = new TsFileWriter(resource.getTsFile())) {
+ MeasurementSchema schema = new MeasurementSchema("s1",
TSDataType.TEXT);
+ writer.registerTimeseries(new Path("root.test.d.s1"), schema);
+ for (int time = 1024 * i; time < 1024 * (i + 1); ++time) {
+ TSRecord record = new TSRecord(i, "root.test.d");
+ StringBuilder sb = new StringBuilder();
+ for (int j = 0; j < 40000; ++j) {
+ sb.append(random.nextInt(1024));
+ }
+ record.addTuple(DataPoint.getDataPoint(TSDataType.TEXT, "s1",
sb.toString()));
+ writer.write(record);
+ resource.updateStartTime("root.test.d", time);
+ resource.updateEndTime("root.test.d", time);
+ }
+ }
+ resources.add(resource);
+ }
+ for (int i = 7; i < 25; ++i) {
+ TsFileResource resource =
+ new TsFileResource(
+ new File(
+ resources.get(0).getTsFile().getParentFile(),
+ String.format("%d-%d-0-0.tsfile", i, i)));
+ try (TsFileWriter writer = new TsFileWriter(resource.getTsFile())) {
+ MeasurementSchema schema = new MeasurementSchema("s1",
TSDataType.TEXT);
+ writer.registerTimeseries(new Path("root.test.d.s1"), schema);
+ for (int time = 1024 * i; time < 1024 * (i + 1); ++time) {
+ TSRecord record = new TSRecord(i, "root.test.d");
+ StringBuilder sb = new StringBuilder();
+ for (int j = 0; j < 40000; ++j) {
+ sb.append(random.nextInt(1024));
+ }
+ record.addTuple(DataPoint.getDataPoint(TSDataType.TEXT, "s1",
sb.toString()));
+ writer.write(record);
+ resource.updateStartTime("root.test.d", time);
+ resource.updateEndTime("root.test.d", time);
+ }
+ }
+ resources.add(resource);
+ }
+ IoTDB.metaManager.createTimeseries(
+ new PartialPath("root.test.d.s1"),
+ TSDataType.TEXT,
+ TSEncoding.PLAIN,
+ CompressionType.UNCOMPRESSED,
+ Collections.emptyMap());
+
IoTDBDescriptor.getInstance().getConfig().setMergeWriteThroughputMbPerSec(1024);
+ TsFileResource targetResource =
+ new TsFileResource(
+ new File(resources.get(0).getTsFile().getParentFile(),
"0-0-1-0.tsfile"));
+ CompactionUtils.merge(
+ targetResource,
+ resources,
+ "root.test",
+ null,
+ new HashSet<>(),
+ true,
+ Collections.EMPTY_LIST,
+ null);
+ }
}