This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/object_type by this push: new 75c8629c6ec Muti FileNode 75c8629c6ec is described below commit 75c8629c6ec557182ae625e70d23cfa8b6c42627 Author: HTHou <hao...@apache.org> AuthorDate: Mon Jul 7 10:55:05 2025 +0800 Muti FileNode --- .../node/write/RelationalInsertTabletNode.java | 10 +-- .../plan/relational/planner/RelationPlanner.java | 8 +- .../dataregion/memtable/TsFileProcessor.java | 90 +++++++++++----------- 3 files changed, 57 insertions(+), 51 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java index 40690d012a8..cad719774f0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java @@ -59,7 +59,7 @@ public class RelationalInsertTabletNode extends InsertTabletNode { private boolean singleDevice; - private List<FileNode> fileNodeList; + private List<List<FileNode>> fileNodesList; public RelationalInsertTabletNode( PlanNodeId id, @@ -111,12 +111,12 @@ public class RelationalInsertTabletNode extends InsertTabletNode { this.singleDevice = true; } - public void setFileNodeList(List<FileNode> fileNodeList) { - this.fileNodeList = fileNodeList; + public void setFileNodeList(List<List<FileNode>> fileNodesList) { + this.fileNodesList = fileNodesList; } - public List<FileNode> getFileNodeList() { - return fileNodeList; + public List<List<FileNode>> getFileNodeList() { + return fileNodesList; } public Binary[] getObjectColumn() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java index 1d6ff3a5c26..83f3e71b2fb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java @@ -1128,10 +1128,11 @@ public class RelationPlanner extends AstVisitor<RelationPlan, Void> { MeasurementSchema[] measurementSchemas = insertTabletStatement.getMeasurementSchemas(); stayConsistent(measurements, measurementSchemas); boolean hasObject = false; - List<FileNode> fileNodeList = new ArrayList<>(); + List<List<FileNode>> fileNodesList = new ArrayList<>(); for (int i = 0; i < insertTabletStatement.getDataTypes().length; i++) { if (insertTabletStatement.getDataTypes()[i] == TSDataType.OBJECT) { hasObject = true; + List<FileNode> fileNodes = new ArrayList<>(); for (int j = 0; j < insertTabletStatement.getTimes().length; j++) { Binary value = ((Binary[]) insertTabletStatement.getColumns()[i])[j]; boolean isEoF = value.getValues()[0] == 1; @@ -1141,9 +1142,10 @@ public class RelationPlanner extends AstVisitor<RelationPlan, Void> { byte[] content = new byte[value.getLength() - 9]; System.arraycopy(value.getValues(), 9, content, 0, value.getLength() - 9); FileNode fileNode = new FileNode(isEoF, offset, content); - fileNodeList.add(fileNode); + fileNodes.add(fileNode); ((Binary[]) insertTabletStatement.getColumns()[i])[j] = null; } + fileNodesList.add(fileNodes); } } @@ -1162,7 +1164,7 @@ public class RelationPlanner extends AstVisitor<RelationPlan, Void> { insertTabletStatement.getColumnCategories()); insertNode.setFailedMeasurementNumber(insertTabletStatement.getFailedMeasurementNumber()); if (hasObject) { - insertNode.setFileNodeList(fileNodeList); + insertNode.setFileNodeList(fileNodesList); } if (insertTabletStatement.isSingleDevice()) { insertNode.setSingleDevice(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index 4cfad7920eb..114a11aa1b5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -569,51 +569,55 @@ public class TsFileProcessor { if (insertTabletNode instanceof RelationalInsertTabletNode) { RelationalInsertTabletNode relationalInsertTabletNode = (RelationalInsertTabletNode) insertTabletNode; - List<FileNode> fileNodeList = relationalInsertTabletNode.getFileNodeList(); - if (fileNodeList != null) { - for (int i = 0; i < fileNodeList.size(); i++) { - FileNode fileNode = fileNodeList.get(i); - String objectFileName = - insertTabletNode.getTimes()[i] - + "-" - + config.getDataNodeId() - + "-" - + DataRegion.objectFileId.incrementAndGet() - + ".bin"; - String objectTmpFileName = objectFileName + ".tmp"; - File objectTmpFile = new File(writer.getFile().getParent(), objectTmpFileName); - try (ObjectWriter writer = new ObjectWriter(objectTmpFile)) { - writer.write(fileNode); - } catch (Exception e) { - throw new WriteProcessException(e); - } - // TODO:[OBJECT] write file node wal - if (fileNode.isEOF()) { - File objectFile = new File(writer.getFile().getParent(), objectFileName); - try { - Files.move( - objectTmpFile.toPath(), objectFile.toPath(), StandardCopyOption.REPLACE_EXISTING); - } catch (IOException e) { + List<List<FileNode>> fileNodesList = relationalInsertTabletNode.getFileNodeList(); + if (fileNodesList != null) { + for (List<FileNode> fileNodeList : fileNodesList) { + for (int i = 0; i < fileNodeList.size(); i++) { + FileNode fileNode = fileNodeList.get(i); + String objectFileName = + insertTabletNode.getTimes()[i] + + "-" + + config.getDataNodeId() + + "-" + + DataRegion.objectFileId.incrementAndGet() + + ".bin"; + String objectTmpFileName = objectFileName + ".tmp"; + File objectTmpFile = new File(writer.getFile().getParent(), objectTmpFileName); + try (ObjectWriter writer = new ObjectWriter(objectTmpFile)) { + writer.write(fileNode); + } catch (Exception e) { throw new WriteProcessException(e); } - String relativePathString = - (sequence - ? IoTDBConstant.SEQUENCE_FOLDER_NAME - : IoTDBConstant.UNSEQUENCE_FOLDER_NAME) - + File.separator - + dataRegionInfo.getDataRegion().getDatabaseName() - + File.separator - + dataRegionInfo.getDataRegion().getDataRegionId() - + File.separator - + tsFileResource.getTsFileID().timePartitionId - + File.separator - + objectFileName; - byte[] filePathBytes = relativePathString.getBytes(StandardCharsets.UTF_8); - byte[] valueBytes = new byte[filePathBytes.length + Long.BYTES]; - System.arraycopy( - BytesUtils.longToBytes(objectFile.length()), 0, valueBytes, 0, Long.BYTES); - System.arraycopy(filePathBytes, 0, valueBytes, 4, filePathBytes.length); - relationalInsertTabletNode.getObjectColumn()[i] = new Binary(valueBytes); + // TODO:[OBJECT] write file node wal + if (fileNode.isEOF()) { + File objectFile = new File(writer.getFile().getParent(), objectFileName); + try { + Files.move( + objectTmpFile.toPath(), + objectFile.toPath(), + StandardCopyOption.REPLACE_EXISTING); + } catch (IOException e) { + throw new WriteProcessException(e); + } + String relativePathString = + (sequence + ? IoTDBConstant.SEQUENCE_FOLDER_NAME + : IoTDBConstant.UNSEQUENCE_FOLDER_NAME) + + File.separator + + dataRegionInfo.getDataRegion().getDatabaseName() + + File.separator + + dataRegionInfo.getDataRegion().getDataRegionId() + + File.separator + + tsFileResource.getTsFileID().timePartitionId + + File.separator + + objectFileName; + byte[] filePathBytes = relativePathString.getBytes(StandardCharsets.UTF_8); + byte[] valueBytes = new byte[filePathBytes.length + Long.BYTES]; + System.arraycopy( + BytesUtils.longToBytes(objectFile.length()), 0, valueBytes, 0, Long.BYTES); + System.arraycopy(filePathBytes, 0, valueBytes, 4, filePathBytes.length); + relationalInsertTabletNode.getObjectColumn()[i] = new Binary(valueBytes); + } } } }