This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7a2419ce1f1 Fix compaction metric and read device MetadataIndexNode
7a2419ce1f1 is described below
commit 7a2419ce1f1e77490ae8d80530cbbdd0df36cd2c
Author: shuwenwei <[email protected]>
AuthorDate: Fri May 31 21:29:46 2024 +0800
Fix compaction metric and read device MetadataIndexNode
---
.../impl/ReadChunkCompactionPerformer.java | 2 +-
.../execute/utils/MultiTsFileDeviceIterator.java | 45 +++++---
.../compaction/io/CompactionTsFileReader.java | 119 +++----------------
.../compaction/repair/RepairDataFileScanUtil.java | 18 ++-
.../inner/InnerSequenceCompactionSpeedTest.java | 126 +++++++++++++++++++++
.../utils/MultiTsFileDeviceIteratorTest.java | 3 +-
6 files changed, 189 insertions(+), 124 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
index b13aca307f9..67b98f175f8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
@@ -179,7 +179,7 @@ public class ReadChunkCompactionPerformer implements
ISeqCompactionPerformer {
throws IOException, MetadataException, InterruptedException {
writer.startChunkGroup(device);
MultiTsFileDeviceIterator.MultiTsFileNonAlignedMeasurementMetadataListIterator
seriesIterator =
- deviceIterator.iterateNotAlignedSeriesAndChunkMetadataList(device);
+
deviceIterator.iterateNotAlignedSeriesAndChunkMetadataListOfCurrentDevice();
while (seriesIterator.hasNextSeries()) {
checkThreadInterrupted();
String series = seriesIterator.nextSeries();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
index 40b156058c3..aa53aed93cc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
@@ -38,6 +38,7 @@ import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.MetadataIndexNode;
import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.tsfile.read.TsFileDeviceIterator;
@@ -334,13 +335,12 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
* return MultiTsFileNonAlignedMeasurementMetadataListIterator, who iterates
the measurements of
* not aligned device
*
- * @param device the device to be iterated
* @return measurement iterator of not aligned device
* @throws IOException if io errors occurred
*/
public MultiTsFileNonAlignedMeasurementMetadataListIterator
- iterateNotAlignedSeriesAndChunkMetadataList(IDeviceID device) throws
IOException {
- return new MultiTsFileNonAlignedMeasurementMetadataListIterator(readerMap,
device);
+ iterateNotAlignedSeriesAndChunkMetadataListOfCurrentDevice() throws
IOException {
+ return new MultiTsFileNonAlignedMeasurementMetadataListIterator();
}
/**
@@ -372,9 +372,12 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
if (!currentDevice.equals(iterator.current())) {
continue;
}
+ MetadataIndexNode firstMeasurementNodeOfCurrentDevice =
+ iterator.getFirstMeasurementNodeOfCurrentDevice();
TsFileSequenceReader reader = readerMap.get(tsFileResource);
List<AlignedChunkMetadata> alignedChunkMetadataList =
- reader.getAlignedChunkMetadata(currentDevice.left);
+ reader.getAlignedChunkMetadataByMetadataIndexNode(
+ currentDevice.left, firstMeasurementNodeOfCurrentDevice);
applyModificationForAlignedChunkMetadataList(tsFileResource,
alignedChunkMetadataList);
readerAndChunkMetadataList.add(new Pair<>(reader,
alignedChunkMetadataList));
}
@@ -465,8 +468,6 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
NonAligned measurement iterator.
*/
public class MultiTsFileNonAlignedMeasurementMetadataListIterator {
- private final Map<TsFileResource, TsFileSequenceReader> readerMap;
- private final IDeviceID device;
private final LinkedList<String> seriesInThisIteration = new
LinkedList<>();
// tsfile sequence reader -> series -> list<ChunkMetadata>
private final Map<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
@@ -479,14 +480,31 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
private LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
chunkMetadataMetadataListOfCurrentCompactingSeries;
- private MultiTsFileNonAlignedMeasurementMetadataListIterator(
- Map<TsFileResource, TsFileSequenceReader> readerMap, IDeviceID device)
throws IOException {
- this.readerMap = readerMap;
- this.device = device;
+ private MultiTsFileNonAlignedMeasurementMetadataListIterator() throws
IOException {
+ IDeviceID device = currentDevice.getLeft();
for (TsFileResource resource : tsFileResourcesSortedByAsc) {
+ TsFileDeviceIterator deviceIterator = deviceIteratorMap.get(resource);
TsFileSequenceReader reader = readerMap.get(resource);
- chunkMetadataIteratorMap.put(
- resource,
reader.getMeasurementChunkMetadataListMapIterator(device));
+ if (deviceIterator == null ||
!device.equals(deviceIterator.current().getLeft())) {
+ chunkMetadataIteratorMap.put(
+ resource,
+ new Iterator<Map<String, List<ChunkMetadata>>>() {
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public Map<String, List<ChunkMetadata>> next() {
+ return Collections.emptyMap();
+ }
+ });
+ } else {
+ chunkMetadataIteratorMap.put(
+ resource,
+ reader.getMeasurementChunkMetadataListMapIterator(
+ deviceIterator.getFirstMeasurementNodeOfCurrentDevice()));
+ }
chunkMetadataCacheMap.put(reader, new TreeMap<>());
}
}
@@ -610,7 +628,8 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
readerAndChunkMetadataForThisSeries = new LinkedList<>();
- PartialPath path = CompactionPathUtils.getPath(device,
currentCompactingSeries);
+ PartialPath path =
+ CompactionPathUtils.getPath(currentDevice.getLeft(),
currentCompactingSeries);
for (TsFileResource resource : tsFileResourcesSortedByAsc) {
TsFileSequenceReader reader = readerMap.get(resource);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
index 09afe1dad01..b836352d64f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java
@@ -27,12 +27,9 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant
import org.apache.tsfile.file.IMetadataIndexEntry;
import org.apache.tsfile.file.header.ChunkHeader;
import org.apache.tsfile.file.metadata.ChunkMetadata;
-import org.apache.tsfile.file.metadata.IChunkMetadata;
-import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.MetadataIndexNode;
import org.apache.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.tsfile.file.metadata.enums.MetadataIndexNodeType;
-import org.apache.tsfile.read.TsFileDeviceIterator;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.read.common.Chunk;
import org.apache.tsfile.utils.Pair;
@@ -43,9 +40,7 @@ import java.nio.ByteBuffer;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
/**
* This class extends the TsFileSequenceReader class to read and manage TsFile
with a focus on
@@ -53,8 +48,8 @@ import java.util.concurrent.atomic.AtomicLong;
* data read and distinguishing between aligned and not aligned series during
compaction.
*/
public class CompactionTsFileReader extends TsFileSequenceReader {
- /** Tracks the total amount of data (in bytes) that has been read. */
- private AtomicLong readDataSize = new AtomicLong(0L);
+
+ private long metadataOffset = 0;
/** The type of compaction running. */
CompactionType compactionType;
@@ -73,13 +68,25 @@ public class CompactionTsFileReader extends
TsFileSequenceReader {
super(file);
this.tsFileInput = new CompactionTsFileInput(tsFileInput);
this.compactionType = compactionType;
+ this.metadataOffset = readFileMetadata().getMetaOffset();
}
@Override
protected ByteBuffer readData(long position, int totalSize) throws
IOException {
acquireReadDataSizeWithCompactionReadRateLimiter(totalSize);
ByteBuffer buffer = super.readData(position, totalSize);
- readDataSize.addAndGet(totalSize);
+ if (position >= metadataOffset) {
+ CompactionMetrics.getInstance()
+ .recordReadInfo(compactionType, CompactionIoDataType.METADATA,
totalSize);
+ } else {
+ CompactionMetrics.getInstance()
+ .recordReadInfo(
+ compactionType,
+ readingAlignedSeries
+ ? CompactionIoDataType.ALIGNED
+ : CompactionIoDataType.NOT_ALIGNED,
+ totalSize);
+ }
return buffer;
}
@@ -95,20 +102,7 @@ public class CompactionTsFileReader extends
TsFileSequenceReader {
@Override
public Chunk readMemChunk(ChunkMetadata metaData) throws IOException {
- synchronized (this) {
- // using synchronized to avoid concurrent read that makes readDataSize
not correct
- long before = readDataSize.get();
- Chunk chunk = super.readMemChunk(metaData);
- long dataSize = readDataSize.get() - before;
- CompactionMetrics.getInstance()
- .recordReadInfo(
- compactionType,
- readingAlignedSeries
- ? CompactionIoDataType.ALIGNED
- : CompactionIoDataType.NOT_ALIGNED,
- dataSize);
- return chunk;
- }
+ return super.readMemChunk(metaData);
}
public ChunkHeader readChunkHeader(long position) throws IOException {
@@ -127,74 +121,12 @@ public class CompactionTsFileReader extends
TsFileSequenceReader {
return readData(startOffset, pageSize);
}
- @Override
- public TsFileDeviceIterator getAllDevicesIteratorWithIsAligned() throws
IOException {
- long before = readDataSize.get();
- TsFileDeviceIterator iterator = super.getAllDevicesIteratorWithIsAligned();
- long dataSize = readDataSize.get() - before;
- CompactionMetrics.getInstance()
- .recordReadInfo(compactionType, CompactionIoDataType.METADATA,
dataSize);
- return iterator;
- }
-
- @Override
- public List<IChunkMetadata> getChunkMetadataListByTimeseriesMetadataOffset(
- long startOffset, long endOffset) throws IOException {
- long before = readDataSize.get();
- List<IChunkMetadata> chunkMetadataList =
- super.getChunkMetadataListByTimeseriesMetadataOffset(startOffset,
endOffset);
- long dataSize = readDataSize.get() - before;
- CompactionMetrics.getInstance()
- .recordReadInfo(compactionType, CompactionIoDataType.METADATA,
dataSize);
- return chunkMetadataList;
- }
-
- @Override
- public void getDevicesAndEntriesOfOneLeafNode(
- Long startOffset, Long endOffset, Queue<Pair<IDeviceID, long[]>>
measurementNodeOffsetQueue)
- throws IOException {
- long before = readDataSize.get();
- super.getDevicesAndEntriesOfOneLeafNode(startOffset, endOffset,
measurementNodeOffsetQueue);
- long dataSize = readDataSize.get() - before;
- CompactionMetrics.getInstance()
- .recordReadInfo(compactionType, CompactionIoDataType.METADATA,
dataSize);
- }
-
- @Override
- public MetadataIndexNode readMetadataIndexNode(long start, long end, boolean
isDeviceLevel)
- throws IOException {
- long before = readDataSize.get();
- MetadataIndexNode metadataIndexNode = super.readMetadataIndexNode(start,
end, isDeviceLevel);
- long dataSize = readDataSize.get() - before;
- CompactionMetrics.getInstance()
- .recordReadInfo(compactionType, CompactionIoDataType.METADATA,
dataSize);
- return metadataIndexNode;
- }
-
- @Override
- public Map<String, Pair<List<IChunkMetadata>, Pair<Long, Long>>>
- getTimeseriesMetadataOffsetByDevice(
- MetadataIndexNode measurementNode,
- Set<String> excludedMeasurementIds,
- boolean needChunkMetadata)
- throws IOException {
- long before = readDataSize.get();
- Map<String, Pair<List<IChunkMetadata>, Pair<Long, Long>>> result =
- super.getTimeseriesMetadataOffsetByDevice(
- measurementNode, excludedMeasurementIds, needChunkMetadata);
- long dataSize = readDataSize.get() - before;
- CompactionMetrics.getInstance()
- .recordReadInfo(compactionType, CompactionIoDataType.METADATA,
dataSize);
- return result;
- }
-
public Map<String, Pair<TimeseriesMetadata, Pair<Long, Long>>>
getTimeseriesMetadataAndOffsetByDevice(
MetadataIndexNode measurementNode,
Set<String> excludedMeasurementIds,
boolean needChunkMetadata)
throws IOException {
- long before = readDataSize.get();
Map<String, Pair<TimeseriesMetadata, Pair<Long, Long>>>
timeseriesMetadataOffsetMap =
new LinkedHashMap<>();
List<IMetadataIndexEntry> childrenEntryList =
measurementNode.getChildren();
@@ -229,28 +161,9 @@ public class CompactionTsFileReader extends
TsFileSequenceReader {
nextLayerMeasurementNode, excludedMeasurementIds,
needChunkMetadata));
}
}
-
- long dataSize = readDataSize.get() - before;
- CompactionMetrics.getInstance()
- .recordReadInfo(compactionType, CompactionIoDataType.METADATA,
dataSize);
return timeseriesMetadataOffsetMap;
}
- @Override
- public void getDeviceTimeseriesMetadata(
- List<TimeseriesMetadata> timeseriesMetadataList,
- MetadataIndexNode measurementNode,
- Set<String> excludedMeasurementIds,
- boolean needChunkMetadata)
- throws IOException {
- long before = readDataSize.get();
- super.getDeviceTimeseriesMetadata(
- timeseriesMetadataList, measurementNode, excludedMeasurementIds,
needChunkMetadata);
- long dataSize = readDataSize.get() - before;
- CompactionMetrics.getInstance()
- .recordReadInfo(compactionType, CompactionIoDataType.METADATA,
dataSize);
- }
-
private void acquireReadDataSizeWithCompactionReadRateLimiter(int
readDataSize) {
CompactionTaskManager.getInstance().getCompactionReadOperationRateLimiter().acquire(1);
CompactionTaskManager.getInstance().getCompactionReadRateLimiter().acquire(readDataSize);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
index 3af6fad71b9..1de636c2109 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
@@ -39,6 +39,7 @@ import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.MetadataIndexNode;
import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
@@ -85,11 +86,13 @@ public class RepairDataFileScanUtil {
while (deviceIterator.hasNext()) {
Pair<IDeviceID, Boolean> deviceIsAlignedPair = deviceIterator.next();
IDeviceID device = deviceIsAlignedPair.getLeft();
+ MetadataIndexNode metadataIndexNode =
+ deviceIterator.getFirstMeasurementNodeOfCurrentDevice();
boolean isAligned = deviceIsAlignedPair.getRight();
if (isAligned) {
- checkAlignedDeviceSeries(reader, device);
+ checkAlignedDeviceSeries(reader, device, metadataIndexNode);
} else {
- checkNonAlignedDeviceSeries(reader, device);
+ checkNonAlignedDeviceSeries(reader, device, metadataIndexNode);
}
}
} catch (CompactionLastTimeCheckFailedException
lastTimeCheckFailedException) {
@@ -108,9 +111,11 @@ public class RepairDataFileScanUtil {
}
}
- private void checkAlignedDeviceSeries(TsFileSequenceReader reader, IDeviceID
device)
+ private void checkAlignedDeviceSeries(
+ TsFileSequenceReader reader, IDeviceID device, MetadataIndexNode
metadataIndexNode)
throws IOException {
- List<AlignedChunkMetadata> chunkMetadataList =
reader.getAlignedChunkMetadata(device);
+ List<AlignedChunkMetadata> chunkMetadataList =
+ reader.getAlignedChunkMetadataByMetadataIndexNode(device,
metadataIndexNode);
for (AlignedChunkMetadata alignedChunkMetadata : chunkMetadataList) {
IChunkMetadata timeChunkMetadata =
alignedChunkMetadata.getTimeChunkMetadata();
Chunk timeChunk = reader.readMemChunk((ChunkMetadata) timeChunkMetadata);
@@ -141,10 +146,11 @@ public class RepairDataFileScanUtil {
previousTime = Long.MIN_VALUE;
}
- private void checkNonAlignedDeviceSeries(TsFileSequenceReader reader,
IDeviceID device)
+ private void checkNonAlignedDeviceSeries(
+ TsFileSequenceReader reader, IDeviceID device, MetadataIndexNode
metadataIndexNode)
throws IOException {
Iterator<Map<String, List<ChunkMetadata>>>
measurementChunkMetadataListMapIterator =
- reader.getMeasurementChunkMetadataListMapIterator(device);
+ reader.getMeasurementChunkMetadataListMapIterator(metadataIndexNode);
while (measurementChunkMetadataListMapIterator.hasNext()) {
Map<String, List<ChunkMetadata>> measurementChunkMetadataListMap =
measurementChunkMetadataListMapIterator.next();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSequenceCompactionSpeedTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSequenceCompactionSpeedTest.java
new file mode 100644
index 00000000000..097d4e6a6db
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSequenceCompactionSpeedTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.compaction.inner;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.read.common.TimeRange;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class InnerSequenceCompactionSpeedTest extends AbstractCompactionTest {
+ private final long compactionReadThroughputPerSec =
+
IoTDBDescriptor.getInstance().getConfig().getCompactionReadThroughputMbPerSec();
+
+ @Before
+ public void setUp()
+ throws IOException, WriteProcessException, MetadataException,
InterruptedException {
+ super.setUp();
+ CompactionTaskManager.getInstance().setCompactionReadThroughputRate(1);
+ }
+
+ @After
+ public void tearDown() throws IOException, StorageEngineException {
+ super.tearDown();
+ CompactionTaskManager.getInstance()
+ .setCompactionReadThroughputRate(compactionReadThroughputPerSec);
+ }
+
+ @Test
+ public void testManyAlignedDeviceTsFile() throws IOException,
InterruptedException {
+ List<String> deviceNames = new ArrayList<>();
+ for (int i = 0; i < 100000; i++) {
+ deviceNames.add("d" + i);
+ }
+ TsFileResource resource = createEmptyFileAndResource(true);
+ try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource)) {
+ for (String device : deviceNames) {
+ writer.startChunkGroup(device);
+ writer.generateSimpleAlignedSeriesToCurrentDevice(
+ Collections.singletonList("s0"),
+ new TimeRange[] {new TimeRange(1, 2)},
+ TSEncoding.PLAIN,
+ CompressionType.LZ4);
+ writer.endChunkGroup();
+ }
+ writer.endFile();
+ }
+ seqResources.add(resource);
+ tsFileManager.add(resource, true);
+ long tsFileSize = resource.getTsFileSize();
+ Thread thread =
+ new Thread(
+ () -> {
+ InnerSpaceCompactionTask task =
+ new InnerSpaceCompactionTask(
+ tsFileManager, 0, seqResources, true, new
ReadChunkCompactionPerformer(), 0);
+ task.start();
+ });
+ thread.start();
+ thread.join(TimeUnit.SECONDS.toMillis(30 + tsFileSize / IoTDBConstant.MB));
+ }
+
+ @Test
+ public void testManyNotAlignedDeviceTsFile() throws IOException {
+ List<String> deviceNames = new ArrayList<>();
+ for (int i = 0; i < 100000; i++) {
+ deviceNames.add("d" + i);
+ }
+ TsFileResource resource = createEmptyFileAndResource(true);
+ try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource)) {
+ for (String device : deviceNames) {
+ writer.startChunkGroup(device);
+ writer.generateSimpleNonAlignedSeriesToCurrentDevice(
+ "s0", new TimeRange[] {new TimeRange(1, 2)}, TSEncoding.PLAIN,
CompressionType.LZ4);
+ writer.endChunkGroup();
+ }
+ writer.endFile();
+ }
+ seqResources.add(resource);
+ tsFileManager.add(resource, true);
+ long tsFileSize = resource.getTsFileSize();
+ InnerSpaceCompactionTask task =
+ new InnerSpaceCompactionTask(
+ tsFileManager, 0, seqResources, true, new
ReadChunkCompactionPerformer(), 0);
+ Assert.assertTrue(task.start());
+ Assert.assertTrue(
+ TimeUnit.SECONDS.toMillis(tsFileSize / IoTDBConstant.MB + 30) >
task.getTimeCost());
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/MultiTsFileDeviceIteratorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/MultiTsFileDeviceIteratorTest.java
index c9b9b62f200..863efce14cf 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/MultiTsFileDeviceIteratorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/MultiTsFileDeviceIteratorTest.java
@@ -133,7 +133,8 @@ public class MultiTsFileDeviceIteratorTest extends
AbstractCompactionTest {
IDeviceID device = deviceIsAlignedPair.getLeft();
MultiTsFileDeviceIterator.MultiTsFileNonAlignedMeasurementMetadataListIterator
measurementIterator =
-
multiTsFileDeviceIterator.iterateNotAlignedSeriesAndChunkMetadataList(device);
+ multiTsFileDeviceIterator
+
.iterateNotAlignedSeriesAndChunkMetadataListOfCurrentDevice();
while (measurementIterator.hasNextSeries()) {
String series = measurementIterator.nextSeries();
LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
readerAndChunkMetadataList =