This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch force_ci/object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/force_ci/object_type by this 
push:
     new 391429b6192 [To force_ci/object_type] Support insert object by sql & 
add IT (#16683)
391429b6192 is described below

commit 391429b61926b44d65b05f98f3fccd0181253698
Author: Haonan <[email protected]>
AuthorDate: Thu Nov 6 19:32:36 2025 +0800

    [To force_ci/object_type] Support insert object by sql & add IT (#16683)
---
 .../it/query/old/IoTDBSimpleQueryTableIT.java      |  63 ++++++-
 .../it/session/IoTDBSessionRelationalIT.java       | 183 +++++++++++++++++++++
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   1 +
 iotdb-core/datanode/pom.xml                        |   5 -
 .../dataregion/DataExecutionVisitor.java           |   2 +-
 .../planner/plan/node/write/InsertRowNode.java     |  11 +-
 .../planner/plan/node/write/InsertTabletNode.java  |   2 +-
 .../plan/planner/plan/node/write/ObjectNode.java   |  25 ---
 .../plan/node/write/RelationalInsertRowsNode.java  |  45 ++++-
 .../node/write/RelationalInsertTabletNode.java     |   2 -
 .../plan/relational/sql/util/AstUtil.java          |  30 ++++
 .../scheduler/FragmentInstanceDispatcherImpl.java  |   2 +-
 .../compaction/execute/utils/CompactionUtils.java  |   9 +-
 .../org/apache/iotdb/db/utils/TabletDecoder.java   |   1 +
 .../compaction/CompactionDeleteObjectFileTest.java | 122 --------------
 15 files changed, 338 insertions(+), 165 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/old/IoTDBSimpleQueryTableIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/old/IoTDBSimpleQueryTableIT.java
index e1571b00129..d89c7a6b3e4 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/old/IoTDBSimpleQueryTableIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/old/IoTDBSimpleQueryTableIT.java
@@ -657,19 +657,26 @@ public class IoTDBSimpleQueryTableIT {
       statement.execute("CREATE DATABASE test");
       statement.execute("USE " + DATABASE_NAME);
       statement.execute(
-          "CREATE TABLE table1(device STRING TAG, s4 DATE FIELD, s5 TIMESTAMP 
FIELD, s6 BLOB FIELD, s7 STRING FIELD)");
+          "CREATE TABLE table1(device STRING TAG, "
+              + "s4 DATE FIELD, s5 TIMESTAMP FIELD, s6 BLOB FIELD, s7 STRING 
FIELD, s8 OBJECT FIELD)");
 
       for (int i = 1; i <= 10; i++) {
         statement.execute(
             String.format(
-                "insert into table1(time, device, s4, s5, s6, s7) values(%d, 
'd1', '%s', %d, %s, '%s')",
-                i, LocalDate.of(2024, 5, i % 31 + 1), i, "X'cafebabe'", i));
+                "insert into table1(time, device, s4, s5, s6, s7, s8) "
+                    + "values(%d, 'd1', '%s', %d, %s, '%s', %s)",
+                i,
+                LocalDate.of(2024, 5, i % 31 + 1),
+                i,
+                "X'cafebabe'",
+                i,
+                "to_object(true, 0, X'cafebabe')"));
       }
 
       try (ResultSet resultSet = statement.executeQuery("select * from 
table1")) {
         final ResultSetMetaData metaData = resultSet.getMetaData();
         final int columnCount = metaData.getColumnCount();
-        assertEquals(6, columnCount);
+        assertEquals(7, columnCount);
         HashMap<Integer, TSDataType> columnType = new HashMap<>();
         for (int i = 3; i <= columnCount; i++) {
           if (metaData.getColumnLabel(i).equals("s4")) {
@@ -680,6 +687,8 @@ public class IoTDBSimpleQueryTableIT {
             columnType.put(i, TSDataType.BLOB);
           } else if (metaData.getColumnLabel(i).equals("s7")) {
             columnType.put(i, TSDataType.TEXT);
+          } else if (metaData.getColumnLabel(i).equals("s8")) {
+            columnType.put(i, TSDataType.OBJECT);
           }
         }
         byte[] byteArray = new byte[] {(byte) 0xCA, (byte) 0xFE, (byte) 0xBA, 
(byte) 0xBE};
@@ -689,12 +698,58 @@ public class IoTDBSimpleQueryTableIT {
           long timestamp = resultSet.getLong(4);
           byte[] blob = resultSet.getBytes(5);
           String text = resultSet.getString(6);
+          String objectSizeString = resultSet.getString(7);
           assertEquals(2024 - 1900, date.getYear());
           assertEquals(5 - 1, date.getMonth());
           assertEquals(time % 31 + 1, date.getDate());
           assertEquals(time, timestamp);
           assertArrayEquals(byteArray, blob);
           assertEquals(String.valueOf(time), text);
+          assertEquals("(Object) 4 B", objectSizeString);
+        }
+      }
+      try (ResultSet resultSet = statement.executeQuery("select 
read_object(s8) from table1")) {
+        final ResultSetMetaData metaData = resultSet.getMetaData();
+        final int columnCount = metaData.getColumnCount();
+        assertEquals(1, columnCount);
+        byte[] byteArray = new byte[] {(byte) 0xCA, (byte) 0xFE, (byte) 0xBA, 
(byte) 0xBE};
+        while (resultSet.next()) {
+          byte[] blob = resultSet.getBytes(1);
+          assertArrayEquals(byteArray, blob);
+        }
+      }
+
+    } catch (SQLException e) {
+      fail();
+    }
+  }
+
+  @Test
+  public void testObjectDataType() {
+    try (Connection connection = 
EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
+        Statement statement = connection.createStatement()) {
+      statement.execute("CREATE DATABASE test");
+      statement.execute("USE " + DATABASE_NAME);
+      statement.execute("CREATE TABLE table1(device STRING TAG, s8 OBJECT 
FIELD)");
+      statement.execute(
+          "insert into table1(time, device, s8) values(1, 'd1', 
to_object(false, 0, X'cafe'))");
+      statement.execute(
+          "insert into table1(time, device, s8) values(1, 'd1', 
to_object(true, 2, X'babe'))");
+
+      try (ResultSet resultSet = statement.executeQuery("select * from 
table1")) {
+        while (resultSet.next()) {
+          String objectSizeString = resultSet.getString(3);
+          assertEquals("(Object) 4 B", objectSizeString);
+        }
+      }
+      try (ResultSet resultSet = statement.executeQuery("select 
read_object(s8) from table1")) {
+        final ResultSetMetaData metaData = resultSet.getMetaData();
+        final int columnCount = metaData.getColumnCount();
+        assertEquals(1, columnCount);
+        byte[] byteArray = new byte[] {(byte) 0xCA, (byte) 0xFE, (byte) 0xBA, 
(byte) 0xBE};
+        while (resultSet.next()) {
+          byte[] blob = resultSet.getBytes(1);
+          assertArrayEquals(byteArray, blob);
         }
       }
 
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
index 8f76adb3be8..bba5681b58b 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.itbase.category.TableClusterIT;
 import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.session.TableSessionBuilder;
 
 import org.apache.tsfile.enums.ColumnCategory;
@@ -43,6 +44,7 @@ import 
org.apache.tsfile.exception.write.WriteProcessException;
 import org.apache.tsfile.file.metadata.TableSchema;
 import org.apache.tsfile.read.common.RowRecord;
 import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
 import org.apache.tsfile.write.record.Tablet;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.apache.tsfile.write.schema.MeasurementSchema;
@@ -60,6 +62,8 @@ import org.junit.runner.RunWith;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.time.LocalDate;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -847,6 +851,183 @@ public class IoTDBSessionRelationalIT {
     }
   }
 
+  @Test
+  public void insertObjectTest()
+      throws IoTDBConnectionException, StatementExecutionException, 
IOException {
+    String testObject =
+        System.getProperty("user.dir")
+            + File.separator
+            + "target"
+            + File.separator
+            + "test-classes"
+            + File.separator
+            + "ainode-example"
+            + File.separator
+            + "model.pt";
+    File object = new File(testObject);
+
+    try (ITableSession session = 
EnvFactory.getEnv().getTableSessionConnection()) {
+      session.executeNonQueryStatement("USE \"db1\"");
+      // insert table data by tablet
+      List<String> columnNameList =
+          Arrays.asList("region_id", "plant_id", "device_id", "temperature", 
"file");
+      List<TSDataType> dataTypeList =
+          Arrays.asList(
+              TSDataType.STRING,
+              TSDataType.STRING,
+              TSDataType.STRING,
+              TSDataType.FLOAT,
+              TSDataType.OBJECT);
+      List<ColumnCategory> columnTypeList =
+          new ArrayList<>(
+              Arrays.asList(
+                  ColumnCategory.TAG,
+                  ColumnCategory.TAG,
+                  ColumnCategory.TAG,
+                  ColumnCategory.FIELD,
+                  ColumnCategory.FIELD));
+      Tablet tablet = new Tablet("object_table", columnNameList, dataTypeList, 
columnTypeList, 1);
+      int rowIndex = tablet.getRowSize();
+      tablet.addTimestamp(rowIndex, 1);
+      tablet.addValue(rowIndex, 0, "1");
+      tablet.addValue(rowIndex, 1, "5");
+      tablet.addValue(rowIndex, 2, "3");
+      tablet.addValue(rowIndex, 3, 37.6F);
+      tablet.addValue(rowIndex, 4, true, 0, 
Files.readAllBytes(Paths.get(testObject)));
+      session.insert(tablet);
+      tablet.reset();
+
+      try (SessionDataSet dataSet =
+          session.executeQueryStatement("select file from object_table where 
time = 1")) {
+        SessionDataSet.DataIterator iterator = dataSet.iterator();
+        while (iterator.next()) {
+          Assert.assertEquals(
+              
BytesUtils.parseObjectByteArrayToString(BytesUtils.longToBytes(object.length())),
+              iterator.getString(1));
+        }
+      }
+
+      try (SessionDataSet dataSet =
+          session.executeQueryStatement(
+              "select READ_OBJECT(file) from object_table where time = 1")) {
+        SessionDataSet.DataIterator iterator = dataSet.iterator();
+        while (iterator.next()) {
+          Binary binary = iterator.getBlob(1);
+          Assert.assertArrayEquals(Files.readAllBytes(Paths.get(testObject)), 
binary.getValues());
+        }
+      }
+    }
+  }
+
+  @Test
+  public void insertObjectSegmentsTest()
+      throws IoTDBConnectionException, StatementExecutionException, 
IOException {
+    String testObject =
+        System.getProperty("user.dir")
+            + File.separator
+            + "target"
+            + File.separator
+            + "test-classes"
+            + File.separator
+            + "ainode-example"
+            + File.separator
+            + "model.pt";
+    byte[] objectBytes = Files.readAllBytes(Paths.get(testObject));
+    List<byte[]> objectSegments = new ArrayList<>();
+    for (int i = 0; i < objectBytes.length; i += 512) {
+      objectSegments.add(Arrays.copyOfRange(objectBytes, i, Math.min(i + 512, 
objectBytes.length)));
+    }
+
+    try (ITableSession session = 
EnvFactory.getEnv().getTableSessionConnection()) {
+      session.executeNonQueryStatement("USE \"db1\"");
+      // insert table data by tablet
+      List<String> columnNameList =
+          Arrays.asList("region_id", "plant_id", "device_id", "temperature", 
"file");
+      List<TSDataType> dataTypeList =
+          Arrays.asList(
+              TSDataType.STRING,
+              TSDataType.STRING,
+              TSDataType.STRING,
+              TSDataType.FLOAT,
+              TSDataType.OBJECT);
+      List<ColumnCategory> columnTypeList =
+          new ArrayList<>(
+              Arrays.asList(
+                  ColumnCategory.TAG,
+                  ColumnCategory.TAG,
+                  ColumnCategory.TAG,
+                  ColumnCategory.FIELD,
+                  ColumnCategory.FIELD));
+      Tablet tablet = new Tablet("object_table", columnNameList, dataTypeList, 
columnTypeList, 1);
+      for (int i = 0; i < objectSegments.size() - 1; i++) {
+        int rowIndex = tablet.getRowSize();
+        tablet.addTimestamp(rowIndex, 1);
+        tablet.addValue(rowIndex, 0, "1");
+        tablet.addValue(rowIndex, 1, "5");
+        tablet.addValue(rowIndex, 2, "3");
+        tablet.addValue(rowIndex, 3, 37.6F);
+        tablet.addValue(rowIndex, 4, false, i * 512L, objectSegments.get(i));
+        session.insert(tablet);
+        tablet.reset();
+      }
+
+      try (SessionDataSet dataSet =
+          session.executeQueryStatement("select file from object_table where 
time = 1")) {
+        SessionDataSet.DataIterator iterator = dataSet.iterator();
+        while (iterator.next()) {
+          assertNull(iterator.getString(1));
+        }
+      }
+
+      // insert segment with wrong offset
+      try {
+        int rowIndex = tablet.getRowSize();
+        tablet.addTimestamp(rowIndex, 1);
+        tablet.addValue(rowIndex, 0, "1");
+        tablet.addValue(rowIndex, 1, "5");
+        tablet.addValue(rowIndex, 2, "3");
+        tablet.addValue(rowIndex, 3, 37.6F);
+        tablet.addValue(rowIndex, 4, false, 512L, objectSegments.get(1));
+        session.insert(tablet);
+      } catch (StatementExecutionException e) {
+        Assert.assertEquals(TSStatusCode.OBJECT_INSERT_ERROR.getStatusCode(), 
e.getStatusCode());
+        Assert.assertEquals(
+            String.format(
+                "741: The file length %d is not equal to the offset %d",
+                ((objectSegments.size() - 1) * 512), 512L),
+            e.getMessage());
+      } finally {
+        tablet.reset();
+      }
+
+      // last segment
+      int rowIndex = tablet.getRowSize();
+      tablet.addTimestamp(rowIndex, 1);
+      tablet.addValue(rowIndex, 0, "1");
+      tablet.addValue(rowIndex, 1, "5");
+      tablet.addValue(rowIndex, 2, "3");
+      tablet.addValue(rowIndex, 3, 37.6F);
+      tablet.addValue(
+          rowIndex,
+          4,
+          true,
+          (objectSegments.size() - 1) * 512L,
+          objectSegments.get(objectSegments.size() - 1));
+      session.insert(tablet);
+      tablet.reset();
+
+      try (SessionDataSet dataSet =
+          session.executeQueryStatement(
+              "select READ_OBJECT(file) from object_table where time = 1")) {
+        SessionDataSet.DataIterator iterator = dataSet.iterator();
+        while (iterator.next()) {
+          Binary binary = iterator.getBlob(1);
+          Assert.assertArrayEquals(Files.readAllBytes(Paths.get(testObject)), 
binary.getValues());
+        }
+      }
+    }
+  }
+
   @Test
   public void autoCreateNontagColumnTest()
       throws IoTDBConnectionException, StatementExecutionException {
@@ -1628,6 +1809,7 @@ public class IoTDBSessionRelationalIT {
     int testNum = 14;
     Set<TSDataType> dataTypes = new HashSet<>();
     Collections.addAll(dataTypes, TSDataType.values());
+    dataTypes.remove(TSDataType.OBJECT);
     dataTypes.remove(TSDataType.VECTOR);
     dataTypes.remove(TSDataType.UNKNOWN);
 
@@ -1718,6 +1900,7 @@ public class IoTDBSessionRelationalIT {
     int testNum = 17;
     Set<TSDataType> dataTypes = new HashSet<>();
     Collections.addAll(dataTypes, TSDataType.values());
+    dataTypes.remove(TSDataType.OBJECT);
     dataTypes.remove(TSDataType.VECTOR);
     dataTypes.remove(TSDataType.UNKNOWN);
 
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index c9ab41e9090..84534adfadb 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -148,6 +148,7 @@ public enum TSStatusCode {
 
   // OBJECT
   OBJECT_NOT_EXISTS(740),
+  OBJECT_INSERT_ERROR(741),
 
   // Arithmetic
   NUMERIC_VALUE_OUT_OF_RANGE(750),
diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml
index e8409508a03..626d8265eed 100644
--- a/iotdb-core/datanode/pom.xml
+++ b/iotdb-core/datanode/pom.xml
@@ -371,11 +371,6 @@
             <version>1.3.0</version>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>org.gdal</groupId>
-            <artifactId>gdal</artifactId>
-            <version>3.11.0</version>
-        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
index 643f62830c2..0ae796f3913 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
@@ -302,7 +302,7 @@ public class DataExecutionVisitor extends 
PlanVisitor<TSStatus, DataRegion> {
       return StatusUtils.OK;
     } catch (final Exception e) {
       LOGGER.error("Error in executing plan node: {}", node, e);
-      return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      return 
RpcUtils.getStatus(TSStatusCode.OBJECT_INSERT_ERROR.getStatusCode(), 
e.getMessage());
     }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
index 04bef0b577c..760d36f1f5c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
@@ -367,6 +367,7 @@ public class InsertRowNode extends InsertNode implements 
WALEntryValue {
           case TEXT:
           case STRING:
           case BLOB:
+          case OBJECT:
             ReadWriteIOUtils.write((Binary) values[i], buffer);
             break;
           default:
@@ -426,6 +427,7 @@ public class InsertRowNode extends InsertNode implements 
WALEntryValue {
           case TEXT:
           case STRING:
           case BLOB:
+          case OBJECT:
             ReadWriteIOUtils.write((Binary) values[i], stream);
             break;
           default:
@@ -520,6 +522,7 @@ public class InsertRowNode extends InsertNode implements 
WALEntryValue {
         case TEXT:
         case STRING:
         case BLOB:
+        case OBJECT:
           values[i] = ReadWriteIOUtils.readBinary(buffer);
           break;
         default:
@@ -589,6 +592,7 @@ public class InsertRowNode extends InsertNode implements 
WALEntryValue {
         case TEXT:
         case STRING:
         case BLOB:
+        case OBJECT:
           size += ReadWriteIOUtils.sizeToWrite((Binary) values[i]);
           break;
         default:
@@ -668,6 +672,7 @@ public class InsertRowNode extends InsertNode implements 
WALEntryValue {
         case TEXT:
         case BLOB:
         case STRING:
+        case OBJECT:
           WALWriteUtils.write((Binary) values[i], buffer);
           break;
         default:
@@ -759,6 +764,7 @@ public class InsertRowNode extends InsertNode implements 
WALEntryValue {
         case TEXT:
         case STRING:
         case BLOB:
+        case OBJECT:
           values[i] = ReadWriteIOUtils.readBinary(stream);
           break;
         default:
@@ -849,6 +855,7 @@ public class InsertRowNode extends InsertNode implements 
WALEntryValue {
         case TEXT:
         case STRING:
         case BLOB:
+        case OBJECT:
           values[i] = ReadWriteIOUtils.readBinary(buffer);
           break;
         default:
@@ -889,7 +896,9 @@ public class InsertRowNode extends InsertNode implements 
WALEntryValue {
   }
 
   public TimeValuePair composeTimeValuePair(int columnIndex) {
-    if (columnIndex >= values.length || 
Objects.isNull(dataTypes[columnIndex])) {
+    if (columnIndex >= values.length
+        || Objects.isNull(dataTypes[columnIndex])
+        || dataTypes[columnIndex] == TSDataType.OBJECT) {
       return null;
     }
     Object value = values[columnIndex];
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index 52386b7e077..3d3c25c0613 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -106,7 +106,7 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
     }
     shouldCheckTTL = true;
     for (MeasurementSchema measurementSchema : measurementSchemas) {
-      if (measurementSchema.getType() == TSDataType.OBJECT) {
+      if (measurementSchema != null && measurementSchema.getType() == 
TSDataType.OBJECT) {
         shouldCheckTTL = false;
         break;
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
index ebe839bbc0c..10fb9bc3443 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
@@ -68,10 +68,6 @@ public class ObjectNode extends SearchNode implements 
WALEntryValue {
 
   private boolean isGeneratedByRemoteConsensusLeader;
 
-  private Long time = null;
-
-  private String table = null;
-
   public ObjectNode(boolean isEOF, long offset, byte[] content, String 
filePath) {
     super(new PlanNodeId(""));
     this.isEOF = isEOF;
@@ -89,27 +85,6 @@ public class ObjectNode extends SearchNode implements 
WALEntryValue {
     this.contentLength = contentLength;
   }
 
-  public long getTimestamp() {
-    calculateTimeAndTableName();
-    return time;
-  }
-
-  public String getTable() {
-    calculateTimeAndTableName();
-    return table;
-  }
-
-  private void calculateTimeAndTableName() {
-    if (time != null && table != null) {
-      return;
-    }
-    File file = new File(filePath);
-    String fileName = new File(filePath).getName();
-    String timeStr = fileName.substring(0, fileName.length() - 
".bin".length());
-    time = Long.parseLong(timeStr);
-    table = 
file.getParentFile().getParentFile().getParentFile().getParentFile().getName();
-  }
-
   public boolean isEOF() {
     return isEOF;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
index 77020d9220d..83f6bbec63e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
@@ -27,13 +27,19 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
 
+import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.IDeviceID.Factory;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -159,6 +165,7 @@ public class RelationalInsertRowsNode extends 
InsertRowsNode {
 
   @Override
   public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+    List<WritePlanNode> writePlanNodeList = new ArrayList<>();
     Map<TRegionReplicaSet, RelationalInsertRowsNode> splitMap = new 
HashMap<>();
     List<TEndPoint> redirectInfo = new ArrayList<>();
     for (int i = 0; i < getInsertRowNodeList().size(); i++) {
@@ -172,6 +179,9 @@ public class RelationalInsertRowsNode extends 
InsertRowsNode {
                   insertRowNode.getDeviceID(),
                   
TimePartitionUtils.getTimePartitionSlot(insertRowNode.getTime()),
                   analysis.getDatabaseName());
+      // handle object type
+      handleObjectValue(insertRowNode, dataRegionReplicaSet, 
writePlanNodeList);
+
       // Collect redirectInfo
       
redirectInfo.add(dataRegionReplicaSet.getDataNodeLocations().get(0).getClientRpcEndPoint());
       RelationalInsertRowsNode tmpNode = splitMap.get(dataRegionReplicaSet);
@@ -185,8 +195,41 @@ public class RelationalInsertRowsNode extends 
InsertRowsNode {
       }
     }
     analysis.setRedirectNodeList(redirectInfo);
+    writePlanNodeList.addAll(splitMap.values());
+
+    return writePlanNodeList;
+  }
 
-    return new ArrayList<>(splitMap.values());
+  private void handleObjectValue(
+      InsertRowNode insertRowNode,
+      TRegionReplicaSet dataRegionReplicaSet,
+      List<WritePlanNode> writePlanNodeList) {
+    for (int j = 0; j < insertRowNode.getDataTypes().length; j++) {
+      if (insertRowNode.getDataTypes()[j] == TSDataType.OBJECT) {
+        Object[] values = insertRowNode.getValues();
+        byte[] binary = ((Binary) values[j]).getValues();
+        ByteBuffer buffer = ByteBuffer.wrap(binary);
+        boolean isEoF = buffer.get() == 1;
+        long offset = buffer.getLong();
+        byte[] content = ReadWriteIOUtils.readBytes(buffer, 
buffer.remaining());
+        String relativePath =
+            TsFileNameGenerator.generateObjectFilePath(
+                dataRegionReplicaSet.getRegionId().getId(),
+                insertRowNode.getTime(),
+                insertRowNode.getDeviceID(),
+                insertRowNode.getMeasurements()[j]);
+        ObjectNode objectNode = new ObjectNode(isEoF, offset, content, 
relativePath);
+        objectNode.setDataRegionReplicaSet(dataRegionReplicaSet);
+        byte[] filePathBytes = relativePath.getBytes(StandardCharsets.UTF_8);
+        byte[] valueBytes = new byte[filePathBytes.length + Long.BYTES];
+        System.arraycopy(
+            BytesUtils.longToBytes(offset + content.length), 0, valueBytes, 0, 
Long.BYTES);
+        System.arraycopy(filePathBytes, 0, valueBytes, Long.BYTES, 
filePathBytes.length);
+        ((Binary) values[j]).setValues(valueBytes);
+        insertRowNode.setValues(values);
+        writePlanNodeList.add(objectNode);
+      }
+    }
   }
 
   public RelationalInsertRowsNode emptyClone() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
index ee93712e77d..e3a114211e1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
@@ -64,8 +64,6 @@ public class RelationalInsertTabletNode extends 
InsertTabletNode {
 
   private boolean singleDevice;
 
-  private Object[] convertedColumns;
-
   public RelationalInsertTabletNode(
       PlanNodeId id,
       PartialPath devicePath,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/AstUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/AstUtil.java
index cb17a90c04f..700d5e40beb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/AstUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/AstUtil.java
@@ -21,16 +21,22 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.sql.util;
 
 import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
 import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BinaryLiteral;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FunctionCall;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Identifier;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Literal;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LongLiteral;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Node;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NullLiteral;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.StringLiteral;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.TableExpressionType;
 
 import com.google.common.graph.SuccessorsFunction;
 import com.google.common.graph.Traverser;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
 
 import java.time.ZoneId;
 import java.util.List;
@@ -113,6 +119,30 @@ public final class AstUtil {
       throw new SemanticException(
           String.format("Cannot insert identifier %s, please use string 
literal", expression));
     }
+    if (expression instanceof FunctionCall
+        && "to_object".equals(((FunctionCall) 
expression).getName().toString())) {
+      List<Expression> arguments = ((FunctionCall) expression).getArguments();
+      if (arguments.size() == 3
+          && arguments.get(0).getExpressionType() == 
TableExpressionType.BOOLEAN_LITERAL
+          && arguments.get(1).getExpressionType() == 
TableExpressionType.LONG_LITERAL
+          && arguments.get(2).getExpressionType() == 
TableExpressionType.BINARY_LITERAL) {
+        boolean isEOF =
+            (boolean)
+                ((BooleanLiteral) ((FunctionCall) 
expression).getArguments().get(0)).getTsValue();
+        long offset =
+            (long) ((LongLiteral) ((FunctionCall) 
expression).getArguments().get(1)).getTsValue();
+        byte[] content =
+            ((Binary)
+                    ((BinaryLiteral) ((FunctionCall) 
expression).getArguments().get(2))
+                        .getTsValue())
+                .getValues();
+        byte[] val = new byte[content.length + 9];
+        val[0] = (byte) (isEOF ? 1 : 0);
+        System.arraycopy(BytesUtils.longToBytes(offset), 0, val, 1, 8);
+        System.arraycopy(content, 0, val, 9, content.length);
+        return new Binary(val);
+      }
+    }
     throw new SemanticException("Unsupported expression: " + expression);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 7e61bea8a4a..18ab1cdc275 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -328,7 +328,7 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
     if (dispatchFailures.isEmpty()) {
       return immediateFuture(new FragInstanceDispatchResult(true));
     }
-    if (instances.size() == 1) {
+    if (instances.size() == 1 || dispatchFailures.size() == 1) {
       return immediateFuture(new 
FragInstanceDispatchResult(dispatchFailures.get(0)));
     } else {
       List<TSStatus> failureStatusList = new ArrayList<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
index 1668355db31..b0575837828 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
@@ -512,14 +512,19 @@ public class CompactionUtils {
     }
   }
 
-  public static void removeDeletedObjectFiles(TsFileResource resource)
-      throws IOException, IllegalPathException {
+  public static void removeDeletedObjectFiles(TsFileResource resource) {
+    // check for compaction recovery
+    if (!resource.tsFileExists()) {
+      return;
+    }
     try (MultiTsFileDeviceIterator deviceIterator =
         new MultiTsFileDeviceIterator(Collections.singletonList(resource))) {
       while (deviceIterator.hasNextDevice()) {
         deviceIterator.nextDevice();
         deviceIterator.getReaderAndChunkMetadataForCurrentAlignedSeries();
       }
+    } catch (Exception e) {
+      logger.warn("Failed to remove object files from file {}", 
resource.getTsFilePath(), e);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TabletDecoder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TabletDecoder.java
index 335d7a23b42..bd1eba9cfa8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TabletDecoder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TabletDecoder.java
@@ -192,6 +192,7 @@ public class TabletDecoder {
       case STRING:
       case BLOB:
       case TEXT:
+      case OBJECT:
         Binary[] binaryCol = new Binary[rowSize];
         if (encoding == TSEncoding.PLAIN) {
           // PlainEncoder uses var int, which may cause compatibility problem
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDeleteObjectFileTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDeleteObjectFileTest.java
deleted file mode 100644
index a4b35c01da6..00000000000
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDeleteObjectFileTest.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.storageengine.dataregion.compaction;
-
-import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.commons.schema.table.TsTable;
-import org.apache.iotdb.commons.schema.table.column.FieldColumnSchema;
-import org.apache.iotdb.commons.schema.table.column.TagColumnSchema;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
-import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
-import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.SettleCompactionTask;
-import 
org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate;
-import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate;
-import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
-import 
org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-
-import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.exception.write.WriteProcessException;
-import org.apache.tsfile.file.metadata.StringArrayDeviceID;
-import org.apache.tsfile.file.metadata.enums.CompressionType;
-import org.apache.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.tsfile.read.common.TimeRange;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-public class CompactionDeleteObjectFileTest extends AbstractCompactionTest {
-  @Before
-  public void setUp()
-      throws IOException, WriteProcessException, MetadataException, 
InterruptedException {
-    super.setUp();
-  }
-
-  @After
-  public void tearDown() throws IOException, StorageEngineException {
-    super.tearDown();
-  }
-
-  @Test
-  public void test1() throws IOException {
-    createTable("tsfile_table", 100);
-    File dir = new File("/Users/shuww/Downloads/0708/1_副本");
-    List<TsFileResource> resources = new ArrayList<>();
-    for (File file : dir.listFiles()) {
-      if (!file.getName().endsWith(".tsfile")) {
-        continue;
-      }
-      TsFileResource resource = new TsFileResource(file);
-
-      try (ModificationFile modificationFile = resource.getExclusiveModFile()) 
{
-        modificationFile.write(
-            new TableDeletionEntry(
-                new DeletionPredicate(
-                    "tsfile_table",
-                    new IDPredicate.FullExactMatch(
-                        new StringArrayDeviceID(new String[] {"tsfile_table", 
"1", "5", "3"})),
-                    Arrays.asList("file")),
-                new TimeRange(-1, 0)));
-        modificationFile.write(
-            new TableDeletionEntry(
-                new DeletionPredicate(
-                    "tsfile_table",
-                    new IDPredicate.FullExactMatch(
-                        new StringArrayDeviceID(new String[] {"tsfile_table", 
"1", "5", "3"})),
-                    Arrays.asList("file")),
-                new TimeRange(2, 2)));
-      }
-      resource.deserialize();
-      resources.add(resource);
-    }
-
-    //        InnerSpaceCompactionTask task =
-    //            new InnerSpaceCompactionTask(
-    //                0, tsFileManager, resources, true, new 
ReadChunkCompactionPerformer(), 0);
-    SettleCompactionTask task =
-        new SettleCompactionTask(
-            0,
-            tsFileManager,
-            resources,
-            Collections.emptyList(),
-            true,
-            new FastCompactionPerformer(false),
-            0);
-    task.start();
-  }
-
-  public void createTable(String tableName, long ttl) {
-    TsTable tsTable = new TsTable(tableName);
-    tsTable.addColumnSchema(new TagColumnSchema("id_column", 
TSDataType.STRING));
-    tsTable.addColumnSchema(
-        new FieldColumnSchema("s1", TSDataType.STRING, TSEncoding.PLAIN, 
CompressionType.LZ4));
-    tsTable.addProp(TsTable.TTL_PROPERTY, ttl + "");
-    DataNodeTableCache.getInstance().preUpdateTable("Downloads", tsTable, 
null);
-    DataNodeTableCache.getInstance().commitUpdateTable("Downloads", tableName, 
null);
-  }
-}


Reply via email to