This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/2.0.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5cb31e84dd40c756a269a3c75f022e30c029e077
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Aug 8 09:27:19 2025 +0800

    Pipe: Repair the table model construction TabletBatch process causing 
memory expansion (#16123)
    
    * Pipe: Repair the table model construction TabletBatch process causing 
memory expansion
    
    * add ut
    
    * update PipeTabletEventPlainBatch
    
    * update PipeTabletEventPlainBatch
    
    (cherry picked from commit 256f3e726520beaeb4d5cac527873930c9d77cb9)
---
 .../evolvable/batch/PipeTabletEventPlainBatch.java | 108 ++++++++++++++-
 .../pipe/sink/PipeTabletEventPlainBatchTest.java   | 147 +++++++++++++++++++++
 .../db/pipe/sink/PipeTabletEventSorterTest.java    |   4 +-
 3 files changed, 251 insertions(+), 8 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
index a68c5ae9602..0db905a6dc0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
@@ -28,16 +28,22 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BitMap;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.PublicBAOS;
 import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
 import org.apache.tsfile.write.record.Tablet;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.time.LocalDate;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -102,23 +108,28 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
       final String databaseName = insertTablets.getKey();
       for (final Map.Entry<String, Pair<Integer, List<Tablet>>> tabletEntry :
           insertTablets.getValue().entrySet()) {
-        final List<Tablet> batchTablets = new ArrayList<>();
+        // needCopyFlag and tablet
+        final List<Pair<Boolean, Tablet>> batchTablets = new ArrayList<>();
         for (final Tablet tablet : tabletEntry.getValue().getRight()) {
           boolean success = false;
-          for (final Tablet batchTablet : batchTablets) {
-            if (batchTablet.append(tablet, tabletEntry.getValue().getLeft())) {
+          for (final Pair<Boolean, Tablet> tabletPair : batchTablets) {
+            if (tabletPair.getLeft()) {
+              tabletPair.setRight(copyTablet(tabletPair.getRight()));
+              tabletPair.setLeft(Boolean.FALSE);
+            }
+            if (tabletPair.getRight().append(tablet, 
tabletEntry.getValue().getLeft())) {
               success = true;
               break;
             }
           }
           if (!success) {
-            batchTablets.add(tablet);
+            batchTablets.add(new Pair<>(Boolean.TRUE, tablet));
           }
         }
-        for (final Tablet batchTablet : batchTablets) {
+        for (final Pair<Boolean, Tablet> tabletPair : batchTablets) {
           try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
               final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
-            batchTablet.serialize(outputStream);
+            tabletPair.getRight().serialize(outputStream);
             ReadWriteIOUtils.write(true, outputStream);
             tabletBuffers.add(
                 ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size()));
@@ -214,4 +225,89 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
     currentBatch.getRight().add(tablet);
     return PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) + 4;
   }
+
+  public static Tablet copyTablet(final Tablet tablet) {
+    final Object[] copiedValues = new Object[tablet.getValues().length];
+    for (int i = 0; i < tablet.getValues().length; i++) {
+      if (tablet.getValues()[i] == null
+          || tablet.getSchemas() == null
+          || tablet.getSchemas().get(i) == null) {
+        continue;
+      }
+      copiedValues[i] =
+          copyValueList(
+              tablet.getValues()[i], tablet.getSchemas().get(i).getType(), 
tablet.getRowSize());
+    }
+
+    BitMap[] bitMaps = null;
+    if (tablet.getBitMaps() != null) {
+      bitMaps =
+          Arrays.stream(tablet.getBitMaps())
+              .map(
+                  bitMap -> {
+                    if (bitMap != null) {
+                      final byte[] data = bitMap.getByteArray();
+                      return new BitMap(bitMap.getSize(), Arrays.copyOf(data, 
data.length));
+                    }
+                    return null;
+                  })
+              .toArray(BitMap[]::new);
+    }
+
+    return new Tablet(
+        tablet.getTableName(),
+        new ArrayList<>(tablet.getSchemas()),
+        new ArrayList<>(tablet.getColumnTypes()),
+        Arrays.copyOf(tablet.getTimestamps(), tablet.getRowSize()),
+        copiedValues,
+        bitMaps,
+        tablet.getRowSize());
+  }
+
+  private static Object copyValueList(
+      final Object valueList, final TSDataType dataType, final int rowSize) {
+    switch (dataType) {
+      case BOOLEAN:
+        final boolean[] boolValues = (boolean[]) valueList;
+        final boolean[] copiedBoolValues = new boolean[rowSize];
+        System.arraycopy(boolValues, 0, copiedBoolValues, 0, rowSize);
+        return copiedBoolValues;
+      case INT32:
+        final int[] intValues = (int[]) valueList;
+        final int[] copiedIntValues = new int[rowSize];
+        System.arraycopy(intValues, 0, copiedIntValues, 0, rowSize);
+        return copiedIntValues;
+      case DATE:
+        final LocalDate[] dateValues = (LocalDate[]) valueList;
+        final LocalDate[] copiedDateValues = new LocalDate[rowSize];
+        System.arraycopy(dateValues, 0, copiedDateValues, 0, rowSize);
+        return copiedDateValues;
+      case INT64:
+      case TIMESTAMP:
+        final long[] longValues = (long[]) valueList;
+        final long[] copiedLongValues = new long[rowSize];
+        System.arraycopy(longValues, 0, copiedLongValues, 0, rowSize);
+        return copiedLongValues;
+      case FLOAT:
+        final float[] floatValues = (float[]) valueList;
+        final float[] copiedFloatValues = new float[rowSize];
+        System.arraycopy(floatValues, 0, copiedFloatValues, 0, rowSize);
+        return copiedFloatValues;
+      case DOUBLE:
+        final double[] doubleValues = (double[]) valueList;
+        final double[] copiedDoubleValues = new double[rowSize];
+        System.arraycopy(doubleValues, 0, copiedDoubleValues, 0, rowSize);
+        return copiedDoubleValues;
+      case TEXT:
+      case BLOB:
+      case STRING:
+        final Binary[] binaryValues = (Binary[]) valueList;
+        final Binary[] copiedBinaryValues = new Binary[rowSize];
+        System.arraycopy(binaryValues, 0, copiedBinaryValues, 0, rowSize);
+        return copiedBinaryValues;
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Data type %s is not supported.", dataType));
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeTabletEventPlainBatchTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeTabletEventPlainBatchTest.java
new file mode 100644
index 00000000000..1815c205b57
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeTabletEventPlainBatchTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink;
+
+import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventPlainBatch;
+
+import org.apache.tsfile.write.record.Tablet;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+public class PipeTabletEventPlainBatchTest {
+
+  @Test
+  public void constructTabletBatch() {
+    Tablet tablet = PipeTabletEventSorterTest.generateTablet("test", 10, true, 
true);
+
+    Tablet copyTablet = PipeTabletEventPlainBatch.copyTablet(tablet);
+
+    Assert.assertNotSame(tablet, copyTablet);
+    Assert.assertEquals(tablet.getRowSize(), copyTablet.getRowSize());
+
+    for (int i = 0; i < tablet.getSchemas().size(); i++) {
+      for (int j = 0; j < tablet.getRowSize(); j++) {
+        Assert.assertEquals(tablet.getValue(j, i), copyTablet.getValue(j, i));
+      }
+    }
+  }
+
+  @Test
+  public void constructTabletBatch1() {
+    Tablet tablet = PipeTabletEventSorterTest.generateTablet("test", 10, true, 
true);
+    tablet.append(PipeTabletEventSorterTest.generateTablet("test", 10, true, 
true));
+    Tablet copyTablet = PipeTabletEventPlainBatch.copyTablet(tablet);
+
+    Assert.assertNotSame(tablet, copyTablet);
+    Assert.assertEquals(tablet.getRowSize(), copyTablet.getRowSize());
+
+    for (int i = 0; i < tablet.getSchemas().size(); i++) {
+      for (int j = 0; j < tablet.getRowSize(); j++) {
+        Assert.assertEquals(tablet.getValue(j, i), copyTablet.getValue(j, i));
+      }
+    }
+  }
+
+  @Test
+  public void constructTabletBatch2() {
+    Tablet tablet = PipeTabletEventSorterTest.generateTablet("test", 10, true, 
true);
+    tablet.append(PipeTabletEventSorterTest.generateTablet("test", 10, true, 
true));
+
+    tablet = PipeTabletEventPlainBatch.copyTablet(tablet);
+    tablet.getBitMaps()[1].markAll();
+    tablet.getValues()[1] = null;
+
+    Tablet copyTablet = PipeTabletEventPlainBatch.copyTablet(tablet);
+
+    Assert.assertNotSame(tablet, copyTablet);
+    Assert.assertEquals(tablet.getRowSize(), copyTablet.getRowSize());
+
+    for (int i = 0; i < tablet.getSchemas().size(); i++) {
+      if (i == 1) {
+        Assert.assertTrue(tablet.getBitMaps()[i].isAllMarked());
+        Assert.assertNull(copyTablet.getValues()[1]);
+        continue;
+      }
+
+      for (int j = 0; j < tablet.getRowSize(); j++) {
+        Assert.assertEquals(tablet.getValue(j, i), copyTablet.getValue(j, i));
+      }
+    }
+  }
+
+  @Test
+  public void constructTabletBatch3() {
+    Tablet tablet = PipeTabletEventSorterTest.generateTablet("test", 10, true, 
true);
+    tablet.append(PipeTabletEventSorterTest.generateTablet("test", 10, true, 
true));
+
+    tablet = PipeTabletEventPlainBatch.copyTablet(tablet);
+    tablet.getBitMaps()[1] = null;
+
+    Tablet copyTablet = PipeTabletEventPlainBatch.copyTablet(tablet);
+
+    Assert.assertNotSame(tablet, copyTablet);
+    Assert.assertEquals(tablet.getRowSize(), copyTablet.getRowSize());
+
+    for (int i = 0; i < tablet.getSchemas().size(); i++) {
+      if (i == 1) {
+        Assert.assertNull(tablet.getBitMaps()[i]);
+        continue;
+      }
+
+      for (int j = 0; j < tablet.getRowSize(); j++) {
+        Assert.assertEquals(tablet.getValue(j, i), copyTablet.getValue(j, i));
+      }
+    }
+  }
+
+  @Test
+  public void constructTabletBatch4() {
+    Tablet tablet = PipeTabletEventSorterTest.generateTablet("test", 10, true, 
true);
+    tablet.append(PipeTabletEventSorterTest.generateTablet("test", 10, true, 
true));
+
+    List<Integer> rowIndices = new ArrayList<>(tablet.getSchemas().size());
+    Random random = new Random();
+    for (int i = 0; i < tablet.getSchemas().size(); i++) {
+      int r = random.nextInt(tablet.getRowSize());
+      rowIndices.add(r);
+      tablet.addValue(tablet.getSchemas().get(i).getMeasurementName(), r, 
null);
+    }
+
+    Tablet copyTablet = PipeTabletEventPlainBatch.copyTablet(tablet);
+
+    Assert.assertNotSame(tablet, copyTablet);
+    Assert.assertEquals(tablet.getRowSize(), copyTablet.getRowSize());
+
+    for (int i = 0; i < tablet.getSchemas().size(); i++) {
+      for (int j = 0; j < tablet.getRowSize(); j++) {
+        if (rowIndices.get(i) == j) {
+          Assert.assertTrue(tablet.getBitMaps()[i].isMarked(j));
+          Assert.assertNull(tablet.getValue(j, i));
+          continue;
+        }
+        Assert.assertEquals(tablet.getValue(j, i), copyTablet.getValue(j, i));
+      }
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeTabletEventSorterTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeTabletEventSorterTest.java
index 4e9a9a95005..eb5ffd475d3 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeTabletEventSorterTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeTabletEventSorterTest.java
@@ -277,7 +277,7 @@ public class PipeTabletEventSorterTest {
     }
   }
 
-  private Tablet generateTablet(
+  static Tablet generateTablet(
       final String tableName,
       final int deviceIDNum,
       final boolean hasDuplicates,
@@ -389,7 +389,7 @@ public class PipeTabletEventSorterTest {
     return tablet;
   }
 
-  public LocalDate getDate(final int value) {
+  public static LocalDate getDate(final int value) {
     Date date = new Date(value);
     SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
     try {

Reply via email to