This is an automated email from the ASF dual-hosted git repository.
bensonchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3d87ecc1346 Refactor MultiTsFileDeviceIterator.MeasurementIterator
(#12221)
3d87ecc1346 is described below
commit 3d87ecc1346a5b04c08dad3498fb1d14bd82bbe7
Author: shuwenwei <[email protected]>
AuthorDate: Fri Apr 26 15:00:46 2024 +0800
Refactor MultiTsFileDeviceIterator.MeasurementIterator (#12221)
refactor MultiTsFileDeviceIterator.MeasurementIterator
---
.../impl/ReadChunkCompactionPerformer.java | 12 +-
.../execute/utils/MultiTsFileDeviceIterator.java | 146 ++++++++++++---------
.../utils/MultiTsFileDeviceIteratorTest.java | 70 ++++++++++
3 files changed, 157 insertions(+), 71 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
index b56d6a76235..512d6c12dfc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
@@ -175,15 +175,13 @@ public class ReadChunkCompactionPerformer implements
ISeqCompactionPerformer {
MultiTsFileDeviceIterator deviceIterator)
throws IOException, MetadataException, InterruptedException {
writer.startChunkGroup(device);
- MultiTsFileDeviceIterator.MeasurementIterator seriesIterator =
- deviceIterator.iterateNotAlignedSeries(device, true);
+
MultiTsFileDeviceIterator.MultiTsFileNonAlignedMeasurementMetadataListIterator
seriesIterator =
+ deviceIterator.iterateNotAlignedSeriesAndChunkMetadataList(device);
while (seriesIterator.hasNextSeries()) {
checkThreadInterrupted();
+ String series = seriesIterator.nextSeries();
// TODO: we can provide a configuration item to enable concurrent
between each series
- PartialPath p = CompactionPathUtils.getPath(device,
seriesIterator.nextSeries());
- // TODO: seriesIterator needs to be refactor.
- // This statement must be called before next hasNextSeries() called, or
it may be trapped in a
- // dead-loop.
+ PartialPath path = CompactionPathUtils.getPath(device, series);
LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
readerAndChunkMetadataList =
seriesIterator.getMetadataListForCurrentSeries();
// remove the chunk metadata whose data type not match the data type of
last chunk
@@ -191,7 +189,7 @@ public class ReadChunkCompactionPerformer implements
ISeqCompactionPerformer {
filterDataTypeNotMatchedChunkMetadata(readerAndChunkMetadataList);
SingleSeriesCompactionExecutor compactionExecutorOfCurrentTimeSeries =
new SingleSeriesCompactionExecutor(
- p, readerAndChunkMetadataList, writer, targetResource, summary);
+ path, readerAndChunkMetadataList, writer, targetResource,
summary);
compactionExecutorOfCurrentTimeSeries.execute();
}
writer.endChunkGroup();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
index 100207823d6..ee0065ff6ae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
@@ -326,15 +326,16 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
}
/**
- * return MeasurementIterator, who iterates the measurements of not aligned
device
+ * return MultiTsFileNonAlignedMeasurementMetadataListIterator, who iterates
the measurements of
+ * not aligned device
*
- * @param device the full path of the device to be iterated
+ * @param device the device to be iterated
* @return measurement iterator of not aligned device
* @throws IOException if io errors occurred
*/
- public MeasurementIterator iterateNotAlignedSeries(
- IDeviceID device, boolean derserializeTimeseriesMetadata) throws
IOException {
- return new MeasurementIterator(readerMap, device,
derserializeTimeseriesMetadata);
+ public MultiTsFileNonAlignedMeasurementMetadataListIterator
+ iterateNotAlignedSeriesAndChunkMetadataList(IDeviceID device) throws
IOException {
+ return new MultiTsFileNonAlignedMeasurementMetadataListIterator(readerMap,
device);
}
/**
@@ -435,34 +436,30 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
/*
NonAligned measurement iterator.
*/
- public class MeasurementIterator {
- private Map<TsFileResource, TsFileSequenceReader> readerMap;
- private IDeviceID device;
- private String currentCompactingSeries = null;
- private LinkedList<String> seriesInThisIteration = new LinkedList<>();
+ public class MultiTsFileNonAlignedMeasurementMetadataListIterator {
+ private final Map<TsFileResource, TsFileSequenceReader> readerMap;
+ private final IDeviceID device;
+ private final LinkedList<String> seriesInThisIteration = new
LinkedList<>();
// tsfile sequence reader -> series -> list<ChunkMetadata>
- private Map<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
chunkMetadataCacheMap =
- new HashMap<>();
+ private final Map<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
+ chunkMetadataCacheMap = new HashMap<>();
// this map cache the chunk metadata list iterator for each tsfile
// the iterator return a batch of series and all chunk metadata of these
series in this tsfile
- private Map<TsFileResource, Iterator<Map<String, List<ChunkMetadata>>>>
+ private final Map<TsFileResource, Iterator<Map<String,
List<ChunkMetadata>>>>
chunkMetadataIteratorMap = new HashMap<>();
+ private String currentCompactingSeries = null;
+ private LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
+ chunkMetadataMetadataListOfCurrentCompactingSeries;
- private MeasurementIterator(
- Map<TsFileResource, TsFileSequenceReader> readerMap,
- IDeviceID device,
- boolean needDeserializeTimeseries)
- throws IOException {
+ private MultiTsFileNonAlignedMeasurementMetadataListIterator(
+ Map<TsFileResource, TsFileSequenceReader> readerMap, IDeviceID device)
throws IOException {
this.readerMap = readerMap;
this.device = device;
-
- if (needDeserializeTimeseries) {
- for (TsFileResource resource : tsFileResourcesSortedByAsc) {
- TsFileSequenceReader reader = readerMap.get(resource);
- chunkMetadataIteratorMap.put(
- resource,
reader.getMeasurementChunkMetadataListMapIterator(device));
- chunkMetadataCacheMap.put(reader, new TreeMap<>());
- }
+ for (TsFileResource resource : tsFileResourcesSortedByAsc) {
+ TsFileSequenceReader reader = readerMap.get(resource);
+ chunkMetadataIteratorMap.put(
+ resource,
reader.getMeasurementChunkMetadataListMapIterator(device));
+ chunkMetadataCacheMap.put(reader, new TreeMap<>());
}
}
@@ -481,47 +478,61 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
@SuppressWarnings("squid:S3776")
private boolean collectSeries() {
String lastSeries = null;
- List<String> tempCollectedSeries = new ArrayList<>();
+ List<String> collectedSeries = new ArrayList<>();
for (TsFileResource resource : tsFileResourcesSortedByAsc) {
- TsFileSequenceReader reader = readerMap.get(resource);
- Map<String, List<ChunkMetadata>> chunkMetadataListMap =
chunkMetadataCacheMap.get(reader);
- if (chunkMetadataListMap.size() == 0) {
- if (chunkMetadataIteratorMap.get(resource).hasNext()) {
- chunkMetadataListMap =
chunkMetadataIteratorMap.get(resource).next();
- if (chunkMetadataListMap.containsKey("")) {
- // encounter deleted aligned series, then remove it
- chunkMetadataListMap.remove("");
- }
- chunkMetadataCacheMap.put(reader, chunkMetadataListMap);
- } else {
- continue;
- }
+ Map<String, List<ChunkMetadata>> batchMeasurementChunkMetadataList =
+ getBatchMeasurementChunkMetadataListFromCache(resource);
+ if (batchMeasurementChunkMetadataList.isEmpty()) {
+ continue;
}
+
// get the min last series in the current chunk metadata
- String maxSeries = Collections.max(chunkMetadataListMap.keySet());
+ String maxSeriesOfCurrentBatch =
+ Collections.max(batchMeasurementChunkMetadataList.keySet());
if (lastSeries == null) {
- lastSeries = maxSeries;
- } else {
- if (maxSeries.compareTo(lastSeries) < 0) {
- lastSeries = maxSeries;
- }
+ lastSeries = maxSeriesOfCurrentBatch;
+ } else if (maxSeriesOfCurrentBatch.compareTo(lastSeries) < 0) {
+ lastSeries = maxSeriesOfCurrentBatch;
}
- tempCollectedSeries.addAll(chunkMetadataListMap.keySet());
+ collectedSeries.addAll(batchMeasurementChunkMetadataList.keySet());
}
- if (!tempCollectedSeries.isEmpty()) {
- if (!hasRemainingSeries()) {
- lastSeries = Collections.max(tempCollectedSeries);
- }
- String finalLastSeries = lastSeries;
- List<String> finalCollectedSeriesInThisIteration =
- tempCollectedSeries.stream()
- .filter(series -> series.compareTo(finalLastSeries) <= 0)
- .collect(Collectors.toList());
- seriesInThisIteration.addAll(finalCollectedSeriesInThisIteration);
- return true;
- } else {
+
+ if (collectedSeries.isEmpty()) {
return false;
}
+ if (!hasRemainingSeries()) {
+ lastSeries = Collections.max(collectedSeries);
+ }
+
+ String finalLastSeries = lastSeries;
+ seriesInThisIteration.addAll(
+ collectedSeries.stream()
+ .filter(series -> series.compareTo(finalLastSeries) <= 0)
+ .sorted()
+ .distinct()
+ .collect(Collectors.toList()));
+ return true;
+ }
+
+ private Map<String, List<ChunkMetadata>>
getBatchMeasurementChunkMetadataListFromCache(
+ TsFileResource resource) {
+ TsFileSequenceReader reader = readerMap.get(resource);
+ Map<String, List<ChunkMetadata>>
cachedBatchMeasurementChunkMetadataListMap =
+ chunkMetadataCacheMap.get(reader);
+ if (!cachedBatchMeasurementChunkMetadataListMap.isEmpty()) {
+ return cachedBatchMeasurementChunkMetadataListMap;
+ }
+ Iterator<Map<String, List<ChunkMetadata>>>
batchMeasurementChunkMetadataListIterator =
+ chunkMetadataIteratorMap.get(resource);
+ if (!batchMeasurementChunkMetadataListIterator.hasNext()) {
+ return cachedBatchMeasurementChunkMetadataListMap;
+ }
+ Map<String, List<ChunkMetadata>> newBatchMeasurementChunkMetadataListMap
=
+ batchMeasurementChunkMetadataListIterator.next();
+ // if encounter deleted aligned series, then remove it
+ newBatchMeasurementChunkMetadataListMap.remove("");
+ chunkMetadataCacheMap.put(reader,
newBatchMeasurementChunkMetadataListMap);
+ return newBatchMeasurementChunkMetadataListMap;
}
private boolean hasRemainingSeries() {
@@ -537,15 +548,21 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
return !seriesInThisIteration.isEmpty() || collectSeries();
}
- public String nextSeries() {
+ public String nextSeries() throws IllegalPathException {
if (!hasNextSeries()) {
return null;
} else {
- currentCompactingSeries = seriesInThisIteration.removeFirst();
+ chunkMetadataMetadataListOfCurrentCompactingSeries =
+ calculateMetadataListForCurrentSeries();
return currentCompactingSeries;
}
}
+ public LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
+ getMetadataListForCurrentSeries() {
+ return chunkMetadataMetadataListOfCurrentCompactingSeries;
+ }
+
/**
* Collect all the chunk metadata of current series from the source files.
*
@@ -556,11 +573,12 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
* @throws IllegalPathException if path is illegal
*/
@SuppressWarnings("squid:S1319")
- public LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
- getMetadataListForCurrentSeries() throws IllegalPathException {
- if (currentCompactingSeries == null) {
+ private LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
+ calculateMetadataListForCurrentSeries() throws IllegalPathException {
+ if (seriesInThisIteration.isEmpty()) {
return new LinkedList<>();
}
+ currentCompactingSeries = seriesInThisIteration.removeFirst();
LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
readerAndChunkMetadataForThisSeries = new LinkedList<>();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/MultiTsFileDeviceIteratorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/MultiTsFileDeviceIteratorTest.java
index 16ef1be97c2..0e9c3cbb72d 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/MultiTsFileDeviceIteratorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/MultiTsFileDeviceIteratorTest.java
@@ -40,9 +40,14 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.PlainDeviceID;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.TimeValuePair;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.TsFileGeneratorUtils;
import org.apache.tsfile.write.schema.MeasurementSchema;
@@ -54,6 +59,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -82,6 +88,70 @@ public class MultiTsFileDeviceIteratorTest extends
AbstractCompactionTest {
Thread.currentThread().setName(oldThreadName);
}
+ @Test
+ public void testMeasurementIterator() throws IOException, MetadataException {
+ TsFileResource resource1 = createEmptyFileAndResource(true);
+ try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource1)) {
+ writer.startChunkGroup("d1");
+ for (int i = 1000; i < 2000; i++) {
+ writer.generateSimpleNonAlignedSeriesToCurrentDevice(
+ "s" + i,
+ new TimeRange[] {new TimeRange(10, 20)},
+ TSEncoding.PLAIN,
+ CompressionType.LZ4);
+ }
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ resource1.setStatusForTest(TsFileResourceStatus.COMPACTING);
+
+ seqResources.add(resource1);
+
+ TsFileResource resource2 = createEmptyFileAndResource(true);
+ try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource2)) {
+ writer.startChunkGroup("d1");
+ for (int i = 1000; i < 5000; i++) {
+ writer.generateSimpleNonAlignedSeriesToCurrentDevice(
+ "s" + i,
+ new TimeRange[] {new TimeRange(30, 40)},
+ TSEncoding.PLAIN,
+ CompressionType.LZ4);
+ }
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ resource2.setStatusForTest(TsFileResourceStatus.COMPACTING);
+
+ seqResources.add(resource2);
+
+ List<String> measurementSet = new ArrayList<>(4000);
+ try (MultiTsFileDeviceIterator multiTsFileDeviceIterator =
+ new MultiTsFileDeviceIterator(seqResources)) {
+ while (multiTsFileDeviceIterator.hasNextDevice()) {
+ Pair<IDeviceID, Boolean> deviceIsAlignedPair =
multiTsFileDeviceIterator.nextDevice();
+ IDeviceID device = deviceIsAlignedPair.getLeft();
+
MultiTsFileDeviceIterator.MultiTsFileNonAlignedMeasurementMetadataListIterator
+ measurementIterator =
+
multiTsFileDeviceIterator.iterateNotAlignedSeriesAndChunkMetadataList(device);
+ while (measurementIterator.hasNextSeries()) {
+ String series = measurementIterator.nextSeries();
+ LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
readerAndChunkMetadataList =
+ measurementIterator.getMetadataListForCurrentSeries();
+ measurementSet.add(series);
+ }
+ }
+ }
+ new InnerSpaceCompactionTask(
+ 0, tsFileManager, seqResources, true, new
ReadChunkCompactionPerformer(), 0)
+ .start();
+ TsFileResource targetFile = tsFileManager.getTsFileList(true).get(0);
+ try (TsFileSequenceReader reader =
+ new TsFileSequenceReader(targetFile.getTsFile().getAbsolutePath())) {
+ Assert.assertEquals(4000, reader.getAllMeasurements().size());
+ }
+ Assert.assertEquals(4000, measurementSet.size());
+ }
+
@Test
public void
getNonAlignedDevicesFromDifferentFilesWithFourLayersInNodeTreeTest()
throws MetadataException, IOException, WriteProcessException {