This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new aaff44910bd Pipe: aggregate tablets with different measurements under
the same table before write into tsfile (#15372)
aaff44910bd is described below
commit aaff44910bdcc7fd4603692020fbbdd190906740
Author: VGalaxies <[email protected]>
AuthorDate: Mon Apr 21 18:04:50 2025 +0800
Pipe: aggregate tablets with different measurements under the same table
before write into tsfile (#15372)
---
.../util/builder/PipeTableModeTsFileBuilder.java | 82 +++++++++++++++++++++-
1 file changed, 79 insertions(+), 3 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
index b90f7b09580..20153bb1d67 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
@@ -25,10 +25,12 @@ import org.apache.commons.io.FileUtils;
import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.WriteUtils;
import org.apache.tsfile.write.TsFileWriter;
import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.record.Tablet.ColumnCategory;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,6 +38,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -48,6 +51,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
public class PipeTableModeTsFileBuilder extends PipeTsFileBuilder {
@@ -201,6 +206,76 @@ public class PipeTableModeTsFileBuilder extends
PipeTsFileBuilder {
return sealedFiles;
}
+ private <T extends Pair<Tablet, List<Pair<IDeviceID, Integer>>>> T
tryBestToAggregateTablets(
+ final LinkedList<T> tablets) {
+ if (tablets.isEmpty()) {
+ return null;
+ }
+
+ // Retrieve the first tablet to serve as the basis for the aggregation
+ final Pair<Tablet, List<Pair<IDeviceID, Integer>>> firstPair =
tablets.peekFirst();
+ final Tablet firstTablet = firstPair.left;
+ final List<Pair<IDeviceID, Integer>> aggregationDeviceTimestampIndexList =
firstPair.right;
+ final String aggregationTableName = firstTablet.getTableName();
+ final long[] aggregationTimestamps = firstTablet.getTimestamps();
+ final int aggregationRow = firstTablet.getRowSize();
+ final int aggregationMaxRow = firstTablet.getMaxRowNumber();
+
+ // Prepare lists to accumulate schemas, columnCategories, values, and
bitMaps
+ final List<IMeasurementSchema> aggregatedSchemas = new ArrayList<>();
+ final List<ColumnCategory> aggregatedColumnCategories = new ArrayList<>();
+ final List<Object> aggregatedValues = new ArrayList<>();
+ final List<BitMap> aggregatedBitMaps = new ArrayList<>();
+
+ // Iterate and poll tablets from the head that satisfy the aggregation
criteria
+ while (!tablets.isEmpty()) {
+ final Pair<Tablet, List<Pair<IDeviceID, Integer>>> pair =
tablets.peekFirst();
+ final Tablet tablet = pair.left;
+ final List<Pair<IDeviceID, Integer>> deviceTimestampIndexList =
pair.right;
+ if (Objects.equals(deviceTimestampIndexList,
aggregationDeviceTimestampIndexList)
+ && Objects.equals(firstTablet.getTableName(), aggregationTableName)
+ && Arrays.equals(tablet.getTimestamps(), aggregationTimestamps)
+ && tablet.getRowSize() == aggregationRow
+ && tablet.getMaxRowNumber() == aggregationMaxRow) {
+ // Aggregate the current tablet's data
+ aggregatedSchemas.addAll(tablet.getSchemas());
+ aggregatedColumnCategories.addAll(tablet.getColumnTypes());
+ aggregatedValues.addAll(Arrays.asList(tablet.getValues()));
+ aggregatedBitMaps.addAll(Arrays.asList(tablet.getBitMaps()));
+ // Remove the aggregated tablet
+ tablets.pollFirst();
+ } else {
+ // Stop aggregating once a tablet does not meet the criteria
+ break;
+ }
+ }
+
+ // Remove duplicates from aggregatedSchemas, record the index of the first
occurrence, and
+ // filter out the corresponding values in aggregatedValues and
aggregatedBitMaps based on that
+ // index
+ final Set<IMeasurementSchema> seen = new HashSet<>();
+ final List<Integer> distinctIndices =
+ IntStream.range(0, aggregatedSchemas.size())
+ .filter(i -> seen.add(aggregatedSchemas.get(i))) // Only keep the
first occurrence index
+ .boxed()
+ .collect(Collectors.toList());
+
+ // Construct a new aggregated Tablet using the deduplicated data
+ return (T)
+ new Pair<>(
+ new Tablet(
+ aggregationTableName,
+
distinctIndices.stream().map(aggregatedSchemas::get).collect(Collectors.toList()),
+ distinctIndices.stream()
+ .map(aggregatedColumnCategories::get)
+ .collect(Collectors.toList()),
+ aggregationTimestamps,
+ distinctIndices.stream().map(aggregatedValues::get).toArray(),
+
distinctIndices.stream().map(aggregatedBitMaps::get).toArray(BitMap[]::new),
+ aggregationRow),
+ aggregationDeviceTimestampIndexList);
+ }
+
private <T extends Pair<Tablet, List<Pair<IDeviceID, Integer>>>>
void tryBestToWriteTabletsIntoOneFile(final Set<LinkedList<T>>
device2TabletsLinkedList)
throws IOException {
@@ -218,7 +293,7 @@ public class PipeTableModeTsFileBuilder extends
PipeTsFileBuilder {
final Set<String> columnNames = new HashSet<>();
while (!tablets.isEmpty()) {
- final T pair = tablets.peekFirst();
+ final T pair = tryBestToAggregateTablets(tablets);
if (timestampsAreNonOverlapping(
(Pair<Tablet, List<Pair<IDeviceID, Integer>>>) pair,
deviceLastTimestampMap)) {
final Tablet tablet = pair.left;
@@ -237,10 +312,11 @@ public class PipeTableModeTsFileBuilder extends
PipeTsFileBuilder {
}
tabletsToWrite.add(pair);
+ continue;
+ } else {
// NOTE: mutating a LinkedList that lives inside a Set violates the
contract that the
// element’s hashCode must remain stable while it’s in the set
- tablets.pollFirst();
- continue;
+ tablets.addFirst(pair);
}
break;
}