This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 17e73ee7126 [To dev/1.3] [Pipe] Optimize memory usage (#17775)
17e73ee7126 is described below

commit 17e73ee71265a67abc60293bd50a6033c9c09aed
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 27 16:36:00 2026 +0800

    [To dev/1.3] [Pipe] Optimize memory usage (#17775)
---
 .../iotdb/db/pipe/event/common/row/PipeRow.java    |   5 +-
 .../db/pipe/event/common/row/PipeRowCollector.java |  25 ++-
 .../common/tablet/PipeRawTabletInsertionEvent.java |   1 +
 .../pipe/event/common/tablet/PipeTabletUtils.java  | 245 +++++++++++++++++++++
 .../tablet/TabletInsertionDataContainer.java       |  58 ++---
 .../query/TsFileInsertionQueryDataContainer.java   |   5 +-
 .../TsFileInsertionQueryDataTabletIterator.java    |  45 ++--
 .../scan/TsFileInsertionScanDataContainer.java     | 105 +++++----
 .../pipe/resource/memory/PipeMemoryWeightUtil.java |  12 +-
 .../batch/PipeTabletEventTsFileBatch.java          |   5 +-
 .../request/PipeTransferTabletBatchReq.java        |   6 +-
 .../request/PipeTransferTabletRawReq.java          |  19 +-
 .../sink/protocol/opcua/server/OpcUaNameSpace.java |   5 +-
 .../db/pipe/sink/util/PipeTabletEventSorter.java   |   8 +
 .../planner/plan/node/write/InsertTabletNode.java  |  21 +-
 .../plan/statement/crud/InsertTabletStatement.java |   3 +-
 .../rescon/quotas/DefaultOperationQuota.java       |  28 ++-
 .../org/apache/iotdb/db/utils/BitMapUtils.java     |  46 ++++
 .../pipe/event/PipeTabletInsertionEventTest.java   |  36 ++-
 .../event/common/tablet/PipeTabletUtilsTest.java   |  72 ++++++
 .../pipe/sink/PipeDataNodeThriftRequestTest.java   |  40 ++++
 .../planner/node/write/WritePlanNodeSplitTest.java |  10 +
 .../rescon/quotas/DefaultOperationQuotaTest.java   |  64 ++++++
 23 files changed, 727 insertions(+), 137 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
index 33bf0a5925c..a8579b32bb7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
@@ -152,7 +152,10 @@ public class PipeRow implements Row {
 
   @Override
   public boolean isNull(final int columnIndex) {
-    return bitMaps[columnIndex].isMarked(rowIndex);
+    return bitMaps != null
+        && columnIndex < bitMaps.length
+        && bitMaps[columnIndex] != null
+        && bitMaps[columnIndex].isMarked(rowIndex);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
index 42560f23a5c..c26b05f6756 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletEventConverter;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
@@ -66,27 +67,26 @@ public class PipeRowCollector extends 
PipeRawTabletEventConverter implements Row
       Pair<Integer, Integer> rowCountAndMemorySize =
           PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(pipeRow);
       tablet = new Tablet(deviceId, measurementSchemaList, 
rowCountAndMemorySize.getLeft());
-      tablet.initBitMaps();
       isAligned = pipeRow.isAligned();
     }
 
     final int rowIndex = tablet.rowSize;
-    tablet.addTimestamp(rowIndex, row.getTime());
+    PipeTabletUtils.putTimestamp(tablet, rowIndex, row.getTime());
     for (int i = 0; i < row.size(); i++) {
       final Object value = row.getObject(i);
-      if (value instanceof org.apache.iotdb.pipe.api.type.Binary) {
-        tablet.addValue(
-            measurementSchemaArray[i].getMeasurementId(),
-            rowIndex,
-            
PipeBinaryTransformer.transformToBinary((org.apache.iotdb.pipe.api.type.Binary) 
value));
-      } else {
-        tablet.addValue(measurementSchemaArray[i].getMeasurementId(), 
rowIndex, value);
-      }
+      PipeTabletUtils.putValue(
+          tablet,
+          rowIndex,
+          i,
+          measurementSchemaArray[i].getType(),
+          value instanceof org.apache.iotdb.pipe.api.type.Binary
+              ? PipeBinaryTransformer.transformToBinary(
+                  (org.apache.iotdb.pipe.api.type.Binary) value)
+              : value);
       if (row.isNull(i)) {
-        tablet.bitMaps[i].mark(rowIndex);
+        PipeTabletUtils.markNullValue(tablet, rowIndex, i);
       }
     }
-    tablet.rowSize++;
 
     if (tablet.rowSize == tablet.getMaxRowNumber()) {
       collectTabletInsertionEvent();
@@ -95,6 +95,7 @@ public class PipeRowCollector extends 
PipeRawTabletEventConverter implements Row
 
   private void collectTabletInsertionEvent() {
     if (tablet != null) {
+      PipeTabletUtils.compactBitMaps(tablet);
       tabletInsertionEventList.add(
           new PipeRawTabletInsertionEvent(
               tablet,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index d322291934f..261e3c7a8b3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -323,6 +323,7 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent
 
   public Tablet convertToTablet() {
     if (!shouldParseTimeOrPattern()) {
+      PipeTabletUtils.compactBitMaps(tablet);
       return tablet;
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java
new file mode 100644
index 00000000000..0a6b073b5b6
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tablet;
+
+import org.apache.iotdb.db.utils.BitMapUtils;
+
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+
+import java.time.LocalDate;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public final class PipeTabletUtils {
+
+  private PipeTabletUtils() {}
+
+  public static final class TabletStringInternPool {
+
+    private final Map<String, String> internedStrings = new HashMap<>();
+
+    public String intern(final String value) {
+      if (Objects.isNull(value)) {
+        return null;
+      }
+
+      final String internedValue = internedStrings.get(value);
+      if (Objects.nonNull(internedValue)) {
+        return internedValue;
+      }
+
+      internedStrings.put(value, value);
+      return value;
+    }
+
+    public void intern(final String[] values) {
+      if (Objects.isNull(values)) {
+        return;
+      }
+
+      for (int i = 0; i < values.length; ++i) {
+        values[i] = intern(values[i]);
+      }
+    }
+
+    public void intern(final List<String> values) {
+      if (Objects.isNull(values)) {
+        return;
+      }
+
+      for (int i = 0; i < values.size(); ++i) {
+        values.set(i, intern(values.get(i)));
+      }
+    }
+
+    public Tablet intern(final Tablet tablet) {
+      if (Objects.isNull(tablet)) {
+        return null;
+      }
+
+      tablet.setDeviceId(intern(tablet.deviceId));
+      internMeasurementSchemas(tablet.getSchemas());
+      return tablet;
+    }
+
+    public void internMeasurementSchemas(final List<MeasurementSchema> 
schemas) {
+      if (Objects.isNull(schemas)) {
+        return;
+      }
+
+      for (final MeasurementSchema schema : schemas) {
+        intern(schema);
+      }
+    }
+
+    public MeasurementSchema intern(final MeasurementSchema schema) {
+      if (Objects.isNull(schema)) {
+        return null;
+      }
+
+      schema.setMeasurementId(intern(schema.getMeasurementId()));
+      schema.setProps(intern(schema.getProps()));
+      return schema;
+    }
+
+    private Map<String, String> intern(final Map<String, String> props) {
+      if (Objects.isNull(props) || props.isEmpty()) {
+        return props;
+      }
+
+      final Map<String, String> internedProps = new HashMap<>(props.size());
+      for (final Map.Entry<String, String> entry : props.entrySet()) {
+        internedProps.put(intern(entry.getKey()), intern(entry.getValue()));
+      }
+      return internedProps;
+    }
+  }
+
+  public static Tablet internTablet(
+      final Tablet tablet, final TabletStringInternPool 
tabletStringInternPool) {
+    return Objects.nonNull(tabletStringInternPool) ? 
tabletStringInternPool.intern(tablet) : tablet;
+  }
+
+  public static void compactBitMaps(final Tablet tablet) {
+    if (Objects.isNull(tablet)) {
+      return;
+    }
+    tablet.bitMaps = compactBitMaps(tablet.bitMaps, tablet.rowSize);
+  }
+
+  public static BitMap[] compactBitMaps(final BitMap[] bitMaps, final int 
rowCount) {
+    return BitMapUtils.compactBitMaps(bitMaps, rowCount);
+  }
+
+  public static BitMap[] copyBitMapsOrCreateEmpty(final Tablet tablet) {
+    final BitMap[] bitMaps = tablet.bitMaps;
+    return Objects.nonNull(bitMaps)
+        ? Arrays.copyOf(bitMaps, bitMaps.length)
+        : new BitMap[getColumnCount(tablet)];
+  }
+
+  public static void markNullValue(final Tablet tablet, final int rowIndex, 
final int columnIndex) {
+    final BitMap[] bitMaps = ensureBitMaps(tablet, columnIndex + 1);
+    if (Objects.isNull(bitMaps[columnIndex])) {
+      bitMaps[columnIndex] = new BitMap(tablet.getMaxRowNumber());
+    }
+    bitMaps[columnIndex].mark(rowIndex);
+  }
+
+  public static void putTimestamp(final Tablet tablet, final int rowIndex, 
final long timestamp) {
+    tablet.timestamps[rowIndex] = timestamp;
+    tablet.rowSize = Math.max(tablet.rowSize, rowIndex + 1);
+  }
+
+  public static void putValue(
+      final Tablet tablet,
+      final int rowIndex,
+      final int columnIndex,
+      final TSDataType dataType,
+      final Object value) {
+    switch (dataType) {
+      case BOOLEAN:
+        ((boolean[]) tablet.values[columnIndex])[rowIndex] = (Boolean) value;
+        break;
+      case INT32:
+        ((int[]) tablet.values[columnIndex])[rowIndex] = (Integer) value;
+        break;
+      case DATE:
+        ((LocalDate[]) tablet.values[columnIndex])[rowIndex] = (LocalDate) 
value;
+        break;
+      case INT64:
+      case TIMESTAMP:
+        ((long[]) tablet.values[columnIndex])[rowIndex] = (Long) value;
+        break;
+      case FLOAT:
+        ((float[]) tablet.values[columnIndex])[rowIndex] = (Float) value;
+        break;
+      case DOUBLE:
+        ((double[]) tablet.values[columnIndex])[rowIndex] = (Double) value;
+        break;
+      case TEXT:
+      case BLOB:
+      case STRING:
+        ((Binary[]) tablet.values[columnIndex])[rowIndex] = toBinary(value);
+        break;
+      default:
+        throw new UnSupportedDataTypeException("Unsupported data type: " + 
dataType);
+    }
+    unmarkNullValue(tablet, rowIndex, columnIndex);
+  }
+
+  private static void unmarkNullValue(
+      final Tablet tablet, final int rowIndex, final int columnIndex) {
+    final BitMap[] bitMaps = tablet.bitMaps;
+    if (Objects.nonNull(bitMaps)
+        && columnIndex < bitMaps.length
+        && Objects.nonNull(bitMaps[columnIndex])) {
+      bitMaps[columnIndex].unmark(rowIndex);
+    }
+  }
+
+  private static BitMap[] ensureBitMaps(final Tablet tablet, final int 
minColumnCount) {
+    final int columnCount = Math.max(getColumnCount(tablet), minColumnCount);
+    BitMap[] bitMaps = tablet.bitMaps;
+    if (Objects.isNull(bitMaps)) {
+      bitMaps = new BitMap[columnCount];
+      tablet.bitMaps = bitMaps;
+    } else if (bitMaps.length < columnCount) {
+      final BitMap[] expandedBitMaps = new BitMap[columnCount];
+      System.arraycopy(bitMaps, 0, expandedBitMaps, 0, bitMaps.length);
+      bitMaps = expandedBitMaps;
+      tablet.bitMaps = bitMaps;
+    }
+    return bitMaps;
+  }
+
+  private static int getColumnCount(final Tablet tablet) {
+    if (Objects.nonNull(tablet.getSchemas())) {
+      return tablet.getSchemas().size();
+    }
+    return Objects.nonNull(tablet.values) ? tablet.values.length : 0;
+  }
+
+  private static Binary toBinary(final Object value) {
+    if (Objects.isNull(value)) {
+      return Binary.EMPTY_VALUE;
+    }
+    if (value instanceof Binary) {
+      return (Binary) value;
+    }
+    if (value instanceof byte[]) {
+      return new Binary((byte[]) value);
+    }
+    if (value instanceof String) {
+      return new Binary(((String) 
value).getBytes(TSFileConfig.STRING_CHARSET));
+    }
+    throw new IllegalArgumentException(
+        String.format("Expected Binary, byte[] or String, but was %s.", 
value.getClass()));
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
index 133dbb5bff8..d8c2bccaa97 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
@@ -198,6 +198,7 @@ public class TabletInsertionDataContainer {
     }
 
     this.rowCount = this.timestampColumn.length;
+    this.nullValueColumnBitmaps = 
PipeTabletUtils.compactBitMaps(nullValueColumnBitmaps, rowCount);
     if (this.rowCount == 0 && LOGGER.isDebugEnabled()) {
       LOGGER.debug(
           "InsertRowNode({}) is parsed to zero rows according to the 
pattern({}) and time range [{}, {}], the corresponding source event({}) will be 
ignored.",
@@ -217,7 +218,6 @@ public class TabletInsertionDataContainer {
     this.isAligned = insertTabletNode.isAligned();
 
     final long[] originTimestampColumn = insertTabletNode.getTimes();
-    final int originRowSize = originTimestampColumn.length;
     final List<Integer> rowIndexList = 
generateRowIndexList(originTimestampColumn);
     this.timestampColumn = rowIndexList.stream().mapToLong(i -> 
originTimestampColumn[i]).toArray();
 
@@ -243,18 +243,7 @@ public class TabletInsertionDataContainer {
     final String[] originColumnNameStringList = 
insertTabletNode.getMeasurements();
     final TSDataType[] originValueColumnTypes = 
insertTabletNode.getDataTypes();
     final Object[] originValueColumns = insertTabletNode.getColumns();
-    final BitMap[] originBitMapList =
-        (insertTabletNode.getBitMaps() == null
-            ? IntStream.range(0, originColumnSize)
-                .boxed()
-                .map(o -> new BitMap(originRowSize))
-                .toArray(BitMap[]::new)
-            : insertTabletNode.getBitMaps());
-    for (int i = 0; i < originBitMapList.length; i++) {
-      if (originBitMapList[i] == null) {
-        originBitMapList[i] = new BitMap(originRowSize);
-      }
-    }
+    final BitMap[] originBitMapList = insertTabletNode.getBitMaps();
 
     for (int i = 0; i < 
originColumnIndex2FilteredColumnIndexMapperList.length; i++) {
       if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) {
@@ -277,7 +266,7 @@ public class TabletInsertionDataContainer {
                   originValueColumns[i],
                   rowIndexList,
                   false,
-                  originBitMapList[i],
+                  getBitMap(originBitMapList, i),
                   bitMap);
         }
         this.nullValueColumnBitmaps[filteredColumnIndex] = bitMap;
@@ -285,6 +274,7 @@ public class TabletInsertionDataContainer {
     }
 
     this.rowCount = this.timestampColumn.length;
+    this.nullValueColumnBitmaps = 
PipeTabletUtils.compactBitMaps(nullValueColumnBitmaps, rowCount);
     if (rowCount == 0 && LOGGER.isDebugEnabled()) {
       LOGGER.debug(
           "InsertTabletNode({}) is parsed to zero rows according to the 
pattern({}) and time range [{}, {}], the corresponding source event({}) will be 
ignored.",
@@ -338,18 +328,7 @@ public class TabletInsertionDataContainer {
     }
     final Object[] originValueColumns =
         tablet.values; // we do not reduce value columns here by origin row 
size
-    final BitMap[] originBitMapList =
-        tablet.bitMaps == null
-            ? IntStream.range(0, originColumnSize)
-                .boxed()
-                .map(o -> new BitMap(tablet.getMaxRowNumber()))
-                .toArray(BitMap[]::new)
-            : tablet.bitMaps; // We do not reduce bitmaps here by origin row 
size
-    for (int i = 0; i < originBitMapList.length; i++) {
-      if (originBitMapList[i] == null) {
-        originBitMapList[i] = new BitMap(tablet.getMaxRowNumber());
-      }
-    }
+    final BitMap[] originBitMapList = tablet.bitMaps;
 
     for (int i = 0; i < 
originColumnIndex2FilteredColumnIndexMapperList.length; i++) {
       if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) {
@@ -372,7 +351,7 @@ public class TabletInsertionDataContainer {
                   originValueColumns[i],
                   rowIndexList,
                   false,
-                  originBitMapList[i],
+                  getBitMap(originBitMapList, i),
                   bitMap);
         }
         this.nullValueColumnBitmaps[filteredColumnIndex] = bitMap;
@@ -380,6 +359,7 @@ public class TabletInsertionDataContainer {
     }
 
     this.rowCount = this.timestampColumn.length;
+    this.nullValueColumnBitmaps = 
PipeTabletUtils.compactBitMaps(nullValueColumnBitmaps, rowCount);
     if (this.rowCount == 0 && LOGGER.isDebugEnabled()) {
       LOGGER.debug(
           "Tablet({}) is parsed to zero rows according to the pattern({}) and 
time range [{}, {}], the corresponding source event({}) will be ignored.",
@@ -471,7 +451,7 @@ public class TabletInsertionDataContainer {
                   : (int[]) originValueColumn;
           final int[] valueColumns = new int[rowIndexList.size()];
           for (int i = 0; i < rowIndexList.size(); ++i) {
-            if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+            if (isNullValue(originNullValueColumnBitmap, rowIndexList.get(i))) 
{
               valueColumns[i] = 0;
               nullValueColumnBitmap.mark(i);
             } else {
@@ -493,7 +473,7 @@ public class TabletInsertionDataContainer {
                     : (LocalDate[]) originValueColumn;
 
             for (int i = 0; i < rowIndexList.size(); ++i) {
-              if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+              if (isNullValue(originNullValueColumnBitmap, 
rowIndexList.get(i))) {
                 valueColumns[i] = EMPTY_LOCALDATE;
                 nullValueColumnBitmap.mark(i);
               } else {
@@ -507,7 +487,7 @@ public class TabletInsertionDataContainer {
                     ? new int[] {(int) originValueColumn}
                     : (int[]) originValueColumn;
             for (int i = 0; i < rowIndexList.size(); ++i) {
-              if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+              if (isNullValue(originNullValueColumnBitmap, 
rowIndexList.get(i))) {
                 valueColumns[i] = EMPTY_LOCALDATE;
                 nullValueColumnBitmap.mark(i);
               } else {
@@ -527,7 +507,7 @@ public class TabletInsertionDataContainer {
                   : (long[]) originValueColumn;
           final long[] valueColumns = new long[rowIndexList.size()];
           for (int i = 0; i < rowIndexList.size(); ++i) {
-            if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+            if (isNullValue(originNullValueColumnBitmap, rowIndexList.get(i))) 
{
               valueColumns[i] = 0L;
               nullValueColumnBitmap.mark(i);
             } else {
@@ -544,7 +524,7 @@ public class TabletInsertionDataContainer {
                   : (float[]) originValueColumn;
           final float[] valueColumns = new float[rowIndexList.size()];
           for (int i = 0; i < rowIndexList.size(); ++i) {
-            if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+            if (isNullValue(originNullValueColumnBitmap, rowIndexList.get(i))) 
{
               valueColumns[i] = 0F;
               nullValueColumnBitmap.mark(i);
             } else {
@@ -561,7 +541,7 @@ public class TabletInsertionDataContainer {
                   : (double[]) originValueColumn;
           final double[] valueColumns = new double[rowIndexList.size()];
           for (int i = 0; i < rowIndexList.size(); ++i) {
-            if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+            if (isNullValue(originNullValueColumnBitmap, rowIndexList.get(i))) 
{
               valueColumns[i] = 0D;
               nullValueColumnBitmap.mark(i);
             } else {
@@ -578,7 +558,7 @@ public class TabletInsertionDataContainer {
                   : (boolean[]) originValueColumn;
           final boolean[] valueColumns = new boolean[rowIndexList.size()];
           for (int i = 0; i < rowIndexList.size(); ++i) {
-            if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+            if (isNullValue(originNullValueColumnBitmap, rowIndexList.get(i))) 
{
               valueColumns[i] = false;
               nullValueColumnBitmap.mark(i);
             } else {
@@ -599,7 +579,7 @@ public class TabletInsertionDataContainer {
           for (int i = 0; i < rowIndexList.size(); ++i) {
             if (Objects.isNull(binaryValueColumns[rowIndexList.get(i)])
                 || 
Objects.isNull(binaryValueColumns[rowIndexList.get(i)].getValues())
-                || originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+                || isNullValue(originNullValueColumnBitmap, 
rowIndexList.get(i))) {
               valueColumns[i] = Binary.EMPTY_VALUE;
               nullValueColumnBitmap.mark(i);
             } else {
@@ -659,6 +639,14 @@ public class TabletInsertionDataContainer {
     }
   }
 
+  private static BitMap getBitMap(final BitMap[] bitMaps, final int index) {
+    return Objects.nonNull(bitMaps) && index < bitMaps.length ? bitMaps[index] 
: null;
+  }
+
+  private static boolean isNullValue(final BitMap bitMap, final int rowIndex) {
+    return Objects.nonNull(bitMap) && bitMap.isMarked(rowIndex);
+  }
+
   ////////////////////////////  process  ////////////////////////////
 
   public List<TabletInsertionEvent> processRowByRow(final BiConsumer<Row, 
RowCollector> consumer) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
index 4353e4984a2..e1fa58f5a0f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.utils.TestOnly;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
@@ -70,6 +71,7 @@ public class TsFileInsertionQueryDataContainer extends 
TsFileInsertionDataContai
   private final Iterator<Map.Entry<IDeviceID, List<String>>> 
deviceMeasurementsMapIterator;
   private final Map<IDeviceID, Boolean> deviceIsAlignedMap;
   private final Map<String, TSDataType> measurementDataTypeMap;
+  private final TabletStringInternPool tabletStringInternPool = new 
TabletStringInternPool();
 
   @TestOnly
   public TsFileInsertionQueryDataContainer(
@@ -385,7 +387,8 @@ public class TsFileInsertionQueryDataContainer extends 
TsFileInsertionDataContai
                               entry.getValue(),
                               timeFilterExpression,
                               allocatedMemoryBlockForTablet,
-                              currentModifications);
+                              currentModifications,
+                              tabletStringInternPool);
                     } catch (final Exception e) {
                       close();
                       throw new PipeException(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java
index e16c7113da3..2e81f4aa335 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.db.pipe.event.common.tsfile.container.query;
 
 import org.apache.iotdb.commons.path.PatternTreeMap;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
@@ -37,6 +39,7 @@ import org.apache.tsfile.read.common.RowRecord;
 import org.apache.tsfile.read.expression.IExpression;
 import org.apache.tsfile.read.expression.QueryExpression;
 import org.apache.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.write.record.Tablet;
 import org.apache.tsfile.write.schema.MeasurementSchema;
@@ -57,6 +60,7 @@ public class TsFileInsertionQueryDataTabletIterator 
implements Iterator<Tablet>
 
   private final String deviceId;
   private final List<String> measurements;
+  private final List<MeasurementSchema> schemas;
 
   private final IExpression timeFilterExpression;
 
@@ -76,20 +80,28 @@ public class TsFileInsertionQueryDataTabletIterator 
implements Iterator<Tablet>
       final List<String> measurements,
       final IExpression timeFilterExpression,
       final PipeMemoryBlock allocatedBlockForTablet,
-      final PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> 
currentModifications)
+      final PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> 
currentModifications,
+      final TabletStringInternPool tabletStringInternPool)
       throws IOException {
     this.tsFileReader = tsFileReader;
     this.measurementDataTypeMap = measurementDataTypeMap;
 
-    this.deviceId = deviceId;
+    this.deviceId = tabletStringInternPool.intern(deviceId);
     this.measurements =
         measurements.stream()
             .filter(
                 measurement ->
                     // time column in aligned time-series should not be a 
query column
                     measurement != null && !measurement.isEmpty())
+            .map(tabletStringInternPool::intern)
             .sorted()
             .collect(Collectors.toList());
+    this.schemas = new ArrayList<>();
+    for (final String measurement : this.measurements) {
+      final TSDataType dataType =
+          measurementDataTypeMap.get(this.deviceId + 
TsFileConstant.PATH_SEPARATOR + measurement);
+      schemas.add(new MeasurementSchema(measurement, dataType));
+    }
 
     this.timeFilterExpression = timeFilterExpression;
 
@@ -99,7 +111,7 @@ public class TsFileInsertionQueryDataTabletIterator 
implements Iterator<Tablet>
 
     this.measurementModsList =
         ModsOperationUtil.initializeMeasurementMods(
-            deviceId, this.measurements, currentModifications);
+            this.deviceId, this.measurements, currentModifications);
   }
 
   private QueryDataSet buildQueryDataSet() throws IOException {
@@ -133,18 +145,9 @@ public class TsFileInsertionQueryDataTabletIterator 
implements Iterator<Tablet>
   }
 
   private Tablet buildNextTablet() throws IOException {
-    final List<MeasurementSchema> schemas = new ArrayList<>();
-    for (final String measurement : measurements) {
-      final TSDataType dataType =
-          measurementDataTypeMap.get(deviceId + TsFileConstant.PATH_SEPARATOR 
+ measurement);
-      schemas.add(new MeasurementSchema(measurement, dataType));
-    }
-
     Tablet tablet = null;
     if (!queryDataSet.hasNext()) {
-      tablet = new Tablet(deviceId, schemas, 1);
-      tablet.initBitMaps();
-      return tablet;
+      return new Tablet(deviceId, schemas, 1);
     }
 
     boolean isFirstRow = true;
@@ -156,7 +159,6 @@ public class TsFileInsertionQueryDataTabletIterator 
implements Iterator<Tablet>
         Pair<Integer, Integer> rowCountAndMemorySize =
             PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(rowRecord);
         tablet = new Tablet(deviceId, schemas, 
rowCountAndMemorySize.getLeft());
-        tablet.initBitMaps();
         if (allocatedBlockForTablet.getMemoryUsageInBytes() < 
rowCountAndMemorySize.getRight()) {
           PipeDataNodeResourceManager.memory()
               .forceResize(allocatedBlockForTablet, 
rowCountAndMemorySize.getRight());
@@ -172,27 +174,30 @@ public class TsFileInsertionQueryDataTabletIterator 
implements Iterator<Tablet>
       final int fieldSize = fields.size();
       for (int i = 0; i < fieldSize; i++) {
         final Field field = fields.get(i);
-        final String measurement = measurements.get(i);
+        final TSDataType dataType = schemas.get(i).getType();
         // Check if this value is deleted by mods
         if (field == null
             || ModsOperationUtil.isDelete(rowRecord.getTimestamp(), 
measurementModsList.get(i))) {
-          tablet.bitMaps[i].mark(rowIndex);
+          if (dataType != null && dataType.isBinary()) {
+            PipeTabletUtils.putValue(tablet, rowIndex, i, dataType, 
Binary.EMPTY_VALUE);
+          }
+          PipeTabletUtils.markNullValue(tablet, rowIndex, i);
         } else {
-          tablet.addValue(measurement, rowIndex, 
field.getObjectValue(schemas.get(i).getType()));
+          PipeTabletUtils.putValue(
+              tablet, rowIndex, i, dataType, 
field.getObjectValue(schemas.get(i).getType()));
           isNeedFillTime = true;
         }
       }
       if (isNeedFillTime) {
-        tablet.addTimestamp(rowIndex, rowRecord.getTimestamp());
+        PipeTabletUtils.putTimestamp(tablet, rowIndex, 
rowRecord.getTimestamp());
       }
 
-      tablet.rowSize++;
-
       if (tablet.rowSize == tablet.getMaxRowNumber()) {
         break;
       }
     }
 
+    PipeTabletUtils.compactBitMaps(tablet);
     return tablet;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index 9366d0f62df..e903c7340e4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -26,6 +26,8 @@ import 
org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
@@ -93,6 +95,7 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
   private String currentDevice;
   private boolean currentIsAligned;
   private final List<MeasurementSchema> currentMeasurements = new 
ArrayList<>();
+  private final TabletStringInternPool tabletStringInternPool = new 
TabletStringInternPool();
 
   private final List<ModsOperationUtil.ModsInfo> modsInfos = new ArrayList<>();
   // Cached time chunk
@@ -272,7 +275,6 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
 
       if (!data.hasCurrent()) {
         tablet = new Tablet(currentDevice, currentMeasurements, 1);
-        tablet.initBitMaps();
         // Ignore the memory cost of tablet
         
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForTablet, 
0);
         return tablet;
@@ -288,7 +290,6 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
                 PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(data);
             tablet =
                 new Tablet(currentDevice, currentMeasurements, 
rowCountAndMemorySize.getLeft());
-            tablet.initBitMaps();
             if (allocatedMemoryBlockForTablet.getMemoryUsageInBytes()
                 < rowCountAndMemorySize.getRight()) {
               PipeDataNodeResourceManager.memory()
@@ -300,10 +301,8 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
           final int rowIndex = tablet.rowSize;
 
           if (putValueToColumns(data, tablet, rowIndex)) {
-            tablet.addTimestamp(rowIndex, data.currentTime());
+            PipeTabletUtils.putTimestamp(tablet, rowIndex, data.currentTime());
           }
-
-          tablet.rowSize++;
         }
 
         data.next();
@@ -318,13 +317,13 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
 
       if (tablet == null) {
         tablet = new Tablet(currentDevice, currentMeasurements, 1);
-        tablet.initBitMaps();
       }
 
       // Switch chunk reader iff current chunk is all consumed
       if (!data.hasCurrent()) {
         prepareData();
       }
+      PipeTabletUtils.compactBitMaps(tablet);
       return tablet;
     } catch (final Exception e) {
       close();
@@ -372,81 +371,100 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
   }
 
   private boolean putValueToColumns(final BatchData data, final Tablet tablet, 
final int rowIndex) {
-    final Object[] columns = tablet.values;
     boolean isNeedFillTime = false;
     if (data.getDataType() == TSDataType.VECTOR) {
-      for (int i = 0; i < columns.length; ++i) {
+      for (int i = 0; i < tablet.getSchemas().size(); ++i) {
         final TsPrimitiveType primitiveType = data.getVector()[i];
+        final TSDataType type = tablet.getSchemas().get(i).getType();
         if (Objects.isNull(primitiveType)
             || ModsOperationUtil.isDelete(data.currentTime(), 
modsInfos.get(i))) {
-          tablet.bitMaps[i].mark(rowIndex);
-          final TSDataType type = tablet.getSchemas().get(i).getType();
           if (type == TSDataType.TEXT || type == TSDataType.BLOB || type == 
TSDataType.STRING) {
-            ((Binary[]) columns[i])[rowIndex] = Binary.EMPTY_VALUE;
-          }
-          if (type == TSDataType.DATE) {
-            ((LocalDate[]) columns[i])[rowIndex] = EMPTY_DATE;
+            PipeTabletUtils.putValue(tablet, rowIndex, i, type, 
Binary.EMPTY_VALUE);
           }
+          PipeTabletUtils.markNullValue(tablet, rowIndex, i);
           continue;
         }
 
         isNeedFillTime = true;
-        switch (tablet.getSchemas().get(i).getType()) {
+        switch (type) {
           case BOOLEAN:
-            ((boolean[]) columns[i])[rowIndex] = primitiveType.getBoolean();
+            PipeTabletUtils.putValue(tablet, rowIndex, i, type, 
primitiveType.getBoolean());
             break;
           case INT32:
-            ((int[]) columns[i])[rowIndex] = primitiveType.getInt();
+            PipeTabletUtils.putValue(tablet, rowIndex, i, type, 
primitiveType.getInt());
             break;
           case DATE:
-            ((LocalDate[]) columns[i])[rowIndex] =
-                DateUtils.parseIntToLocalDate(primitiveType.getInt());
+            PipeTabletUtils.putValue(
+                tablet, rowIndex, i, type, 
DateUtils.parseIntToLocalDate(primitiveType.getInt()));
             break;
           case INT64:
           case TIMESTAMP:
-            ((long[]) columns[i])[rowIndex] = primitiveType.getLong();
+            PipeTabletUtils.putValue(tablet, rowIndex, i, type, 
primitiveType.getLong());
             break;
           case FLOAT:
-            ((float[]) columns[i])[rowIndex] = primitiveType.getFloat();
+            PipeTabletUtils.putValue(tablet, rowIndex, i, type, 
primitiveType.getFloat());
             break;
           case DOUBLE:
-            ((double[]) columns[i])[rowIndex] = primitiveType.getDouble();
+            PipeTabletUtils.putValue(tablet, rowIndex, i, type, 
primitiveType.getDouble());
             break;
           case TEXT:
           case BLOB:
           case STRING:
-            ((Binary[]) columns[i])[rowIndex] = primitiveType.getBinary();
+            final Binary binary = primitiveType.getBinary();
+            PipeTabletUtils.putValue(
+                tablet,
+                rowIndex,
+                i,
+                type,
+                Objects.isNull(binary) || Objects.isNull(binary.getValues())
+                    ? Binary.EMPTY_VALUE
+                    : binary);
             break;
           default:
             throw new UnSupportedDataTypeException("UnSupported" + 
primitiveType.getDataType());
         }
       }
     } else {
+      if (!modsInfos.isEmpty()
+          && ModsOperationUtil.isDelete(data.currentTime(), modsInfos.get(0))) 
{
+        return false;
+      }
+
       isNeedFillTime = true;
-      switch (tablet.getSchemas().get(0).getType()) {
+      final TSDataType type = tablet.getSchemas().get(0).getType();
+      switch (type) {
         case BOOLEAN:
-          ((boolean[]) columns[0])[rowIndex] = data.getBoolean();
+          PipeTabletUtils.putValue(tablet, rowIndex, 0, type, 
data.getBoolean());
           break;
         case INT32:
-          ((int[]) columns[0])[rowIndex] = data.getInt();
+          PipeTabletUtils.putValue(tablet, rowIndex, 0, type, data.getInt());
           break;
         case DATE:
-          ((LocalDate[]) columns[0])[rowIndex] = 
DateUtils.parseIntToLocalDate(data.getInt());
+          PipeTabletUtils.putValue(
+              tablet, rowIndex, 0, type, 
DateUtils.parseIntToLocalDate(data.getInt()));
           break;
         case INT64:
         case TIMESTAMP:
-          ((long[]) columns[0])[rowIndex] = data.getLong();
+          PipeTabletUtils.putValue(tablet, rowIndex, 0, type, data.getLong());
           break;
         case FLOAT:
-          ((float[]) columns[0])[rowIndex] = data.getFloat();
+          PipeTabletUtils.putValue(tablet, rowIndex, 0, type, data.getFloat());
           break;
         case DOUBLE:
-          ((double[]) columns[0])[rowIndex] = data.getDouble();
+          PipeTabletUtils.putValue(tablet, rowIndex, 0, type, 
data.getDouble());
           break;
         case TEXT:
         case BLOB:
         case STRING:
-          ((Binary[]) columns[0])[rowIndex] = data.getBinary();
+          final Binary binary = data.getBinary();
+          PipeTabletUtils.putValue(
+              tablet,
+              rowIndex,
+              0,
+              type,
+              Objects.isNull(binary) || Objects.isNull(binary.getValues())
+                  ? Binary.EMPTY_VALUE
+                  : binary);
           break;
         default:
           throw new UnSupportedDataTypeException("UnSupported" + 
data.getDataType());
@@ -560,13 +578,13 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
                     ? new ChunkReader(chunk, filter)
                     : new SinglePageWholeChunkReader(chunk);
             currentIsAligned = false;
+            final String measurementID =
+                tabletStringInternPool.intern(chunkHeader.getMeasurementID());
             currentMeasurements.add(
-                new MeasurementSchema(chunkHeader.getMeasurementID(), 
chunkHeader.getDataType()));
+                new MeasurementSchema(measurementID, 
chunkHeader.getDataType()));
             modsInfos.addAll(
                 ModsOperationUtil.initializeMeasurementMods(
-                    currentDevice,
-                    Collections.singletonList(chunkHeader.getMeasurementID()),
-                    currentModifications));
+                    currentDevice, Collections.singletonList(measurementID), 
currentModifications));
             return;
           }
         case MetaMarker.VALUE_CHUNK_HEADER:
@@ -615,9 +633,11 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
               }
 
               // Increase value index
+              final String measurementID =
+                  
tabletStringInternPool.intern(chunkHeader.getMeasurementID());
               final int valueIndex =
                   measurementIndexMap.compute(
-                      chunkHeader.getMeasurementID(),
+                      measurementID,
                       (measurement, index) -> Objects.nonNull(index) ? index + 
1 : 0);
 
               // Emit when encountered non-sequential value chunk, or the 
chunk size exceeds
@@ -677,13 +697,13 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
             valueChunkSize += chunkHeader.getDataSize();
             valueChunkPageMemorySize += currentValueChunkPageMemorySize;
             valueChunkList.add(chunk);
+            final String measurementID =
+                tabletStringInternPool.intern(chunkHeader.getMeasurementID());
             currentMeasurements.add(
-                new MeasurementSchema(chunkHeader.getMeasurementID(), 
chunkHeader.getDataType()));
+                new MeasurementSchema(measurementID, 
chunkHeader.getDataType()));
             modsInfos.addAll(
                 ModsOperationUtil.initializeMeasurementMods(
-                    currentDevice,
-                    Collections.singletonList(chunkHeader.getMeasurementID()),
-                    currentModifications));
+                    currentDevice, Collections.singletonList(measurementID), 
currentModifications));
             break;
           }
         case MetaMarker.CHUNK_GROUP_HEADER:
@@ -702,7 +722,10 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
             timeChunkPageMemorySizeList.clear();
             measurementIndexMap.clear();
 
-            currentDevice = pattern.mayOverlapWithDevice(deviceID) ? deviceID 
: null;
+            currentDevice =
+                pattern.mayOverlapWithDevice(deviceID)
+                    ? tabletStringInternPool.intern(deviceID)
+                    : null;
             break;
           }
         case MetaMarker.OPERATION_INDEX_RANGE:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
index a707f554c51..a22522666c2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
@@ -179,14 +179,14 @@ public class PipeMemoryWeightUtil {
       return new Pair<>(1, 0);
     }
 
-    // Calculate row number according to the max size of a pipe tablet.
-    // "-100" is the estimated size of other data structures in a pipe tablet.
+    // Calculate row number according to the max size of a pipe tablet. "100" 
is the estimated size
+    // of other data structures in a pipe tablet.
     // "*8" converts bytes to bits, because the bitmap size is 1 bit per 
schema.
-    // Here we estimate the max use of
     int sizeLimit =
-        Math.min(
-            
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(),
-            (int) (inputNum * rowBytesUsed * 1.2));
+        (int)
+            Math.min(
+                
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(),
+                Math.min(Integer.MAX_VALUE, 100 + inputNum * (double) 
rowBytesUsed * 1.2));
 
     int rowNumber = 8 * (sizeLimit - 100) / (8 * rowBytesUsed + schemaCount);
     rowNumber = Math.max(1, rowNumber);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
index 36ca10daa72..2e69b239277 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
 import org.apache.iotdb.db.pipe.sink.util.PipeTabletEventSorter;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -361,7 +362,7 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
         // Aggregate the current tablet's data
         aggregatedSchemas.addAll(tablet.getSchemas());
         aggregatedValues.addAll(Arrays.asList(tablet.values));
-        aggregatedBitMaps.addAll(Arrays.asList(tablet.bitMaps));
+        
aggregatedBitMaps.addAll(Arrays.asList(PipeTabletUtils.copyBitMapsOrCreateEmpty(tablet)));
         // Remove the aggregated tablet
         tablets.pollFirst();
       } else {
@@ -563,7 +564,7 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
                 .map(schema -> (MeasurementSchema) schema)
                 .toArray(MeasurementSchema[]::new);
         Object[] values = Arrays.copyOf(tablet.values, tablet.values.length);
-        BitMap[] bitMaps = Arrays.copyOf(tablet.bitMaps, 
tablet.bitMaps.length);
+        BitMap[] bitMaps = PipeTabletUtils.copyBitMapsOrCreateEmpty(tablet);
 
         // convert date value to int refer to
         // 
org.apache.iotdb.db.storageengine.dataregion.memtable.WritableMemChunk.writeNonAlignedTablet
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
index 94a838ee0ad..266894060dc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
 import org.apache.iotdb.commons.utils.TestOnly;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
@@ -34,7 +35,6 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.PublicBAOS;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
-import org.apache.tsfile.write.record.Tablet;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -130,6 +130,7 @@ public class PipeTransferTabletBatchReq extends 
TPipeTransferReq {
   public static PipeTransferTabletBatchReq fromTPipeTransferReq(
       final TPipeTransferReq transferReq) {
     final PipeTransferTabletBatchReq batchReq = new 
PipeTransferTabletBatchReq();
+    final TabletStringInternPool tabletStringInternPool = new 
TabletStringInternPool();
 
     // Binary req, for rolling upgrading
     ReadWriteIOUtils.readInt(transferReq.body);
@@ -144,8 +145,7 @@ public class PipeTransferTabletBatchReq extends 
TPipeTransferReq {
     size = ReadWriteIOUtils.readInt(transferReq.body);
     for (int i = 0; i < size; ++i) {
       batchReq.tabletReqs.add(
-          PipeTransferTabletRawReq.toTPipeTransferRawReq(
-              Tablet.deserialize(transferReq.body), 
ReadWriteIOUtils.readBool(transferReq.body)));
+          PipeTransferTabletRawReq.toTPipeTransferRawReq(transferReq.body, 
tabletStringInternPool));
     }
 
     batchReq.version = transferReq.version;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
index 47bf4d44897..60619fd4268 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
@@ -22,6 +22,8 @@ package 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request;
 import org.apache.iotdb.commons.exception.MetadataException;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool;
 import org.apache.iotdb.db.pipe.sink.util.PipeTabletEventSorter;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -82,6 +84,17 @@ public class PipeTransferTabletRawReq extends 
TPipeTransferReq {
     return tabletReq;
   }
 
+  public static PipeTransferTabletRawReq toTPipeTransferRawReq(
+      final ByteBuffer buffer, final TabletStringInternPool 
tabletStringInternPool) {
+    final PipeTransferTabletRawReq tabletReq = new PipeTransferTabletRawReq();
+
+    tabletReq.tablet =
+        PipeTabletUtils.internTablet(Tablet.deserialize(buffer), 
tabletStringInternPool);
+    tabletReq.isAligned = ReadWriteIOUtils.readBool(buffer);
+
+    return tabletReq;
+  }
+
   /////////////////////////////// Thrift ///////////////////////////////
 
   public static PipeTransferTabletRawReq toTPipeTransferReq(
@@ -105,10 +118,8 @@ public class PipeTransferTabletRawReq extends 
TPipeTransferReq {
   }
 
   public static PipeTransferTabletRawReq fromTPipeTransferReq(final 
TPipeTransferReq transferReq) {
-    final PipeTransferTabletRawReq tabletReq = new PipeTransferTabletRawReq();
-
-    tabletReq.tablet = Tablet.deserialize(transferReq.body);
-    tabletReq.isAligned = ReadWriteIOUtils.readBool(transferReq.body);
+    final PipeTransferTabletRawReq tabletReq =
+        toTPipeTransferRawReq(transferReq.body, new TabletStringInternPool());
 
     tabletReq.version = transferReq.version;
     tabletReq.type = transferReq.type;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
index 3a42ec87969..713a87b2e35 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
@@ -374,7 +374,10 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
 
       for (int rowIndex = 0; rowIndex < tablet.rowSize; ++rowIndex) {
         // Filter null value
-        if (tablet.bitMaps[columnIndex].isMarked(rowIndex)) {
+        if (tablet.bitMaps != null
+            && columnIndex < tablet.bitMaps.length
+            && tablet.bitMaps[columnIndex] != null
+            && tablet.bitMaps[columnIndex].isMarked(rowIndex)) {
           continue;
         }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/PipeTabletEventSorter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/PipeTabletEventSorter.java
index 17e7a7c13e2..50beb6405e9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/PipeTabletEventSorter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/PipeTabletEventSorter.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.pipe.sink.util;
 
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
+
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.BitMap;
@@ -129,6 +131,7 @@ public class PipeTabletEventSorter {
   // Col: [6, 1]
   private void sortAndMayDeduplicateValuesAndBitMaps() {
     int columnIndex = 0;
+    boolean bitMapsModified = false;
     for (int i = 0, size = tablet.getSchemas().size(); i < size; i++) {
       final IMeasurementSchema schema = tablet.getSchemas().get(i);
       if (schema != null) {
@@ -145,10 +148,15 @@ public class PipeTabletEventSorter {
 
         if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) {
           tablet.bitMaps[columnIndex] = deDuplicatedBitMap;
+          bitMapsModified = true;
         }
         columnIndex++;
       }
     }
+
+    if (bitMapsModified) {
+      tablet.bitMaps = PipeTabletUtils.compactBitMaps(tablet.bitMaps, 
deDuplicatedSize);
+    }
   }
 
   private Object reorderValueListAndBitMap(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index faacc10eccd..6a1e9bf1bb4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -37,6 +37,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
+import org.apache.iotdb.db.utils.BitMapUtils;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -277,7 +278,7 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
         long[] subTimes = new long[count];
         int destLoc = 0;
         Object[] values = initTabletValues(dataTypes.length, count, dataTypes);
-        BitMap[] bitMaps = this.bitMaps == null ? null : 
initBitmaps(dataTypes.length, count);
+        BitMap[] bitMaps = initBitmapsForSplit(dataTypes.length, count);
         System.arraycopy(times, start, subTimes, destLoc, end - start);
         for (int k = 0; k < values.length; k++) {
           if (dataTypes[k] != null) {
@@ -302,6 +303,7 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
         subNode.setFailedMeasurementNumber(getFailedMeasurementNumber());
         subNode.setRange(locs);
         subNode.setDataRegionReplicaSet(entry.getKey());
+        subNode.bitMaps = BitMapUtils.compactBitMaps(subNode.bitMaps, 
subNode.rowCount);
         result.add(subNode);
       }
     }
@@ -366,6 +368,23 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
     return bitMaps;
   }
 
+  protected BitMap[] initBitmapsForSplit(int columnSize, int rowSize) {
+    if (this.bitMaps == null) {
+      return null;
+    }
+
+    final int sourceRowCount = rowCount > 0 ? rowCount : times == null ? 0 : 
times.length;
+    final BitMap[] splitBitMaps = new BitMap[columnSize];
+    boolean hasBitMap = false;
+    for (int i = 0; i < columnSize && i < this.bitMaps.length; ++i) {
+      if (this.bitMaps[i] != null && !this.bitMaps[i].isAllUnmarked()) {
+        splitBitMaps[i] = new BitMap(rowSize);
+        hasBitMap = true;
+      }
+    }
+    return hasBitMap ? splitBitMaps : null;
+  }
+
   @Override
   public void markFailedMeasurement(int index) {
     if (measurements[index] == null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index 2e393678c5d..525bb48f0dd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -34,6 +34,7 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDeviceP
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.utils.BitMapUtils;
 import org.apache.iotdb.db.utils.CommonUtils;
 
 import org.apache.tsfile.enums.TSDataType;
@@ -326,7 +327,7 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
       statement.setMeasurementSchemas(measurementSchemas);
       statement.setDataTypes(dataTypes);
       if (this.bitMaps != null) {
-        statement.setBitMaps(copiedBitMaps);
+        statement.setBitMaps(BitMapUtils.compactBitMaps(copiedBitMaps, 
rowCount));
       }
       statement.setFailedMeasurementIndex2Info(failedMeasurementIndex2Info);
       insertTabletStatementList.add(statement);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
index e4f1170ddb3..3da4db4b408 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
@@ -116,9 +116,7 @@ public class DefaultOperationQuota implements 
OperationQuota {
         case BATCH_INSERT:
           // InsertTabletStatement
           InsertTabletStatement insertTabletStatement = 
(InsertTabletStatement) s;
-          for (BitMap bitMap : insertTabletStatement.getBitMaps()) {
-            avgSize += bitMap.getSize();
-          }
+          avgSize += calculationWrite(insertTabletStatement.getBitMaps());
           break;
         case BATCH_INSERT_ONE_DEVICE:
           // InsertRowsOfOneDeviceStatement
@@ -151,10 +149,12 @@ public class DefaultOperationQuota implements 
OperationQuota {
             for (int i = 0;
                 i < 
insertMultiTabletsStatement.getInsertTabletStatementList().size();
                 i++) {
-              for (BitMap bitMap :
-                  
insertMultiTabletsStatement.getInsertTabletStatementList().get(i).getBitMaps()) 
{
-                avgSize += bitMap.getSize();
-              }
+              avgSize +=
+                  calculationWrite(
+                      insertMultiTabletsStatement
+                          .getInsertTabletStatementList()
+                          .get(i)
+                          .getBitMaps());
             }
           }
           break;
@@ -178,6 +178,20 @@ public class DefaultOperationQuota implements 
OperationQuota {
     return size;
   }
 
+  private long calculationWrite(BitMap[] bitMaps) {
+    if (bitMaps == null) {
+      return 0;
+    }
+
+    long size = 0;
+    for (BitMap bitMap : bitMaps) {
+      if (bitMap != null) {
+        size += bitMap.getSize();
+      }
+    }
+    return size;
+  }
+
   private long estimateConsume(int numReqs, long avgSize) {
     if (numReqs > 0) {
       return avgSize * numReqs;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/BitMapUtils.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/BitMapUtils.java
new file mode 100644
index 00000000000..ba30c8847b4
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/BitMapUtils.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils;
+
+import org.apache.tsfile.utils.BitMap;
+
+import java.util.Objects;
+
+public final class BitMapUtils {
+
+  private BitMapUtils() {}
+
+  public static BitMap[] compactBitMaps(final BitMap[] bitMaps, final int 
rowCount) {
+    if (Objects.isNull(bitMaps)) {
+      return null;
+    }
+
+    boolean hasMarkedBitMap = false;
+    for (int i = 0; i < bitMaps.length; ++i) {
+      if (Objects.nonNull(bitMaps[i]) && bitMaps[i].isAllUnmarked()) {
+        bitMaps[i] = null;
+      }
+      if (Objects.nonNull(bitMaps[i])) {
+        hasMarkedBitMap = true;
+      }
+    }
+    return hasMarkedBitMap ? bitMaps : null;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
index 4a7d77eab8d..308c5458dc9 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixPipePattern;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.TabletInsertionDataContainer;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
@@ -217,7 +218,7 @@ public class PipeTabletInsertionEventTest {
     tabletForInsertRowNode.values = values;
     tabletForInsertRowNode.timestamps = new long[] {times[0]};
     tabletForInsertRowNode.rowSize = 1;
-    tabletForInsertRowNode.bitMaps = bitMapsForInsertRowNode;
+    tabletForInsertRowNode.bitMaps = 
PipeTabletUtils.compactBitMaps(bitMapsForInsertRowNode, 1);
 
     // create tablet for insertTabletNode
     BitMap[] bitMapsForInsertTabletNode = new BitMap[schemas.length];
@@ -253,7 +254,8 @@ public class PipeTabletInsertionEventTest {
     tabletForInsertTabletNode.values = values;
     tabletForInsertTabletNode.timestamps = times;
     tabletForInsertTabletNode.rowSize = times.length;
-    tabletForInsertTabletNode.bitMaps = bitMapsForInsertTabletNode;
+    tabletForInsertTabletNode.bitMaps =
+        PipeTabletUtils.compactBitMaps(bitMapsForInsertTabletNode, 
times.length);
   }
 
   @Test
@@ -318,6 +320,36 @@ public class PipeTabletInsertionEventTest {
     Assert.assertTrue(isAligned4);
   }
 
+  @Test
+  public void convertToTabletSkipsUnnecessaryBitMapsForTest() throws Exception 
{
+    final BitMap[] bitMaps = new BitMap[schemas.length];
+    bitMaps[0] = new BitMap(times.length);
+    bitMaps[1] = new BitMap(times.length);
+    bitMaps[1].mark(1);
+
+    final InsertTabletNode nodeWithSparseColumn =
+        new InsertTabletNode(
+            new PlanNodeId("plannode bitmap"),
+            new PartialPath(deviceId),
+            false,
+            measurementIds,
+            dataTypes,
+            schemas,
+            times,
+            bitMaps,
+            insertTabletNode.getColumns(),
+            times.length);
+
+    final Tablet tablet =
+        new TabletInsertionDataContainer(nodeWithSparseColumn, new 
PrefixPipePattern(pattern))
+            .convertToTablet();
+
+    Assert.assertNotNull(tablet.bitMaps);
+    Assert.assertNull(tablet.bitMaps[0]);
+    Assert.assertNotNull(tablet.bitMaps[1]);
+    Assert.assertTrue(tablet.bitMaps[1].isMarked(1));
+  }
+
   @Test
   public void convertToTabletWithFilteredRowsForTest() {
     TabletInsertionDataContainer container1 =
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtilsTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtilsTest.java
new file mode 100644
index 00000000000..8bf32bd066f
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtilsTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tablet;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class PipeTabletUtilsTest {
+
+  @Test
+  public void testPutValueUnmarksReusedNullRow() {
+    final List<MeasurementSchema> schemas =
+        Arrays.asList(
+            new MeasurementSchema("s1", TSDataType.FLOAT),
+            new MeasurementSchema("s2", TSDataType.FLOAT));
+    final Tablet tablet = new Tablet("root.sg.d1", schemas, 2);
+
+    PipeTabletUtils.markNullValue(tablet, 0, 0);
+    PipeTabletUtils.markNullValue(tablet, 0, 1);
+
+    PipeTabletUtils.putValue(tablet, 0, 0, TSDataType.FLOAT, 1.0f);
+    PipeTabletUtils.putTimestamp(tablet, 0, 1L);
+    PipeTabletUtils.compactBitMaps(tablet);
+
+    Assert.assertNull(tablet.bitMaps[0]);
+    Assert.assertTrue(tablet.bitMaps[1].isMarked(0));
+  }
+
+  @Test
+  public void testCopyBitMapsOrCreateEmptyWithNullBitMaps() {
+    final List<MeasurementSchema> schemas =
+        Arrays.asList(
+            new MeasurementSchema("s1", TSDataType.FLOAT),
+            new MeasurementSchema("s2", TSDataType.FLOAT));
+    final Tablet tablet = new Tablet("root.sg.d1", schemas, 2);
+    tablet.addTimestamp(0, 1L);
+    tablet.addValue("s1", 0, 1.0f);
+    tablet.addValue("s2", 0, 2.0f);
+
+    Assert.assertNull(tablet.bitMaps);
+
+    final BitMap[] bitMaps = PipeTabletUtils.copyBitMapsOrCreateEmpty(tablet);
+
+    Assert.assertEquals(schemas.size(), bitMaps.length);
+    Assert.assertNull(bitMaps[0]);
+    Assert.assertNull(bitMaps[1]);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
index ee9b7218dab..4e4d11aacbf 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
@@ -321,6 +321,46 @@ public class PipeDataNodeThriftRequestTest {
     Assert.assertFalse(deserializedReq.getTabletReqs().get(0).getIsAligned());
   }
 
+  @Test
+  public void testPipeTransferTabletBatchReqInternsRepeatedMeasurementNames() 
throws IOException {
+    final List<ByteBuffer> tabletBuffers = new ArrayList<>();
+    tabletBuffers.add(
+        serializeTablet(createSingleValueTablet(new String("root.sg.d"), new 
String("s1")), false));
+    tabletBuffers.add(
+        serializeTablet(createSingleValueTablet(new String("root.sg.d"), new 
String("s1")), false));
+
+    final PipeTransferTabletBatchReq deserializedReq =
+        PipeTransferTabletBatchReq.fromTPipeTransferReq(
+            
PipeTransferTabletBatchReq.toTPipeTransferReq(Collections.emptyList(), 
tabletBuffers));
+    final Tablet firstTablet = 
deserializedReq.getTabletReqs().get(0).getTablet();
+    final Tablet secondTablet = 
deserializedReq.getTabletReqs().get(1).getTablet();
+
+    Assert.assertSame(firstTablet.deviceId, secondTablet.deviceId);
+    Assert.assertSame(
+        firstTablet.getSchemas().get(0).getMeasurementId(),
+        secondTablet.getSchemas().get(0).getMeasurementId());
+  }
+
+  private static Tablet createSingleValueTablet(final String deviceId, final 
String measurement) {
+    final List<MeasurementSchema> schemaList = new ArrayList<>();
+    schemaList.add(new MeasurementSchema(measurement, TSDataType.INT32));
+    final Tablet tablet = new Tablet(deviceId, schemaList, 1);
+    tablet.addTimestamp(0, 1);
+    tablet.addValue(measurement, 0, 1);
+    tablet.rowSize = 1;
+    return tablet;
+  }
+
+  private static ByteBuffer serializeTablet(final Tablet tablet, final boolean 
isAligned)
+      throws IOException {
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      tablet.serialize(outputStream);
+      ReadWriteIOUtils.write(isAligned, outputStream);
+      return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+  }
+
   @Test
   public void testPipeTransferFilePieceReq() throws IOException {
     final byte[] body = "testPipeTransferFilePieceReq".getBytes();
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
index a93a22b6e9f..305a3197cfe 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
@@ -44,6 +44,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOf
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.BitMap;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -201,6 +202,9 @@ public class WritePlanNodeSplitTest {
     insertTabletNode.setColumns(
         new Object[] {new int[] {-20, -10, 10, 20, 30, 40, 50, 60, 70, 80, 90, 
100}});
     insertTabletNode.setRowCount(insertTabletNode.getTimes().length);
+    final BitMap[] bitMaps = new BitMap[] {new 
BitMap(insertTabletNode.getRowCount())};
+    bitMaps[0].mark(2);
+    insertTabletNode.setBitMaps(bitMaps);
 
     DataPartitionQueryParam dataPartitionQueryParam = new 
DataPartitionQueryParam();
     
dataPartitionQueryParam.setDevicePath(insertTabletNode.getDevicePath().getFullPath());
@@ -219,6 +223,12 @@ public class WritePlanNodeSplitTest {
       Assert.assertEquals(tabletNode.getTimes().length, 2);
       TConsensusGroupId regionId = 
tabletNode.getDataRegionReplicaSet().getRegionId();
       Assert.assertEquals(getRegionIdByTime(tabletNode.getMinTime()), 
regionId.getId());
+      if (tabletNode.getTimes()[0] == 1) {
+        Assert.assertNotNull(tabletNode.getBitMaps());
+        Assert.assertTrue(tabletNode.getBitMaps()[0].isMarked(0));
+      } else {
+        Assert.assertNull(tabletNode.getBitMaps());
+      }
     }
 
     insertTabletNode = new InsertTabletNode(new PlanNodeId("plan node 2"));
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuotaTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuotaTest.java
new file mode 100644
index 00000000000..5f11039eeb3
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuotaTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.rescon.quotas;
+
+import org.apache.iotdb.common.rpc.thrift.TTimedQuota;
+import org.apache.iotdb.common.rpc.thrift.ThrottleType;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
+import org.apache.tsfile.utils.BitMap;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Map;
+
+public class DefaultOperationQuotaTest {
+
+  @Test
+  public void testCheckQuotaWithNullAndSparseBitMaps() throws Exception {
+    final DefaultOperationQuota quota = new 
DefaultOperationQuota(createQuotaLimiter());
+
+    final InsertTabletStatement tabletWithoutBitMaps = new 
InsertTabletStatement();
+    tabletWithoutBitMaps.setBitMaps(null);
+    quota.checkQuota(1, 0, tabletWithoutBitMaps);
+
+    final InsertTabletStatement tabletWithSparseBitMaps = new 
InsertTabletStatement();
+    final BitMap bitMap = new BitMap(8);
+    bitMap.mark(0);
+    tabletWithSparseBitMaps.setBitMaps(new BitMap[] {null, bitMap});
+    quota.checkQuota(1, 0, tabletWithSparseBitMaps);
+
+    final InsertMultiTabletsStatement multiTabletsStatement = new 
InsertMultiTabletsStatement();
+    multiTabletsStatement.setInsertTabletStatementList(
+        Arrays.asList(tabletWithoutBitMaps, tabletWithSparseBitMaps));
+    quota.checkQuota(1, 0, multiTabletsStatement);
+  }
+
+  private static QuotaLimiter createQuotaLimiter() {
+    final Map<ThrottleType, TTimedQuota> quotas = new 
EnumMap<>(ThrottleType.class);
+    for (final ThrottleType throttleType : ThrottleType.values()) {
+      quotas.put(throttleType, new TTimedQuota(60_000L, 1_000_000_000L));
+    }
+    return QuotaLimiter.fromThrottle(Collections.unmodifiableMap(quotas));
+  }
+}

Reply via email to