This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new f1cbc133986 [to rc/1.3.3] Optimize column batch selection when there
are many value columns (#13457)
f1cbc133986 is described below
commit f1cbc133986c0f5fdde47b5e5dba00c2e32665cf
Author: shuwenwei <[email protected]>
AuthorDate: Tue Sep 10 15:00:57 2024 +0800
[to rc/1.3.3] Optimize column batch selection when there are many value
columns (#13457)
* optimize column batch selection when there are many value columns
* fix bug
* use array dequeue
---
...BatchedFastAlignedSeriesCompactionExecutor.java | 41 +++---
...edReadChunkAlignedSeriesCompactionExecutor.java | 42 +++---
.../utils/AlignedSeriesBatchCompactionUtils.java | 161 +++++++++++++--------
.../ReadChunkAlignedSeriesCompactionExecutor.java | 42 +++---
.../compaction/utils/BatchCompactionUtilsTest.java | 12 +-
5 files changed, 162 insertions(+), 136 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/BatchedFastAlignedSeriesCompactionExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/BatchedFastAlignedSeriesCompactionExecutor.java
index 8047c00ba95..9a31990b11a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/BatchedFastAlignedSeriesCompactionExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/BatchedFastAlignedSeriesCompactionExecutor.java
@@ -56,18 +56,15 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
public class BatchedFastAlignedSeriesCompactionExecutor
extends FastAlignedSeriesCompactionExecutor {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
- private final Set<String> compactedMeasurements;
+ private AlignedSeriesBatchCompactionUtils.BatchColumnSelection
batchColumnSelection;
private final IMeasurementSchema timeSchema;
private final List<IMeasurementSchema> valueMeasurementSchemas;
private final List<TsFileResource> sortedSourceFiles;
@@ -100,7 +97,9 @@ public class BatchedFastAlignedSeriesCompactionExecutor
summary);
timeSchema = measurementSchemas.remove(0);
valueMeasurementSchemas = measurementSchemas;
- this.compactedMeasurements = new HashSet<>();
+ this.batchColumnSelection =
+ new AlignedSeriesBatchCompactionUtils.BatchColumnSelection(
+ valueMeasurementSchemas, batchSize);
this.sortedSourceFiles = sortedSourceFiles;
this.alignedChunkMetadataCache = new HashMap<>();
this.batchCompactionPlan = new BatchCompactionPlan();
@@ -122,13 +121,9 @@ public class BatchedFastAlignedSeriesCompactionExecutor
List<AlignedChunkMetadata> filteredAlignedChunkMetadataList = new
ArrayList<>();
for (AlignedChunkMetadata alignedChunkMetadata : alignedChunkMetadataList)
{
- List<String> selectedMeasurements =
- selectedValueMeasurementSchemas.stream()
- .map(IMeasurementSchema::getMeasurementId)
- .collect(Collectors.toList());
filteredAlignedChunkMetadataList.add(
- AlignedSeriesBatchCompactionUtils.filterAlignedChunkMetadata(
- alignedChunkMetadata, selectedMeasurements));
+ AlignedSeriesBatchCompactionUtils.filterAlignedChunkMetadataByIndex(
+ alignedChunkMetadata,
batchColumnSelection.getSelectedColumnIndexList()));
}
return filteredAlignedChunkMetadataList;
}
@@ -145,13 +140,16 @@ public class BatchedFastAlignedSeriesCompactionExecutor
private void compactFirstBatch()
throws PageException, IllegalPathException, IOException,
WriteProcessException {
- List<IMeasurementSchema> firstGroupMeasurements =
- AlignedSeriesBatchCompactionUtils.selectColumnBatchToCompact(
- valueMeasurementSchemas, compactedMeasurements, batchSize);
+ if (!batchColumnSelection.hasNext()) {
+ return;
+ }
+ batchColumnSelection.next();
+
List<IMeasurementSchema> currentBatchMeasurementSchemas =
- new ArrayList<>(firstGroupMeasurements.size() + 1);
+ new
ArrayList<>(batchColumnSelection.getCurrentSelectedColumnSchemaList().size() +
1);
currentBatchMeasurementSchemas.add(timeSchema);
- currentBatchMeasurementSchemas.addAll(firstGroupMeasurements);
+ currentBatchMeasurementSchemas.addAll(
+ batchColumnSelection.getCurrentSelectedColumnSchemaList());
FirstBatchFastAlignedSeriesCompactionExecutor executor =
new FirstBatchFastAlignedSeriesCompactionExecutor(
@@ -173,14 +171,13 @@ public class BatchedFastAlignedSeriesCompactionExecutor
private void compactLeftBatches()
throws PageException, IllegalPathException, IOException,
WriteProcessException {
- while (compactedMeasurements.size() < valueMeasurementSchemas.size()) {
- List<IMeasurementSchema> selectedValueColumnGroup =
- AlignedSeriesBatchCompactionUtils.selectColumnBatchToCompact(
- valueMeasurementSchemas, compactedMeasurements, batchSize);
+ while (batchColumnSelection.hasNext()) {
+ batchColumnSelection.next();
List<IMeasurementSchema> currentBatchMeasurementSchemas =
- new ArrayList<>(selectedValueColumnGroup.size() + 1);
+ new
ArrayList<>(batchColumnSelection.getCurrentSelectedColumnSchemaList().size() +
1);
currentBatchMeasurementSchemas.add(timeSchema);
- currentBatchMeasurementSchemas.addAll(selectedValueColumnGroup);
+ currentBatchMeasurementSchemas.addAll(
+ batchColumnSelection.getCurrentSelectedColumnSchemaList());
FollowingBatchFastAlignedSeriesCompactionExecutor executor =
new FollowingBatchFastAlignedSeriesCompactionExecutor(
compactionWriter,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/BatchedReadChunkAlignedSeriesCompactionExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/BatchedReadChunkAlignedSeriesCompactionExecutor.java
index fd84b37dc6c..a8245d62632 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/BatchedReadChunkAlignedSeriesCompactionExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/BatchedReadChunkAlignedSeriesCompactionExecutor.java
@@ -53,20 +53,17 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
public class BatchedReadChunkAlignedSeriesCompactionExecutor
extends ReadChunkAlignedSeriesCompactionExecutor {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
- private final Set<String> compactedMeasurements;
private final BatchCompactionPlan batchCompactionPlan = new
BatchCompactionPlan();
private final int batchSize =
IoTDBDescriptor.getInstance().getConfig().getCompactionMaxAlignedSeriesNumInOneBatch();
+ private final AlignedSeriesBatchCompactionUtils.BatchColumnSelection
batchColumnSelection;
private final LinkedList<Pair<TsFileSequenceReader,
List<AlignedChunkMetadata>>>
originReaderAndChunkMetadataList;
@@ -79,7 +76,8 @@ public class BatchedReadChunkAlignedSeriesCompactionExecutor
throws IOException {
super(device, targetResource, readerAndChunkMetadataList, writer, summary);
this.originReaderAndChunkMetadataList = readerAndChunkMetadataList;
- compactedMeasurements = new HashSet<>();
+ this.batchColumnSelection =
+ new AlignedSeriesBatchCompactionUtils.BatchColumnSelection(schemaList,
batchSize);
}
@Override
@@ -97,17 +95,16 @@ public class BatchedReadChunkAlignedSeriesCompactionExecutor
}
private void compactFirstBatch() throws IOException, PageException {
- List<IMeasurementSchema> firstBatchMeasurements =
- AlignedSeriesBatchCompactionUtils.selectColumnBatchToCompact(
- schemaList, compactedMeasurements, batchSize);
+ if (!batchColumnSelection.hasNext()) {
+ return;
+ }
+ batchColumnSelection.next();
LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>>
batchedReaderAndChunkMetadataList =
filterAlignedChunkMetadataList(
- readerAndChunkMetadataList,
- firstBatchMeasurements.stream()
- .map(IMeasurementSchema::getMeasurementId)
- .collect(Collectors.toList()));
+ readerAndChunkMetadataList,
batchColumnSelection.getSelectedColumnIndexList());
+
FirstBatchedReadChunkAlignedSeriesCompactionExecutor executor =
new FirstBatchedReadChunkAlignedSeriesCompactionExecutor(
device,
@@ -116,7 +113,7 @@ public class BatchedReadChunkAlignedSeriesCompactionExecutor
writer,
summary,
timeSchema,
- firstBatchMeasurements);
+ batchColumnSelection.getCurrentSelectedColumnSchemaList());
executor.execute();
LOGGER.debug(
"[Batch Compaction] current device is {}, first batch compacted time
chunk is {}",
@@ -125,17 +122,12 @@ public class
BatchedReadChunkAlignedSeriesCompactionExecutor
}
private void compactLeftBatches() throws PageException, IOException {
- while (compactedMeasurements.size() < schemaList.size()) {
- List<IMeasurementSchema> selectedColumnBatch =
- AlignedSeriesBatchCompactionUtils.selectColumnBatchToCompact(
- schemaList, compactedMeasurements, batchSize);
+ while (batchColumnSelection.hasNext()) {
+ batchColumnSelection.next();
LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>>
groupReaderAndChunkMetadataList =
filterAlignedChunkMetadataList(
- readerAndChunkMetadataList,
- selectedColumnBatch.stream()
- .map(IMeasurementSchema::getMeasurementId)
- .collect(Collectors.toList()));
+ readerAndChunkMetadataList,
batchColumnSelection.getSelectedColumnIndexList());
FollowingBatchedReadChunkAlignedSeriesGroupCompactionExecutor executor =
new FollowingBatchedReadChunkAlignedSeriesGroupCompactionExecutor(
device,
@@ -144,7 +136,7 @@ public class BatchedReadChunkAlignedSeriesCompactionExecutor
writer,
summary,
timeSchema,
- selectedColumnBatch);
+ batchColumnSelection.getCurrentSelectedColumnSchemaList());
executor.execute();
}
}
@@ -152,7 +144,7 @@ public class BatchedReadChunkAlignedSeriesCompactionExecutor
private LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>>
filterAlignedChunkMetadataList(
List<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>>
readerAndChunkMetadataList,
- List<String> selectedMeasurements) {
+ List<Integer> selectedMeasurementIndexs) {
LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>>
groupReaderAndChunkMetadataList = new LinkedList<>();
for (Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> pair :
readerAndChunkMetadataList) {
@@ -160,8 +152,8 @@ public class BatchedReadChunkAlignedSeriesCompactionExecutor
List<AlignedChunkMetadata> selectedColumnAlignedChunkMetadataList = new
LinkedList<>();
for (AlignedChunkMetadata alignedChunkMetadata :
alignedChunkMetadataList) {
selectedColumnAlignedChunkMetadataList.add(
- AlignedSeriesBatchCompactionUtils.filterAlignedChunkMetadata(
- alignedChunkMetadata, selectedMeasurements));
+
AlignedSeriesBatchCompactionUtils.filterAlignedChunkMetadataByIndex(
+ alignedChunkMetadata, selectedMeasurementIndexs));
}
groupReaderAndChunkMetadataList.add(
new Pair<>(pair.getLeft(), selectedColumnAlignedChunkMetadataList));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/AlignedSeriesBatchCompactionUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/AlignedSeriesBatchCompactionUtils.java
index c1e83101a12..79e7b72a8ac 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/AlignedSeriesBatchCompactionUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/AlignedSeriesBatchCompactionUtils.java
@@ -21,7 +21,6 @@ package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.ex
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus;
-import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.read.TsFileSequenceReader;
@@ -29,61 +28,19 @@ import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.schema.IMeasurementSchema;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.NoSuchElementException;
+import java.util.Queue;
public class AlignedSeriesBatchCompactionUtils {
private AlignedSeriesBatchCompactionUtils() {}
- public static List<IMeasurementSchema> selectColumnBatchToCompact(
- List<IMeasurementSchema> schemaList, Set<String> compactedMeasurements,
int batchSize) {
- // TODO: select batch by allocated memory and chunk size to perform more
strict memory control
- List<IMeasurementSchema> selectedColumnBatch = new ArrayList<>(batchSize);
- for (IMeasurementSchema schema : schemaList) {
- if (!isLargeDataType(schema.getType())) {
- continue;
- }
- if (compactedMeasurements.contains(schema.getMeasurementId())) {
- continue;
- }
- compactedMeasurements.add(schema.getMeasurementId());
- selectedColumnBatch.add(schema);
- if (selectedColumnBatch.size() >= batchSize) {
- return selectedColumnBatch;
- }
- if (compactedMeasurements.size() == schemaList.size()) {
- return selectedColumnBatch;
- }
- }
- for (IMeasurementSchema schema : schemaList) {
- if (compactedMeasurements.contains(schema.getMeasurementId())) {
- continue;
- }
- selectedColumnBatch.add(schema);
- compactedMeasurements.add(schema.getMeasurementId());
- if (selectedColumnBatch.size() >= batchSize) {
- break;
- }
- if (compactedMeasurements.size() == schemaList.size()) {
- break;
- }
- }
- return selectedColumnBatch;
- }
-
- private static boolean isLargeDataType(TSDataType dataType) {
- return dataType.equals(TSDataType.BLOB)
- || dataType.equals(TSDataType.TEXT)
- || dataType.equals(TSDataType.STRING);
- }
-
public static void markAlignedChunkHasDeletion(
LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>>
readerAndChunkMetadataList) {
@@ -106,27 +63,45 @@ public class AlignedSeriesBatchCompactionUtils {
}
}
- public static AlignedChunkMetadata filterAlignedChunkMetadata(
- AlignedChunkMetadata alignedChunkMetadata, List<String>
selectedMeasurements) {
- List<IChunkMetadata> valueChunkMetadataList =
- Arrays.asList(new IChunkMetadata[selectedMeasurements.size()]);
-
- Map<String, Integer> measurementIndex = new HashMap<>();
+ public static AlignedChunkMetadata filterAlignedChunkMetadataByIndex(
+ AlignedChunkMetadata alignedChunkMetadata, List<Integer>
selectedMeasurements) {
+ IChunkMetadata[] valueChunkMetadataArr = new
IChunkMetadata[selectedMeasurements.size()];
+ List<IChunkMetadata> originValueChunkMetadataList =
+ alignedChunkMetadata.getValueChunkMetadataList();
for (int i = 0; i < selectedMeasurements.size(); i++) {
- measurementIndex.put(selectedMeasurements.get(i), i);
+ int columnIndex = selectedMeasurements.get(i);
+ valueChunkMetadataArr[i] = originValueChunkMetadataList.get(columnIndex);
}
+ return new AlignedChunkMetadata(
+ alignedChunkMetadata.getTimeChunkMetadata(),
Arrays.asList(valueChunkMetadataArr));
+ }
- for (IChunkMetadata chunkMetadata :
alignedChunkMetadata.getValueChunkMetadataList()) {
- if (chunkMetadata == null) {
- continue;
+ public static AlignedChunkMetadata fillAlignedChunkMetadataBySchemaList(
+ AlignedChunkMetadata originAlignedChunkMetadata,
List<IMeasurementSchema> schemaList) {
+ List<IChunkMetadata> originValueChunkMetadataList =
+ originAlignedChunkMetadata.getValueChunkMetadataList();
+ if (originValueChunkMetadataList.size() == schemaList.size()) {
+ return originAlignedChunkMetadata;
+ }
+ IChunkMetadata[] newValueChunkMetadataArr = new
IChunkMetadata[schemaList.size()];
+ int currentValueChunkMetadataIndex = 0;
+ for (int i = 0; i < schemaList.size(); i++) {
+ IMeasurementSchema currentSchema = schemaList.get(i);
+ if (currentValueChunkMetadataIndex >=
originValueChunkMetadataList.size()) {
+ break;
}
- Integer idx = measurementIndex.get(chunkMetadata.getMeasurementUid());
- if (idx != null) {
- valueChunkMetadataList.set(idx, chunkMetadata);
+ IChunkMetadata currentValueChunkMetadata =
+ originValueChunkMetadataList.get(currentValueChunkMetadataIndex);
+ if (currentValueChunkMetadata != null
+ && currentSchema
+ .getMeasurementId()
+ .equals(currentValueChunkMetadata.getMeasurementUid())) {
+ newValueChunkMetadataArr[i] = currentValueChunkMetadata;
+ currentValueChunkMetadataIndex++;
}
}
return new AlignedChunkMetadata(
- alignedChunkMetadata.getTimeChunkMetadata(), valueChunkMetadataList);
+ originAlignedChunkMetadata.getTimeChunkMetadata(),
Arrays.asList(newValueChunkMetadataArr));
}
public static ModifiedStatus calculateAlignedPageModifiedStatus(
@@ -173,4 +148,70 @@ public class AlignedSeriesBatchCompactionUtils {
}
return status;
}
+
+ public static class BatchColumnSelection {
+ private final List<IMeasurementSchema> schemaList;
+ private final Queue<Integer> normalTypeSortedColumnIndexList;
+ private final Queue<Integer> binaryTypeSortedColumnIndexList;
+ private final int batchSize;
+ private int selectedColumnNum;
+
+ private List<Integer> columnIndexList;
+ private List<IMeasurementSchema> currentSelectedColumnSchemaList;
+
+ public BatchColumnSelection(List<IMeasurementSchema> valueSchemas, int
batchSize) {
+ this.schemaList = valueSchemas;
+ this.normalTypeSortedColumnIndexList = new
ArrayDeque<>(valueSchemas.size());
+ this.binaryTypeSortedColumnIndexList = new ArrayDeque<>();
+ for (int i = 0; i < valueSchemas.size(); i++) {
+ IMeasurementSchema schema = valueSchemas.get(i);
+ if (schema.getType().isBinary()) {
+ this.binaryTypeSortedColumnIndexList.add(i);
+ } else {
+ this.normalTypeSortedColumnIndexList.add(i);
+ }
+ }
+ this.batchSize = batchSize;
+ this.selectedColumnNum = 0;
+ }
+
+ public boolean hasNext() {
+ return selectedColumnNum < schemaList.size();
+ }
+
+ public void next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ selectColumnBatchToCompact();
+ }
+
+ public List<Integer> getSelectedColumnIndexList() {
+ return this.columnIndexList;
+ }
+
+ public List<IMeasurementSchema> getCurrentSelectedColumnSchemaList() {
+ return this.currentSelectedColumnSchemaList;
+ }
+
+ private void selectColumnBatchToCompact() {
+ // TODO: select batch by allocated memory and chunk size to perform more
strict memory control
+ this.columnIndexList = new ArrayList<>(batchSize);
+ this.currentSelectedColumnSchemaList = new ArrayList<>(batchSize);
+ while (!normalTypeSortedColumnIndexList.isEmpty()
+ || !binaryTypeSortedColumnIndexList.isEmpty()) {
+ Queue<Integer> sourceTypeSortedList =
+ binaryTypeSortedColumnIndexList.isEmpty()
+ ? normalTypeSortedColumnIndexList
+ : binaryTypeSortedColumnIndexList;
+ Integer columnIndex = sourceTypeSortedList.poll();
+ this.columnIndexList.add(columnIndex);
+
this.currentSelectedColumnSchemaList.add(this.schemaList.get(columnIndex));
+ selectedColumnNum++;
+ if (this.columnIndexList.size() >= batchSize) {
+ break;
+ }
+ }
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java
index 2404c99f114..61c85a4bd47 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionLastTimeCheckFailedException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils.AlignedSeriesBatchCompactionUtils;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.readchunk.loader.ChunkLoader;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.readchunk.loader.InstantChunkLoader;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.readchunk.loader.InstantPageLoader;
@@ -57,8 +58,6 @@ import org.apache.tsfile.write.schema.MeasurementSchema;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
@@ -96,6 +95,7 @@ public class ReadChunkAlignedSeriesCompactionExecutor {
this.targetResource = targetResource;
this.summary = summary;
collectValueColumnSchemaList();
+ fillAlignedChunkMetadataToMatchSchemaList();
int compactionFileLevel =
Integer.parseInt(this.targetResource.getTsFile().getName().split("-")[2]);
flushController = new
ReadChunkAlignedSeriesCompactionFlushController(compactionFileLevel);
@@ -120,10 +120,6 @@ public class ReadChunkAlignedSeriesCompactionExecutor {
flushController = new
ReadChunkAlignedSeriesCompactionFlushController(compactionFileLevel);
this.timeSchema = timeSchema;
this.schemaList = valueSchemaList;
- this.measurementSchemaListIndexMap = new HashMap<>();
- for (int i = 0; i < schemaList.size(); i++) {
- measurementSchemaListIndexMap.put(schemaList.get(i).getMeasurementId(),
i);
- }
this.chunkWriter = constructAlignedChunkWriter();
}
@@ -172,10 +168,18 @@ public class ReadChunkAlignedSeriesCompactionExecutor {
measurementSchemaMap.values().stream()
.sorted(Comparator.comparing(IMeasurementSchema::getMeasurementId))
.collect(Collectors.toList());
+ }
- this.measurementSchemaListIndexMap = new HashMap<>();
- for (int i = 0; i < schemaList.size(); i++) {
-
this.measurementSchemaListIndexMap.put(schemaList.get(i).getMeasurementId(), i);
+ private void fillAlignedChunkMetadataToMatchSchemaList() {
+ for (Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> pair :
readerAndChunkMetadataList) {
+ List<AlignedChunkMetadata> alignedChunkMetadataList = pair.getRight();
+ for (int i = 0; i < alignedChunkMetadataList.size(); i++) {
+ AlignedChunkMetadata alignedChunkMetadata =
alignedChunkMetadataList.get(i);
+ alignedChunkMetadataList.set(
+ i,
+
AlignedSeriesBatchCompactionUtils.fillAlignedChunkMetadataBySchemaList(
+ alignedChunkMetadata, schemaList));
+ }
}
}
@@ -210,17 +214,17 @@ public class ReadChunkAlignedSeriesCompactionExecutor {
throws IOException, PageException {
ChunkLoader timeChunk =
getChunkLoader(reader, (ChunkMetadata)
alignedChunkMetadata.getTimeChunkMetadata());
- List<ChunkLoader> valueChunks = Arrays.asList(new
ChunkLoader[schemaList.size()]);
- Collections.fill(valueChunks, getChunkLoader(reader, null));
+ List<ChunkLoader> valueChunks = new ArrayList<>(schemaList.size());
long pointNum = 0;
- for (IChunkMetadata chunkMetadata :
alignedChunkMetadata.getValueChunkMetadataList()) {
- if (chunkMetadata == null ||
!isValueChunkDataTypeMatchSchema(chunkMetadata)) {
+ for (int i = 0; i <
alignedChunkMetadata.getValueChunkMetadataList().size(); i++) {
+ IChunkMetadata chunkMetadata =
alignedChunkMetadata.getValueChunkMetadataList().get(i);
+ if (chunkMetadata == null
+ || !chunkMetadata.getDataType().equals(schemaList.get(i).getType()))
{
+ valueChunks.add(getChunkLoader(reader, null));
continue;
}
pointNum += chunkMetadata.getStatistics().getCount();
- ChunkLoader valueChunk = getChunkLoader(reader, (ChunkMetadata)
chunkMetadata);
- int idx =
measurementSchemaListIndexMap.get(chunkMetadata.getMeasurementUid());
- valueChunks.set(idx, valueChunk);
+ valueChunks.add(getChunkLoader(reader, (ChunkMetadata) chunkMetadata));
}
summary.increaseProcessPointNum(pointNum);
if (flushController.canFlushCurrentChunkWriter()) {
@@ -233,12 +237,6 @@ public class ReadChunkAlignedSeriesCompactionExecutor {
}
}
- private boolean isValueChunkDataTypeMatchSchema(IChunkMetadata
valueChunkMetadata) {
- String measurement = valueChunkMetadata.getMeasurementUid();
- IMeasurementSchema schema =
schemaList.get(measurementSchemaListIndexMap.get(measurement));
- return schema.getType() == valueChunkMetadata.getDataType();
- }
-
private ChunkLoader getChunkLoader(TsFileSequenceReader reader,
ChunkMetadata chunkMetadata)
throws IOException {
if (chunkMetadata == null || chunkMetadata.getStatistics().getCount() ==
0) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/BatchCompactionUtilsTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/BatchCompactionUtilsTest.java
index 813ee8cf1de..673f83f2a3d 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/BatchCompactionUtilsTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/BatchCompactionUtilsTest.java
@@ -61,9 +61,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
public class BatchCompactionUtilsTest extends AbstractCompactionTest {
@@ -88,11 +86,11 @@ public class BatchCompactionUtilsTest extends
AbstractCompactionTest {
measurementSchemas.add(new MeasurementSchema("s" + i,
TSDataType.INT32));
}
}
- Set<String> selectedColumns = new HashSet<>();
- List<IMeasurementSchema> iMeasurementSchemas =
- AlignedSeriesBatchCompactionUtils.selectColumnBatchToCompact(
- measurementSchemas, selectedColumns, 10);
- Assert.assertEquals(10, iMeasurementSchemas.size());
+ AlignedSeriesBatchCompactionUtils.BatchColumnSelection
batchColumnSelection =
+ new
AlignedSeriesBatchCompactionUtils.BatchColumnSelection(measurementSchemas, 10);
+ Assert.assertTrue(batchColumnSelection.hasNext());
+ batchColumnSelection.next();
+ Assert.assertEquals(10,
batchColumnSelection.getCurrentSelectedColumnSchemaList().size());
}
@Test