This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch use_pam_for_insert_tablet
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/use_pam_for_insert_tablet by 
this push:
     new 3641826e1e0 multiple fixes
3641826e1e0 is described below

commit 3641826e1e04601e5d9ca503fef8d760fb546dea
Author: Tian Jiang <[email protected]>
AuthorDate: Wed Apr 2 11:12:59 2025 +0800

    multiple fixes
---
 .../it/session/IoTDBSessionRelationalIT.java       |  5 ++
 .../iotdb/db/protocol/mqtt/MPPPublishHandler.java  |  3 +-
 .../v1/handler/StatementConstructionHandler.java   |  3 +-
 .../v1/handler/StatementConstructionHandler.java   |  3 +-
 .../v2/handler/StatementConstructionHandler.java   |  3 +-
 .../execution/executor/RegionWriteExecutor.java    |  3 +
 .../operator/process/AbstractIntoOperator.java     |  3 +-
 .../plan/parser/StatementGenerator.java            |  8 +--
 .../planner/plan/node/write/InsertTabletNode.java  | 48 ++++++++------
 .../plan/statement/crud/InsertTabletStatement.java | 75 +++++++++++-----------
 .../memtable/AlignedWritableMemChunk.java          |  9 +--
 .../rescon/memory/PrimitiveArrayManager.java       |  4 --
 .../java/org/apache/iotdb/db/utils/MemUtils.java   |  3 +-
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   | 12 ++--
 .../db/utils/datastructure/AlignedTVList.java      |  6 +-
 15 files changed, 103 insertions(+), 85 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
index 7d89b5efee6..464267a9231 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.isession.ITableSession;
 import org.apache.iotdb.isession.SessionDataSet;
 import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.env.remote.env.RemoteServerEnv;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.TableClusterIT;
 import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
@@ -49,6 +50,7 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -1848,6 +1850,9 @@ public class IoTDBSessionRelationalIT {
           "INSERT INTO remove_attr_col (time, tag1, attr1, m1) VALUES (10, 
'tag:1', 'attr:10', 10.0)");
 
       // check WAL
+      if (EnvFactory.getEnv() instanceof RemoteServerEnv) {
+        return;
+      }
       for (DataNodeWrapper dataNodeWrapper : 
EnvFactory.getEnv().getDataNodeWrapperList()) {
         String walNodeDir = dataNodeWrapper.getWalDir() + File.separator + "0";
         File[] walFiles = new File(walNodeDir).listFiles(f -> 
f.getName().endsWith(".wal"));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
index a1e652cbd5a..bf8b17c6823 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
@@ -259,7 +259,8 @@ public class MPPPublishHandler extends 
AbstractInterceptHandler {
       columnCategories[i] = TsTableColumnCategory.ATTRIBUTE;
     }
     insertStatement.setDataTypes(dataTypes);
-    insertStatement.setColumns(new TwoDArrayValueView(columns, dataTypes, 
rowSize));
+    insertStatement.setColumns(
+        new TwoDArrayValueView(columns, insertStatement::getDataTypes, 
rowSize));
     insertStatement.setColumnCategories(columnCategories);
 
     return insertStatement;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/StatementConstructionHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/StatementConstructionHandler.java
index 5d993e5428f..25856242aff 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/StatementConstructionHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/StatementConstructionHandler.java
@@ -169,7 +169,8 @@ public class StatementConstructionHandler {
           throw new IllegalArgumentException("Invalid input: " + 
rawDataType.get(columnIndex));
       }
     }
-    insertStatement.setColumns(new TwoDArrayValueView(columns, dataTypes, 
rowSize));
+    insertStatement.setColumns(
+        new TwoDArrayValueView(columns, insertStatement::getDataTypes, 
rowSize));
     insertStatement.setBitMaps(bitMaps);
     insertStatement.setRowCount(rowSize);
     insertStatement.setDataTypes(dataTypes);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/handler/StatementConstructionHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/handler/StatementConstructionHandler.java
index 61d4c7dc406..04fe0c045cd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/handler/StatementConstructionHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/handler/StatementConstructionHandler.java
@@ -173,7 +173,8 @@ public class StatementConstructionHandler {
     insertStatement.setBitMaps(bitMaps);
     insertStatement.setRowCount(insertTabletRequest.getTimestamps().size());
     insertStatement.setDataTypes(dataTypes);
-    insertStatement.setColumns(new TwoDArrayValueView(columns, dataTypes, 
rowSize));
+    insertStatement.setColumns(
+        new TwoDArrayValueView(columns, insertStatement::getDataTypes, 
rowSize));
     insertStatement.setAligned(insertTabletRequest.getIsAligned());
     return insertStatement;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/handler/StatementConstructionHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/handler/StatementConstructionHandler.java
index f9e5f1bb604..890aeb15cad 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/handler/StatementConstructionHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/handler/StatementConstructionHandler.java
@@ -183,7 +183,8 @@ public class StatementConstructionHandler {
     insertStatement.setBitMaps(bitMaps);
     insertStatement.setRowCount(insertTabletRequest.getTimestamps().size());
     insertStatement.setDataTypes(dataTypes);
-    insertStatement.setColumns(new TwoDArrayValueView(columns, dataTypes, 
rowSize));
+    insertStatement.setColumns(
+        new TwoDArrayValueView(columns, insertStatement::getDataTypes, 
rowSize));
     insertStatement.setAligned(insertTabletRequest.getIsAligned());
     return insertStatement;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
index fc3013b6240..2e3c5a742c8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
@@ -324,6 +324,9 @@ public class RegionWriteExecutor {
         }
         triggerCostTime += (System.nanoTime() - startTime);
       }
+      if (insertNode instanceof InsertTabletNode) {
+        ((InsertTabletNode) insertNode).decRefCount();
+      }
       PERFORMANCE_OVERVIEW_METRICS.recordScheduleTriggerCost(triggerCostTime);
       return status;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java
index 15afe24a90c..3423326b499 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java
@@ -542,7 +542,8 @@ public abstract class AbstractIntoOperator implements 
ProcessOperator {
 
       insertTabletStatement.setTimes(new SingleArrayTimeView(times));
       insertTabletStatement.setBitMaps(bitMaps);
-      insertTabletStatement.setColumns(new TwoDArrayValueView(columns, 
dataTypes, rowCount));
+      insertTabletStatement.setColumns(
+          new TwoDArrayValueView(columns, insertTabletStatement::getDataTypes, 
rowCount));
 
       return insertTabletStatement;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
index 50cddde149a..898196361f8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
@@ -348,15 +348,15 @@ public class StatementGenerator {
           new TwoDArrayValueView(
               QueryDataSetUtils.readTabletValuesFromBuffer(
                   valueBuffer, dataTypes, dataTypes.length, rowSize),
-              dataTypes,
+              insertTabletStatement::getDataTypes,
               rowSize));
     } else {
       long[][] timestamps = 
QueryDataSetUtils.readTimesFromBufferWithPam(timeBuffer, rowSize);
       if (timestamps.length != 0) {
         TimestampPrecisionUtils.checkTimestampPrecision(timestamps[0][0]);
         TimestampPrecisionUtils.checkTimestampPrecision(
-            timestamps[rowSize / PrimitiveArrayManager.ARRAY_SIZE][
-                rowSize % PrimitiveArrayManager.ARRAY_SIZE]);
+            timestamps[(rowSize - 1) / PrimitiveArrayManager.ARRAY_SIZE][
+                (rowSize - 1) % PrimitiveArrayManager.ARRAY_SIZE]);
       }
       insertTabletStatement.setTimes(
           new MultiArrayTimeView(PrimitiveArrayManager.ARRAY_SIZE, timestamps, 
rowSize));
@@ -364,7 +364,7 @@ public class StatementGenerator {
           new ThreeDArrayValueView(
               QueryDataSetUtils.readTabletValuesFromBufferWithPam(
                   valueBuffer, dataTypes, dataTypes.length, rowSize),
-              dataTypes,
+              insertTabletStatement::getDataTypes,
               rowSize,
               PrimitiveArrayManager.ARRAY_SIZE));
       insertTabletStatement.setRefCount(new AtomicInteger(1));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index 4b082b68746..2da909e6471 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -136,7 +136,7 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
     super(id, devicePath, isAligned, measurements, dataTypes);
     this.times = new SingleArrayTimeView(times);
     this.bitMaps = bitMaps;
-    this.columns = new TwoDArrayValueView(columns, dataTypes, rowCount);
+    this.columns = new TwoDArrayValueView(columns, this::getDataTypes, 
rowCount);
     this.rowCount = rowCount;
   }
 
@@ -155,7 +155,7 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
     this.measurementSchemas = measurementSchemas;
     this.times = new SingleArrayTimeView(times);
     this.bitMaps = bitMaps;
-    this.columns = new TwoDArrayValueView(columns, dataTypes, rowCount);
+    this.columns = new TwoDArrayValueView(columns, this::getDataTypes, 
rowCount);
     this.rowCount = rowCount;
   }
 
@@ -212,7 +212,7 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
     if (dataTypes == null || rowCount == 0) {
       throw new IllegalArgumentException("dataTypes and rowCount must be set 
first");
     }
-    this.columns = new TwoDArrayValueView(columns, dataTypes, rowCount);
+    this.columns = new TwoDArrayValueView(columns, this::getDataTypes, 
rowCount);
   }
 
   public int getRowCount() {
@@ -375,24 +375,29 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
           values,
           subTimes.length);
     } else {
-      int numOfArrays = PrimitiveArrayManager.numOfArrays(count);
+      int numOfArrays = PrimitiveArrayManager.getArrayRowCount(count);
       long[][] subTimes = new long[numOfArrays][];
       for (int i = 0; i < numOfArrays; i++) {
         subTimes[i] = (long[]) 
PrimitiveArrayManager.allocate(TSDataType.INT64);
       }
       Object[][] values = initTabletValuesWithPam(dataTypes.length, count, 
dataTypes);
-      return new InsertTabletNode(
-          getPlanNodeId(),
-          targetPath,
-          isAligned,
-          measurements,
-          dataTypes,
-          measurementSchemas,
-          new MultiArrayTimeView(PrimitiveArrayManager.ARRAY_SIZE, subTimes, 
count),
-          bitMaps,
-          new ThreeDArrayValueView(values, dataTypes, count, 
PrimitiveArrayManager.ARRAY_SIZE),
-          count,
-          new AtomicInteger(1));
+      InsertTabletNode insertTabletNode =
+          new InsertTabletNode(
+              getPlanNodeId(),
+              targetPath,
+              isAligned,
+              measurements,
+              dataTypes,
+              measurementSchemas,
+              new MultiArrayTimeView(PrimitiveArrayManager.ARRAY_SIZE, 
subTimes, count),
+              bitMaps,
+              null,
+              count,
+              new AtomicInteger(1));
+      insertTabletNode.setColumns(
+          new ThreeDArrayValueView(
+              values, insertTabletNode::getDataTypes, count, 
PrimitiveArrayManager.ARRAY_SIZE));
+      return insertTabletNode;
     }
   }
 
@@ -485,8 +490,11 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
   protected Object[][] initTabletValuesWithPam(
       int columnSize, int rowSize, TSDataType[] dataTypes) {
     Object[][] values = new Object[columnSize][];
-    int numOfArrays = PrimitiveArrayManager.numOfArrays(rowSize);
+    int numOfArrays = PrimitiveArrayManager.getArrayRowCount(rowSize);
     for (int i = 0; i < values.length; i++) {
+      if (dataTypes[i] == null) {
+        continue;
+      }
       for (int j = 0; j < numOfArrays; j++) {
         values[i][j] = PrimitiveArrayManager.allocate(dataTypes[i]);
       }
@@ -731,7 +739,7 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
         new TwoDArrayValueView(
             QueryDataSetUtils.readTabletValuesFromBuffer(
                 buffer, dataTypes, measurementSize, rowCount),
-            dataTypes,
+            this::getDataTypes,
             rowCount);
     isAligned = buffer.get() == 1;
   }
@@ -906,7 +914,7 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
         new TwoDArrayValueView(
             QueryDataSetUtils.readTabletValuesFromStream(
                 stream, dataTypes, measurementSize, rowCount),
-            dataTypes,
+            this::getDataTypes,
             rowCount);
     isAligned = stream.readByte() == 1;
   }
@@ -949,7 +957,7 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
         new TwoDArrayValueView(
             QueryDataSetUtils.readTabletValuesFromBuffer(
                 buffer, dataTypes, measurementSize, rowCount),
-            dataTypes,
+            this::getDataTypes,
             rowCount);
     isAligned = buffer.get() == 1;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index a91b0f8dd02..8970ed35839 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -70,6 +70,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 public class InsertTabletStatement extends InsertBaseStatement implements 
ISchemaValidation {
@@ -143,7 +144,7 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
     if (dataTypes == null || rowCount == 0) {
       throw new IllegalArgumentException("dataTypes and rowCount must be set 
first");
     }
-    this.columns = new TwoDArrayValueView(columns, dataTypes, rowCount);
+    this.columns = new TwoDArrayValueView(columns, this::getDataTypes, 
rowCount);
   }
 
   public BitMap[] getBitMaps() {
@@ -748,7 +749,7 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
         int arrayEnd = arrayStart + copyLength;
         tvList.putTimes(
             timestamps[arrayIndex],
-            bitMap.getRegion(current, current + copyLength),
+            bitMap == null ? null : bitMap.getRegion(current, current + 
copyLength),
             arrayStart,
             arrayEnd);
 
@@ -864,11 +865,11 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
 
   public static class TwoDArrayValueView implements ValueView {
 
-    private final TSDataType[] dataTypes;
+    private final Supplier<TSDataType[]> dataTypes;
     private Object[] values;
     private final int rowLength;
 
-    public TwoDArrayValueView(Object[] values, TSDataType[] dataTypes, int 
rowLength) {
+    public TwoDArrayValueView(Object[] values, Supplier<TSDataType[]> 
dataTypes, int rowLength) {
       this.values = values;
       this.dataTypes = dataTypes;
       this.rowLength = rowLength;
@@ -886,7 +887,7 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
 
     @Override
     public Object get(int rowIndex, int colIndex) {
-      switch (dataTypes[colIndex]) {
+      switch (dataTypes.get()[colIndex]) {
         case INT32:
         case DATE:
           return ((int[]) values[colIndex])[rowIndex];
@@ -906,13 +907,13 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
         case VECTOR:
         case UNKNOWN:
         default:
-          throw new IllegalArgumentException(dataTypes[colIndex].toString());
+          throw new 
IllegalArgumentException(dataTypes.get()[colIndex].toString());
       }
     }
 
     @Override
     public void set(int rowIndex, int colIndex, Object value) {
-      switch (dataTypes[colIndex]) {
+      switch (dataTypes.get()[colIndex]) {
         case INT32:
         case DATE:
           ((int[]) values[colIndex])[rowIndex] = ((int) value);
@@ -938,7 +939,7 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
         case VECTOR:
         case UNKNOWN:
         default:
-          throw new IllegalArgumentException(dataTypes[colIndex].toString());
+          throw new 
IllegalArgumentException(dataTypes.get()[colIndex].toString());
       }
     }
 
@@ -960,7 +961,7 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
 
     @Override
     public void serializeColumn(int colIndex, ByteBuffer buffer) {
-      switch (dataTypes[colIndex]) {
+      switch (dataTypes.get()[colIndex]) {
         case INT32:
         case DATE:
           int[] intValues = (int[]) values[colIndex];
@@ -1007,13 +1008,13 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
           break;
         default:
           throw new UnSupportedDataTypeException(
-              String.format(DATATYPE_UNSUPPORTED, dataTypes[colIndex]));
+              String.format(DATATYPE_UNSUPPORTED, dataTypes.get()[colIndex]));
       }
     }
 
     @Override
     public void serializeColumn(int colIndex, DataOutputStream stream) throws 
IOException {
-      switch (dataTypes[colIndex]) {
+      switch (dataTypes.get()[colIndex]) {
         case INT32:
         case DATE:
           int[] intValues = (int[]) values[colIndex];
@@ -1060,13 +1061,13 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
           break;
         default:
           throw new UnSupportedDataTypeException(
-              String.format(DATATYPE_UNSUPPORTED, dataTypes[colIndex]));
+              String.format(DATATYPE_UNSUPPORTED, dataTypes.get()[colIndex]));
       }
     }
 
     @Override
     public void serializeColumn(int colIndex, IWALByteBufferView buffer, int 
start, int end) {
-      switch (dataTypes[colIndex]) {
+      switch (dataTypes.get()[colIndex]) {
         case INT32:
         case DATE:
           int[] intValues = (int[]) values[colIndex];
@@ -1114,18 +1115,18 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
           break;
         default:
           throw new UnSupportedDataTypeException(
-              String.format(DATATYPE_UNSUPPORTED, dataTypes[colIndex]));
+              String.format(DATATYPE_UNSUPPORTED, dataTypes.get()[colIndex]));
       }
     }
 
     @Override
     public TSDataType[] dataTypes() {
-      return dataTypes;
+      return dataTypes.get();
     }
 
     @Override
     public void castTo(int colIndex, TSDataType newType) {
-      values[colIndex] = newType.castFromArray(dataTypes[colIndex], 
values[colIndex]);
+      values[colIndex] = newType.castFromArray(dataTypes.get()[colIndex], 
values[colIndex]);
     }
 
     @Override
@@ -1185,13 +1186,13 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
   @SuppressWarnings("SuspiciousSystemArraycopy")
   public static class ThreeDArrayValueView implements ValueView {
 
-    private final TSDataType[] dataTypes;
+    private final Supplier<TSDataType[]> dataTypes;
     private Object[][] values;
     private final int rowLength;
     private final int singleArraySize;
 
     public ThreeDArrayValueView(
-        Object[][] values, TSDataType[] dataTypes, int rowLength, int 
singleArraySize) {
+        Object[][] values, Supplier<TSDataType[]> dataTypes, int rowLength, 
int singleArraySize) {
       this.values = values;
       this.dataTypes = dataTypes;
       this.rowLength = rowLength;
@@ -1210,7 +1211,7 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
 
     @Override
     public Object get(int rowIndex, int colIndex) {
-      switch (dataTypes[colIndex]) {
+      switch (dataTypes.get()[colIndex]) {
         case INT32:
         case DATE:
           return ((int[]) values[colIndex][rowIndex / 
singleArraySize])[rowIndex % singleArraySize];
@@ -1235,13 +1236,13 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
         case VECTOR:
         case UNKNOWN:
         default:
-          throw new IllegalArgumentException(dataTypes[colIndex].toString());
+          throw new 
IllegalArgumentException(dataTypes.get()[colIndex].toString());
       }
     }
 
     @Override
     public void set(int rowIndex, int colIndex, Object value) {
-      switch (dataTypes[colIndex]) {
+      switch (dataTypes.get()[colIndex]) {
         case INT32:
         case DATE:
           ((int[]) values[colIndex][rowIndex / singleArraySize])[rowIndex % 
singleArraySize] =
@@ -1273,7 +1274,7 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
         case VECTOR:
         case UNKNOWN:
         default:
-          throw new IllegalArgumentException(dataTypes[colIndex].toString());
+          throw new 
IllegalArgumentException(dataTypes.get()[colIndex].toString());
       }
     }
 
@@ -1324,7 +1325,7 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
     @Override
     public void serializeColumn(int colIndex, ByteBuffer buffer) {
       int rowIndex = 0;
-      switch (dataTypes[colIndex]) {
+      switch (dataTypes.get()[colIndex]) {
         case INT32:
         case DATE:
           for (Object o : values[colIndex]) {
@@ -1407,14 +1408,14 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
           break;
         default:
           throw new UnSupportedDataTypeException(
-              String.format(DATATYPE_UNSUPPORTED, dataTypes[colIndex]));
+              String.format(DATATYPE_UNSUPPORTED, dataTypes.get()[colIndex]));
       }
     }
 
     @Override
     public void serializeColumn(int colIndex, DataOutputStream stream) throws 
IOException {
       int rowIndex = 0;
-      switch (dataTypes[colIndex]) {
+      switch (dataTypes.get()[colIndex]) {
         case INT32:
         case DATE:
           for (Object o : values[colIndex]) {
@@ -1497,7 +1498,7 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
           break;
         default:
           throw new UnSupportedDataTypeException(
-              String.format(DATATYPE_UNSUPPORTED, dataTypes[colIndex]));
+              String.format(DATATYPE_UNSUPPORTED, dataTypes.get()[colIndex]));
       }
     }
 
@@ -1505,7 +1506,7 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
     public void serializeColumn(int colIndex, IWALByteBufferView buffer, int 
start, int end) {
       int serializedCnt = 0;
       Object[] column = values[colIndex];
-      switch (dataTypes[colIndex]) {
+      switch (dataTypes.get()[colIndex]) {
         case INT32:
         case DATE:
           for (int i = start / singleArraySize; i < column.length; i++) {
@@ -1613,21 +1614,23 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
           break;
         default:
           throw new UnSupportedDataTypeException(
-              String.format(DATATYPE_UNSUPPORTED, dataTypes[colIndex]));
+              String.format(DATATYPE_UNSUPPORTED, dataTypes.get()[colIndex]));
       }
     }
 
     @Override
     public TSDataType[] dataTypes() {
-      return dataTypes;
+      return dataTypes.get();
     }
 
     @Override
     public void castTo(int colIndex, TSDataType newType) {
       for (int i = 0; i < values[colIndex].length; i++) {
         Object originalArray = values[colIndex][i];
-        values[colIndex][i] = newType.castFromArray(dataTypes[colIndex], 
originalArray);
-        release(originalArray);
+        values[colIndex][i] = newType.castFromArray(dataTypes.get()[colIndex], 
originalArray);
+        if (originalArray != values[colIndex][i]) {
+          release(originalArray);
+        }
       }
     }
 
@@ -1718,7 +1721,7 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
         int arrayEnd = arrayStart + copyLength;
         tvList.putValues(
             values[columnIndex][arrayIndex],
-            bitMap.getRegion(current, current + copyLength),
+            bitMap == null ? null : bitMap.getRegion(current, current + 
copyLength),
             arrayStart,
             arrayEnd,
             pos,
@@ -1755,7 +1758,7 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
           tvList.putAlignedValues(
               values[i][arrayIndex],
               columnIndices.get(i),
-              bitMaps[i].getRegion(current, current + copyLength),
+              bitMaps[i] == null ? null : bitMaps[i].getRegion(current, 
current + copyLength),
               arrayStart,
               arrayEnd,
               results,
@@ -1771,10 +1774,10 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
     public Object[] toTwoDArray() {
       Object[] twoDArray = new Object[values.length];
       for (int i = 0; i < values.length; i++) {
-        if (dataTypes[i] == null) {
+        if (dataTypes.get()[i] == null) {
           continue;
         }
-        switch (dataTypes[i]) {
+        switch (dataTypes.get()[i]) {
           case INT32:
           case DATE:
             twoDArray[i] = new int[rowLength];
@@ -1800,7 +1803,7 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
           case UNKNOWN:
           case VECTOR:
           default:
-            throw new UnSupportedDataTypeException(dataTypes[i].toString());
+            throw new 
UnSupportedDataTypeException(dataTypes.get()[i].toString());
         }
         int arrayIndex = 0;
         for (; arrayIndex < values[i].length - 1; arrayIndex++) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index 9b726718c23..afc5db33ccb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -507,10 +507,11 @@ public class AlignedWritableMemChunk extends 
AbstractWritableMemChunk {
               case TEXT:
               case STRING:
               case BLOB:
-                alignedChunkWriter.writeByColumn(
-                    time,
-                    isNull ? null : list.getBinaryByValueIndex(originRowIndex, 
columnIndex),
-                    isNull);
+                Binary value = list.getBinaryByValueIndex(originRowIndex, 
columnIndex);
+                if (!isNull && value == null) {
+                  System.out.println("");
+                }
+                alignedChunkWriter.writeByColumn(time, isNull ? null : value, 
isNull);
                 break;
               default:
                 break;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
index 65a040a1d89..f4bc88db2ef 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
@@ -126,10 +126,6 @@ public class PrimitiveArrayManager {
     // Empty constructor
   }
 
-  public static int numOfArrays(int numOfElements) {
-    return numOfElements / ARRAY_SIZE + numOfElements % ARRAY_SIZE > 0 ? 1 : 0;
-  }
-
   /**
    * Get or allocate primitive data lists according to type.
    *
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
index b94b01c2f95..96364c8d9ed 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
@@ -122,7 +122,8 @@ public class MemUtils {
     for (int i = start; i < end; i++) {
       if (results == null
           || results[i] == null
-          || results[i].code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          || results[i].code == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+              && columns.get(i, columnIndex) != null) {
         memSize += RamUsageEstimator.sizeOf(((Binary) columns.get(i, 
columnIndex)).getValues());
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 05c5d9145a0..0b55962f0f1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -646,11 +646,8 @@ public class QueryDataSetUtils {
   }
 
   public static long[][] readTimesFromBufferWithPam(ByteBuffer buffer, int 
size) {
-    int numOfArray =
-        size / PrimitiveArrayManager.ARRAY_SIZE + size % 
PrimitiveArrayManager.ARRAY_SIZE > 0
-            ? 1
-            : 0;
-    long[][] times = new long[size][];
+    int numOfArray = PrimitiveArrayManager.getArrayRowCount(size);
+    long[][] times = new long[numOfArray][];
     for (int i = 0; i < numOfArray; i++) {
       times[i] = (long[]) PrimitiveArrayManager.allocate(TSDataType.INT64);
     }
@@ -792,9 +789,8 @@ public class QueryDataSetUtils {
       ByteBuffer buffer, TSDataType[] types, int columns, int size) {
     Object[][] values = new Object[columns][];
     int arraySize =
-        size / PrimitiveArrayManager.ARRAY_SIZE + size % 
PrimitiveArrayManager.ARRAY_SIZE == 0
-            ? 0
-            : 1;
+        size / PrimitiveArrayManager.ARRAY_SIZE
+            + (size % PrimitiveArrayManager.ARRAY_SIZE == 0 ? 0 : 1);
     for (int i = 0; i < columns; i++) {
       values[i] = new Object[arraySize];
       for (int j = 0; j < arraySize; j++) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index a4b6f7ba298..dda343c3de1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -842,7 +842,7 @@ public abstract class AlignedTVList extends TVList {
       // the remaining inputs can fit the last array, copy all remaining 
inputs into last array
       arrayCopy(column, columnIndex, current, arrayIdx, elementIdx, 
copyLength);
       for (int i = 0; i < copyLength; i++) {
-        if (bitMap.isMarked(current + i)
+        if (bitMap != null && bitMap.isMarked(current + i)
             || results != null
                 && results[current + i] != null
                 && results[current + i].code != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -850,8 +850,8 @@ public abstract class AlignedTVList extends TVList {
         }
       }
 
-      current += internalRemaining;
-      tvListPos += internalRemaining;
+      current += copyLength;
+      tvListPos += copyLength;
     }
   }
 


Reply via email to