This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/object_type by this push:
new aca5f97d06b Insert FileNode
aca5f97d06b is described below
commit aca5f97d06b10c36c62ac788b45a0922a2f2c354
Author: HTHou <[email protected]>
AuthorDate: Thu Jul 3 16:36:34 2025 +0800
Insert FileNode
---
.../main/java/org/apache/iotdb/ObjectExample.java | 87 ++++++++++++++++++++++
.../apache/iotdb/session/util/SessionUtils.java | 2 +
.../plan/planner/plan/node/write/FileNode.java | 38 ++++++++++
.../planner/plan/node/write/InsertTabletNode.java | 8 ++
.../node/write/RelationalInsertTabletNode.java | 10 +++
.../plan/relational/planner/RelationPlanner.java | 34 +++++++++
.../plan/relational/type/InternalTypeManager.java | 2 +
.../dataregion/memtable/TsFileProcessor.java | 13 ++++
.../iotdb/db/utils/EncodingInferenceUtils.java | 1 +
.../apache/iotdb/db/utils/QueryDataSetUtils.java | 1 +
10 files changed, 196 insertions(+)
diff --git a/example/session/src/main/java/org/apache/iotdb/ObjectExample.java
b/example/session/src/main/java/org/apache/iotdb/ObjectExample.java
new file mode 100644
index 00000000000..1aa286e7ac4
--- /dev/null
+++ b/example/session/src/main/java/org/apache/iotdb/ObjectExample.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb;
+
+import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.TableSessionBuilder;
+
+import org.apache.tsfile.enums.ColumnCategory;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.record.Tablet;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class ObjectExample {
+ private static final String LOCAL_URL = "127.0.0.1:6667";
+
+ public static void main(String[] args) {
+
+ // don't specify database in constructor
+ try (ITableSession session =
+ new TableSessionBuilder()
+ .nodeUrls(Collections.singletonList(LOCAL_URL))
+ .username("root")
+ .password("root")
+ .build()) {
+ session.executeNonQueryStatement("CREATE DATABASE test1");
+ session.executeNonQueryStatement("use test1");
+
+ // insert table data by tablet
+ List<String> columnNameList =
+ Arrays.asList("region_id", "plant_id", "device_id", "temperature",
"file");
+ List<TSDataType> dataTypeList =
+ Arrays.asList(
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.FLOAT,
+ TSDataType.OBJECT);
+ List<ColumnCategory> columnTypeList =
+ new ArrayList<>(
+ Arrays.asList(
+ ColumnCategory.TAG,
+ ColumnCategory.TAG,
+ ColumnCategory.TAG,
+ ColumnCategory.FIELD,
+ ColumnCategory.FIELD));
+ Tablet tablet = new Tablet("test1", columnNameList, dataTypeList,
columnTypeList, 1);
+ int rowIndex = tablet.getRowSize();
+ tablet.addTimestamp(rowIndex, 1);
+ tablet.addValue(rowIndex, 0, "1");
+ tablet.addValue(rowIndex, 1, "5");
+ tablet.addValue(rowIndex, 2, "3");
+ tablet.addValue(rowIndex, 3, 37.6F);
+ tablet.addValue(rowIndex, 4, true, 0,
"123456".getBytes(StandardCharsets.UTF_8));
+ session.insert(tablet);
+ tablet.reset();
+
+ } catch (IoTDBConnectionException e) {
+ e.printStackTrace();
+ } catch (StatementExecutionException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
index 8534b808754..87b30bd2dd3 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
@@ -129,6 +129,7 @@ public class SessionUtils {
case TEXT:
case BLOB:
case STRING:
+ case OBJECT:
valueOccupation += rowSize * 4;
Binary[] binaries = (Binary[]) values[columnIndex];
for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) {
@@ -319,6 +320,7 @@ public class SessionUtils {
case TEXT:
case STRING:
case BLOB:
+ case OBJECT:
Binary[] binaryValues = (Binary[]) tablet.getValues()[i];
for (int index = 0; index < tablet.getRowSize(); index++) {
if (!tablet.isNull(index, i) && binaryValues[index] != null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/FileNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/FileNode.java
new file mode 100644
index 00000000000..55b756a4d3f
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/FileNode.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
+
+public class FileNode {
+
+ private String filePath;
+
+ private long offset;
+
+ private byte[] content;
+
+ private boolean isEOF;
+
+ public FileNode(String filePath, boolean isEOF, long offset, byte[] content)
{
+ this.filePath = filePath;
+ this.isEOF = isEOF;
+ this.offset = offset;
+ this.content = content;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index c8b840cc6bb..a1f909b2b8e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -388,6 +388,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
case TEXT:
case BLOB:
case STRING:
+ case OBJECT:
values[i] = new Binary[rowSize];
break;
case FLOAT:
@@ -641,6 +642,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
case TEXT:
case BLOB:
case STRING:
+ case OBJECT:
Binary[] binaryValues = (Binary[]) column;
for (int j = 0; j < rowCount; j++) {
if (binaryValues[j] != null) {
@@ -693,6 +695,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
case STRING:
case TEXT:
case BLOB:
+ case OBJECT:
Binary[] binaryValues = (Binary[]) column;
for (int j = 0; j < rowCount; j++) {
if (binaryValues[j] != null) {
@@ -833,6 +836,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
case TEXT:
case BLOB:
case STRING:
+ case OBJECT:
Binary[] binaryValues = (Binary[]) column;
for (int j = start; j < end; j++) {
size += ReadWriteIOUtils.sizeToWrite(binaryValues[j]);
@@ -964,6 +968,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
case STRING:
case TEXT:
case BLOB:
+ case OBJECT:
Binary[] binaryValues = (Binary[]) column;
for (int j = start; j < end; j++) {
if (binaryValues[j] != null) {
@@ -1127,6 +1132,7 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
case TEXT:
case BLOB:
case STRING:
+ case OBJECT:
if (!Arrays.equals((Binary[]) this.columns[i], (Binary[])
columns[i])) {
return false;
}
@@ -1199,6 +1205,8 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
Binary[] binaryValues = (Binary[]) columns[measurementIndex];
value = new TsPrimitiveType.TsBinary(binaryValues[lastIdx]);
break;
+ case OBJECT:
+ return null;
default:
throw new UnSupportedDataTypeException(
String.format(DATATYPE_UNSUPPORTED, dataTypes[measurementIndex]));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
index 0d4b698108e..b04aebd0867 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
@@ -58,6 +58,8 @@ public class RelationalInsertTabletNode extends
InsertTabletNode {
private boolean singleDevice;
+ private List<FileNode> fileNodeList;
+
public RelationalInsertTabletNode(
PlanNodeId id,
PartialPath devicePath,
@@ -108,6 +110,14 @@ public class RelationalInsertTabletNode extends
InsertTabletNode {
this.singleDevice = true;
}
+ public void setFileNodeList(List<FileNode> fileNodeList) {
+ this.fileNodeList = fileNodeList;
+ }
+
+ public List<FileNode> getFileNodeList() {
+ return fileNodeList;
+ }
+
@Override
public IDeviceID getDeviceID(int rowIdx) {
if (singleDevice) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
index fed7e954d40..1e7ebdebcf5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFileNod
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWritePlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.FileNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
@@ -124,9 +125,13 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
import org.apache.tsfile.write.schema.MeasurementSchema;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -1123,6 +1128,32 @@ public class RelationPlanner extends
AstVisitor<RelationPlan, Void> {
String[] measurements = insertTabletStatement.getMeasurements();
MeasurementSchema[] measurementSchemas =
insertTabletStatement.getMeasurementSchemas();
stayConsistent(measurements, measurementSchemas);
+ boolean hasObject = false;
+ List<FileNode> fileNodeList = new ArrayList<>();
+ for (int i = 0; i < insertTabletStatement.getDataTypes().length; i++) {
+ if (insertTabletStatement.getDataTypes()[i] == TSDataType.OBJECT) {
+ hasObject = true;
+ for (int j = 0; j < insertTabletStatement.getTimes().length; j++) {
+ Binary value = ((Binary[]) insertTabletStatement.getColumns()[i])[j];
+ boolean isEoF = value.getValues()[0] == 1;
+ byte[] offsetBytes = new byte[8];
+ System.arraycopy(value.getValues(), 1, offsetBytes, 0, 8);
+ long offset = BytesUtils.bytesToLong(offsetBytes);
+ byte[] content = new byte[value.getLength() - 9];
+ System.arraycopy(value.getValues(), 9, content, 0, value.getLength()
- 9);
+ // Generate File name
+ String fileName = "assssd";
+ FileNode fileNode = new FileNode(fileName, isEoF, offset, content);
+ fileNodeList.add(fileNode);
+ if (isEoF) {
+ ((Binary[]) insertTabletStatement.getColumns()[i])[j] =
+ new Binary(fileName.getBytes(StandardCharsets.UTF_8));
+ } else {
+ ((Binary[]) insertTabletStatement.getColumns()[i])[j] = null;
+ }
+ }
+ }
+ }
RelationalInsertTabletNode insertNode =
new RelationalInsertTabletNode(
@@ -1138,6 +1169,9 @@ public class RelationPlanner extends
AstVisitor<RelationPlan, Void> {
insertTabletStatement.getRowCount(),
insertTabletStatement.getColumnCategories());
insertNode.setFailedMeasurementNumber(insertTabletStatement.getFailedMeasurementNumber());
+ if (hasObject) {
+ insertNode.setFileNodeList(fileNodeList);
+ }
if (insertTabletStatement.isSingleDevice()) {
insertNode.setSingleDevice();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/InternalTypeManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/InternalTypeManager.java
index 6f2aaef76ab..51506ed9fa0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/InternalTypeManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/InternalTypeManager.java
@@ -105,6 +105,8 @@ public class InternalTypeManager implements TypeManager {
return TSDataType.BLOB;
case STRING:
return TSDataType.STRING;
+ case OBJECT:
+ return TSDataType.OBJECT;
default:
throw new IllegalArgumentException();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index b2c96a87f3b..e34f1b22339 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -43,10 +43,12 @@ import
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.FileNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils;
import org.apache.iotdb.db.service.metrics.WritingMetrics;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
@@ -558,6 +560,17 @@ public class TsFileProcessor {
boolean noFailure,
long[] infoForMetrics)
throws WriteProcessException {
+ if (insertTabletNode instanceof RelationalInsertTabletNode) {
+ List<FileNode> fileNodeList =
+ ((RelationalInsertTabletNode) insertTabletNode).getFileNodeList();
+ if (fileNodeList != null) {
+ for (FileNode fileNode : fileNodeList) {
+ System.out.println(fileNode);
+ // fileNode.writeFile();
+ // TODO write file node wal
+ }
+ }
+ }
ensureMemTable(infoForMetrics);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/EncodingInferenceUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/EncodingInferenceUtils.java
index 402921338a4..d9d4cecc4e4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/EncodingInferenceUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/EncodingInferenceUtils.java
@@ -51,6 +51,7 @@ public class EncodingInferenceUtils {
case TEXT:
case BLOB:
case STRING:
+ case OBJECT:
return conf.getDefaultTextEncoding();
default:
throw new UnSupportedDataTypeException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index cd4ca79ac43..876a63a197d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -753,6 +753,7 @@ public class QueryDataSetUtils {
case TEXT:
case BLOB:
case STRING:
+ case OBJECT:
Binary[] binaryValues = new Binary[size];
for (int index = 0; index < size; index++) {
int binarySize = buffer.getInt();