This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch force_ci/object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b23460c57cc1437a712e2c5c63ddc639e4d623de
Author: shuwenwei <[email protected]>
AuthorDate: Tue Dec 2 17:06:10 2025 +0800

    fix object functions & add tests for object type & fix some bugs (#16829)
---
 .../it/query/object/IoTDBObjectQueryIT2.java       | 296 +++++++++++++++++++++
 .../scalar/IoTDBScalarFunctionTableIT.java         |  16 +-
 .../it/query/recent/IoTDBLengthFunctionIT.java     |   2 +-
 .../iotdb/udf/api/relational/access/Record.java    |  35 ++-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   1 +
 .../impl/DataNodeInternalRPCServiceImpl.java       |   8 +
 .../operator/process/function/partition/Slice.java |  30 ++-
 .../expression/PatternExpressionComputation.java   |   5 +-
 .../aggregation/MaskedRecordIterator.java          |   2 -
 .../relational/aggregation}/RecordIterator.java    |  17 +-
 .../UserDefinedAggregateFunctionAccumulator.java   |   1 -
 .../GroupedUserDefinedAggregateAccumulator.java    |   2 +-
 .../relational/ColumnTransformerBuilder.java       |   8 +-
 .../plan/analyze/ClusterPartitionFetcher.java      |   4 +
 .../plan/planner/OperatorTreeGenerator.java        |   1 +
 .../plan/planner/TableOperatorGenerator.java       |   6 +-
 .../plan/node/write/RelationalInsertRowsNode.java  |   3 +
 .../relational/metadata/TableMetadataImpl.java     |   8 +-
 .../udf/UserDefineScalarFunctionTransformer.java   |   2 +-
 .../AbstractCastFunctionColumnTransformer.java     |  10 +
 ...r.java => AbstractLengthColumnTransformer.java} |  15 +-
 .../unary/scalar/BlobLengthColumnTransformer.java  |  25 +-
 .../scalar/CastFunctionColumnTransformer.java      |   3 +
 .../unary/scalar/LengthColumnTransformer.java      |  30 +--
 .../scalar/ObjectLengthColumnTransformer.java}     |  23 +-
 .../unary/scalar/ReadObjectColumnTransformer.java  |  44 +--
 .../scalar/TryCastFunctionColumnTransformer.java   |   3 +
 .../compaction/execute/utils/CompactionUtils.java  |   2 +
 .../org/apache/iotdb/db/utils/ObjectTypeUtils.java | 139 ++++++++++
 .../plan/function/RecordObjectTypeTest.java        | 135 ++++++++++
 .../scalar/BlobLengthColumnTransformerTest.java    |  22 +-
 .../unary/scalar/ObjectTypeFunctionTest.java       | 180 +++++++++++++
 .../object/ObjectTypeCompactionTest.java           | 261 ++++++++++++++++++
 .../src/main/thrift/datanode.thrift                |   8 +
 34 files changed, 1203 insertions(+), 144 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/object/IoTDBObjectQueryIT2.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/object/IoTDBObjectQueryIT2.java
new file mode 100644
index 00000000000..9a0dd60907e
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/object/IoTDBObjectQueryIT2.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.query.object;
+
+import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+
+import org.apache.tsfile.utils.Binary;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.time.LocalDate;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBObjectQueryIT2 {
+
+  private static final String DATABASE_NAME = "test";
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    
EnvFactory.getEnv().getConfig().getCommonConfig().setDataReplicationFactor(1);
+    EnvFactory.getEnv().initClusterEnvironment();
+    try (Connection connection = 
EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
+        Statement statement = connection.createStatement()) {
+      statement.execute("CREATE DATABASE " + DATABASE_NAME);
+      statement.execute("USE " + DATABASE_NAME);
+      statement.execute(
+          "CREATE TABLE table1(device STRING TAG, s4 DATE FIELD, s5 TIMESTAMP 
FIELD, s6 BLOB FIELD, s7 STRING FIELD, s8 OBJECT FIELD, s9 OBJECT FIELD)");
+      for (int i = 1; i <= 10; i++) {
+        for (int j = 0; j < 10; j++) {
+          statement.execute(
+              String.format(
+                  "insert into table1(time, device, s4, s5, s6, s7, s8) "
+                      + "values(%d, '%s', '%s', %d, %s, '%s', %s)",
+                  j,
+                  "d" + i,
+                  LocalDate.of(2024, 5, i % 31 + 1),
+                  j,
+                  "X'cafebabe'",
+                  j,
+                  "to_object(true, 0, X'cafebabe')"));
+        }
+      }
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testObjectLength() throws IoTDBConnectionException, 
StatementExecutionException {
+    try (ITableSession session = 
EnvFactory.getEnv().getTableSessionConnection()) {
+      session.executeNonQueryStatement("USE " + DATABASE_NAME);
+      SessionDataSet sessionDataSet =
+          session.executeQueryStatement("select length(s8) from table1 limit 
1");
+      SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
+      while (iterator.next()) {
+        long length = iterator.getLong(1);
+        Assert.assertEquals(4, length);
+      }
+    }
+  }
+
+  @Test
+  public void testReadObject() throws IoTDBConnectionException, 
StatementExecutionException {
+    try (ITableSession session = 
EnvFactory.getEnv().getTableSessionConnection()) {
+      session.executeNonQueryStatement("USE " + DATABASE_NAME);
+      SessionDataSet sessionDataSet =
+          session.executeQueryStatement("select read_object(s8) from table1 
where device = 'd2'");
+      SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
+      byte[] expected = new byte[] {(byte) 0xCA, (byte) 0xFE, (byte) 0xBA, 
(byte) 0xBE};
+      while (iterator.next()) {
+        Binary blob = iterator.getBlob(1);
+        Assert.assertArrayEquals(expected, blob.getValues());
+      }
+
+      sessionDataSet =
+          session.executeQueryStatement(
+              "select read_object(s8, 1) from table1 where device = 'd3'");
+      iterator = sessionDataSet.iterator();
+      expected = new byte[] {(byte) 0xFE, (byte) 0xBA, (byte) 0xBE};
+      while (iterator.next()) {
+        Binary blob = iterator.getBlob(1);
+        Assert.assertArrayEquals(expected, blob.getValues());
+      }
+
+      sessionDataSet =
+          session.executeQueryStatement(
+              "select read_object(s8, 1, 2) from table1 where device = 'd1'");
+      iterator = sessionDataSet.iterator();
+      expected = new byte[] {(byte) 0xFE, (byte) 0xBA};
+      while (iterator.next()) {
+        Binary blob = iterator.getBlob(1);
+        Assert.assertArrayEquals(expected, blob.getValues());
+      }
+
+      sessionDataSet =
+          session.executeQueryStatement(
+              "select read_object(s8, 1, 1000) from table1 where device = 
'd1'");
+      iterator = sessionDataSet.iterator();
+      expected = new byte[] {(byte) 0xFE, (byte) 0xBA, (byte) 0xBE};
+      while (iterator.next()) {
+        Binary blob = iterator.getBlob(1);
+        Assert.assertArrayEquals(expected, blob.getValues());
+      }
+
+      sessionDataSet =
+          session.executeQueryStatement(
+              "select count(*) from table1 where device = 'd1' and s6 = 
read_object(s8)");
+      iterator = sessionDataSet.iterator();
+      while (iterator.next()) {
+        long count = iterator.getLong(1);
+        Assert.assertEquals(10, count);
+      }
+
+      // read_object are not pushed down. Read remote files
+      sessionDataSet =
+          session.executeQueryStatement(
+              "select read_object(t1_s8) from (select t1.s8 as t1_s8, t2.s8 as 
t2_s8 from table1 as t1 inner join table1 as t2 using(time))");
+      iterator = sessionDataSet.iterator();
+      expected = new byte[] {(byte) 0xCA, (byte) 0xFE, (byte) 0xBA, (byte) 
0xBE};
+      while (iterator.next()) {
+        Binary blob = iterator.getBlob(1);
+        Assert.assertArrayEquals(expected, blob.getValues());
+      }
+    }
+  }
+
+  @Test
+  public void testFunctionAndClauses()
+      throws IoTDBConnectionException, StatementExecutionException {
+    try (ITableSession session = 
EnvFactory.getEnv().getTableSessionConnection()) {
+      session.executeNonQueryStatement("USE " + DATABASE_NAME);
+
+      SessionDataSet sessionDataSet =
+          session.executeQueryStatement(
+              "select length(s8) from table1 where device = 'd2' and s8 is not 
null limit 1");
+      SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
+      while (iterator.next()) {
+        Assert.assertEquals(4, iterator.getLong(1));
+      }
+      sessionDataSet =
+          session.executeQueryStatement(
+              "select count(s8), first(s8), last(s8), first_by(s8, time), 
last_by(s8, time) from table1 where device = 'd1' and cast(s8 as string) = 
'(Object) 4 B' and try_cast(s8 as string) = '(Object) 4 B'");
+      iterator = sessionDataSet.iterator();
+      while (iterator.next()) {
+        Assert.assertEquals(10, iterator.getLong(1));
+        Assert.assertEquals("(Object) 4 B", iterator.getString(2));
+        Assert.assertEquals("(Object) 4 B", iterator.getString(3));
+        Assert.assertEquals("(Object) 4 B", iterator.getString(4));
+        Assert.assertEquals("(Object) 4 B", iterator.getString(5));
+      }
+
+      sessionDataSet = session.executeQueryStatement("select coalesce(s9, s8) 
from table1");
+      iterator = sessionDataSet.iterator();
+      while (iterator.next()) {
+        Assert.assertEquals("(Object) 4 B", iterator.getString(1));
+      }
+
+      // MATCH_RECOGNIZE
+      Assert.assertThrows(
+          StatementExecutionException.class,
+          () ->
+              session.executeNonQueryStatement(
+                  "select m.cnt from table1 match_recognize (order by s8 
measures RPR_LAST(time) as cnt one row per match pattern (B+) define B as B.s6 
= prev(B.s6)) as m"));
+      Assert.assertThrows(
+          StatementExecutionException.class,
+          () ->
+              session.executeNonQueryStatement(
+                  "select m.cnt from table1 match_recognize (partition by s8 
measures RPR_LAST(time) as cnt one row per match pattern (B+) define B as B.s6 
= prev(B.s6)) as m"));
+
+      sessionDataSet =
+          session.executeQueryStatement(
+              "select m.value from table1 match_recognize(partition by s6 
measures prev(s8) as value one row per match pattern (B+) define B as 
B.s6=prev(B.s6)) as m");
+      iterator = sessionDataSet.iterator();
+      while (iterator.next()) {
+        Assert.assertEquals("(Object) 4 B", iterator.getString(1));
+      }
+
+      // WHERE
+      session.executeQueryStatement(
+          "select time, s8 from table1 where device = 'd10' and s8 is not 
null");
+      iterator = sessionDataSet.iterator();
+      while (iterator.next()) {
+        Assert.assertEquals("(Object) 4 B", iterator.getString(2));
+      }
+
+      // GROUP BY
+      Assert.assertThrows(
+          StatementExecutionException.class,
+          () -> session.executeNonQueryStatement("select count(*) from table1 
group by s8"));
+
+      // ORDER BY
+      Assert.assertThrows(
+          StatementExecutionException.class,
+          () -> session.executeNonQueryStatement("select count(*) from table1 
order by s8"));
+
+      // FILL
+      Assert.assertThrows(
+          StatementExecutionException.class,
+          () ->
+              session.executeNonQueryStatement(
+                  "select time, s8 from table1 where device = 'd10' fill 
method linear"));
+      session.executeQueryStatement(
+          "select time, s8 from table1 where device = 'd10' fill method 
previous");
+      iterator = sessionDataSet.iterator();
+      while (iterator.next()) {
+        Assert.assertEquals("(Object) 4 B", iterator.getString(2));
+      }
+
+      // HAVING
+      session.executeQueryStatement(
+          "select device, count(s8) from table1 group by device having 
count(s8) > 0");
+      iterator = sessionDataSet.iterator();
+      while (iterator.next()) {
+        long count = iterator.getLong(2);
+        Assert.assertEquals(10, count);
+      }
+
+      // WINDOW
+      Assert.assertThrows(
+          StatementExecutionException.class,
+          () ->
+              session.executeNonQueryStatement(
+                  "select *, nth_value(s8,2) over(partition by s8) from 
table1"));
+      Assert.assertThrows(
+          StatementExecutionException.class,
+          () ->
+              session.executeNonQueryStatement(
+                  "select *, nth_value(s8,2) over(order by s8) from table1"));
+      session.executeNonQueryStatement(
+          "select *, nth_value(s8,2) over(partition by device) from table1");
+      session.executeNonQueryStatement(
+          "select *, lead(s8) over(partition by device order by time) from 
table1");
+      session.executeNonQueryStatement(
+          "select *, first_value(s8) over(partition by device) from table1");
+      session.executeNonQueryStatement(
+          "select *, last_value(s8) over(partition by device) from table1");
+      session.executeNonQueryStatement(
+          "select *, lag(s8) over(partition by device order by time) from 
table1");
+
+      // Table-value function
+      Assert.assertThrows(
+          StatementExecutionException.class,
+          () ->
+              session.executeNonQueryStatement(
+                  "select * from session(data => table1 partition by s8, 
timecol => 'time', gap => 1ms)"));
+      Assert.assertThrows(
+          StatementExecutionException.class,
+          () ->
+              session.executeNonQueryStatement(
+                  "select * from session(data => table1 order by s8, timecol 
=> 'time', gap => 1ms)"));
+      sessionDataSet =
+          session.executeQueryStatement(
+              "select * from hop(data => table1, timecol => 'time', slide => 
1ms, size => 2ms)");
+      iterator = sessionDataSet.iterator();
+      while (iterator.next()) {
+        String str = iterator.getString("s8");
+        Assert.assertEquals("(Object) 4 B", str);
+      }
+    }
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/old/builtinfunction/scalar/IoTDBScalarFunctionTableIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/old/builtinfunction/scalar/IoTDBScalarFunctionTableIT.java
index 01c2d591918..0e4b0a38221 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/old/builtinfunction/scalar/IoTDBScalarFunctionTableIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/old/builtinfunction/scalar/IoTDBScalarFunctionTableIT.java
@@ -1370,56 +1370,56 @@ public class IoTDBScalarFunctionTableIT {
     tableAssertTestFail(
         "select s1,Length(s1,1) from lengthTable",
         TSStatusCode.SEMANTIC_ERROR.getStatusCode()
-            + ": Scalar function length only accepts one argument and it must 
be text, string, or blob data type.",
+            + ": Scalar function length only accepts one argument and it must 
be text or string or blob or object data type.",
         DATABASE_NAME);
 
     // case 2: wrong data type
     tableAssertTestFail(
         "select s1,Length(s2) from lengthTable",
         TSStatusCode.SEMANTIC_ERROR.getStatusCode()
-            + ": Scalar function length only accepts one argument and it must 
be text, string, or blob data type.",
+            + ": Scalar function length only accepts one argument and it must 
be text or string or blob or object data type.",
         DATABASE_NAME);
 
     // case 3: wrong data type
     tableAssertTestFail(
         "select s1,Length(s3) from lengthTable",
         TSStatusCode.SEMANTIC_ERROR.getStatusCode()
-            + ": Scalar function length only accepts one argument and it must 
be text, string, or blob data type.",
+            + ": Scalar function length only accepts one argument and it must 
be text or string or blob or object data type.",
         DATABASE_NAME);
 
     // case 4: wrong data type
     tableAssertTestFail(
         "select s1,Length(s4) from lengthTable",
         TSStatusCode.SEMANTIC_ERROR.getStatusCode()
-            + ": Scalar function length only accepts one argument and it must 
be text, string, or blob data type.",
+            + ": Scalar function length only accepts one argument and it must 
be text or string or blob or object data type.",
         DATABASE_NAME);
 
     // case 5: wrong data type
     tableAssertTestFail(
         "select s1,Length(s5) from lengthTable",
         TSStatusCode.SEMANTIC_ERROR.getStatusCode()
-            + ": Scalar function length only accepts one argument and it must 
be text, string, or blob data type.",
+            + ": Scalar function length only accepts one argument and it must 
be text or string or blob or object data type.",
         DATABASE_NAME);
 
     // case 6: wrong data type
     tableAssertTestFail(
         "select s1,Length(s6) from lengthTable",
         TSStatusCode.SEMANTIC_ERROR.getStatusCode()
-            + ": Scalar function length only accepts one argument and it must 
be text, string, or blob data type.",
+            + ": Scalar function length only accepts one argument and it must 
be text or string or blob or object data type.",
         DATABASE_NAME);
 
     // case 7: wrong data type
     tableAssertTestFail(
         "select s1,Length(s7) from lengthTable",
         TSStatusCode.SEMANTIC_ERROR.getStatusCode()
-            + ": Scalar function length only accepts one argument and it must 
be text, string, or blob data type.",
+            + ": Scalar function length only accepts one argument and it must 
be text or string or blob or object data type.",
         DATABASE_NAME);
 
     // case 8: wrong data type
     tableAssertTestFail(
         "select s1,Length(s8) from lengthTable",
         TSStatusCode.SEMANTIC_ERROR.getStatusCode()
-            + ": Scalar function length only accepts one argument and it must 
be text, string, or blob data type.",
+            + ": Scalar function length only accepts one argument and it must 
be text or string or blob or object data type.",
         DATABASE_NAME);
   }
 
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBLengthFunctionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBLengthFunctionIT.java
index 57bf23918d5..7f3ba13b11d 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBLengthFunctionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBLengthFunctionIT.java
@@ -107,7 +107,7 @@ public class IoTDBLengthFunctionIT {
   public void testLengthFunctionOnInvalidInputs() {
     String expectedErrorMessage =
         TSStatusCode.SEMANTIC_ERROR.getStatusCode()
-            + ": Scalar function length only accepts one argument and it must 
be text, string, or blob data type.";
+            + ": Scalar function length only accepts one argument and it must 
be text or string or blob or object data type.";
 
     // Exception 1: Using LENGTH() on non-TEXT/BLOB/STRING types
     tableAssertTestFail("SELECT length(c_int) FROM table1", 
expectedErrorMessage, DATABASE_NAME);
diff --git 
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java
 
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java
index 8c6e2a7f357..4267f41dc8d 100644
--- 
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java
+++ 
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java
@@ -83,7 +83,7 @@ public interface Record {
    * Returns the Binary value at the specified column in this row.
    *
    * <p>Users need to ensure that the data type of the specified column is 
{@code TSDataType.TEXT},
-   * {@code TSDataType.STRING} or {@code TSDataType.BLOB}.
+   * {@code TSDataType.STRING} or {@code TSDataType.BLOB} or {@code 
TSDataType.OBJECT}.
    *
    * @param columnIndex index of the specified column
    * @return the Binary value at the specified column in this row
@@ -94,7 +94,7 @@ public interface Record {
    * Returns the String value at the specified column in this row.
    *
    * <p>Users need to ensure that the data type of the specified column is 
{@code TSDataType.TEXT}
-   * or {@code TSDataType.STRING}.
+   * or {@code TSDataType.STRING} or {@code TSDataType.OBJECT}.
    *
    * @param columnIndex index of the specified column
    * @return the String value at the specified column in this row
@@ -113,6 +113,37 @@ public interface Record {
 
   Object getObject(int columnIndex);
 
+  /**
+   * Returns the Binary representation of an object stored at the specified 
column in this row.
+   *
+   * <p>Users need to ensure that the data type of the specified column is 
{@code
+   * TSDataType.OBJECT}.
+   *
+   * <p>This method returns the entire binary data of the object and may 
require considerable memory
+   * if the stored object is large.
+   *
+   * @param columnIndex index of the specified column
+   * @return the Binary content of the object at the specified column
+   */
+  Binary readObject(int columnIndex);
+
+  /**
+   * Returns a partial Binary segment of an object stored at the specified 
column in this row.
+   *
+   * <p>Users need to ensure that the data type of the specified column is 
{@code
+   * TSDataType.OBJECT}.
+   *
+   * <p>This method enables reading a subset of the stored object without 
materializing the entire
+   * binary data in memory, which is useful for large objects and streaming 
access patterns.
+   *
+   * @param columnIndex index of the specified column
+   * @param offset byte offset of the subsection read
+   * @param length number of bytes to read starting from the offset. If length 
< 0, read the entire
+   *     binary data from offset.
+   * @return the Binary content of the object segment at the specified column
+   */
+  Binary readObject(int columnIndex, long offset, int length);
+
   /**
    * Returns the actual data type of the value at the specified column in this 
row.
    *
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 3b6a08d6475..bb533ddbbd5 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -148,6 +148,7 @@ public enum TSStatusCode {
   // OBJECT
   OBJECT_NOT_EXISTS(740),
   OBJECT_INSERT_ERROR(741),
+  OBJECT_READ_ERROR(742),
 
   // Arithmetic
   NUMERIC_VALUE_OUT_OF_RANGE(750),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index dcc62fcd3c8..8f1b97b7196 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -201,6 +201,7 @@ import 
org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
 import org.apache.iotdb.db.trigger.executor.TriggerExecutor;
 import org.apache.iotdb.db.trigger.executor.TriggerFireResult;
 import org.apache.iotdb.db.trigger.service.TriggerManagementService;
+import org.apache.iotdb.db.utils.ObjectTypeUtils;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.metrics.type.AutoGauge;
 import org.apache.iotdb.metrics.utils.MetricLevel;
@@ -279,6 +280,7 @@ import 
org.apache.iotdb.mpp.rpc.thrift.TPushSingleTopicMetaReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaRespExceptionMessage;
+import org.apache.iotdb.mpp.rpc.thrift.TReadObjectReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeResp;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionMigrateResult;
@@ -3051,6 +3053,12 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
   }
 
+  @Override
+  public ByteBuffer readObject(TReadObjectReq req) {
+    return ObjectTypeUtils.readObjectContent(
+        req.getRelativePath(), req.getOffset(), req.getSize(), false);
+  }
+
   public void handleClientExit() {
     // Do nothing
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java
index b7ff6f3e0fd..599efc4f240 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java
@@ -19,12 +19,14 @@
 
 package 
org.apache.iotdb.db.queryengine.execution.operator.process.function.partition;
 
+import org.apache.iotdb.db.utils.ObjectTypeUtils;
 import org.apache.iotdb.udf.api.relational.access.Record;
 import org.apache.iotdb.udf.api.type.Type;
 
 import org.apache.tsfile.block.column.Column;
 import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
 import org.apache.tsfile.utils.DateUtils;
 
 import java.time.LocalDate;
@@ -35,6 +37,8 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.iotdb.udf.api.type.Type.OBJECT;
+
 /** Parts of partition. */
 public class Slice {
   private final Column[] requiredColumns;
@@ -171,9 +175,15 @@ public class Slice {
 
     @Override
     public String getString(int columnIndex) {
-      return originalColumns[columnIndex]
-          .getBinary(offset)
-          .getStringValue(TSFileConfig.STRING_CHARSET);
+      Binary binary = originalColumns[columnIndex].getBinary(offset);
+      Type type = dataTypes.get(columnIndex);
+      if (type == OBJECT) {
+        return BytesUtils.parseObjectByteArrayToString(binary.getValues());
+      } else if (type == Type.BLOB) {
+        return BytesUtils.parseBlobByteArrayToString(binary.getValues());
+      } else {
+        return binary.getStringValue(TSFileConfig.STRING_CHARSET);
+      }
     }
 
     @Override
@@ -186,6 +196,20 @@ public class Slice {
       return originalColumns[columnIndex].getObject(offset);
     }
 
+    @Override
+    public Binary readObject(int columnIndex, long offset, int length) {
+      if (getDataType(columnIndex) != Type.OBJECT) {
+        throw new UnsupportedOperationException("current column is not object 
column");
+      }
+      Binary binary = getBinary(columnIndex);
+      return new Binary(ObjectTypeUtils.readObjectContent(binary, offset, 
length, true).array());
+    }
+
+    @Override
+    public Binary readObject(int columnIndex) {
+      return readObject(columnIndex, 0L, -1);
+    }
+
     @Override
     public Type getDataType(int columnIndex) {
       return dataTypes.get(columnIndex);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/expression/PatternExpressionComputation.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/expression/PatternExpressionComputation.java
index 6b5032627b2..cb8cbdccbb6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/expression/PatternExpressionComputation.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/expression/PatternExpressionComputation.java
@@ -35,6 +35,7 @@ import org.apache.tsfile.read.common.type.BlobType;
 import org.apache.tsfile.read.common.type.BooleanType;
 import org.apache.tsfile.read.common.type.DoubleType;
 import org.apache.tsfile.read.common.type.FloatType;
+import org.apache.tsfile.read.common.type.ObjectType;
 import org.apache.tsfile.read.common.type.Type;
 
 import java.util.ArrayList;
@@ -161,7 +162,9 @@ public class PatternExpressionComputation {
       return partition.getFloat(channel, position);
     } else if (type instanceof DoubleType) {
       return partition.getDouble(channel, position);
-    } else if (type instanceof AbstractVarcharType || type instanceof 
BlobType) {
+    } else if (type instanceof AbstractVarcharType
+        || type instanceof BlobType
+        || type instanceof ObjectType) {
       return partition.getBinary(channel, position);
     } else {
       throw new SemanticException("Unsupported type: " + 
type.getClass().getSimpleName());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaskedRecordIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaskedRecordIterator.java
index 5237fcfd12e..48095927eda 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaskedRecordIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaskedRecordIterator.java
@@ -19,8 +19,6 @@
 
 package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
 
-import org.apache.iotdb.commons.udf.access.RecordIterator;
-
 import org.apache.tsfile.block.column.Column;
 import org.apache.tsfile.read.common.type.Type;
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/access/RecordIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/RecordIterator.java
similarity index 88%
rename from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/access/RecordIterator.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/RecordIterator.java
index 6f5813955dd..9151ef31e0d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/access/RecordIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/RecordIterator.java
@@ -17,9 +17,10 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.udf.access;
+package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
 
 import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
+import org.apache.iotdb.db.utils.ObjectTypeUtils;
 import org.apache.iotdb.udf.api.relational.access.Record;
 import org.apache.iotdb.udf.api.type.Type;
 
@@ -138,6 +139,20 @@ public class RecordIterator implements Iterator<Record> {
       return childrenColumns.get(columnIndex).getObject(index);
     }
 
+    @Override
+    public Binary readObject(int columnIndex, long offset, int length) {
+      if (getDataType(columnIndex) != Type.OBJECT) {
+        throw new UnsupportedOperationException("current column is not object 
column");
+      }
+      Binary binary = getBinary(columnIndex);
+      return new Binary(ObjectTypeUtils.readObjectContent(binary, offset, 
length, true).array());
+    }
+
+    @Override
+    public Binary readObject(int columnIndex) {
+      return readObject(columnIndex, 0L, -1);
+    }
+
     @Override
     public Type getDataType(int columnIndex) {
       return 
UDFDataTypeTransformer.transformReadTypeToUDFDataType(dataTypes.get(columnIndex));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/UserDefinedAggregateFunctionAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/UserDefinedAggregateFunctionAccumulator.java
index 64c0896b3a1..70e1c0c5d44 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/UserDefinedAggregateFunctionAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/UserDefinedAggregateFunctionAccumulator.java
@@ -19,7 +19,6 @@
 
 package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
 
-import org.apache.iotdb.commons.udf.access.RecordIterator;
 import org.apache.iotdb.udf.api.State;
 import org.apache.iotdb.udf.api.customizer.analysis.AggregateFunctionAnalysis;
 import org.apache.iotdb.udf.api.relational.AggregateFunction;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedUserDefinedAggregateAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedUserDefinedAggregateAccumulator.java
index b3d24e923ae..9ac6b48db7c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedUserDefinedAggregateAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedUserDefinedAggregateAccumulator.java
@@ -19,9 +19,9 @@
 
 package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped;
 
-import org.apache.iotdb.commons.udf.access.RecordIterator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationMask;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.MaskedRecordIterator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.RecordIterator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.ObjectBigArray;
 import org.apache.iotdb.udf.api.State;
 import org.apache.iotdb.udf.api.relational.AggregateFunction;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java
index a17a824248c..41279c9b3eb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java
@@ -164,6 +164,7 @@ import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.Ln
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.Log10ColumnTransformer;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.LongToBytesColumnTransformer;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.LowerColumnTransformer;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.ObjectLengthColumnTransformer;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.RTrim2ColumnTransformer;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.RTrimColumnTransformer;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.RadiansColumnTransformer;
@@ -230,6 +231,7 @@ import java.util.stream.Collectors;
 import static com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.iotdb.db.queryengine.plan.expression.unary.LikeExpression.getEscapeCharacter;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.PredicatePushIntoMetadataChecker.isStringLiteral;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.metadata.TableMetadataImpl.isBlobType;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.metadata.TableMetadataImpl.isCharType;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.type.TypeSignatureTranslator.toTypeSignature;
@@ -782,9 +784,11 @@ public class ColumnTransformerBuilder
       if (children.size() == 1) {
         Type argumentType = first.getType();
         if (isCharType(argumentType)) {
-          return new LengthColumnTransformer(INT32, first);
+          return new LengthColumnTransformer(INT64, first);
+        } else if (isBlobType(argumentType)) {
+          return new BlobLengthColumnTransformer(INT64, first);
         } else {
-          return new BlobLengthColumnTransformer(INT32, first);
+          return new ObjectLengthColumnTransformer(INT64, first);
         }
       }
     } else if 
(TableBuiltinScalarFunction.UPPER.getFunctionName().equalsIgnoreCase(functionName))
 {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
index 8c54fd640f8..2274762341b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
@@ -308,6 +308,10 @@ public class ClusterPartitionFetcher implements 
IPartitionFetcher {
     return partitionCache.updateGroupIdToReplicaSetMap(req.getTimestamp(), 
req.getRegionRouteMap());
   }
 
+  public List<TRegionReplicaSet> getRegionReplicaSet(List<TConsensusGroupId> 
consensusGroupIds) {
+    return partitionCache.getRegionReplicaSet(consensusGroupIds);
+  }
+
   @Override
   public void invalidAllCache() {
     partitionCache.invalidAllCache();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 58164159183..44be3e69afb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -1435,6 +1435,7 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
         case TEXT:
         case STRING:
         case BLOB:
+        case OBJECT:
           previousFill[i] =
               filter == null
                   ? new BinaryPreviousFill()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index adf4efd8c5d..e699ee417de 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -268,6 +268,7 @@ import 
org.apache.tsfile.read.common.block.column.LongColumn;
 import org.apache.tsfile.read.common.type.BinaryType;
 import org.apache.tsfile.read.common.type.BlobType;
 import org.apache.tsfile.read.common.type.BooleanType;
+import org.apache.tsfile.read.common.type.ObjectType;
 import org.apache.tsfile.read.common.type.Type;
 import org.apache.tsfile.read.common.type.TypeFactory;
 import org.apache.tsfile.read.filter.basic.Filter;
@@ -3833,6 +3834,7 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
         case MAX:
         case MIN:
           if (BlobType.BLOB.equals(argumentType)
+              || ObjectType.OBJECT.equals(argumentType)
               || BinaryType.TEXT.equals(argumentType)
               || BooleanType.BOOLEAN.equals(argumentType)) {
             canUseStatistic = false;
@@ -3848,8 +3850,8 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
             descendingCount++;
           }
 
-          // first/last/first_by/last_by aggregation with BLOB type can not 
use statistics
-          if (BlobType.BLOB.equals(argumentType)) {
+          // first/last/first_by/last_by aggregation with BLOB or OBJECT type 
can not use statistics
+          if (BlobType.BLOB.equals(argumentType) || 
ObjectType.OBJECT.equals(argumentType)) {
             canUseStatistic = false;
             break;
           }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
index 83f6bbec63e..2297ddcebdd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
@@ -207,6 +207,9 @@ public class RelationalInsertRowsNode extends 
InsertRowsNode {
     for (int j = 0; j < insertRowNode.getDataTypes().length; j++) {
       if (insertRowNode.getDataTypes()[j] == TSDataType.OBJECT) {
         Object[] values = insertRowNode.getValues();
+        if (values[j] == null) {
+          continue;
+        }
         byte[] binary = ((Binary) values[j]).getValues();
         ByteBuffer buffer = ByteBuffer.wrap(binary);
         boolean isEoF = buffer.get() == 1;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
index 60190d4b287..7786876e19a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
@@ -267,13 +267,15 @@ public class TableMetadataImpl implements Metadata {
       return STRING;
     } else if 
(TableBuiltinScalarFunction.LENGTH.getFunctionName().equalsIgnoreCase(functionName))
 {
       if (!(argumentTypes.size() == 1
-          && (isCharType(argumentTypes.get(0)) || 
isBlobType(argumentTypes.get(0))))) {
+          && (isCharType(argumentTypes.get(0))
+              || isBlobType(argumentTypes.get(0))
+              || isObjectType(argumentTypes.get(0))))) {
         throw new SemanticException(
             "Scalar function "
                 + functionName.toLowerCase(Locale.ENGLISH)
-                + " only accepts one argument and it must be text, string, or 
blob data type.");
+                + " only accepts one argument and it must be text or string or 
blob or object data type.");
       }
-      return INT32;
+      return INT64;
     } else if 
(TableBuiltinScalarFunction.UPPER.getFunctionName().equalsIgnoreCase(functionName))
 {
       if (!(argumentTypes.size() == 1 && isCharType(argumentTypes.get(0)))) {
         throw new SemanticException(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
index 279e3c06fd4..47fd40ed73f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.queryengine.transformation.dag.column.udf;
 
-import org.apache.iotdb.commons.udf.access.RecordIterator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.RecordIterator;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.multi.MultiColumnTransformer;
 import org.apache.iotdb.udf.api.relational.ScalarFunction;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractCastFunctionColumnTransformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractCastFunctionColumnTransformer.java
index ee40a3ec6f8..9ed1bb33075 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractCastFunctionColumnTransformer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractCastFunctionColumnTransformer.java
@@ -341,6 +341,7 @@ public abstract class AbstractCastFunctionColumnTransformer 
extends UnaryColumnT
         case TEXT:
         case STRING:
         case BLOB:
+        case OBJECT:
           returnType.writeBinary(columnBuilder, value);
           break;
         default:
@@ -393,4 +394,13 @@ public abstract class 
AbstractCastFunctionColumnTransformer extends UnaryColumnT
           String.format("Cannot cast %s to %s type", stringValue, 
returnType.getDisplayName()));
     }
   }
+
+  protected void castObject(ColumnBuilder columnBuilder, Binary value) {
+    String stringValue = 
BytesUtils.parseObjectByteArrayToString(value.getValues());
+    if (returnType.getTypeEnum() == TypeEnum.STRING) {
+      returnType.writeBinary(columnBuilder, 
BytesUtils.valueOf(String.valueOf(stringValue)));
+    } else {
+      throw new UnsupportedOperationException(String.format(ERROR_MSG, 
returnType.getTypeEnum()));
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/LengthColumnTransformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractLengthColumnTransformer.java
similarity index 77%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/LengthColumnTransformer.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractLengthColumnTransformer.java
index 00448c6f575..08eb4769166 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/LengthColumnTransformer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractLengthColumnTransformer.java
@@ -24,12 +24,13 @@ import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.UnaryColu
 
 import org.apache.tsfile.block.column.Column;
 import org.apache.tsfile.block.column.ColumnBuilder;
-import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.utils.Binary;
 
-public class LengthColumnTransformer extends UnaryColumnTransformer {
+public abstract class AbstractLengthColumnTransformer extends 
UnaryColumnTransformer {
 
-  public LengthColumnTransformer(Type returnType, ColumnTransformer 
childColumnTransformer) {
+  public AbstractLengthColumnTransformer(
+      Type returnType, ColumnTransformer childColumnTransformer) {
     super(returnType, childColumnTransformer);
   }
 
@@ -37,8 +38,7 @@ public class LengthColumnTransformer extends 
UnaryColumnTransformer {
   protected void doTransform(Column column, ColumnBuilder columnBuilder) {
     for (int i = 0, n = column.getPositionCount(); i < n; i++) {
       if (!column.isNull(i)) {
-        String currentValue = 
column.getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET);
-        columnBuilder.writeInt(currentValue.length());
+        columnBuilder.writeLong(transformNonNullValue(column.getBinary(i)));
       } else {
         columnBuilder.appendNull();
       }
@@ -49,11 +49,12 @@ public class LengthColumnTransformer extends 
UnaryColumnTransformer {
   protected void doTransform(Column column, ColumnBuilder columnBuilder, 
boolean[] selection) {
     for (int i = 0, n = column.getPositionCount(); i < n; i++) {
       if (selection[i] && !column.isNull(i)) {
-        String currentValue = 
column.getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET);
-        columnBuilder.writeInt(currentValue.length());
+        columnBuilder.writeLong(transformNonNullValue(column.getBinary(i)));
       } else {
         columnBuilder.appendNull();
       }
     }
   }
+
+  protected abstract long transformNonNullValue(Binary binary);
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/BlobLengthColumnTransformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/BlobLengthColumnTransformer.java
index e94d9ad3907..bce18fb0597 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/BlobLengthColumnTransformer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/BlobLengthColumnTransformer.java
@@ -20,37 +20,18 @@
 package org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar;
 
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
-import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.UnaryColumnTransformer;
 
-import org.apache.tsfile.block.column.Column;
-import org.apache.tsfile.block.column.ColumnBuilder;
 import org.apache.tsfile.read.common.type.Type;
 import org.apache.tsfile.utils.Binary;
 
-public class BlobLengthColumnTransformer extends UnaryColumnTransformer {
+public class BlobLengthColumnTransformer extends 
AbstractLengthColumnTransformer {
 
   public BlobLengthColumnTransformer(Type returnType, ColumnTransformer 
childColumnTransformer) {
     super(returnType, childColumnTransformer);
   }
 
   @Override
-  protected void doTransform(Column column, ColumnBuilder columnBuilder) {
-    doTransform(column, columnBuilder, null);
-  }
-
-  @Override
-  protected void doTransform(Column column, ColumnBuilder columnBuilder, 
boolean[] selection) {
-
-    int positionCount = column.getPositionCount();
-    for (int i = 0; i < positionCount; i++) {
-      if ((selection != null && !selection[i]) || column.isNull(i)) {
-        columnBuilder.appendNull();
-        continue;
-      }
-
-      Binary value = column.getBinary(i);
-      int length = value.getValues().length;
-      columnBuilder.writeInt(length);
-    }
+  protected long transformNonNullValue(Binary binary) {
+    return binary.getValues().length;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/CastFunctionColumnTransformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/CastFunctionColumnTransformer.java
index b9c8e31b1ab..624eabadc62 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/CastFunctionColumnTransformer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/CastFunctionColumnTransformer.java
@@ -67,6 +67,9 @@ public class CastFunctionColumnTransformer extends 
AbstractCastFunctionColumnTra
       case BLOB:
         castBlob(columnBuilder, childType.getBinary(column, i));
         break;
+      case OBJECT:
+        castObject(columnBuilder, childType.getBinary(column, i));
+        break;
       default:
         throw new UnsupportedOperationException(
             String.format(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/LengthColumnTransformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/LengthColumnTransformer.java
index 00448c6f575..c94530c83d4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/LengthColumnTransformer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/LengthColumnTransformer.java
@@ -20,40 +20,18 @@
 package org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar;
 
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
-import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.UnaryColumnTransformer;
 
-import org.apache.tsfile.block.column.Column;
-import org.apache.tsfile.block.column.ColumnBuilder;
 import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.utils.Binary;
 
-public class LengthColumnTransformer extends UnaryColumnTransformer {
-
+public class LengthColumnTransformer extends AbstractLengthColumnTransformer {
   public LengthColumnTransformer(Type returnType, ColumnTransformer 
childColumnTransformer) {
     super(returnType, childColumnTransformer);
   }
 
   @Override
-  protected void doTransform(Column column, ColumnBuilder columnBuilder) {
-    for (int i = 0, n = column.getPositionCount(); i < n; i++) {
-      if (!column.isNull(i)) {
-        String currentValue = 
column.getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET);
-        columnBuilder.writeInt(currentValue.length());
-      } else {
-        columnBuilder.appendNull();
-      }
-    }
-  }
-
-  @Override
-  protected void doTransform(Column column, ColumnBuilder columnBuilder, 
boolean[] selection) {
-    for (int i = 0, n = column.getPositionCount(); i < n; i++) {
-      if (selection[i] && !column.isNull(i)) {
-        String currentValue = 
column.getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET);
-        columnBuilder.writeInt(currentValue.length());
-      } else {
-        columnBuilder.appendNull();
-      }
-    }
+  protected long transformNonNullValue(Binary binary) {
+    return binary.getStringValue(TSFileConfig.STRING_CHARSET).length();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaskedRecordIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectLengthColumnTransformer.java
similarity index 57%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaskedRecordIterator.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectLengthColumnTransformer.java
index 5237fcfd12e..5d39c6f6af3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaskedRecordIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectLengthColumnTransformer.java
@@ -17,26 +17,21 @@
  * under the License.
  */
 
-package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+package org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar;
 
-import org.apache.iotdb.commons.udf.access.RecordIterator;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.utils.ObjectTypeUtils;
 
-import org.apache.tsfile.block.column.Column;
 import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.utils.Binary;
 
-import java.util.List;
-
-public class MaskedRecordIterator extends RecordIterator {
-  private final int[] selectedPositions;
-
-  public MaskedRecordIterator(
-      List<Column> childrenColumns, List<Type> dataTypes, AggregationMask 
mask) {
-    super(childrenColumns, dataTypes, mask.getSelectedPositionCount());
-    this.selectedPositions = mask.getSelectedPositions();
+public class ObjectLengthColumnTransformer extends 
AbstractLengthColumnTransformer {
+  public ObjectLengthColumnTransformer(Type returnType, ColumnTransformer 
childColumnTransformer) {
+    super(returnType, childColumnTransformer);
   }
 
   @Override
-  protected int getCurrentIndex() {
-    return selectedPositions[currentIndex++];
+  protected long transformNonNullValue(Binary binary) {
+    return ObjectTypeUtils.getObjectLength(binary);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
index 9504c6c2282..3049a9bc441 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
@@ -19,25 +19,18 @@
 
 package org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar;
 
-import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
-import org.apache.iotdb.db.exception.sql.SemanticException;
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.UnaryColumnTransformer;
 import org.apache.iotdb.db.utils.ObjectTypeUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.tsfile.block.column.Column;
 import org.apache.tsfile.block.column.ColumnBuilder;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.common.type.Type;
 import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.Pair;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.file.StandardOpenOption;
 import java.util.Optional;
 
 public class ReadObjectColumnTransformer extends UnaryColumnTransformer {
@@ -107,35 +100,14 @@ public class ReadObjectColumnTransformer extends 
UnaryColumnTransformer {
   }
 
   private Binary readObject(Binary binary) {
-    File file = ObjectTypeUtils.getObjectPathFromBinary(binary);
-    long actualReadSize = getActualReadSize(file);
+    Pair<Long, String> objectLengthPathPair = 
ObjectTypeUtils.parseObjectBinary(binary);
+    long fileLength = objectLengthPathPair.getLeft();
+    String relativePath = objectLengthPathPair.getRight();
+    int actualReadSize =
+        ObjectTypeUtils.getActualReadSize(relativePath, fileLength, offset, 
length);
     fragmentInstanceContext.ifPresent(
         context -> 
context.getMemoryReservationContext().reserveMemoryCumulatively(actualReadSize));
-    byte[] bytes = new byte[(int) actualReadSize];
-    ByteBuffer buffer = ByteBuffer.wrap(bytes);
-    try (FileChannel fileChannel = FileChannel.open(file.toPath(), 
StandardOpenOption.READ)) {
-      fileChannel.read(buffer, offset);
-    } catch (IOException e) {
-      throw new IoTDBRuntimeException(e, 
TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
-    }
-    return new Binary(bytes);
-  }
-
-  private long getActualReadSize(File file) {
-    long fileSize = file.length();
-    if (offset >= fileSize) {
-      throw new SemanticException(
-          String.format(
-              "offset %d is greater than object size %d, file path is %s",
-              offset, fileSize, file.getAbsolutePath()));
-    }
-    long actualReadSize = Math.min(length < 0 ? fileSize : length, fileSize - 
offset);
-    if (actualReadSize > Integer.MAX_VALUE) {
-      throw new SemanticException(
-          String.format(
-              "Read object size %s is too large (size > 2G), file path is %s",
-              actualReadSize, file.getAbsolutePath()));
-    }
-    return actualReadSize;
+    return new Binary(
+        ObjectTypeUtils.readObjectContent(relativePath, offset, 
actualReadSize, true).array());
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/TryCastFunctionColumnTransformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/TryCastFunctionColumnTransformer.java
index c25fd321c34..419d1bbabf3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/TryCastFunctionColumnTransformer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/TryCastFunctionColumnTransformer.java
@@ -69,6 +69,9 @@ public class TryCastFunctionColumnTransformer extends 
AbstractCastFunctionColumn
         case BLOB:
           castBlob(columnBuilder, childType.getBinary(column, i));
           break;
+        case OBJECT:
+          castObject(columnBuilder, childType.getBinary(column, i));
+          break;
         default:
           throw new UnsupportedOperationException(
               String.format(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
index b0575837828..0262eb36170 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
@@ -528,6 +528,7 @@ public class CompactionUtils {
     }
   }
 
+  @SuppressWarnings("java:S3776")
   public static void removeDeletedObjectFiles(
       TsFileSequenceReader reader,
       List<AbstractAlignedChunkMetadata> alignedChunkMetadataList,
@@ -581,6 +582,7 @@ public class CompactionUtils {
     }
   }
 
+  @SuppressWarnings("java:S3776")
   private static void removeDeletedObjectFiles(
       TsFileSequenceReader reader,
       AbstractAlignedChunkMetadata alignedChunkMetadata,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java
index c153061a90d..44ba3235911 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java
@@ -19,18 +19,36 @@
 
 package org.apache.iotdb.db.utils;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
 import org.apache.iotdb.commons.exception.ObjectFileNotExist;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
+import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
 import org.apache.iotdb.db.service.metrics.FileMetrics;
 import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
+import org.apache.iotdb.mpp.rpc.thrift.TReadObjectReq;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.Collections;
+import java.util.List;
 import java.util.Optional;
 
 public class ObjectTypeUtils {
@@ -40,6 +58,127 @@ public class ObjectTypeUtils {
 
   private ObjectTypeUtils() {}
 
+  public static ByteBuffer readObjectContent(
+      Binary binary, long offset, int length, boolean mayNotInCurrentNode) {
+    Pair<Long, String> objectLengthPathPair = 
ObjectTypeUtils.parseObjectBinary(binary);
+    long fileLength = objectLengthPathPair.getLeft();
+    String relativePath = objectLengthPathPair.getRight();
+    int actualReadSize =
+        ObjectTypeUtils.getActualReadSize(
+            relativePath, fileLength, offset, length < 0 ? fileLength : 
length);
+    return ObjectTypeUtils.readObjectContent(
+        relativePath, offset, actualReadSize, mayNotInCurrentNode);
+  }
+
+  public static ByteBuffer readObjectContent(
+      String relativePath, long offset, int readSize, boolean 
mayNotInCurrentNode) {
+    Optional<File> objectFile = 
TIER_MANAGER.getAbsoluteObjectFilePath(relativePath, false);
+    if (objectFile.isPresent()) {
+      return readObjectContentFromLocalFile(objectFile.get(), offset, 
readSize);
+    }
+    if (mayNotInCurrentNode) {
+      return readObjectContentFromRemoteFile(relativePath, offset, readSize);
+    }
+    throw new ObjectFileNotExist(relativePath);
+  }
+
+  private static ByteBuffer readObjectContentFromLocalFile(File file, long 
offset, long readSize) {
+    byte[] bytes = new byte[(int) readSize];
+    ByteBuffer buffer = ByteBuffer.wrap(bytes);
+    try (FileChannel fileChannel = FileChannel.open(file.toPath(), 
StandardOpenOption.READ)) {
+      fileChannel.read(buffer, offset);
+    } catch (IOException e) {
+      throw new IoTDBRuntimeException(e, 
TSStatusCode.OBJECT_READ_ERROR.getStatusCode());
+    }
+    buffer.flip();
+    return buffer;
+  }
+
+  private static ByteBuffer readObjectContentFromRemoteFile(
+      final String relativePath, final long offset, final int readSize) {
+    int regionId;
+    try {
+      regionId = 
Integer.parseInt(Paths.get(relativePath).getName(0).toString());
+    } catch (NumberFormatException e) {
+      throw new IoTDBRuntimeException(
+          "wrong object file path: " + relativePath,
+          TSStatusCode.OBJECT_READ_ERROR.getStatusCode());
+    }
+    TConsensusGroupId consensusGroupId =
+        new TConsensusGroupId(TConsensusGroupType.DataRegion, regionId);
+    List<TRegionReplicaSet> regionReplicaSetList =
+        ClusterPartitionFetcher.getInstance()
+            .getRegionReplicaSet(Collections.singletonList(consensusGroupId));
+    if (regionReplicaSetList.isEmpty()) {
+      throw new ObjectFileNotExist(relativePath);
+    }
+    TRegionReplicaSet regionReplicaSet = 
regionReplicaSetList.iterator().next();
+    if (regionReplicaSet.getDataNodeLocations().isEmpty()) {
+      throw new ObjectFileNotExist(relativePath);
+    }
+    final int batchSize = 1024 * 1024;
+    final TReadObjectReq req = new TReadObjectReq();
+    req.setRelativePath(relativePath);
+    ByteBuffer buffer = ByteBuffer.allocate(readSize);
+    for (int i = 0; i < regionReplicaSet.getDataNodeLocations().size(); i++) {
+      TDataNodeLocation dataNodeLocation = 
regionReplicaSet.getDataNodeLocations().get(i);
+      int toReadSizeInCurrentDataNode = readSize;
+      try (SyncDataNodeInternalServiceClient client =
+          Coordinator.getInstance()
+              .getInternalServiceClientManager()
+              .borrowClient(dataNodeLocation.getInternalEndPoint())) {
+        while (toReadSizeInCurrentDataNode > 0) {
+          req.setOffset(offset + buffer.position());
+          req.setSize(Math.min(toReadSizeInCurrentDataNode, batchSize));
+          toReadSizeInCurrentDataNode -= req.getSize();
+          ByteBuffer partial = client.readObject(req);
+          buffer.put(partial);
+        }
+      } catch (Exception e) {
+        logger.warn("Failed to read object from datanode: {}", 
dataNodeLocation, e);
+        if (i == regionReplicaSet.getDataNodeLocations().size() - 1) {
+          throw new IoTDBRuntimeException(e, 
TSStatusCode.OBJECT_READ_ERROR.getStatusCode());
+        }
+        continue;
+      }
+      break;
+    }
+    buffer.flip();
+    return buffer;
+  }
+
+  public static int getActualReadSize(String filePath, long fileSize, long 
offset, long length) {
+    if (offset >= fileSize) {
+      throw new SemanticException(
+          String.format(
+              "offset %d is greater than or equal to object size %d, file path 
is %s",
+              offset, fileSize, filePath));
+    }
+    long actualReadSize = Math.min(length < 0 ? fileSize : length, fileSize - 
offset);
+    if (actualReadSize > Integer.MAX_VALUE) {
+      throw new SemanticException(
+          String.format(
+              "Read object size %s is too large (size > 2G), file path is %s",
+              actualReadSize, filePath));
+    }
+    return (int) actualReadSize;
+  }
+
+  public static Pair<Long, String> parseObjectBinary(Binary binary) {
+    byte[] bytes = binary.getValues();
+    ByteBuffer buffer = ByteBuffer.wrap(bytes);
+    long length = buffer.getLong();
+    String relativeObjectFilePath =
+        new String(bytes, 8, bytes.length - 8, TSFileConfig.STRING_CHARSET);
+    return new Pair<>(length, relativeObjectFilePath);
+  }
+
+  public static long getObjectLength(Binary binary) {
+    byte[] bytes = binary.getValues();
+    ByteBuffer wrap = ByteBuffer.wrap(bytes);
+    return wrap.getLong();
+  }
+
   public static File getObjectPathFromBinary(Binary binary) {
     byte[] bytes = binary.getValues();
     String relativeObjectFilePath =
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/RecordObjectTypeTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/RecordObjectTypeTest.java
new file mode 100644
index 00000000000..e0a6ca88825
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/RecordObjectTypeTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.function;
+
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.Slice;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.RecordIterator;
+import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
+import org.apache.iotdb.db.utils.ObjectTypeUtils;
+import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.type.Type;
+
+import org.apache.tsfile.block.TsBlockBuilderStatus;
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilderStatus;
+import org.apache.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.tsfile.read.common.type.ObjectType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
+import org.apache.tsfile.utils.Pair;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.Iterator;
+
+public class RecordObjectTypeTest {
+
+  private File objectDir;
+
+  @Before
+  public void setup() {
+    try {
+      objectDir = new 
File(TierManager.getInstance().getNextFolderForObjectFile());
+    } catch (DiskSpaceInsufficientException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    File[] files = objectDir.listFiles();
+    if (files != null) {
+      for (File file : files) {
+        Files.delete(file.toPath());
+      }
+    }
+  }
+
+  @Test
+  public void test() throws IOException {
+    BinaryColumnBuilder columnBuilder =
+        new BinaryColumnBuilder(new ColumnBuilderStatus(new 
TsBlockBuilderStatus(1024)), 1);
+    columnBuilder.writeBinary(new Binary(createObjectBinary().array()));
+    RecordIterator recordIterator =
+        new RecordIterator(
+            Collections.singletonList(columnBuilder.build()),
+            Collections.singletonList(ObjectType.OBJECT),
+            1);
+    Slice slice =
+        new Slice(
+            0,
+            1,
+            new Column[] {columnBuilder.build()},
+            Collections.singletonList(0),
+            Collections.emptyList(),
+            Collections.singletonList(Type.OBJECT));
+    testRecordIterator(recordIterator);
+    testRecordIterator(slice.getRequiredRecordIterator(false));
+  }
+
+  private void testRecordIterator(Iterator<Record> recordIterator) {
+    Assert.assertTrue(recordIterator.hasNext());
+    Record record = recordIterator.next();
+
+    Binary result = record.readObject(0);
+    Assert.assertEquals(100, result.getLength());
+    for (int j = 0; j < 100; j++) {
+      Assert.assertEquals(j, result.getValues()[j]);
+    }
+
+    result = record.readObject(0, 10, 2);
+    Assert.assertArrayEquals(new byte[] {(byte) 10, (byte) 11}, 
result.getValues());
+
+    Object object = record.getObject(0);
+    Assert.assertTrue(object instanceof Binary);
+    Pair<Long, String> pair = ObjectTypeUtils.parseObjectBinary((Binary) 
object);
+    Assert.assertEquals(Long.valueOf(100L), pair.getLeft());
+    Assert.assertTrue(pair.getRight().startsWith("test_") && 
pair.getRight().endsWith(".bin"));
+
+    Assert.assertArrayEquals(((Binary) object).getValues(), 
record.getBinary(0).getValues());
+
+    Assert.assertEquals("(Object) 100 B", record.getString(0));
+    Assert.assertFalse(recordIterator.hasNext());
+  }
+
+  private ByteBuffer createObjectBinary() throws IOException {
+    Path testFile1 = Files.createTempFile(objectDir.toPath(), "test_", ".bin");
+    byte[] content = new byte[100];
+    for (int i = 0; i < 100; i++) {
+      content[i] = (byte) i;
+    }
+    Files.write(testFile1, content);
+    String relativePath = testFile1.toFile().getName();
+    ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES + 
relativePath.length());
+    buffer.putLong(100L);
+    buffer.put(BytesUtils.stringToBytes(relativePath));
+    buffer.flip();
+    return buffer;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/BlobLengthColumnTransformerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/BlobLengthColumnTransformerTest.java
index ffb00505d14..7fbbd6da2fa 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/BlobLengthColumnTransformerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/BlobLengthColumnTransformerTest.java
@@ -31,7 +31,7 @@ import org.mockito.Mockito;
 import java.nio.charset.StandardCharsets;
 import java.util.Optional;
 
-import static org.apache.tsfile.read.common.type.IntType.INT32;
+import static org.apache.tsfile.read.common.type.LongType.INT64;
 
 public class BlobLengthColumnTransformerTest {
 
@@ -69,13 +69,13 @@ public class BlobLengthColumnTransformerTest {
 
     ColumnTransformer childColumnTransformer = 
mockChildColumnTransformer(binaryColumn);
     BlobLengthColumnTransformer blobLengthColumnTransformer =
-        new BlobLengthColumnTransformer(INT32, childColumnTransformer);
+        new BlobLengthColumnTransformer(INT64, childColumnTransformer);
     blobLengthColumnTransformer.addReferenceCount();
     blobLengthColumnTransformer.evaluate();
     Column result = blobLengthColumnTransformer.getColumn();
 
     int expectedLength = input.getBytes(StandardCharsets.UTF_8).length;
-    Assert.assertEquals(expectedLength, result.getInt(0));
+    Assert.assertEquals(expectedLength, result.getLong(0));
   }
 
   @Test
@@ -87,13 +87,13 @@ public class BlobLengthColumnTransformerTest {
 
     ColumnTransformer childColumnTransformer = 
mockChildColumnTransformer(binaryColumn);
     BlobLengthColumnTransformer blobLengthColumnTransformer =
-        new BlobLengthColumnTransformer(INT32, childColumnTransformer);
+        new BlobLengthColumnTransformer(INT64, childColumnTransformer);
     blobLengthColumnTransformer.addReferenceCount();
     blobLengthColumnTransformer.evaluate();
     Column result = blobLengthColumnTransformer.getColumn();
 
     int expectedLength = inputBytes.length;
-    Assert.assertEquals(expectedLength, result.getInt(0));
+    Assert.assertEquals(expectedLength, result.getLong(0));
   }
 
   @Test
@@ -109,13 +109,13 @@ public class BlobLengthColumnTransformerTest {
     ColumnTransformer childColumnTransformer = 
mockChildColumnTransformer(binaryColumn);
 
     BlobLengthColumnTransformer blobLengthColumnTransformer =
-        new BlobLengthColumnTransformer(INT32, childColumnTransformer);
+        new BlobLengthColumnTransformer(INT64, childColumnTransformer);
     blobLengthColumnTransformer.addReferenceCount();
     blobLengthColumnTransformer.evaluate();
     Column result = blobLengthColumnTransformer.getColumn();
-    Assert.assertEquals(inputBytes1.length, result.getInt(0));
+    Assert.assertEquals(inputBytes1.length, result.getLong(0));
     Assert.assertTrue(result.isNull(1));
-    Assert.assertEquals(inputBytes2.length, result.getInt(2));
+    Assert.assertEquals(inputBytes2.length, result.getLong(2));
   }
 
   @Test
@@ -133,7 +133,7 @@ public class BlobLengthColumnTransformerTest {
     ColumnTransformer child =
         mockChildColumnTransformer(new BinaryColumn(values.length, 
Optional.empty(), values));
     BlobLengthColumnTransformer blobLengthColumnTransformer =
-        new BlobLengthColumnTransformer(INT32, child);
+        new BlobLengthColumnTransformer(INT64, child);
     blobLengthColumnTransformer.addReferenceCount();
     blobLengthColumnTransformer.evaluateWithSelection(booleans);
     Column result = blobLengthColumnTransformer.getColumn();
@@ -142,7 +142,7 @@ public class BlobLengthColumnTransformerTest {
     int expectedValue3 = bytes3.length;
 
     Assert.assertTrue(result.isNull(0));
-    Assert.assertEquals(expectedValue2, result.getInt(1));
-    Assert.assertEquals(expectedValue3, result.getInt(2));
+    Assert.assertEquals(expectedValue2, result.getLong(1));
+    Assert.assertEquals(expectedValue3, result.getLong(2));
   }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectTypeFunctionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectTypeFunctionTest.java
new file mode 100644
index 00000000000..0a0d73d2ac3
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectTypeFunctionTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar;
+
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
+
+import org.apache.tsfile.block.TsBlockBuilderStatus;
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilderStatus;
+import org.apache.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.tsfile.read.common.block.column.LongColumn;
+import org.apache.tsfile.read.common.type.BlobType;
+import org.apache.tsfile.read.common.type.LongType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Optional;
+
+public class ObjectTypeFunctionTest {
+
+  private File objectDir;
+
+  @Before
+  public void setup() {
+    try {
+      objectDir = new 
File(TierManager.getInstance().getNextFolderForObjectFile());
+    } catch (DiskSpaceInsufficientException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    File[] files = objectDir.listFiles();
+    if (files != null) {
+      for (File file : files) {
+        Files.delete(file.toPath());
+      }
+    }
+  }
+
+  @Test
+  public void testLength() throws IOException {
+    BinaryColumnBuilder columnBuilder =
+        new BinaryColumnBuilder(new ColumnBuilderStatus(new 
TsBlockBuilderStatus(1024)), 1);
+    columnBuilder.writeBinary(new Binary(createObjectBinary().array()));
+    ColumnTransformer childColumnTransformer = 
mockChildColumnTransformer(columnBuilder.build());
+    ObjectLengthColumnTransformer transformer =
+        new ObjectLengthColumnTransformer(LongType.INT64, 
childColumnTransformer);
+    transformer.addReferenceCount();
+    transformer.evaluate();
+    Column result = transformer.getColumn();
+    Assert.assertTrue(result instanceof LongColumn);
+    Assert.assertEquals(LongType.INT64, transformer.getType());
+    Assert.assertEquals(1, result.getPositionCount());
+    for (int i = 0; i < result.getPositionCount(); i++) {
+      Assert.assertEquals(100, result.getLong(i));
+    }
+  }
+
+  @Test
+  public void testReadObject1() throws IOException {
+    BinaryColumnBuilder columnBuilder =
+        new BinaryColumnBuilder(new ColumnBuilderStatus(new 
TsBlockBuilderStatus(1024)), 1);
+    columnBuilder.writeBinary(new Binary(createObjectBinary().array()));
+    ColumnTransformer childColumnTransformer = 
mockChildColumnTransformer(columnBuilder.build());
+    ReadObjectColumnTransformer transformer =
+        new ReadObjectColumnTransformer(BlobType.BLOB, childColumnTransformer, 
Optional.empty());
+    transformer.addReferenceCount();
+    transformer.evaluate();
+    Column result = transformer.getColumn();
+    Assert.assertTrue(result instanceof BinaryColumn);
+    Assert.assertEquals(BlobType.BLOB, transformer.getType());
+    Assert.assertEquals(1, result.getPositionCount());
+    for (int i = 0; i < result.getPositionCount(); i++) {
+      Assert.assertEquals(100, result.getBinary(i).getLength());
+      for (int j = 0; j < 100; j++) {
+        Assert.assertEquals(j, result.getBinary(i).getValues()[j]);
+      }
+    }
+  }
+
+  @Test
+  public void testReadObject2() throws IOException {
+    BinaryColumnBuilder columnBuilder =
+        new BinaryColumnBuilder(new ColumnBuilderStatus(new 
TsBlockBuilderStatus(1024)), 1);
+    columnBuilder.writeBinary(new Binary(createObjectBinary().array()));
+    ColumnTransformer childColumnTransformer = 
mockChildColumnTransformer(columnBuilder.build());
+    ReadObjectColumnTransformer transformer =
+        new ReadObjectColumnTransformer(
+            BlobType.BLOB, 10, childColumnTransformer, Optional.empty());
+    transformer.addReferenceCount();
+    transformer.evaluate();
+    Column result = transformer.getColumn();
+    Assert.assertTrue(result instanceof BinaryColumn);
+    Assert.assertEquals(BlobType.BLOB, transformer.getType());
+    Assert.assertEquals(1, result.getPositionCount());
+    for (int i = 0; i < result.getPositionCount(); i++) {
+      Assert.assertEquals(90, result.getBinary(i).getLength());
+      for (int j = 10; j < 100; j++) {
+        Assert.assertEquals(j, result.getBinary(i).getValues()[j - 10]);
+      }
+    }
+  }
+
+  @Test
+  public void testReadObject3() throws IOException {
+    BinaryColumnBuilder columnBuilder =
+        new BinaryColumnBuilder(new ColumnBuilderStatus(new 
TsBlockBuilderStatus(1024)), 1);
+    columnBuilder.writeBinary(new Binary(createObjectBinary().array()));
+    ColumnTransformer childColumnTransformer = 
mockChildColumnTransformer(columnBuilder.build());
+    ReadObjectColumnTransformer transformer =
+        new ReadObjectColumnTransformer(
+            BlobType.BLOB, 10, 2, childColumnTransformer, Optional.empty());
+    transformer.addReferenceCount();
+    transformer.evaluate();
+    Column result = transformer.getColumn();
+    Assert.assertTrue(result instanceof BinaryColumn);
+    Assert.assertEquals(BlobType.BLOB, transformer.getType());
+    Assert.assertEquals(1, result.getPositionCount());
+    for (int i = 0; i < result.getPositionCount(); i++) {
+      Assert.assertEquals(2, result.getBinary(i).getLength());
+      Assert.assertArrayEquals(new byte[] {(byte) 10, (byte) 11}, 
result.getBinary(i).getValues());
+    }
+  }
+
+  private ByteBuffer createObjectBinary() throws IOException {
+    Path testFile1 = Files.createTempFile(objectDir.toPath(), "test_", ".bin");
+    byte[] content = new byte[100];
+    for (int i = 0; i < 100; i++) {
+      content[i] = (byte) i;
+    }
+    Files.write(testFile1, content);
+    String relativePath = testFile1.toFile().getName();
+    ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES + 
relativePath.length());
+    buffer.putLong(100L);
+    buffer.put(BytesUtils.stringToBytes(relativePath));
+    buffer.flip();
+    return buffer;
+  }
+
+  private ColumnTransformer mockChildColumnTransformer(Column column) {
+    ColumnTransformer mockColumnTransformer = 
Mockito.mock(ColumnTransformer.class);
+    Mockito.when(mockColumnTransformer.getColumn()).thenReturn(column);
+    Mockito.doNothing().when(mockColumnTransformer).tryEvaluate();
+    Mockito.doNothing().when(mockColumnTransformer).clearCache();
+    
Mockito.doNothing().when(mockColumnTransformer).evaluateWithSelection(Mockito.any());
+    return mockColumnTransformer;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java
new file mode 100644
index 00000000000..4bc11fbec57
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.compaction.object;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.schema.table.TsTable;
+import org.apache.iotdb.commons.schema.table.column.FieldColumnSchema;
+import org.apache.iotdb.commons.schema.table.column.TagColumnSchema;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadPointCompactionPerformer;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CrossSpaceCompactionTask;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.SettleCompactionTask;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
+
+import org.apache.tsfile.enums.ColumnCategory;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.ColumnSchema;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.writer.TsFileIOWriter;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+
+public class ObjectTypeCompactionTest extends AbstractCompactionTest {
+
+  private static final TableSchema tableSchema =
+      new TableSchema(
+          "t1",
+          Arrays.asList(
+              new ColumnSchema("device", TSDataType.STRING, 
ColumnCategory.TAG),
+              new ColumnSchema("s1", TSDataType.OBJECT, 
ColumnCategory.FIELD)));
+
+  private String threadName;
+  private File objectDir;
+
+  @Before
+  @Override
+  public void setUp()
+      throws IOException, WriteProcessException, MetadataException, 
InterruptedException {
+    this.threadName = Thread.currentThread().getName();
+    Thread.currentThread().setName("pool-1-IoTDB-Compaction-Worker-1");
+    DataNodeTableCache.getInstance().invalid(this.COMPACTION_TEST_SG);
+    createTable("t1", 1);
+    super.setUp();
+    try {
+      objectDir = new 
File(TierManager.getInstance().getNextFolderForObjectFile());
+    } catch (DiskSpaceInsufficientException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @After
+  @Override
+  public void tearDown() throws IOException, StorageEngineException {
+    super.tearDown();
+    Thread.currentThread().setName(threadName);
+    DataNodeTableCache.getInstance().invalid(this.COMPACTION_TEST_SG);
+    File[] files = objectDir.listFiles();
+    if (files != null) {
+      for (File file : files) {
+        Files.delete(file.toPath());
+      }
+    }
+  }
+
+  public void createTable(String tableName, long ttl) {
+    TsTable tsTable = new TsTable(tableName);
+    tsTable.addColumnSchema(new TagColumnSchema("device", TSDataType.STRING));
+    tsTable.addColumnSchema(
+        new FieldColumnSchema("s1", TSDataType.OBJECT, TSEncoding.PLAIN, 
CompressionType.LZ4));
+    tsTable.addProp(TsTable.TTL_PROPERTY, ttl + "");
+    DataNodeTableCache.getInstance().preUpdateTable(this.COMPACTION_TEST_SG, 
tsTable, null);
+    
DataNodeTableCache.getInstance().commitUpdateTable(this.COMPACTION_TEST_SG, 
tableName, null);
+  }
+
+  @Test
+  public void testSeqCompactionWithTTL() throws IOException, 
WriteProcessException {
+    Pair<TsFileResource, File> pair1 =
+        generateTsFileAndObject(true, System.currentTimeMillis() - 10000);
+    Pair<TsFileResource, File> pair2 =
+        generateTsFileAndObject(true, System.currentTimeMillis() + 1000000);
+    tsFileManager.add(pair1.getLeft(), true);
+    tsFileManager.add(pair2.getLeft(), true);
+    InnerSpaceCompactionTask task =
+        new InnerSpaceCompactionTask(
+            0,
+            tsFileManager,
+            tsFileManager.getTsFileList(true),
+            true,
+            new ReadChunkCompactionPerformer(),
+            0);
+    Assert.assertTrue(task.start());
+    Assert.assertFalse(pair1.getRight().exists());
+    Assert.assertTrue(pair2.getRight().exists());
+  }
+
+  @Test
+  public void testUnseqCompactionWithTTL() throws IOException, 
WriteProcessException {
+    Pair<TsFileResource, File> pair1 =
+        generateTsFileAndObject(false, System.currentTimeMillis() + 100000);
+    Pair<TsFileResource, File> pair2 =
+        generateTsFileAndObject(false, System.currentTimeMillis() - 1000000);
+    tsFileManager.add(pair1.getLeft(), false);
+    tsFileManager.add(pair2.getLeft(), false);
+    InnerSpaceCompactionTask task =
+        new InnerSpaceCompactionTask(
+            0,
+            tsFileManager,
+            tsFileManager.getTsFileList(false),
+            false,
+            new FastCompactionPerformer(false),
+            0);
+    Assert.assertTrue(task.start());
+    Assert.assertFalse(pair2.getRight().exists());
+    Assert.assertTrue(pair1.getRight().exists());
+  }
+
+  @Test
+  public void testUnseqCompactionWithReadPointWithTTL() throws IOException, 
WriteProcessException {
+    Pair<TsFileResource, File> pair1 =
+        generateTsFileAndObject(false, System.currentTimeMillis() + 100000);
+    Pair<TsFileResource, File> pair2 =
+        generateTsFileAndObject(false, System.currentTimeMillis() - 1000000);
+    tsFileManager.add(pair1.getLeft(), false);
+    tsFileManager.add(pair2.getLeft(), false);
+    InnerSpaceCompactionTask task =
+        new InnerSpaceCompactionTask(
+            0,
+            tsFileManager,
+            tsFileManager.getTsFileList(false),
+            false,
+            new ReadPointCompactionPerformer(),
+            0);
+    Assert.assertTrue(task.start());
+    Assert.assertTrue(pair1.getRight().exists());
+    Assert.assertFalse(pair2.getRight().exists());
+  }
+
+  @Test
+  public void testCrossCompactionWithTTL() throws IOException, 
WriteProcessException {
+    Pair<TsFileResource, File> pair1 =
+        generateTsFileAndObject(true, System.currentTimeMillis() + 100000);
+    Pair<TsFileResource, File> pair2 =
+        generateTsFileAndObject(false, System.currentTimeMillis() - 1000000);
+    tsFileManager.add(pair1.getLeft(), true);
+    tsFileManager.add(pair2.getLeft(), false);
+    CrossSpaceCompactionTask task =
+        new CrossSpaceCompactionTask(
+            0,
+            tsFileManager,
+            tsFileManager.getTsFileList(true),
+            tsFileManager.getTsFileList(false),
+            new FastCompactionPerformer(true),
+            1,
+            0);
+    Assert.assertTrue(task.start());
+    Assert.assertFalse(pair2.getRight().exists());
+    Assert.assertTrue(pair1.getRight().exists());
+  }
+
+  @Test
+  public void testSettleCompaction() throws IOException, WriteProcessException 
{
+    Pair<TsFileResource, File> pair1 =
+        generateTsFileAndObject(true, System.currentTimeMillis() - 10000);
+    Pair<TsFileResource, File> pair2 =
+        generateTsFileAndObject(true, System.currentTimeMillis() + 1000000);
+    tsFileManager.add(pair1.getLeft(), true);
+    tsFileManager.add(pair2.getLeft(), true);
+    SettleCompactionTask task =
+        new SettleCompactionTask(
+            0,
+            tsFileManager,
+            tsFileManager.getTsFileList(true),
+            Collections.emptyList(),
+            true,
+            new FastCompactionPerformer(true),
+            0);
+    Assert.assertTrue(task.start());
+    Assert.assertFalse(pair1.getRight().exists());
+    Assert.assertTrue(pair2.getRight().exists());
+  }
+
+  private Pair<TsFileResource, File> generateTsFileAndObject(boolean seq, long 
timestamp)
+      throws IOException, WriteProcessException {
+    TsFileResource resource = createEmptyFileAndResource(seq);
+    Path testFile1 = Files.createTempFile(objectDir.toPath(), "test_", ".bin");
+    byte[] content = new byte[100];
+    for (int i = 0; i < 100; i++) {
+      content[i] = (byte) i;
+    }
+    Files.write(testFile1, content);
+    String relativePath = testFile1.toFile().getName();
+    ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES + 
relativePath.length());
+    buffer.putLong(100L);
+    buffer.put(BytesUtils.stringToBytes(relativePath));
+    buffer.flip();
+    IDeviceID deviceID = new StringArrayDeviceID("t1", "d1");
+    try (TsFileIOWriter writer = new TsFileIOWriter(resource.getTsFile())) {
+      writer.getSchema().registerTableSchema(tableSchema);
+      writer.startChunkGroup(deviceID);
+      AlignedChunkWriterImpl alignedChunkWriter =
+          new AlignedChunkWriterImpl(Arrays.asList(new MeasurementSchema("s1", 
TSDataType.OBJECT)));
+      alignedChunkWriter.write(timestamp);
+      alignedChunkWriter.write(timestamp, new Binary(buffer.array()), false);
+      alignedChunkWriter.sealCurrentPage();
+      alignedChunkWriter.writeToFileWriter(writer);
+      writer.endChunkGroup();
+      writer.endFile();
+    }
+    resource.updateStartTime(deviceID, 1);
+    resource.updateEndTime(deviceID, 1);
+    resource.serialize();
+    resource.deserialize();
+    resource.setStatus(TsFileResourceStatus.NORMAL);
+    return new Pair<>(resource, testFile1.toFile());
+  }
+}
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index caaf44c16a7..469d2bb006a 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -773,6 +773,12 @@ struct TKillQueryInstanceReq {
   2: optional string allowedUsername
 }
 
+struct TReadObjectReq {
+  1: string relativePath
+  2: i64 offset
+  3: i32 size
+}
+
 /**
 * END: Used for EXPLAIN ANALYZE
 **/
@@ -1257,6 +1263,8 @@ service IDataNodeRPCService {
    * Write an audit log entry to the DataNode's AuditEventLogger
    */
   common.TSStatus writeAuditLog(TAuditLogReq req);
+
+  binary readObject(TReadObjectReq req);
 }
 
 service MPPDataExchangeService {

Reply via email to