This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/object_type by this push:
     new 726cacce416 move object file to object dir
     new d48f2c50b63 Merge branch 'object_type' of github.com:apache/iotdb into 
object_type
726cacce416 is described below

commit 726cacce41610f07d66dcaa4138424651c9368b6
Author: HTHou <[email protected]>
AuthorDate: Mon Jul 7 15:16:43 2025 +0800

    move object file to object dir
---
 .../dataregion/DataExecutionVisitor.java           |  1 +
 .../plan/planner/plan/node/write/FileNode.java     |  1 -
 .../node/write/RelationalInsertTabletNode.java     | 24 ++++++++++++++++
 .../plan/relational/planner/RelationPlanner.java   | 28 ------------------
 .../dataregion/memtable/TsFileProcessor.java       | 33 +++++++++++++++-------
 .../db/storageengine/rescon/disk/TierManager.java  | 23 +++++++++++++++
 .../apache/iotdb/commons/conf/IoTDBConstant.java   |  1 +
 7 files changed, 72 insertions(+), 39 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
index 7431aa9a79d..34f051be137 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
@@ -91,6 +91,7 @@ public class DataExecutionVisitor extends 
PlanVisitor<TSStatus, DataRegion> {
   @Override
   public TSStatus visitRelationalInsertTablet(
       RelationalInsertTabletNode node, DataRegion dataRegion) {
+    node.handleObjectTypeValue();
     return visitInsertTablet(node, dataRegion);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/FileNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/FileNode.java
index cd82fbf12d8..6eae25ac1df 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/FileNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/FileNode.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
 
 // TODO:[OBJECT] WAL serde
-// TODO:[OBJECT] dispatch serde
 public class FileNode {
 
   private String filePath;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
index 1f08a754453..fe8940bd5da 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
@@ -39,6 +39,7 @@ import org.apache.tsfile.file.metadata.IDeviceID.Factory;
 import org.apache.tsfile.read.TimeValuePair;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.utils.BytesUtils;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.apache.tsfile.write.schema.MeasurementSchema;
@@ -393,4 +394,27 @@ public class RelationalInsertTabletNode extends 
InsertTabletNode {
       startOffset = endOffset;
     }
   }
+
+  public void handleObjectTypeValue() {
+    List<List<FileNode>> fileNodesList = new ArrayList<>();
+    for (int i = 0; i < dataTypes.length; i++) {
+      if (dataTypes[i] == TSDataType.OBJECT) {
+        List<FileNode> fileNodes = new ArrayList<>();
+        for (int j = 0; j < times.length; j++) {
+          Binary value = ((Binary[]) columns[i])[j];
+          boolean isEoF = value.getValues()[0] == 1;
+          byte[] offsetBytes = new byte[8];
+          System.arraycopy(value.getValues(), 1, offsetBytes, 0, 8);
+          long offset = BytesUtils.bytesToLong(offsetBytes);
+          byte[] content = new byte[value.getLength() - 9];
+          System.arraycopy(value.getValues(), 9, content, 0, value.getLength() 
- 9);
+          FileNode fileNode = new FileNode(isEoF, offset, content);
+          fileNodes.add(fileNode);
+          ((Binary[]) columns[i])[j] = null;
+        }
+        fileNodesList.add(fileNodes);
+      }
+    }
+    this.fileNodesList = fileNodesList;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
index 83f3e71b2fb..fed7e954d40 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
@@ -30,7 +30,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFileNod
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWritePlanNode;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.FileNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
@@ -125,10 +124,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.common.type.Type;
-import org.apache.tsfile.utils.Binary;
-import org.apache.tsfile.utils.BytesUtils;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 
 import java.util.ArrayList;
@@ -1127,27 +1123,6 @@ public class RelationPlanner extends 
AstVisitor<RelationPlan, Void> {
     String[] measurements = insertTabletStatement.getMeasurements();
     MeasurementSchema[] measurementSchemas = 
insertTabletStatement.getMeasurementSchemas();
     stayConsistent(measurements, measurementSchemas);
-    boolean hasObject = false;
-    List<List<FileNode>> fileNodesList = new ArrayList<>();
-    for (int i = 0; i < insertTabletStatement.getDataTypes().length; i++) {
-      if (insertTabletStatement.getDataTypes()[i] == TSDataType.OBJECT) {
-        hasObject = true;
-        List<FileNode> fileNodes = new ArrayList<>();
-        for (int j = 0; j < insertTabletStatement.getTimes().length; j++) {
-          Binary value = ((Binary[]) insertTabletStatement.getColumns()[i])[j];
-          boolean isEoF = value.getValues()[0] == 1;
-          byte[] offsetBytes = new byte[8];
-          System.arraycopy(value.getValues(), 1, offsetBytes, 0, 8);
-          long offset = BytesUtils.bytesToLong(offsetBytes);
-          byte[] content = new byte[value.getLength() - 9];
-          System.arraycopy(value.getValues(), 9, content, 0, value.getLength() 
- 9);
-          FileNode fileNode = new FileNode(isEoF, offset, content);
-          fileNodes.add(fileNode);
-          ((Binary[]) insertTabletStatement.getColumns()[i])[j] = null;
-        }
-        fileNodesList.add(fileNodes);
-      }
-    }
 
     RelationalInsertTabletNode insertNode =
         new RelationalInsertTabletNode(
@@ -1163,9 +1138,6 @@ public class RelationPlanner extends 
AstVisitor<RelationPlan, Void> {
             insertTabletStatement.getRowCount(),
             insertTabletStatement.getColumnCategories());
     
insertNode.setFailedMeasurementNumber(insertTabletStatement.getFailedMeasurementNumber());
-    if (hasObject) {
-      insertNode.setFileNodeList(fileNodesList);
-    }
     if (insertTabletStatement.isSingleDevice()) {
       insertNode.setSingleDevice();
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index a4880f50bb1..b521f8a1f5b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
 import org.apache.iotdb.commons.exception.MetadataException;
@@ -73,6 +72,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
 import org.apache.iotdb.db.storageengine.dataregion.wal.node.IWALNode;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.AbstractResultListener;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener;
+import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
 import org.apache.iotdb.db.storageengine.rescon.memory.MemTableManager;
 import org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager;
 import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
@@ -91,6 +91,7 @@ import org.apache.tsfile.file.metadata.ChunkMetadata;
 import org.apache.tsfile.file.metadata.IChunkMetadata;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.tsfile.read.filter.basic.Filter;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.BytesUtils;
@@ -583,16 +584,32 @@ public class TsFileProcessor {
                     + DataRegion.objectFileId.incrementAndGet()
                     + ".bin";
             String objectTmpFileName = objectFileName + ".tmp";
-            File objectTmpFile = new File(writer.getFile().getParent(), 
objectTmpFileName);
-            try (ObjectWriter writer = new ObjectWriter(objectTmpFile)) {
-              writer.write(fileNode);
+            File objectTmpFile;
+            try {
+              String baseDir = 
TierManager.getInstance().getNextFolderForObjectFile();
+              String objectFileDir =
+                  baseDir
+                      + File.separator
+                      + dataRegionInfo.getDataRegion().getDatabaseName()
+                      + File.separator
+                      + dataRegionInfo.getDataRegion().getDataRegionId()
+                      + File.separator
+                      + tsFileResource.getTsFileID().timePartitionId;
+
+              objectTmpFile =
+                  FSFactoryProducer.getFSFactory().getFile(objectFileDir, 
objectTmpFileName);
+              try (ObjectWriter writer = new ObjectWriter(objectTmpFile)) {
+                writer.write(fileNode);
+              }
             } catch (Exception e) {
               results[i] = 
RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
               throw new WriteProcessException(e);
             }
             // TODO:[OBJECT] write file node wal
             if (fileNode.isEOF()) {
-              File objectFile = new File(writer.getFile().getParent(), 
objectFileName);
+              File objectFile =
+                  FSFactoryProducer.getFSFactory()
+                      .getFile(objectTmpFile.getParentFile(), objectFileName);
               try {
                 Files.move(
                     objectTmpFile.toPath(),
@@ -603,11 +620,7 @@ public class TsFileProcessor {
                 throw new WriteProcessException(e);
               }
               String relativePathString =
-                  (sequence
-                          ? IoTDBConstant.SEQUENCE_FOLDER_NAME
-                          : IoTDBConstant.UNSEQUENCE_FOLDER_NAME)
-                      + File.separator
-                      + dataRegionInfo.getDataRegion().getDatabaseName()
+                  dataRegionInfo.getDataRegion().getDatabaseName()
                       + File.separator
                       + dataRegionInfo.getDataRegion().getDataRegionId()
                       + File.separator
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java
index b86f36b1c49..36974127aad 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java
@@ -65,6 +65,8 @@ public class TierManager {
    */
   private final List<FolderManager> unSeqTiers = new ArrayList<>();
 
+  private final List<FolderManager> objectTiers = new ArrayList<>();
+
   /** seq file folder's rawFsPath path -> tier level */
   private final Map<String, Integer> seqDir2TierLevel = new HashMap<>();
 
@@ -151,6 +153,22 @@ public class TierManager {
       for (String dir : unSeqDirs) {
         unSeqDir2TierLevel.put(dir, tierLevel);
       }
+
+      List<String> objectDirs =
+          Arrays.stream(tierDirs[tierLevel])
+              .filter(Objects::nonNull)
+              .map(
+                  v ->
+                      FSFactoryProducer.getFSFactory()
+                          .getFile(v, IoTDBConstant.OBJECT_FOLDER_NAME)
+                          .getPath())
+              .collect(Collectors.toList());
+
+      try {
+        objectTiers.add(new FolderManager(objectDirs, directoryStrategyType));
+      } catch (DiskSpaceInsufficientException e) {
+        logger.error("All disks of tier {} are full.", tierLevel, e);
+      }
     }
 
     tierDiskTotalSpace = getTierDiskSpace(DiskSpaceType.TOTAL);
@@ -160,6 +178,7 @@ public class TierManager {
     long startTime = System.currentTimeMillis();
     seqTiers.clear();
     unSeqTiers.clear();
+    objectTiers.clear();
     seqDir2TierLevel.clear();
     unSeqDir2TierLevel.clear();
 
@@ -190,6 +209,10 @@ public class TierManager {
         : unSeqTiers.get(tierLevel).getNextFolder();
   }
 
+  public String getNextFolderForObjectFile() throws 
DiskSpaceInsufficientException {
+    return objectTiers.get(0).getNextFolder();
+  }
+
   public List<String> getAllFilesFolders() {
     List<String> folders = new ArrayList<>(seqDir2TierLevel.keySet());
     folders.addAll(unSeqDir2TierLevel.keySet());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index ba568eae896..b76ab05f885 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -248,6 +248,7 @@ public class IoTDBConstant {
   public static final String DATA_FOLDER_NAME = "data";
   public static final String SEQUENCE_FOLDER_NAME = "sequence";
   public static final String UNSEQUENCE_FOLDER_NAME = "unsequence";
+  public static final String OBJECT_FOLDER_NAME = "object";
   public static final String FILE_NAME_SEPARATOR = "-";
   public static final String CONSENSUS_FOLDER_NAME = "consensus";
   public static final String DATA_REGION_FOLDER_NAME = "data_region";

Reply via email to