This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/2.0.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1244ac1ccbf36ed6c5a92f73c1531d2be7dc094e
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jul 31 20:00:21 2025 +0800

    Pipe: Optimized the table model writing latency by batching & Fixed the NPE 
caused by tablet event sorting
    
    (cherry picked from commit 6fd3870b90a0b92487b6bdd59dacdc7db59e87a8)
---
 .../evolvable/batch/PipeTabletEventPlainBatch.java | 130 ++++++++++++++-------
 .../request/PipeTransferTabletBatchReqV2.java      |  48 +++++++-
 .../sink/util/sorter/PipeTabletEventSorter.java    |   2 +-
 3 files changed, 137 insertions(+), 43 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
index 31b10736499..a68c5ae9602 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
@@ -22,14 +22,17 @@ package 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBatchReqV2;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
-import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.apache.tsfile.write.record.Tablet;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -38,7 +41,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
 
@@ -51,13 +54,13 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
   private final List<String> insertNodeDataBases = new ArrayList<>();
   private final List<String> tabletDataBases = new ArrayList<>();
 
+  // database -> tableName -> Pair<size, tablets to batch>
+  private final Map<String, Map<String, Pair<Integer, List<Tablet>>>> 
tableModelTabletMap =
+      new HashMap<>();
+
   // Used to rate limit when transferring data
   private final Map<Pair<String, Long>, Long> pipe2BytesAccumulated = new 
HashMap<>();
 
-  PipeTabletEventPlainBatch(final int maxDelayInMs, final long 
requestMaxBatchSizeInBytes) {
-    super(maxDelayInMs, requestMaxBatchSizeInBytes, null);
-  }
-
   PipeTabletEventPlainBatch(
       final int maxDelayInMs,
       final long requestMaxBatchSizeInBytes,
@@ -66,9 +69,8 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
   }
 
   @Override
-  protected boolean constructBatch(final TabletInsertionEvent event)
-      throws WALPipeException, IOException {
-    final int bufferSize = buildTabletInsertionBuffer(event);
+  protected boolean constructBatch(final TabletInsertionEvent event) throws 
IOException {
+    final long bufferSize = buildTabletInsertionBuffer(event);
     totalBufferSize += bufferSize;
     pipe2BytesAccumulated.compute(
         new Pair<>(
@@ -89,11 +91,45 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
     binaryDataBases.clear();
     insertNodeDataBases.clear();
     tabletDataBases.clear();
+    tableModelTabletMap.clear();
 
     pipe2BytesAccumulated.clear();
   }
 
   public PipeTransferTabletBatchReqV2 toTPipeTransferReq() throws IOException {
+    for (final Map.Entry<String, Map<String, Pair<Integer, List<Tablet>>>> 
insertTablets :
+        tableModelTabletMap.entrySet()) {
+      final String databaseName = insertTablets.getKey();
+      for (final Map.Entry<String, Pair<Integer, List<Tablet>>> tabletEntry :
+          insertTablets.getValue().entrySet()) {
+        final List<Tablet> batchTablets = new ArrayList<>();
+        for (final Tablet tablet : tabletEntry.getValue().getRight()) {
+          boolean success = false;
+          for (final Tablet batchTablet : batchTablets) {
+            if (batchTablet.append(tablet, tabletEntry.getValue().getLeft())) {
+              success = true;
+              break;
+            }
+          }
+          if (!success) {
+            batchTablets.add(tablet);
+          }
+        }
+        for (final Tablet batchTablet : batchTablets) {
+          try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+              final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+            batchTablet.serialize(outputStream);
+            ReadWriteIOUtils.write(true, outputStream);
+            tabletBuffers.add(
+                ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size()));
+          }
+          tabletDataBases.add(databaseName);
+        }
+      }
+    }
+
+    tableModelTabletMap.clear();
+
     return PipeTransferTabletBatchReqV2.toTPipeTransferReq(
         binaryBuffers,
         insertNodeBuffers,
@@ -111,57 +147,71 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
     return pipe2BytesAccumulated;
   }
 
-  private int buildTabletInsertionBuffer(final TabletInsertionEvent event)
-      throws IOException, WALPipeException {
-    int databaseEstimateSize = 0;
+  private long buildTabletInsertionBuffer(final TabletInsertionEvent event) 
throws IOException {
+    long estimateSize = 0;
     final ByteBuffer buffer;
     if (event instanceof PipeInsertNodeTabletInsertionEvent) {
       final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent =
           (PipeInsertNodeTabletInsertionEvent) event;
-      // Read the bytebuffer from the wal file and transfer it directly 
without serializing or
-      // deserializing if possible
       final InsertNode insertNode = 
pipeInsertNodeTabletInsertionEvent.getInsertNode();
-      if (Objects.isNull(insertNode)) {
-        buffer = pipeInsertNodeTabletInsertionEvent.getByteBuffer();
-        binaryBuffers.add(buffer);
-        if (pipeInsertNodeTabletInsertionEvent.isTableModelEvent()) {
-          databaseEstimateSize =
-              
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName().length();
-          
binaryDataBases.add(pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName());
-        } else {
-          databaseEstimateSize = 4;
-          binaryDataBases.add(TREE_MODEL_DATABASE_PLACEHOLDER);
-        }
-      } else {
+      if (!(insertNode instanceof RelationalInsertTabletNode)) {
         buffer = insertNode.serializeToByteBuffer();
         insertNodeBuffers.add(buffer);
         if (pipeInsertNodeTabletInsertionEvent.isTableModelEvent()) {
-          databaseEstimateSize =
-              
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName().length();
+          estimateSize =
+              RamUsageEstimator.sizeOf(
+                  
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName());
           
insertNodeDataBases.add(pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName());
         } else {
-          databaseEstimateSize = 4;
+          estimateSize = 4;
           insertNodeDataBases.add(TREE_MODEL_DATABASE_PLACEHOLDER);
         }
+        estimateSize += buffer.limit();
+      } else {
+        for (final Tablet tablet :
+            ((PipeInsertNodeTabletInsertionEvent) event).convertToTablets()) {
+          estimateSize +=
+              constructTabletBatch(
+                  tablet, 
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName());
+        }
       }
     } else {
       final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
           (PipeRawTabletInsertionEvent) event;
-      try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
-          final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
-        pipeRawTabletInsertionEvent.convertToTablet().serialize(outputStream);
-        ReadWriteIOUtils.write(pipeRawTabletInsertionEvent.isAligned(), 
outputStream);
-        buffer = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
-      }
-      tabletBuffers.add(buffer);
       if (pipeRawTabletInsertionEvent.isTableModelEvent()) {
-        databaseEstimateSize = 
pipeRawTabletInsertionEvent.getTableModelDatabaseName().length();
-        
tabletDataBases.add(pipeRawTabletInsertionEvent.getTableModelDatabaseName());
+        estimateSize =
+            constructTabletBatch(
+                pipeRawTabletInsertionEvent.convertToTablet(),
+                pipeRawTabletInsertionEvent.getTableModelDatabaseName());
       } else {
-        databaseEstimateSize = 4;
+        try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+            final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+          
pipeRawTabletInsertionEvent.convertToTablet().serialize(outputStream);
+          ReadWriteIOUtils.write(pipeRawTabletInsertionEvent.isAligned(), 
outputStream);
+          buffer = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+        }
+        estimateSize = 4 + buffer.limit();
+        tabletBuffers.add(buffer);
         tabletDataBases.add(TREE_MODEL_DATABASE_PLACEHOLDER);
       }
     }
-    return buffer.limit() + databaseEstimateSize;
+
+    return estimateSize;
+  }
+
+  private long constructTabletBatch(final Tablet tablet, final String 
databaseName) {
+    final AtomicLong size = new AtomicLong(0);
+    final Pair<Integer, List<Tablet>> currentBatch =
+        tableModelTabletMap
+            .computeIfAbsent(
+                databaseName,
+                k -> {
+                  size.addAndGet(RamUsageEstimator.sizeOf(databaseName));
+                  return new HashMap<>();
+                })
+            .computeIfAbsent(tablet.getTableName(), k -> new Pair<>(0, new 
ArrayList<>()));
+    currentBatch.setLeft(currentBatch.getLeft() + tablet.getRowSize());
+    currentBatch.getRight().add(tablet);
+    return PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) + 4;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
index b7d7d44db85..d9c3fabfae8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
@@ -39,7 +39,9 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 public class PipeTransferTabletBatchReqV2 extends TPipeTransferReq {
@@ -62,6 +64,8 @@ public class PipeTransferTabletBatchReqV2 extends 
TPipeTransferReq {
 
     final List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
     final List<InsertTabletStatement> insertTabletStatementList = new 
ArrayList<>();
+    final Map<String, List<InsertRowStatement>> 
tableModelDatabaseInsertRowStatementMap =
+        new HashMap<>();
 
     for (final PipeTransferTabletBinaryReqV2 binaryReq : binaryReqs) {
       final InsertBaseStatement statement = binaryReq.constructStatement();
@@ -69,7 +73,22 @@ public class PipeTransferTabletBatchReqV2 extends 
TPipeTransferReq {
         continue;
       }
       if (statement.isWriteToTable()) {
-        statements.add(statement);
+        if (statement instanceof InsertRowStatement) {
+          tableModelDatabaseInsertRowStatementMap
+              .computeIfAbsent(statement.getDatabaseName().get(), k -> new 
ArrayList<>())
+              .add((InsertRowStatement) statement);
+        } else if (statement instanceof InsertTabletStatement) {
+          statements.add(statement);
+        } else if (statement instanceof InsertRowsStatement) {
+          tableModelDatabaseInsertRowStatementMap
+              .computeIfAbsent(statement.getDatabaseName().get(), k -> new 
ArrayList<>())
+              .addAll(((InsertRowsStatement) 
statement).getInsertRowStatementList());
+        } else {
+          throw new UnsupportedOperationException(
+              String.format(
+                  "unknown InsertBaseStatement %s constructed from 
PipeTransferTabletBinaryReqV2.",
+                  binaryReq));
+        }
         continue;
       }
       if (statement instanceof InsertRowStatement) {
@@ -93,7 +112,22 @@ public class PipeTransferTabletBatchReqV2 extends 
TPipeTransferReq {
         continue;
       }
       if (statement.isWriteToTable()) {
-        statements.add(statement);
+        if (statement instanceof InsertRowStatement) {
+          tableModelDatabaseInsertRowStatementMap
+              .computeIfAbsent(statement.getDatabaseName().get(), k -> new 
ArrayList<>())
+              .add((InsertRowStatement) statement);
+        } else if (statement instanceof InsertTabletStatement) {
+          statements.add(statement);
+        } else if (statement instanceof InsertRowsStatement) {
+          tableModelDatabaseInsertRowStatementMap
+              .computeIfAbsent(statement.getDatabaseName().get(), k -> new 
ArrayList<>())
+              .addAll(((InsertRowsStatement) 
statement).getInsertRowStatementList());
+        } else {
+          throw new UnsupportedOperationException(
+              String.format(
+                  "unknown InsertBaseStatement %s constructed from 
PipeTransferTabletBinaryReqV2.",
+                  insertNodeReq));
+        }
         continue;
       }
       if (statement instanceof InsertRowStatement) {
@@ -131,6 +165,16 @@ public class PipeTransferTabletBatchReqV2 extends 
TPipeTransferReq {
     if (!insertMultiTabletsStatement.isEmpty()) {
       statements.add(insertMultiTabletsStatement);
     }
+
+    for (final Map.Entry<String, List<InsertRowStatement>> insertRows :
+        tableModelDatabaseInsertRowStatementMap.entrySet()) {
+      final InsertRowsStatement statement = new InsertRowsStatement();
+      statement.setWriteToTable(true);
+      statement.setDatabaseName(insertRows.getKey());
+      statement.setInsertRowStatementList(insertRows.getValue());
+      statements.add(statement);
+    }
+
     return statements;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java
index 20847173ffc..c9857c9eaba 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java
@@ -153,7 +153,7 @@ public class PipeTabletEventSorter {
   private int getLastNonnullIndex(
       final int i, final BitMap originalBitMap, final BitMap 
deDuplicatedBitMap) {
     if (deDuplicatedIndex == null) {
-      if (originalBitMap.isMarked(index[i])) {
+      if (originalBitMap != null && originalBitMap.isMarked(index[i])) {
         deDuplicatedBitMap.mark(i);
       }
       return index[i];

Reply via email to