This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new f5f52a483eb [To dev/1.3] Pipe: Removed the useless ser-de in receiver
raw req & Improved the handling logic for rowCount and null value bitmaps in
insertTabletNode (#16133)
f5f52a483eb is described below
commit f5f52a483eb11709fcc8773512af9119535cd625
Author: Caideyipi <[email protected]>
AuthorDate: Wed Aug 13 10:17:46 2025 +0800
[To dev/1.3] Pipe: Removed the useless ser-de in receiver raw req &
Improved the handling logic for rowCount and null value bitmaps in
insertTabletNode (#16133)
---
.../request/PipeTransferTabletRawReq.java | 22 +------------
.../planner/plan/node/write/InsertTabletNode.java | 37 +++++++++++++++-------
.../plan/statement/crud/InsertTabletStatement.java | 32 +++++++++++++++++++
3 files changed, 58 insertions(+), 33 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
index 216a6329df9..47bf4d44897 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
@@ -22,18 +22,13 @@ package
org.apache.iotdb.db.pipe.sink.payload.evolvable.request;
import org.apache.iotdb.commons.exception.MetadataException;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
-import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.db.pipe.sink.util.PipeTabletEventSorter;
-import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
-import org.apache.iotdb.session.util.SessionUtils;
import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.write.record.Tablet;
-import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,22 +63,7 @@ public class PipeTransferTabletRawReq extends
TPipeTransferReq {
return new InsertTabletStatement();
}
- final TSInsertTabletReq request = new TSInsertTabletReq();
-
- for (final IMeasurementSchema measurementSchema : tablet.getSchemas()) {
- request.addToMeasurements(measurementSchema.getMeasurementId());
- request.addToTypes(measurementSchema.getType().ordinal());
- }
-
- request.setPrefixPath(tablet.deviceId);
- request.setIsAligned(isAligned);
- request.setTimestamps(SessionUtils.getTimeBuffer(tablet));
- request.setValues(SessionUtils.getValueBuffer(tablet));
- request.setSize(tablet.rowSize);
- request.setMeasurements(
-
PathUtils.checkIsLegalSingleMeasurementsAndUpdate(request.getMeasurements()));
-
- return StatementGenerator.createStatement(request);
+ return new InsertTabletStatement(tablet, isAligned);
} catch (final MetadataException e) {
LOGGER.warn("Generate Statement from tablet {} error.", tablet, e);
return null;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index b00bb4a4be2..a9a07ec1cbe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -470,17 +470,17 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
}
}
- private void writeTimes(ByteBuffer buffer) {
+ private void writeTimes(final ByteBuffer buffer) {
ReadWriteIOUtils.write(rowCount, buffer);
- for (long time : times) {
- ReadWriteIOUtils.write(time, buffer);
+ for (int i = 0; i < rowCount; ++i) {
+ ReadWriteIOUtils.write(times[i], buffer);
}
}
- private void writeTimes(DataOutputStream stream) throws IOException {
+ private void writeTimes(final DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(rowCount, stream);
- for (long time : times) {
- ReadWriteIOUtils.write(time, stream);
+ for (int i = 0; i < rowCount; ++i) {
+ ReadWriteIOUtils.write(times[i], stream);
}
}
@@ -498,7 +498,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
ReadWriteIOUtils.write(BytesUtils.boolToByte(false), buffer);
} else {
ReadWriteIOUtils.write(BytesUtils.boolToByte(true), buffer);
- buffer.put(bitMaps[i].getByteArray());
+ buffer.put(bitMaps[i].getByteArray(), 0, rowCount / 8 + 1);
}
}
}
@@ -518,7 +518,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
ReadWriteIOUtils.write(BytesUtils.boolToByte(false), stream);
} else {
ReadWriteIOUtils.write(BytesUtils.boolToByte(true), stream);
- stream.write(bitMaps[i].getByteArray());
+ stream.write(bitMaps[i].getByteArray(), 0, rowCount / 8 + 1);
}
}
}
@@ -585,7 +585,12 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
case STRING:
Binary[] binaryValues = (Binary[]) column;
for (int j = 0; j < rowCount; j++) {
- ReadWriteIOUtils.write(binaryValues[j], buffer);
+ if (binaryValues[j] != null && binaryValues[j].getValues() != null) {
+ buffer.putInt(binaryValues[j].getLength());
+ buffer.put(binaryValues[j].getValues());
+ } else {
+ buffer.putInt(0);
+ }
}
break;
default:
@@ -633,7 +638,11 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
case STRING:
Binary[] binaryValues = (Binary[]) column;
for (int j = 0; j < rowCount; j++) {
- ReadWriteIOUtils.write(binaryValues[j], stream);
+ if (binaryValues[j] != null && binaryValues[j].getValues() != null) {
+ ReadWriteIOUtils.write(binaryValues[j], stream);
+ } else {
+ ReadWriteIOUtils.write(0, stream);
+ }
}
break;
default:
@@ -887,8 +896,12 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
case STRING:
Binary[] binaryValues = (Binary[]) column;
for (int j = start; j < end; j++) {
- buffer.putInt(binaryValues[j].getLength());
- buffer.put(binaryValues[j].getValues());
+ if (binaryValues[j] != null && binaryValues[j].getValues() != null) {
+ buffer.putInt(binaryValues[j].getLength());
+ buffer.put(binaryValues[j].getValues());
+ } else {
+ buffer.putInt(0);
+ }
}
break;
default:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index 99ed39caa44..7a5f66b6801 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.queryengine.plan.statement.crud;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
@@ -28,6 +29,7 @@ import
org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator;
import
org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
+import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
@@ -38,19 +40,23 @@ import
org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.utils.DateUtils;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.write.UnSupportedDataTypeException;
+import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
public class InsertTabletStatement extends InsertBaseStatement implements
ISchemaValidation {
private static final Logger LOGGER =
LoggerFactory.getLogger(InsertTabletStatement.class);
@@ -78,6 +84,32 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
this.recordedEndOfLogicalViewSchemaList = 0;
}
+ public InsertTabletStatement(final Tablet tablet, final boolean isAligned)
+ throws MetadataException {
+ this();
+ setMeasurements(
+ tablet.getSchemas().stream()
+ .map(MeasurementSchema::getMeasurementId)
+ .toArray(String[]::new));
+ setDataTypes(
+
tablet.getSchemas().stream().map(MeasurementSchema::getType).toArray(TSDataType[]::new));
+
setDevicePath(DataNodeDevicePathCache.getInstance().getPartialPath(tablet.deviceId));
+ setAligned(isAligned);
+ setTimes(tablet.timestamps);
+
setColumns(Arrays.stream(tablet.values).map(this::convertTableColumn).toArray());
+ setBitMaps(tablet.bitMaps);
+ setRowCount(tablet.rowSize);
+ }
+
+ private Object convertTableColumn(final Object input) {
+ return input instanceof LocalDate[]
+ ? Arrays.stream(((LocalDate[]) input))
+ .map(date -> Objects.nonNull(date) ?
DateUtils.parseDateExpressionToInt(date) : 0)
+ .mapToInt(Integer::intValue)
+ .toArray()
+ : input;
+ }
+
public int getRowCount() {
return rowCount;
}