This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new f5f52a483eb [To dev/1.3] Pipe: Removed the useless ser-de in receiver 
raw req & Improved the handling logic for rowCount and null value bitmaps in 
insertTabletNode (#16133)
f5f52a483eb is described below

commit f5f52a483eb11709fcc8773512af9119535cd625
Author: Caideyipi <[email protected]>
AuthorDate: Wed Aug 13 10:17:46 2025 +0800

    [To dev/1.3] Pipe: Removed the useless ser-de in receiver raw req & 
Improved the handling logic for rowCount and null value bitmaps in 
insertTabletNode (#16133)
---
 .../request/PipeTransferTabletRawReq.java          | 22 +------------
 .../planner/plan/node/write/InsertTabletNode.java  | 37 +++++++++++++++-------
 .../plan/statement/crud/InsertTabletStatement.java | 32 +++++++++++++++++++
 3 files changed, 58 insertions(+), 33 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
index 216a6329df9..47bf4d44897 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
@@ -22,18 +22,13 @@ package 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request;
 import org.apache.iotdb.commons.exception.MetadataException;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
-import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.db.pipe.sink.util.PipeTabletEventSorter;
-import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
-import org.apache.iotdb.session.util.SessionUtils;
 
 import org.apache.tsfile.utils.PublicBAOS;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.apache.tsfile.write.record.Tablet;
-import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,22 +63,7 @@ public class PipeTransferTabletRawReq extends 
TPipeTransferReq {
         return new InsertTabletStatement();
       }
 
-      final TSInsertTabletReq request = new TSInsertTabletReq();
-
-      for (final IMeasurementSchema measurementSchema : tablet.getSchemas()) {
-        request.addToMeasurements(measurementSchema.getMeasurementId());
-        request.addToTypes(measurementSchema.getType().ordinal());
-      }
-
-      request.setPrefixPath(tablet.deviceId);
-      request.setIsAligned(isAligned);
-      request.setTimestamps(SessionUtils.getTimeBuffer(tablet));
-      request.setValues(SessionUtils.getValueBuffer(tablet));
-      request.setSize(tablet.rowSize);
-      request.setMeasurements(
-          
PathUtils.checkIsLegalSingleMeasurementsAndUpdate(request.getMeasurements()));
-
-      return StatementGenerator.createStatement(request);
+      return new InsertTabletStatement(tablet, isAligned);
     } catch (final MetadataException e) {
       LOGGER.warn("Generate Statement from tablet {} error.", tablet, e);
       return null;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index b00bb4a4be2..a9a07ec1cbe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -470,17 +470,17 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
     }
   }
 
-  private void writeTimes(ByteBuffer buffer) {
+  private void writeTimes(final ByteBuffer buffer) {
     ReadWriteIOUtils.write(rowCount, buffer);
-    for (long time : times) {
-      ReadWriteIOUtils.write(time, buffer);
+    for (int i = 0; i < rowCount; ++i) {
+      ReadWriteIOUtils.write(times[i], buffer);
     }
   }
 
-  private void writeTimes(DataOutputStream stream) throws IOException {
+  private void writeTimes(final DataOutputStream stream) throws IOException {
     ReadWriteIOUtils.write(rowCount, stream);
-    for (long time : times) {
-      ReadWriteIOUtils.write(time, stream);
+    for (int i = 0; i < rowCount; ++i) {
+      ReadWriteIOUtils.write(times[i], stream);
     }
   }
 
@@ -498,7 +498,7 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
           ReadWriteIOUtils.write(BytesUtils.boolToByte(false), buffer);
         } else {
           ReadWriteIOUtils.write(BytesUtils.boolToByte(true), buffer);
-          buffer.put(bitMaps[i].getByteArray());
+          buffer.put(bitMaps[i].getByteArray(), 0, rowCount / 8 + 1);
         }
       }
     }
@@ -518,7 +518,7 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
           ReadWriteIOUtils.write(BytesUtils.boolToByte(false), stream);
         } else {
           ReadWriteIOUtils.write(BytesUtils.boolToByte(true), stream);
-          stream.write(bitMaps[i].getByteArray());
+          stream.write(bitMaps[i].getByteArray(), 0, rowCount / 8 + 1);
         }
       }
     }
@@ -585,7 +585,12 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
       case STRING:
         Binary[] binaryValues = (Binary[]) column;
         for (int j = 0; j < rowCount; j++) {
-          ReadWriteIOUtils.write(binaryValues[j], buffer);
+          if (binaryValues[j] != null && binaryValues[j].getValues() != null) {
+            buffer.putInt(binaryValues[j].getLength());
+            buffer.put(binaryValues[j].getValues());
+          } else {
+            buffer.putInt(0);
+          }
         }
         break;
       default:
@@ -633,7 +638,11 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
       case STRING:
         Binary[] binaryValues = (Binary[]) column;
         for (int j = 0; j < rowCount; j++) {
-          ReadWriteIOUtils.write(binaryValues[j], stream);
+          if (binaryValues[j] != null && binaryValues[j].getValues() != null) {
+            ReadWriteIOUtils.write(binaryValues[j], stream);
+          } else {
+            ReadWriteIOUtils.write(0, stream);
+          }
         }
         break;
       default:
@@ -887,8 +896,12 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
       case STRING:
         Binary[] binaryValues = (Binary[]) column;
         for (int j = start; j < end; j++) {
-          buffer.putInt(binaryValues[j].getLength());
-          buffer.put(binaryValues[j].getValues());
+          if (binaryValues[j] != null && binaryValues[j].getValues() != null) {
+            buffer.putInt(binaryValues[j].getLength());
+            buffer.put(binaryValues[j].getValues());
+          } else {
+            buffer.putInt(0);
+          }
         }
         break;
       default:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index 99ed39caa44..7a5f66b6801 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.queryengine.plan.statement.crud;
 
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
@@ -28,6 +29,7 @@ import 
org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator;
 import 
org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
@@ -38,19 +40,23 @@ import 
org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.utils.DateUtils;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.write.UnSupportedDataTypeException;
+import org.apache.tsfile.write.record.Tablet;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.LocalDate;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 public class InsertTabletStatement extends InsertBaseStatement implements 
ISchemaValidation {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(InsertTabletStatement.class);
@@ -78,6 +84,32 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
     this.recordedEndOfLogicalViewSchemaList = 0;
   }
 
+  public InsertTabletStatement(final Tablet tablet, final boolean isAligned)
+      throws MetadataException {
+    this();
+    setMeasurements(
+        tablet.getSchemas().stream()
+            .map(MeasurementSchema::getMeasurementId)
+            .toArray(String[]::new));
+    setDataTypes(
+        
tablet.getSchemas().stream().map(MeasurementSchema::getType).toArray(TSDataType[]::new));
+    
setDevicePath(DataNodeDevicePathCache.getInstance().getPartialPath(tablet.deviceId));
+    setAligned(isAligned);
+    setTimes(tablet.timestamps);
+    
setColumns(Arrays.stream(tablet.values).map(this::convertTableColumn).toArray());
+    setBitMaps(tablet.bitMaps);
+    setRowCount(tablet.rowSize);
+  }
+
+  private Object convertTableColumn(final Object input) {
+    return input instanceof LocalDate[]
+        ? Arrays.stream(((LocalDate[]) input))
+            .map(date -> Objects.nonNull(date) ? 
DateUtils.parseDateExpressionToInt(date) : 0)
+            .mapToInt(Integer::intValue)
+            .toArray()
+        : input;
+  }
+
   public int getRowCount() {
     return rowCount;
   }

Reply via email to