This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch support_insert_object_through_sql in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit dcb2b237656e8d9a53b46ffeef2cd548e75d943c Author: HTHou <[email protected]> AuthorDate: Fri Oct 31 15:54:45 2025 +0800 [To new_object_type] Support insert object by sql --- .../planner/plan/node/write/InsertRowNode.java | 7 ++++ .../plan/node/write/RelationalInsertRowsNode.java | 39 +++++++++++++++++++++- .../node/write/RelationalInsertTabletNode.java | 2 -- 3 files changed, 45 insertions(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java index 04bef0b577c..3d2b3b79288 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java @@ -367,6 +367,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue { case TEXT: case STRING: case BLOB: + case OBJECT: ReadWriteIOUtils.write((Binary) values[i], buffer); break; default: @@ -426,6 +427,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue { case TEXT: case STRING: case BLOB: + case OBJECT: ReadWriteIOUtils.write((Binary) values[i], stream); break; default: @@ -520,6 +522,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue { case TEXT: case STRING: case BLOB: + case OBJECT: values[i] = ReadWriteIOUtils.readBinary(buffer); break; default: @@ -589,6 +592,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue { case TEXT: case STRING: case BLOB: + case OBJECT: size += ReadWriteIOUtils.sizeToWrite((Binary) values[i]); break; default: @@ -668,6 +672,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue { case TEXT: case BLOB: case STRING: + case OBJECT: WALWriteUtils.write((Binary) values[i], buffer); break; default: @@ -759,6 +764,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue { case TEXT: case STRING: case BLOB: + case OBJECT: values[i] = ReadWriteIOUtils.readBinary(stream); break; default: @@ -849,6 +855,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue { case TEXT: case STRING: case BLOB: + case OBJECT: values[i] = ReadWriteIOUtils.readBinary(buffer); break; default: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java index 77020d9220d..009804b64d7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java @@ -27,13 +27,18 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.IDeviceID.Factory; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BytesUtils; import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -159,6 +164,7 @@ public class RelationalInsertRowsNode extends InsertRowsNode { @Override public List<WritePlanNode> splitByPartition(IAnalysis analysis) { + List<WritePlanNode> writePlanNodeList = new ArrayList<>(); Map<TRegionReplicaSet, RelationalInsertRowsNode> splitMap = new HashMap<>(); List<TEndPoint> redirectInfo = new ArrayList<>(); for (int i = 0; i < getInsertRowNodeList().size(); i++) { @@ -172,6 +178,9 @@ public class RelationalInsertRowsNode extends InsertRowsNode { insertRowNode.getDeviceID(), TimePartitionUtils.getTimePartitionSlot(insertRowNode.getTime()), analysis.getDatabaseName()); + // handle object type + handleObjectValue(insertRowNode, dataRegionReplicaSet, writePlanNodeList); + // Collect redirectInfo redirectInfo.add(dataRegionReplicaSet.getDataNodeLocations().get(0).getClientRpcEndPoint()); RelationalInsertRowsNode tmpNode = splitMap.get(dataRegionReplicaSet); @@ -185,8 +194,36 @@ public class RelationalInsertRowsNode extends InsertRowsNode { } } analysis.setRedirectNodeList(redirectInfo); + writePlanNodeList.addAll(splitMap.values()); + + return writePlanNodeList; + } - return new ArrayList<>(splitMap.values()); + private void handleObjectValue( + InsertRowNode insertRowNode, + TRegionReplicaSet dataRegionReplicaSet, + List<WritePlanNode> writePlanNodeList) { + for (int j = 0; j < insertRowNode.getDataTypes().length; j++) { + if (insertRowNode.getDataTypes()[j] == TSDataType.OBJECT) { + Object[] values = insertRowNode.getValues(); + byte[] content = ((Binary) values[j]).getValues(); + String relativePath = + TsFileNameGenerator.generateObjectFilePath( + dataRegionReplicaSet.getRegionId().getId(), + insertRowNode.getTime(), + insertRowNode.getDeviceID(), + insertRowNode.getMeasurements()[j]); + ObjectNode objectNode = new ObjectNode(true, 0, content, relativePath); + objectNode.setDataRegionReplicaSet(dataRegionReplicaSet); + byte[] filePathBytes = relativePath.getBytes(StandardCharsets.UTF_8); + byte[] valueBytes = new byte[filePathBytes.length + Long.BYTES]; + System.arraycopy(BytesUtils.longToBytes(content.length), 0, valueBytes, 0, Long.BYTES); + System.arraycopy(filePathBytes, 0, valueBytes, Long.BYTES, filePathBytes.length); + ((Binary) values[j]).setValues(valueBytes); + insertRowNode.setValues(values); + writePlanNodeList.add(objectNode); + } + } } public RelationalInsertRowsNode emptyClone() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java index ee93712e77d..e3a114211e1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java @@ -64,8 +64,6 @@ public class RelationalInsertTabletNode extends InsertTabletNode { private boolean singleDevice; - private Object[] convertedColumns; - public RelationalInsertTabletNode( PlanNodeId id, PartialPath devicePath,
