This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new aaff44910bd Pipe: aggregate tablets with different measurements under 
the same table before write into tsfile (#15372)
aaff44910bd is described below

commit aaff44910bdcc7fd4603692020fbbdd190906740
Author: VGalaxies <[email protected]>
AuthorDate: Mon Apr 21 18:04:50 2025 +0800

    Pipe: aggregate tablets with different measurements under the same table 
before write into tsfile (#15372)
---
 .../util/builder/PipeTableModeTsFileBuilder.java   | 82 +++++++++++++++++++++-
 1 file changed, 79 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
index b90f7b09580..20153bb1d67 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
@@ -25,10 +25,12 @@ import org.apache.commons.io.FileUtils;
 import org.apache.tsfile.exception.write.WriteProcessException;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.utils.BitMap;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.WriteUtils;
 import org.apache.tsfile.write.TsFileWriter;
 import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.record.Tablet.ColumnCategory;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,6 +38,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -48,6 +51,8 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 public class PipeTableModeTsFileBuilder extends PipeTsFileBuilder {
 
@@ -201,6 +206,76 @@ public class PipeTableModeTsFileBuilder extends 
PipeTsFileBuilder {
     return sealedFiles;
   }
 
+  private <T extends Pair<Tablet, List<Pair<IDeviceID, Integer>>>> T 
tryBestToAggregateTablets(
+      final LinkedList<T> tablets) {
+    if (tablets.isEmpty()) {
+      return null;
+    }
+
+    // Retrieve the first tablet to serve as the basis for the aggregation
+    final Pair<Tablet, List<Pair<IDeviceID, Integer>>> firstPair = 
tablets.peekFirst();
+    final Tablet firstTablet = firstPair.left;
+    final List<Pair<IDeviceID, Integer>> aggregationDeviceTimestampIndexList = 
firstPair.right;
+    final String aggregationTableName = firstTablet.getTableName();
+    final long[] aggregationTimestamps = firstTablet.getTimestamps();
+    final int aggregationRow = firstTablet.getRowSize();
+    final int aggregationMaxRow = firstTablet.getMaxRowNumber();
+
+    // Prepare lists to accumulate schemas, columnCategories, values, and 
bitMaps
+    final List<IMeasurementSchema> aggregatedSchemas = new ArrayList<>();
+    final List<ColumnCategory> aggregatedColumnCategories = new ArrayList<>();
+    final List<Object> aggregatedValues = new ArrayList<>();
+    final List<BitMap> aggregatedBitMaps = new ArrayList<>();
+
+    // Iterate and poll tablets from the head that satisfy the aggregation 
criteria
+    while (!tablets.isEmpty()) {
+      final Pair<Tablet, List<Pair<IDeviceID, Integer>>> pair = 
tablets.peekFirst();
+      final Tablet tablet = pair.left;
+      final List<Pair<IDeviceID, Integer>> deviceTimestampIndexList = 
pair.right;
+      if (Objects.equals(deviceTimestampIndexList, 
aggregationDeviceTimestampIndexList)
+          && Objects.equals(firstTablet.getTableName(), aggregationTableName)
+          && Arrays.equals(tablet.getTimestamps(), aggregationTimestamps)
+          && tablet.getRowSize() == aggregationRow
+          && tablet.getMaxRowNumber() == aggregationMaxRow) {
+        // Aggregate the current tablet's data
+        aggregatedSchemas.addAll(tablet.getSchemas());
+        aggregatedColumnCategories.addAll(tablet.getColumnTypes());
+        aggregatedValues.addAll(Arrays.asList(tablet.getValues()));
+        aggregatedBitMaps.addAll(Arrays.asList(tablet.getBitMaps()));
+        // Remove the aggregated tablet
+        tablets.pollFirst();
+      } else {
+        // Stop aggregating once a tablet does not meet the criteria
+        break;
+      }
+    }
+
+    // Remove duplicates from aggregatedSchemas, record the index of the first 
occurrence, and
+    // filter out the corresponding values in aggregatedValues and 
aggregatedBitMaps based on that
+    // index
+    final Set<IMeasurementSchema> seen = new HashSet<>();
+    final List<Integer> distinctIndices =
+        IntStream.range(0, aggregatedSchemas.size())
+            .filter(i -> seen.add(aggregatedSchemas.get(i))) // Only keep the 
first occurrence index
+            .boxed()
+            .collect(Collectors.toList());
+
+    // Construct a new aggregated Tablet using the deduplicated data
+    return (T)
+        new Pair<>(
+            new Tablet(
+                aggregationTableName,
+                
distinctIndices.stream().map(aggregatedSchemas::get).collect(Collectors.toList()),
+                distinctIndices.stream()
+                    .map(aggregatedColumnCategories::get)
+                    .collect(Collectors.toList()),
+                aggregationTimestamps,
+                distinctIndices.stream().map(aggregatedValues::get).toArray(),
+                
distinctIndices.stream().map(aggregatedBitMaps::get).toArray(BitMap[]::new),
+                aggregationRow),
+            aggregationDeviceTimestampIndexList);
+  }
+
   private <T extends Pair<Tablet, List<Pair<IDeviceID, Integer>>>>
       void tryBestToWriteTabletsIntoOneFile(final Set<LinkedList<T>> 
device2TabletsLinkedList)
           throws IOException {
@@ -218,7 +293,7 @@ public class PipeTableModeTsFileBuilder extends 
PipeTsFileBuilder {
       final Set<String> columnNames = new HashSet<>();
 
       while (!tablets.isEmpty()) {
-        final T pair = tablets.peekFirst();
+        final T pair = tryBestToAggregateTablets(tablets);
         if (timestampsAreNonOverlapping(
             (Pair<Tablet, List<Pair<IDeviceID, Integer>>>) pair, 
deviceLastTimestampMap)) {
           final Tablet tablet = pair.left;
@@ -237,10 +312,11 @@ public class PipeTableModeTsFileBuilder extends 
PipeTsFileBuilder {
           }
 
           tabletsToWrite.add(pair);
+          continue;
+        } else {
           // NOTE: mutating a LinkedList that lives inside a Set violates the 
contract that the
           // element’s hashCode must remain stable while it’s in the set
-          tablets.pollFirst();
-          continue;
+          tablets.addFirst(pair);
         }
         break;
       }

Reply via email to