This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/object_type by this push:
new 726cacce416 move object file to object dir
new d48f2c50b63 Merge branch 'object_type' of github.com:apache/iotdb into
object_type
726cacce416 is described below
commit 726cacce41610f07d66dcaa4138424651c9368b6
Author: HTHou <[email protected]>
AuthorDate: Mon Jul 7 15:16:43 2025 +0800
move object file to object dir
---
.../dataregion/DataExecutionVisitor.java | 1 +
.../plan/planner/plan/node/write/FileNode.java | 1 -
.../node/write/RelationalInsertTabletNode.java | 24 ++++++++++++++++
.../plan/relational/planner/RelationPlanner.java | 28 ------------------
.../dataregion/memtable/TsFileProcessor.java | 33 +++++++++++++++-------
.../db/storageengine/rescon/disk/TierManager.java | 23 +++++++++++++++
.../apache/iotdb/commons/conf/IoTDBConstant.java | 1 +
7 files changed, 72 insertions(+), 39 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
index 7431aa9a79d..34f051be137 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
@@ -91,6 +91,7 @@ public class DataExecutionVisitor extends
PlanVisitor<TSStatus, DataRegion> {
@Override
public TSStatus visitRelationalInsertTablet(
RelationalInsertTabletNode node, DataRegion dataRegion) {
+ node.handleObjectTypeValue();
return visitInsertTablet(node, dataRegion);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/FileNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/FileNode.java
index cd82fbf12d8..6eae25ac1df 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/FileNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/FileNode.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
// TODO:[OBJECT] WAL serde
-// TODO:[OBJECT] dispatch serde
public class FileNode {
private String filePath;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
index 1f08a754453..fe8940bd5da 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
@@ -39,6 +39,7 @@ import org.apache.tsfile.file.metadata.IDeviceID.Factory;
import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.utils.BytesUtils;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.write.schema.MeasurementSchema;
@@ -393,4 +394,27 @@ public class RelationalInsertTabletNode extends
InsertTabletNode {
startOffset = endOffset;
}
}
+
+ public void handleObjectTypeValue() {
+ List<List<FileNode>> fileNodesList = new ArrayList<>();
+ for (int i = 0; i < dataTypes.length; i++) {
+ if (dataTypes[i] == TSDataType.OBJECT) {
+ List<FileNode> fileNodes = new ArrayList<>();
+ for (int j = 0; j < times.length; j++) {
+ Binary value = ((Binary[]) columns[i])[j];
+ boolean isEoF = value.getValues()[0] == 1;
+ byte[] offsetBytes = new byte[8];
+ System.arraycopy(value.getValues(), 1, offsetBytes, 0, 8);
+ long offset = BytesUtils.bytesToLong(offsetBytes);
+ byte[] content = new byte[value.getLength() - 9];
+ System.arraycopy(value.getValues(), 9, content, 0, value.getLength()
- 9);
+ FileNode fileNode = new FileNode(isEoF, offset, content);
+ fileNodes.add(fileNode);
+ ((Binary[]) columns[i])[j] = null;
+ }
+ fileNodesList.add(fileNodes);
+ }
+ }
+ this.fileNodesList = fileNodesList;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
index 83f3e71b2fb..fed7e954d40 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
@@ -30,7 +30,6 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFileNod
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWritePlanNode;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.FileNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
@@ -125,10 +124,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.type.Type;
-import org.apache.tsfile.utils.Binary;
-import org.apache.tsfile.utils.BytesUtils;
import org.apache.tsfile.write.schema.MeasurementSchema;
import java.util.ArrayList;
@@ -1127,27 +1123,6 @@ public class RelationPlanner extends
AstVisitor<RelationPlan, Void> {
String[] measurements = insertTabletStatement.getMeasurements();
MeasurementSchema[] measurementSchemas =
insertTabletStatement.getMeasurementSchemas();
stayConsistent(measurements, measurementSchemas);
- boolean hasObject = false;
- List<List<FileNode>> fileNodesList = new ArrayList<>();
- for (int i = 0; i < insertTabletStatement.getDataTypes().length; i++) {
- if (insertTabletStatement.getDataTypes()[i] == TSDataType.OBJECT) {
- hasObject = true;
- List<FileNode> fileNodes = new ArrayList<>();
- for (int j = 0; j < insertTabletStatement.getTimes().length; j++) {
- Binary value = ((Binary[]) insertTabletStatement.getColumns()[i])[j];
- boolean isEoF = value.getValues()[0] == 1;
- byte[] offsetBytes = new byte[8];
- System.arraycopy(value.getValues(), 1, offsetBytes, 0, 8);
- long offset = BytesUtils.bytesToLong(offsetBytes);
- byte[] content = new byte[value.getLength() - 9];
- System.arraycopy(value.getValues(), 9, content, 0, value.getLength()
- 9);
- FileNode fileNode = new FileNode(isEoF, offset, content);
- fileNodes.add(fileNode);
- ((Binary[]) insertTabletStatement.getColumns()[i])[j] = null;
- }
- fileNodesList.add(fileNodes);
- }
- }
RelationalInsertTabletNode insertNode =
new RelationalInsertTabletNode(
@@ -1163,9 +1138,6 @@ public class RelationPlanner extends
AstVisitor<RelationPlan, Void> {
insertTabletStatement.getRowCount(),
insertTabletStatement.getColumnCategories());
insertNode.setFailedMeasurementNumber(insertTabletStatement.getFailedMeasurementNumber());
- if (hasObject) {
- insertNode.setFileNodeList(fileNodesList);
- }
if (insertTabletStatement.isSingleDevice()) {
insertNode.setSingleDevice();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index a4880f50bb1..b521f8a1f5b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.exception.MetadataException;
@@ -73,6 +72,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.db.storageengine.dataregion.wal.node.IWALNode;
import
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.AbstractResultListener;
import
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener;
+import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
import org.apache.iotdb.db.storageengine.rescon.memory.MemTableManager;
import org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
@@ -91,6 +91,7 @@ import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.fileSystem.FSFactoryProducer;
import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BytesUtils;
@@ -583,16 +584,32 @@ public class TsFileProcessor {
+ DataRegion.objectFileId.incrementAndGet()
+ ".bin";
String objectTmpFileName = objectFileName + ".tmp";
- File objectTmpFile = new File(writer.getFile().getParent(),
objectTmpFileName);
- try (ObjectWriter writer = new ObjectWriter(objectTmpFile)) {
- writer.write(fileNode);
+ File objectTmpFile;
+ try {
+ String baseDir =
TierManager.getInstance().getNextFolderForObjectFile();
+ String objectFileDir =
+ baseDir
+ + File.separator
+ + dataRegionInfo.getDataRegion().getDatabaseName()
+ + File.separator
+ + dataRegionInfo.getDataRegion().getDataRegionId()
+ + File.separator
+ + tsFileResource.getTsFileID().timePartitionId;
+
+ objectTmpFile =
+ FSFactoryProducer.getFSFactory().getFile(objectFileDir,
objectTmpFileName);
+ try (ObjectWriter writer = new ObjectWriter(objectTmpFile)) {
+ writer.write(fileNode);
+ }
} catch (Exception e) {
results[i] =
RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
throw new WriteProcessException(e);
}
// TODO:[OBJECT] write file node wal
if (fileNode.isEOF()) {
- File objectFile = new File(writer.getFile().getParent(),
objectFileName);
+ File objectFile =
+ FSFactoryProducer.getFSFactory()
+ .getFile(objectTmpFile.getParentFile(), objectFileName);
try {
Files.move(
objectTmpFile.toPath(),
@@ -603,11 +620,7 @@ public class TsFileProcessor {
throw new WriteProcessException(e);
}
String relativePathString =
- (sequence
- ? IoTDBConstant.SEQUENCE_FOLDER_NAME
- : IoTDBConstant.UNSEQUENCE_FOLDER_NAME)
- + File.separator
- + dataRegionInfo.getDataRegion().getDatabaseName()
+ dataRegionInfo.getDataRegion().getDatabaseName()
+ File.separator
+ dataRegionInfo.getDataRegion().getDataRegionId()
+ File.separator
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java
index b86f36b1c49..36974127aad 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java
@@ -65,6 +65,8 @@ public class TierManager {
*/
private final List<FolderManager> unSeqTiers = new ArrayList<>();
+ private final List<FolderManager> objectTiers = new ArrayList<>();
+
/** seq file folder's rawFsPath path -> tier level */
private final Map<String, Integer> seqDir2TierLevel = new HashMap<>();
@@ -151,6 +153,22 @@ public class TierManager {
for (String dir : unSeqDirs) {
unSeqDir2TierLevel.put(dir, tierLevel);
}
+
+ List<String> objectDirs =
+ Arrays.stream(tierDirs[tierLevel])
+ .filter(Objects::nonNull)
+ .map(
+ v ->
+ FSFactoryProducer.getFSFactory()
+ .getFile(v, IoTDBConstant.OBJECT_FOLDER_NAME)
+ .getPath())
+ .collect(Collectors.toList());
+
+ try {
+ objectTiers.add(new FolderManager(objectDirs, directoryStrategyType));
+ } catch (DiskSpaceInsufficientException e) {
+ logger.error("All disks of tier {} are full.", tierLevel, e);
+ }
}
tierDiskTotalSpace = getTierDiskSpace(DiskSpaceType.TOTAL);
@@ -160,6 +178,7 @@ public class TierManager {
long startTime = System.currentTimeMillis();
seqTiers.clear();
unSeqTiers.clear();
+ objectTiers.clear();
seqDir2TierLevel.clear();
unSeqDir2TierLevel.clear();
@@ -190,6 +209,10 @@ public class TierManager {
: unSeqTiers.get(tierLevel).getNextFolder();
}
+ public String getNextFolderForObjectFile() throws
DiskSpaceInsufficientException {
+ return objectTiers.get(0).getNextFolder();
+ }
+
public List<String> getAllFilesFolders() {
List<String> folders = new ArrayList<>(seqDir2TierLevel.keySet());
folders.addAll(unSeqDir2TierLevel.keySet());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index ba568eae896..b76ab05f885 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -248,6 +248,7 @@ public class IoTDBConstant {
public static final String DATA_FOLDER_NAME = "data";
public static final String SEQUENCE_FOLDER_NAME = "sequence";
public static final String UNSEQUENCE_FOLDER_NAME = "unsequence";
+ public static final String OBJECT_FOLDER_NAME = "object";
public static final String FILE_NAME_SEPARATOR = "-";
public static final String CONSENSUS_FOLDER_NAME = "consensus";
public static final String DATA_REGION_FOLDER_NAME = "data_region";