This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch force_ci/object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/force_ci/object_type by this
push:
new d405afaf06b fix object functions & add tests for object type & fix
some bugs (#16829)
d405afaf06b is described below
commit d405afaf06bb2bced1daa6ada028b6225753ad4b
Author: shuwenwei <[email protected]>
AuthorDate: Tue Dec 2 17:06:10 2025 +0800
fix object functions & add tests for object type & fix some bugs (#16829)
---
.../it/query/object/IoTDBObjectQueryIT2.java | 296 +++++++++++++++++++++
.../scalar/IoTDBScalarFunctionTableIT.java | 16 +-
.../it/query/recent/IoTDBLengthFunctionIT.java | 2 +-
.../iotdb/udf/api/relational/access/Record.java | 35 ++-
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
.../impl/DataNodeInternalRPCServiceImpl.java | 8 +
.../operator/process/function/partition/Slice.java | 30 ++-
.../expression/PatternExpressionComputation.java | 5 +-
.../aggregation/MaskedRecordIterator.java | 2 -
.../relational/aggregation}/RecordIterator.java | 17 +-
.../UserDefinedAggregateFunctionAccumulator.java | 1 -
.../GroupedUserDefinedAggregateAccumulator.java | 2 +-
.../relational/ColumnTransformerBuilder.java | 8 +-
.../plan/analyze/ClusterPartitionFetcher.java | 4 +
.../plan/planner/OperatorTreeGenerator.java | 1 +
.../plan/planner/TableOperatorGenerator.java | 6 +-
.../plan/node/write/RelationalInsertRowsNode.java | 3 +
.../relational/metadata/TableMetadataImpl.java | 8 +-
.../udf/UserDefineScalarFunctionTransformer.java | 2 +-
.../AbstractCastFunctionColumnTransformer.java | 10 +
...r.java => AbstractLengthColumnTransformer.java} | 15 +-
.../unary/scalar/BlobLengthColumnTransformer.java | 25 +-
.../scalar/CastFunctionColumnTransformer.java | 3 +
.../unary/scalar/LengthColumnTransformer.java | 30 +--
.../scalar/ObjectLengthColumnTransformer.java} | 23 +-
.../unary/scalar/ReadObjectColumnTransformer.java | 44 +--
.../scalar/TryCastFunctionColumnTransformer.java | 3 +
.../compaction/execute/utils/CompactionUtils.java | 2 +
.../org/apache/iotdb/db/utils/ObjectTypeUtils.java | 139 ++++++++++
.../plan/function/RecordObjectTypeTest.java | 135 ++++++++++
.../scalar/BlobLengthColumnTransformerTest.java | 22 +-
.../unary/scalar/ObjectTypeFunctionTest.java | 180 +++++++++++++
.../object/ObjectTypeCompactionTest.java | 261 ++++++++++++++++++
.../src/main/thrift/datanode.thrift | 8 +
34 files changed, 1203 insertions(+), 144 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/object/IoTDBObjectQueryIT2.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/object/IoTDBObjectQueryIT2.java
new file mode 100644
index 00000000000..9a0dd60907e
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/object/IoTDBObjectQueryIT2.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.query.object;
+
+import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+
+import org.apache.tsfile.utils.Binary;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.time.LocalDate;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBObjectQueryIT2 {
+
+ private static final String DATABASE_NAME = "test";
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+
EnvFactory.getEnv().getConfig().getCommonConfig().setDataReplicationFactor(1);
+ EnvFactory.getEnv().initClusterEnvironment();
+ try (Connection connection =
EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
+ Statement statement = connection.createStatement()) {
+ statement.execute("CREATE DATABASE " + DATABASE_NAME);
+ statement.execute("USE " + DATABASE_NAME);
+ statement.execute(
+ "CREATE TABLE table1(device STRING TAG, s4 DATE FIELD, s5 TIMESTAMP
FIELD, s6 BLOB FIELD, s7 STRING FIELD, s8 OBJECT FIELD, s9 OBJECT FIELD)");
+ for (int i = 1; i <= 10; i++) {
+ for (int j = 0; j < 10; j++) {
+ statement.execute(
+ String.format(
+ "insert into table1(time, device, s4, s5, s6, s7, s8) "
+ + "values(%d, '%s', '%s', %d, %s, '%s', %s)",
+ j,
+ "d" + i,
+ LocalDate.of(2024, 5, i % 31 + 1),
+ j,
+ "X'cafebabe'",
+ j,
+ "to_object(true, 0, X'cafebabe')"));
+ }
+ }
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testObjectLength() throws IoTDBConnectionException,
StatementExecutionException {
+ try (ITableSession session =
EnvFactory.getEnv().getTableSessionConnection()) {
+ session.executeNonQueryStatement("USE " + DATABASE_NAME);
+ SessionDataSet sessionDataSet =
+ session.executeQueryStatement("select length(s8) from table1 limit
1");
+ SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
+ while (iterator.next()) {
+ long length = iterator.getLong(1);
+ Assert.assertEquals(4, length);
+ }
+ }
+ }
+
+ @Test
+ public void testReadObject() throws IoTDBConnectionException,
StatementExecutionException {
+ try (ITableSession session =
EnvFactory.getEnv().getTableSessionConnection()) {
+ session.executeNonQueryStatement("USE " + DATABASE_NAME);
+ SessionDataSet sessionDataSet =
+ session.executeQueryStatement("select read_object(s8) from table1
where device = 'd2'");
+ SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
+ byte[] expected = new byte[] {(byte) 0xCA, (byte) 0xFE, (byte) 0xBA,
(byte) 0xBE};
+ while (iterator.next()) {
+ Binary blob = iterator.getBlob(1);
+ Assert.assertArrayEquals(expected, blob.getValues());
+ }
+
+ sessionDataSet =
+ session.executeQueryStatement(
+ "select read_object(s8, 1) from table1 where device = 'd3'");
+ iterator = sessionDataSet.iterator();
+ expected = new byte[] {(byte) 0xFE, (byte) 0xBA, (byte) 0xBE};
+ while (iterator.next()) {
+ Binary blob = iterator.getBlob(1);
+ Assert.assertArrayEquals(expected, blob.getValues());
+ }
+
+ sessionDataSet =
+ session.executeQueryStatement(
+ "select read_object(s8, 1, 2) from table1 where device = 'd1'");
+ iterator = sessionDataSet.iterator();
+ expected = new byte[] {(byte) 0xFE, (byte) 0xBA};
+ while (iterator.next()) {
+ Binary blob = iterator.getBlob(1);
+ Assert.assertArrayEquals(expected, blob.getValues());
+ }
+
+ sessionDataSet =
+ session.executeQueryStatement(
+ "select read_object(s8, 1, 1000) from table1 where device =
'd1'");
+ iterator = sessionDataSet.iterator();
+ expected = new byte[] {(byte) 0xFE, (byte) 0xBA, (byte) 0xBE};
+ while (iterator.next()) {
+ Binary blob = iterator.getBlob(1);
+ Assert.assertArrayEquals(expected, blob.getValues());
+ }
+
+ sessionDataSet =
+ session.executeQueryStatement(
+ "select count(*) from table1 where device = 'd1' and s6 =
read_object(s8)");
+ iterator = sessionDataSet.iterator();
+ while (iterator.next()) {
+ long count = iterator.getLong(1);
+ Assert.assertEquals(10, count);
+ }
+
+ // read_object are not pushed down. Read remote files
+ sessionDataSet =
+ session.executeQueryStatement(
+ "select read_object(t1_s8) from (select t1.s8 as t1_s8, t2.s8 as
t2_s8 from table1 as t1 inner join table1 as t2 using(time))");
+ iterator = sessionDataSet.iterator();
+ expected = new byte[] {(byte) 0xCA, (byte) 0xFE, (byte) 0xBA, (byte)
0xBE};
+ while (iterator.next()) {
+ Binary blob = iterator.getBlob(1);
+ Assert.assertArrayEquals(expected, blob.getValues());
+ }
+ }
+ }
+
+ @Test
+ public void testFunctionAndClauses()
+ throws IoTDBConnectionException, StatementExecutionException {
+ try (ITableSession session =
EnvFactory.getEnv().getTableSessionConnection()) {
+ session.executeNonQueryStatement("USE " + DATABASE_NAME);
+
+ SessionDataSet sessionDataSet =
+ session.executeQueryStatement(
+ "select length(s8) from table1 where device = 'd2' and s8 is not
null limit 1");
+ SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
+ while (iterator.next()) {
+ Assert.assertEquals(4, iterator.getLong(1));
+ }
+ sessionDataSet =
+ session.executeQueryStatement(
+ "select count(s8), first(s8), last(s8), first_by(s8, time),
last_by(s8, time) from table1 where device = 'd1' and cast(s8 as string) =
'(Object) 4 B' and try_cast(s8 as string) = '(Object) 4 B'");
+ iterator = sessionDataSet.iterator();
+ while (iterator.next()) {
+ Assert.assertEquals(10, iterator.getLong(1));
+ Assert.assertEquals("(Object) 4 B", iterator.getString(2));
+ Assert.assertEquals("(Object) 4 B", iterator.getString(3));
+ Assert.assertEquals("(Object) 4 B", iterator.getString(4));
+ Assert.assertEquals("(Object) 4 B", iterator.getString(5));
+ }
+
+ sessionDataSet = session.executeQueryStatement("select coalesce(s9, s8)
from table1");
+ iterator = sessionDataSet.iterator();
+ while (iterator.next()) {
+ Assert.assertEquals("(Object) 4 B", iterator.getString(1));
+ }
+
+ // MATCH_RECOGNIZE
+ Assert.assertThrows(
+ StatementExecutionException.class,
+ () ->
+ session.executeNonQueryStatement(
+ "select m.cnt from table1 match_recognize (order by s8
measures RPR_LAST(time) as cnt one row per match pattern (B+) define B as B.s6
= prev(B.s6)) as m"));
+ Assert.assertThrows(
+ StatementExecutionException.class,
+ () ->
+ session.executeNonQueryStatement(
+ "select m.cnt from table1 match_recognize (partition by s8
measures RPR_LAST(time) as cnt one row per match pattern (B+) define B as B.s6
= prev(B.s6)) as m"));
+
+ sessionDataSet =
+ session.executeQueryStatement(
+ "select m.value from table1 match_recognize(partition by s6
measures prev(s8) as value one row per match pattern (B+) define B as
B.s6=prev(B.s6)) as m");
+ iterator = sessionDataSet.iterator();
+ while (iterator.next()) {
+ Assert.assertEquals("(Object) 4 B", iterator.getString(1));
+ }
+
+ // WHERE
+ session.executeQueryStatement(
+ "select time, s8 from table1 where device = 'd10' and s8 is not
null");
+ iterator = sessionDataSet.iterator();
+ while (iterator.next()) {
+ Assert.assertEquals("(Object) 4 B", iterator.getString(2));
+ }
+
+ // GROUP BY
+ Assert.assertThrows(
+ StatementExecutionException.class,
+ () -> session.executeNonQueryStatement("select count(*) from table1
group by s8"));
+
+ // ORDER BY
+ Assert.assertThrows(
+ StatementExecutionException.class,
+ () -> session.executeNonQueryStatement("select count(*) from table1
order by s8"));
+
+ // FILL
+ Assert.assertThrows(
+ StatementExecutionException.class,
+ () ->
+ session.executeNonQueryStatement(
+ "select time, s8 from table1 where device = 'd10' fill
method linear"));
+ session.executeQueryStatement(
+ "select time, s8 from table1 where device = 'd10' fill method
previous");
+ iterator = sessionDataSet.iterator();
+ while (iterator.next()) {
+ Assert.assertEquals("(Object) 4 B", iterator.getString(2));
+ }
+
+ // HAVING
+ session.executeQueryStatement(
+ "select device, count(s8) from table1 group by device having
count(s8) > 0");
+ iterator = sessionDataSet.iterator();
+ while (iterator.next()) {
+ long count = iterator.getLong(2);
+ Assert.assertEquals(10, count);
+ }
+
+ // WINDOW
+ Assert.assertThrows(
+ StatementExecutionException.class,
+ () ->
+ session.executeNonQueryStatement(
+ "select *, nth_value(s8,2) over(partition by s8) from
table1"));
+ Assert.assertThrows(
+ StatementExecutionException.class,
+ () ->
+ session.executeNonQueryStatement(
+ "select *, nth_value(s8,2) over(order by s8) from table1"));
+ session.executeNonQueryStatement(
+ "select *, nth_value(s8,2) over(partition by device) from table1");
+ session.executeNonQueryStatement(
+ "select *, lead(s8) over(partition by device order by time) from
table1");
+ session.executeNonQueryStatement(
+ "select *, first_value(s8) over(partition by device) from table1");
+ session.executeNonQueryStatement(
+ "select *, last_value(s8) over(partition by device) from table1");
+ session.executeNonQueryStatement(
+ "select *, lag(s8) over(partition by device order by time) from
table1");
+
+ // Table-value function
+ Assert.assertThrows(
+ StatementExecutionException.class,
+ () ->
+ session.executeNonQueryStatement(
+ "select * from session(data => table1 partition by s8,
timecol => 'time', gap => 1ms)"));
+ Assert.assertThrows(
+ StatementExecutionException.class,
+ () ->
+ session.executeNonQueryStatement(
+ "select * from session(data => table1 order by s8, timecol
=> 'time', gap => 1ms)"));
+ sessionDataSet =
+ session.executeQueryStatement(
+ "select * from hop(data => table1, timecol => 'time', slide =>
1ms, size => 2ms)");
+ iterator = sessionDataSet.iterator();
+ while (iterator.next()) {
+ String str = iterator.getString("s8");
+ Assert.assertEquals("(Object) 4 B", str);
+ }
+ }
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/old/builtinfunction/scalar/IoTDBScalarFunctionTableIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/old/builtinfunction/scalar/IoTDBScalarFunctionTableIT.java
index 01c2d591918..0e4b0a38221 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/old/builtinfunction/scalar/IoTDBScalarFunctionTableIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/old/builtinfunction/scalar/IoTDBScalarFunctionTableIT.java
@@ -1370,56 +1370,56 @@ public class IoTDBScalarFunctionTableIT {
tableAssertTestFail(
"select s1,Length(s1,1) from lengthTable",
TSStatusCode.SEMANTIC_ERROR.getStatusCode()
- + ": Scalar function length only accepts one argument and it must
be text, string, or blob data type.",
+ + ": Scalar function length only accepts one argument and it must
be text or string or blob or object data type.",
DATABASE_NAME);
// case 2: wrong data type
tableAssertTestFail(
"select s1,Length(s2) from lengthTable",
TSStatusCode.SEMANTIC_ERROR.getStatusCode()
- + ": Scalar function length only accepts one argument and it must
be text, string, or blob data type.",
+ + ": Scalar function length only accepts one argument and it must
be text or string or blob or object data type.",
DATABASE_NAME);
// case 3: wrong data type
tableAssertTestFail(
"select s1,Length(s3) from lengthTable",
TSStatusCode.SEMANTIC_ERROR.getStatusCode()
- + ": Scalar function length only accepts one argument and it must
be text, string, or blob data type.",
+ + ": Scalar function length only accepts one argument and it must
be text or string or blob or object data type.",
DATABASE_NAME);
// case 4: wrong data type
tableAssertTestFail(
"select s1,Length(s4) from lengthTable",
TSStatusCode.SEMANTIC_ERROR.getStatusCode()
- + ": Scalar function length only accepts one argument and it must
be text, string, or blob data type.",
+ + ": Scalar function length only accepts one argument and it must
be text or string or blob or object data type.",
DATABASE_NAME);
// case 5: wrong data type
tableAssertTestFail(
"select s1,Length(s5) from lengthTable",
TSStatusCode.SEMANTIC_ERROR.getStatusCode()
- + ": Scalar function length only accepts one argument and it must
be text, string, or blob data type.",
+ + ": Scalar function length only accepts one argument and it must
be text or string or blob or object data type.",
DATABASE_NAME);
// case 6: wrong data type
tableAssertTestFail(
"select s1,Length(s6) from lengthTable",
TSStatusCode.SEMANTIC_ERROR.getStatusCode()
- + ": Scalar function length only accepts one argument and it must
be text, string, or blob data type.",
+ + ": Scalar function length only accepts one argument and it must
be text or string or blob or object data type.",
DATABASE_NAME);
// case 7: wrong data type
tableAssertTestFail(
"select s1,Length(s7) from lengthTable",
TSStatusCode.SEMANTIC_ERROR.getStatusCode()
- + ": Scalar function length only accepts one argument and it must
be text, string, or blob data type.",
+ + ": Scalar function length only accepts one argument and it must
be text or string or blob or object data type.",
DATABASE_NAME);
// case 8: wrong data type
tableAssertTestFail(
"select s1,Length(s8) from lengthTable",
TSStatusCode.SEMANTIC_ERROR.getStatusCode()
- + ": Scalar function length only accepts one argument and it must
be text, string, or blob data type.",
+ + ": Scalar function length only accepts one argument and it must
be text or string or blob or object data type.",
DATABASE_NAME);
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBLengthFunctionIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBLengthFunctionIT.java
index 57bf23918d5..7f3ba13b11d 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBLengthFunctionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBLengthFunctionIT.java
@@ -107,7 +107,7 @@ public class IoTDBLengthFunctionIT {
public void testLengthFunctionOnInvalidInputs() {
String expectedErrorMessage =
TSStatusCode.SEMANTIC_ERROR.getStatusCode()
- + ": Scalar function length only accepts one argument and it must
be text, string, or blob data type.";
+ + ": Scalar function length only accepts one argument and it must
be text or string or blob or object data type.";
// Exception 1: Using LENGTH() on non-TEXT/BLOB/STRING types
tableAssertTestFail("SELECT length(c_int) FROM table1",
expectedErrorMessage, DATABASE_NAME);
diff --git
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java
index 8c6e2a7f357..4267f41dc8d 100644
---
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java
+++
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java
@@ -83,7 +83,7 @@ public interface Record {
* Returns the Binary value at the specified column in this row.
*
* <p>Users need to ensure that the data type of the specified column is
{@code TSDataType.TEXT},
- * {@code TSDataType.STRING} or {@code TSDataType.BLOB}.
+ * {@code TSDataType.STRING} or {@code TSDataType.BLOB} or {@code
TSDataType.OBJECT}.
*
* @param columnIndex index of the specified column
* @return the Binary value at the specified column in this row
@@ -94,7 +94,7 @@ public interface Record {
* Returns the String value at the specified column in this row.
*
* <p>Users need to ensure that the data type of the specified column is
{@code TSDataType.TEXT}
- * or {@code TSDataType.STRING}.
+ * or {@code TSDataType.STRING} or {@code TSDataType.OBJECT}.
*
* @param columnIndex index of the specified column
* @return the String value at the specified column in this row
@@ -113,6 +113,37 @@ public interface Record {
Object getObject(int columnIndex);
+ /**
+ * Returns the Binary representation of an object stored at the specified
column in this row.
+ *
+ * <p>Users need to ensure that the data type of the specified column is
{@code
+ * TSDataType.OBJECT}.
+ *
+ * <p>This method returns the entire binary data of the object and may
require considerable memory
+ * if the stored object is large.
+ *
+ * @param columnIndex index of the specified column
+ * @return the Binary content of the object at the specified column
+ */
+ Binary readObject(int columnIndex);
+
+ /**
+ * Returns a partial Binary segment of an object stored at the specified
column in this row.
+ *
+ * <p>Users need to ensure that the data type of the specified column is
{@code
+ * TSDataType.OBJECT}.
+ *
+ * <p>This method enables reading a subset of the stored object without
materializing the entire
+ * binary data in memory, which is useful for large objects and streaming
access patterns.
+ *
+ * @param columnIndex index of the specified column
+ * @param offset byte offset of the subsection read
+ * @param length number of bytes to read starting from the offset. If length
< 0, read the entire
+ * binary data from offset.
+ * @return the Binary content of the object segment at the specified column
+ */
+ Binary readObject(int columnIndex, long offset, int length);
+
/**
* Returns the actual data type of the value at the specified column in this
row.
*
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 246a1f66982..527e6eae3f4 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -148,6 +148,7 @@ public enum TSStatusCode {
// OBJECT
OBJECT_NOT_EXISTS(740),
OBJECT_INSERT_ERROR(741),
+ OBJECT_READ_ERROR(742),
// Arithmetic
NUMERIC_VALUE_OUT_OF_RANGE(750),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index dcc62fcd3c8..8f1b97b7196 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -201,6 +201,7 @@ import
org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.db.trigger.executor.TriggerExecutor;
import org.apache.iotdb.db.trigger.executor.TriggerFireResult;
import org.apache.iotdb.db.trigger.service.TriggerManagementService;
+import org.apache.iotdb.db.utils.ObjectTypeUtils;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.metrics.type.AutoGauge;
import org.apache.iotdb.metrics.utils.MetricLevel;
@@ -279,6 +280,7 @@ import
org.apache.iotdb.mpp.rpc.thrift.TPushSingleTopicMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaRespExceptionMessage;
+import org.apache.iotdb.mpp.rpc.thrift.TReadObjectReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeResp;
import org.apache.iotdb.mpp.rpc.thrift.TRegionMigrateResult;
@@ -3051,6 +3053,12 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
+ @Override
+ public ByteBuffer readObject(TReadObjectReq req) {
+ return ObjectTypeUtils.readObjectContent(
+ req.getRelativePath(), req.getOffset(), req.getSize(), false);
+ }
+
public void handleClientExit() {
// Do nothing
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java
index b7ff6f3e0fd..599efc4f240 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java
@@ -19,12 +19,14 @@
package
org.apache.iotdb.db.queryengine.execution.operator.process.function.partition;
+import org.apache.iotdb.db.utils.ObjectTypeUtils;
import org.apache.iotdb.udf.api.relational.access.Record;
import org.apache.iotdb.udf.api.type.Type;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
import org.apache.tsfile.utils.DateUtils;
import java.time.LocalDate;
@@ -35,6 +37,8 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
+import static org.apache.iotdb.udf.api.type.Type.OBJECT;
+
/** Parts of partition. */
public class Slice {
private final Column[] requiredColumns;
@@ -171,9 +175,15 @@ public class Slice {
@Override
public String getString(int columnIndex) {
- return originalColumns[columnIndex]
- .getBinary(offset)
- .getStringValue(TSFileConfig.STRING_CHARSET);
+ Binary binary = originalColumns[columnIndex].getBinary(offset);
+ Type type = dataTypes.get(columnIndex);
+ if (type == OBJECT) {
+ return BytesUtils.parseObjectByteArrayToString(binary.getValues());
+ } else if (type == Type.BLOB) {
+ return BytesUtils.parseBlobByteArrayToString(binary.getValues());
+ } else {
+ return binary.getStringValue(TSFileConfig.STRING_CHARSET);
+ }
}
@Override
@@ -186,6 +196,20 @@ public class Slice {
return originalColumns[columnIndex].getObject(offset);
}
+ @Override
+ public Binary readObject(int columnIndex, long offset, int length) {
+ if (getDataType(columnIndex) != Type.OBJECT) {
+ throw new UnsupportedOperationException("current column is not object
column");
+ }
+ Binary binary = getBinary(columnIndex);
+ return new Binary(ObjectTypeUtils.readObjectContent(binary, offset,
length, true).array());
+ }
+
+ @Override
+ public Binary readObject(int columnIndex) {
+ return readObject(columnIndex, 0L, -1);
+ }
+
@Override
public Type getDataType(int columnIndex) {
return dataTypes.get(columnIndex);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/expression/PatternExpressionComputation.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/expression/PatternExpressionComputation.java
index 6b5032627b2..cb8cbdccbb6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/expression/PatternExpressionComputation.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/expression/PatternExpressionComputation.java
@@ -35,6 +35,7 @@ import org.apache.tsfile.read.common.type.BlobType;
import org.apache.tsfile.read.common.type.BooleanType;
import org.apache.tsfile.read.common.type.DoubleType;
import org.apache.tsfile.read.common.type.FloatType;
+import org.apache.tsfile.read.common.type.ObjectType;
import org.apache.tsfile.read.common.type.Type;
import java.util.ArrayList;
@@ -161,7 +162,9 @@ public class PatternExpressionComputation {
return partition.getFloat(channel, position);
} else if (type instanceof DoubleType) {
return partition.getDouble(channel, position);
- } else if (type instanceof AbstractVarcharType || type instanceof
BlobType) {
+ } else if (type instanceof AbstractVarcharType
+ || type instanceof BlobType
+ || type instanceof ObjectType) {
return partition.getBinary(channel, position);
} else {
throw new SemanticException("Unsupported type: " +
type.getClass().getSimpleName());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaskedRecordIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaskedRecordIterator.java
index 5237fcfd12e..48095927eda 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaskedRecordIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaskedRecordIterator.java
@@ -19,8 +19,6 @@
package
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
-import org.apache.iotdb.commons.udf.access.RecordIterator;
-
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.read.common.type.Type;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/access/RecordIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/RecordIterator.java
similarity index 88%
rename from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/access/RecordIterator.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/RecordIterator.java
index 6f5813955dd..9151ef31e0d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/access/RecordIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/RecordIterator.java
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.commons.udf.access;
+package
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
+import org.apache.iotdb.db.utils.ObjectTypeUtils;
import org.apache.iotdb.udf.api.relational.access.Record;
import org.apache.iotdb.udf.api.type.Type;
@@ -138,6 +139,20 @@ public class RecordIterator implements Iterator<Record> {
return childrenColumns.get(columnIndex).getObject(index);
}
+ @Override
+ public Binary readObject(int columnIndex, long offset, int length) {
+ if (getDataType(columnIndex) != Type.OBJECT) {
+ throw new UnsupportedOperationException("current column is not object
column");
+ }
+ Binary binary = getBinary(columnIndex);
+ return new Binary(ObjectTypeUtils.readObjectContent(binary, offset,
length, true).array());
+ }
+
+ @Override
+ public Binary readObject(int columnIndex) {
+ return readObject(columnIndex, 0L, -1);
+ }
+
@Override
public Type getDataType(int columnIndex) {
return
UDFDataTypeTransformer.transformReadTypeToUDFDataType(dataTypes.get(columnIndex));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/UserDefinedAggregateFunctionAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/UserDefinedAggregateFunctionAccumulator.java
index 64c0896b3a1..70e1c0c5d44 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/UserDefinedAggregateFunctionAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/UserDefinedAggregateFunctionAccumulator.java
@@ -19,7 +19,6 @@
package
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
-import org.apache.iotdb.commons.udf.access.RecordIterator;
import org.apache.iotdb.udf.api.State;
import org.apache.iotdb.udf.api.customizer.analysis.AggregateFunctionAnalysis;
import org.apache.iotdb.udf.api.relational.AggregateFunction;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedUserDefinedAggregateAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedUserDefinedAggregateAccumulator.java
index b3d24e923ae..9ac6b48db7c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedUserDefinedAggregateAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedUserDefinedAggregateAccumulator.java
@@ -19,9 +19,9 @@
package
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped;
-import org.apache.iotdb.commons.udf.access.RecordIterator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationMask;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.MaskedRecordIterator;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.RecordIterator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.ObjectBigArray;
import org.apache.iotdb.udf.api.State;
import org.apache.iotdb.udf.api.relational.AggregateFunction;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java
index a17a824248c..41279c9b3eb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java
@@ -164,6 +164,7 @@ import
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.Ln
import
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.Log10ColumnTransformer;
import
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.LongToBytesColumnTransformer;
import
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.LowerColumnTransformer;
+import
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.ObjectLengthColumnTransformer;
import
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.RTrim2ColumnTransformer;
import
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.RTrimColumnTransformer;
import
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.RadiansColumnTransformer;
@@ -230,6 +231,7 @@ import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
import static
org.apache.iotdb.db.queryengine.plan.expression.unary.LikeExpression.getEscapeCharacter;
import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.PredicatePushIntoMetadataChecker.isStringLiteral;
+import static
org.apache.iotdb.db.queryengine.plan.relational.metadata.TableMetadataImpl.isBlobType;
import static
org.apache.iotdb.db.queryengine.plan.relational.metadata.TableMetadataImpl.isCharType;
import static
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType;
import static
org.apache.iotdb.db.queryengine.plan.relational.type.TypeSignatureTranslator.toTypeSignature;
@@ -782,9 +784,11 @@ public class ColumnTransformerBuilder
if (children.size() == 1) {
Type argumentType = first.getType();
if (isCharType(argumentType)) {
- return new LengthColumnTransformer(INT32, first);
+ return new LengthColumnTransformer(INT64, first);
+ } else if (isBlobType(argumentType)) {
+ return new BlobLengthColumnTransformer(INT64, first);
} else {
- return new BlobLengthColumnTransformer(INT32, first);
+ return new ObjectLengthColumnTransformer(INT64, first);
}
}
} else if
(TableBuiltinScalarFunction.UPPER.getFunctionName().equalsIgnoreCase(functionName))
{
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
index 8c54fd640f8..2274762341b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
@@ -308,6 +308,10 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
return partitionCache.updateGroupIdToReplicaSetMap(req.getTimestamp(),
req.getRegionRouteMap());
}
+ public List<TRegionReplicaSet> getRegionReplicaSet(List<TConsensusGroupId>
consensusGroupIds) {
+ return partitionCache.getRegionReplicaSet(consensusGroupIds);
+ }
+
@Override
public void invalidAllCache() {
partitionCache.invalidAllCache();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 58164159183..44be3e69afb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -1435,6 +1435,7 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
case TEXT:
case STRING:
case BLOB:
+ case OBJECT:
previousFill[i] =
filter == null
? new BinaryPreviousFill()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index d7d4a951cb8..c29ceaff04f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -268,6 +268,7 @@ import
org.apache.tsfile.read.common.block.column.LongColumn;
import org.apache.tsfile.read.common.type.BinaryType;
import org.apache.tsfile.read.common.type.BlobType;
import org.apache.tsfile.read.common.type.BooleanType;
+import org.apache.tsfile.read.common.type.ObjectType;
import org.apache.tsfile.read.common.type.Type;
import org.apache.tsfile.read.common.type.TypeFactory;
import org.apache.tsfile.read.filter.basic.Filter;
@@ -3825,6 +3826,7 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
case MAX:
case MIN:
if (BlobType.BLOB.equals(argumentType)
+ || ObjectType.OBJECT.equals(argumentType)
|| BinaryType.TEXT.equals(argumentType)
|| BooleanType.BOOLEAN.equals(argumentType)) {
canUseStatistic = false;
@@ -3840,8 +3842,8 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
descendingCount++;
}
- // first/last/first_by/last_by aggregation with BLOB type can not
use statistics
- if (BlobType.BLOB.equals(argumentType)) {
+ // first/last/first_by/last_by aggregation with BLOB or OBJECT type
can not use statistics
+ if (BlobType.BLOB.equals(argumentType) ||
ObjectType.OBJECT.equals(argumentType)) {
canUseStatistic = false;
break;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
index 83f6bbec63e..2297ddcebdd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
@@ -207,6 +207,9 @@ public class RelationalInsertRowsNode extends
InsertRowsNode {
for (int j = 0; j < insertRowNode.getDataTypes().length; j++) {
if (insertRowNode.getDataTypes()[j] == TSDataType.OBJECT) {
Object[] values = insertRowNode.getValues();
+ if (values[j] == null) {
+ continue;
+ }
byte[] binary = ((Binary) values[j]).getValues();
ByteBuffer buffer = ByteBuffer.wrap(binary);
boolean isEoF = buffer.get() == 1;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
index 672168b4d19..9260ec8de03 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
@@ -267,13 +267,15 @@ public class TableMetadataImpl implements Metadata {
return STRING;
} else if
(TableBuiltinScalarFunction.LENGTH.getFunctionName().equalsIgnoreCase(functionName))
{
if (!(argumentTypes.size() == 1
- && (isCharType(argumentTypes.get(0)) ||
isBlobType(argumentTypes.get(0))))) {
+ && (isCharType(argumentTypes.get(0))
+ || isBlobType(argumentTypes.get(0))
+ || isObjectType(argumentTypes.get(0))))) {
throw new SemanticException(
"Scalar function "
+ functionName.toLowerCase(Locale.ENGLISH)
- + " only accepts one argument and it must be text, string, or
blob data type.");
+ + " only accepts one argument and it must be text or string or
blob or object data type.");
}
- return INT32;
+ return INT64;
} else if
(TableBuiltinScalarFunction.UPPER.getFunctionName().equalsIgnoreCase(functionName))
{
if (!(argumentTypes.size() == 1 && isCharType(argumentTypes.get(0)))) {
throw new SemanticException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
index 279e3c06fd4..47fd40ed73f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.queryengine.transformation.dag.column.udf;
-import org.apache.iotdb.commons.udf.access.RecordIterator;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.RecordIterator;
import
org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
import
org.apache.iotdb.db.queryengine.transformation.dag.column.multi.MultiColumnTransformer;
import org.apache.iotdb.udf.api.relational.ScalarFunction;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractCastFunctionColumnTransformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractCastFunctionColumnTransformer.java
index ee40a3ec6f8..9ed1bb33075 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractCastFunctionColumnTransformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractCastFunctionColumnTransformer.java
@@ -341,6 +341,7 @@ public abstract class AbstractCastFunctionColumnTransformer
extends UnaryColumnT
case TEXT:
case STRING:
case BLOB:
+ case OBJECT:
returnType.writeBinary(columnBuilder, value);
break;
default:
@@ -393,4 +394,13 @@ public abstract class
AbstractCastFunctionColumnTransformer extends UnaryColumnT
String.format("Cannot cast %s to %s type", stringValue,
returnType.getDisplayName()));
}
}
+
+ protected void castObject(ColumnBuilder columnBuilder, Binary value) {
+ String stringValue =
BytesUtils.parseObjectByteArrayToString(value.getValues());
+ if (returnType.getTypeEnum() == TypeEnum.STRING) {
+ returnType.writeBinary(columnBuilder,
BytesUtils.valueOf(String.valueOf(stringValue)));
+ } else {
+ throw new UnsupportedOperationException(String.format(ERROR_MSG,
returnType.getTypeEnum()));
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/LengthColumnTransformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractLengthColumnTransformer.java
similarity index 77%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/LengthColumnTransformer.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractLengthColumnTransformer.java
index 00448c6f575..08eb4769166 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/LengthColumnTransformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractLengthColumnTransformer.java
@@ -24,12 +24,13 @@ import
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.UnaryColu
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.block.column.ColumnBuilder;
-import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.utils.Binary;
-public class LengthColumnTransformer extends UnaryColumnTransformer {
+public abstract class AbstractLengthColumnTransformer extends
UnaryColumnTransformer {
- public LengthColumnTransformer(Type returnType, ColumnTransformer
childColumnTransformer) {
+ public AbstractLengthColumnTransformer(
+ Type returnType, ColumnTransformer childColumnTransformer) {
super(returnType, childColumnTransformer);
}
@@ -37,8 +38,7 @@ public class LengthColumnTransformer extends
UnaryColumnTransformer {
protected void doTransform(Column column, ColumnBuilder columnBuilder) {
for (int i = 0, n = column.getPositionCount(); i < n; i++) {
if (!column.isNull(i)) {
- String currentValue =
column.getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET);
- columnBuilder.writeInt(currentValue.length());
+ columnBuilder.writeLong(transformNonNullValue(column.getBinary(i)));
} else {
columnBuilder.appendNull();
}
@@ -49,11 +49,12 @@ public class LengthColumnTransformer extends
UnaryColumnTransformer {
protected void doTransform(Column column, ColumnBuilder columnBuilder,
boolean[] selection) {
for (int i = 0, n = column.getPositionCount(); i < n; i++) {
if (selection[i] && !column.isNull(i)) {
- String currentValue =
column.getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET);
- columnBuilder.writeInt(currentValue.length());
+ columnBuilder.writeLong(transformNonNullValue(column.getBinary(i)));
} else {
columnBuilder.appendNull();
}
}
}
+
+ protected abstract long transformNonNullValue(Binary binary);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/BlobLengthColumnTransformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/BlobLengthColumnTransformer.java
index e94d9ad3907..bce18fb0597 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/BlobLengthColumnTransformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/BlobLengthColumnTransformer.java
@@ -20,37 +20,18 @@
package org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar;
import
org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
-import
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.UnaryColumnTransformer;
-import org.apache.tsfile.block.column.Column;
-import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.read.common.type.Type;
import org.apache.tsfile.utils.Binary;
-public class BlobLengthColumnTransformer extends UnaryColumnTransformer {
+public class BlobLengthColumnTransformer extends
AbstractLengthColumnTransformer {
public BlobLengthColumnTransformer(Type returnType, ColumnTransformer
childColumnTransformer) {
super(returnType, childColumnTransformer);
}
@Override
- protected void doTransform(Column column, ColumnBuilder columnBuilder) {
- doTransform(column, columnBuilder, null);
- }
-
- @Override
- protected void doTransform(Column column, ColumnBuilder columnBuilder,
boolean[] selection) {
-
- int positionCount = column.getPositionCount();
- for (int i = 0; i < positionCount; i++) {
- if ((selection != null && !selection[i]) || column.isNull(i)) {
- columnBuilder.appendNull();
- continue;
- }
-
- Binary value = column.getBinary(i);
- int length = value.getValues().length;
- columnBuilder.writeInt(length);
- }
+ protected long transformNonNullValue(Binary binary) {
+ return binary.getValues().length;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/CastFunctionColumnTransformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/CastFunctionColumnTransformer.java
index b9c8e31b1ab..624eabadc62 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/CastFunctionColumnTransformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/CastFunctionColumnTransformer.java
@@ -67,6 +67,9 @@ public class CastFunctionColumnTransformer extends
AbstractCastFunctionColumnTra
case BLOB:
castBlob(columnBuilder, childType.getBinary(column, i));
break;
+ case OBJECT:
+ castObject(columnBuilder, childType.getBinary(column, i));
+ break;
default:
throw new UnsupportedOperationException(
String.format(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/LengthColumnTransformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/LengthColumnTransformer.java
index 00448c6f575..c94530c83d4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/LengthColumnTransformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/LengthColumnTransformer.java
@@ -20,40 +20,18 @@
package org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar;
import
org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
-import
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.UnaryColumnTransformer;
-import org.apache.tsfile.block.column.Column;
-import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.utils.Binary;
-public class LengthColumnTransformer extends UnaryColumnTransformer {
-
+public class LengthColumnTransformer extends AbstractLengthColumnTransformer {
public LengthColumnTransformer(Type returnType, ColumnTransformer
childColumnTransformer) {
super(returnType, childColumnTransformer);
}
@Override
- protected void doTransform(Column column, ColumnBuilder columnBuilder) {
- for (int i = 0, n = column.getPositionCount(); i < n; i++) {
- if (!column.isNull(i)) {
- String currentValue =
column.getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET);
- columnBuilder.writeInt(currentValue.length());
- } else {
- columnBuilder.appendNull();
- }
- }
- }
-
- @Override
- protected void doTransform(Column column, ColumnBuilder columnBuilder,
boolean[] selection) {
- for (int i = 0, n = column.getPositionCount(); i < n; i++) {
- if (selection[i] && !column.isNull(i)) {
- String currentValue =
column.getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET);
- columnBuilder.writeInt(currentValue.length());
- } else {
- columnBuilder.appendNull();
- }
- }
+ protected long transformNonNullValue(Binary binary) {
+ return binary.getStringValue(TSFileConfig.STRING_CHARSET).length();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaskedRecordIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectLengthColumnTransformer.java
similarity index 57%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaskedRecordIterator.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectLengthColumnTransformer.java
index 5237fcfd12e..5d39c6f6af3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaskedRecordIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectLengthColumnTransformer.java
@@ -17,26 +17,21 @@
* under the License.
*/
-package
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+package org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar;
-import org.apache.iotdb.commons.udf.access.RecordIterator;
+import
org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.utils.ObjectTypeUtils;
-import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.utils.Binary;
-import java.util.List;
-
-public class MaskedRecordIterator extends RecordIterator {
- private final int[] selectedPositions;
-
- public MaskedRecordIterator(
- List<Column> childrenColumns, List<Type> dataTypes, AggregationMask
mask) {
- super(childrenColumns, dataTypes, mask.getSelectedPositionCount());
- this.selectedPositions = mask.getSelectedPositions();
+public class ObjectLengthColumnTransformer extends
AbstractLengthColumnTransformer {
+ public ObjectLengthColumnTransformer(Type returnType, ColumnTransformer
childColumnTransformer) {
+ super(returnType, childColumnTransformer);
}
@Override
- protected int getCurrentIndex() {
- return selectedPositions[currentIndex++];
+ protected long transformNonNullValue(Binary binary) {
+ return ObjectTypeUtils.getObjectLength(binary);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
index 9504c6c2282..3049a9bc441 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
@@ -19,25 +19,18 @@
package org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar;
-import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
-import org.apache.iotdb.db.exception.sql.SemanticException;
import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
import
org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
import
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.UnaryColumnTransformer;
import org.apache.iotdb.db.utils.ObjectTypeUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.type.Type;
import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.Pair;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.file.StandardOpenOption;
import java.util.Optional;
public class ReadObjectColumnTransformer extends UnaryColumnTransformer {
@@ -107,35 +100,14 @@ public class ReadObjectColumnTransformer extends
UnaryColumnTransformer {
}
private Binary readObject(Binary binary) {
- File file = ObjectTypeUtils.getObjectPathFromBinary(binary);
- long actualReadSize = getActualReadSize(file);
+ Pair<Long, String> objectLengthPathPair =
ObjectTypeUtils.parseObjectBinary(binary);
+ long fileLength = objectLengthPathPair.getLeft();
+ String relativePath = objectLengthPathPair.getRight();
+ int actualReadSize =
+ ObjectTypeUtils.getActualReadSize(relativePath, fileLength, offset,
length);
fragmentInstanceContext.ifPresent(
context ->
context.getMemoryReservationContext().reserveMemoryCumulatively(actualReadSize));
- byte[] bytes = new byte[(int) actualReadSize];
- ByteBuffer buffer = ByteBuffer.wrap(bytes);
- try (FileChannel fileChannel = FileChannel.open(file.toPath(),
StandardOpenOption.READ)) {
- fileChannel.read(buffer, offset);
- } catch (IOException e) {
- throw new IoTDBRuntimeException(e,
TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
- }
- return new Binary(bytes);
- }
-
- private long getActualReadSize(File file) {
- long fileSize = file.length();
- if (offset >= fileSize) {
- throw new SemanticException(
- String.format(
- "offset %d is greater than object size %d, file path is %s",
- offset, fileSize, file.getAbsolutePath()));
- }
- long actualReadSize = Math.min(length < 0 ? fileSize : length, fileSize -
offset);
- if (actualReadSize > Integer.MAX_VALUE) {
- throw new SemanticException(
- String.format(
- "Read object size %s is too large (size > 2G), file path is %s",
- actualReadSize, file.getAbsolutePath()));
- }
- return actualReadSize;
+ return new Binary(
+ ObjectTypeUtils.readObjectContent(relativePath, offset,
actualReadSize, true).array());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/TryCastFunctionColumnTransformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/TryCastFunctionColumnTransformer.java
index c25fd321c34..419d1bbabf3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/TryCastFunctionColumnTransformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/TryCastFunctionColumnTransformer.java
@@ -69,6 +69,9 @@ public class TryCastFunctionColumnTransformer extends
AbstractCastFunctionColumn
case BLOB:
castBlob(columnBuilder, childType.getBinary(column, i));
break;
+ case OBJECT:
+ castObject(columnBuilder, childType.getBinary(column, i));
+ break;
default:
throw new UnsupportedOperationException(
String.format(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
index b0575837828..0262eb36170 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
@@ -528,6 +528,7 @@ public class CompactionUtils {
}
}
+ @SuppressWarnings("java:S3776")
public static void removeDeletedObjectFiles(
TsFileSequenceReader reader,
List<AbstractAlignedChunkMetadata> alignedChunkMetadataList,
@@ -581,6 +582,7 @@ public class CompactionUtils {
}
}
+ @SuppressWarnings("java:S3776")
private static void removeDeletedObjectFiles(
TsFileSequenceReader reader,
AbstractAlignedChunkMetadata alignedChunkMetadata,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java
index c153061a90d..44ba3235911 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java
@@ -19,18 +19,36 @@
package org.apache.iotdb.db.utils;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.exception.ObjectFileNotExist;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
+import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.service.metrics.FileMetrics;
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
+import org.apache.iotdb.mpp.rpc.thrift.TReadObjectReq;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.Collections;
+import java.util.List;
import java.util.Optional;
public class ObjectTypeUtils {
@@ -40,6 +58,127 @@ public class ObjectTypeUtils {
private ObjectTypeUtils() {}
+ public static ByteBuffer readObjectContent(
+ Binary binary, long offset, int length, boolean mayNotInCurrentNode) {
+ Pair<Long, String> objectLengthPathPair =
ObjectTypeUtils.parseObjectBinary(binary);
+ long fileLength = objectLengthPathPair.getLeft();
+ String relativePath = objectLengthPathPair.getRight();
+ int actualReadSize =
+ ObjectTypeUtils.getActualReadSize(
+ relativePath, fileLength, offset, length < 0 ? fileLength :
length);
+ return ObjectTypeUtils.readObjectContent(
+ relativePath, offset, actualReadSize, mayNotInCurrentNode);
+ }
+
+ public static ByteBuffer readObjectContent(
+ String relativePath, long offset, int readSize, boolean
mayNotInCurrentNode) {
+ Optional<File> objectFile =
TIER_MANAGER.getAbsoluteObjectFilePath(relativePath, false);
+ if (objectFile.isPresent()) {
+ return readObjectContentFromLocalFile(objectFile.get(), offset,
readSize);
+ }
+ if (mayNotInCurrentNode) {
+ return readObjectContentFromRemoteFile(relativePath, offset, readSize);
+ }
+ throw new ObjectFileNotExist(relativePath);
+ }
+
+ private static ByteBuffer readObjectContentFromLocalFile(File file, long
offset, long readSize) {
+ byte[] bytes = new byte[(int) readSize];
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ try (FileChannel fileChannel = FileChannel.open(file.toPath(),
StandardOpenOption.READ)) {
+ fileChannel.read(buffer, offset);
+ } catch (IOException e) {
+ throw new IoTDBRuntimeException(e,
TSStatusCode.OBJECT_READ_ERROR.getStatusCode());
+ }
+ buffer.flip();
+ return buffer;
+ }
+
+ private static ByteBuffer readObjectContentFromRemoteFile(
+ final String relativePath, final long offset, final int readSize) {
+ int regionId;
+ try {
+ regionId =
Integer.parseInt(Paths.get(relativePath).getName(0).toString());
+ } catch (NumberFormatException e) {
+ throw new IoTDBRuntimeException(
+ "wrong object file path: " + relativePath,
+ TSStatusCode.OBJECT_READ_ERROR.getStatusCode());
+ }
+ TConsensusGroupId consensusGroupId =
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, regionId);
+ List<TRegionReplicaSet> regionReplicaSetList =
+ ClusterPartitionFetcher.getInstance()
+ .getRegionReplicaSet(Collections.singletonList(consensusGroupId));
+ if (regionReplicaSetList.isEmpty()) {
+ throw new ObjectFileNotExist(relativePath);
+ }
+ TRegionReplicaSet regionReplicaSet =
regionReplicaSetList.iterator().next();
+ if (regionReplicaSet.getDataNodeLocations().isEmpty()) {
+ throw new ObjectFileNotExist(relativePath);
+ }
+ final int batchSize = 1024 * 1024;
+ final TReadObjectReq req = new TReadObjectReq();
+ req.setRelativePath(relativePath);
+ ByteBuffer buffer = ByteBuffer.allocate(readSize);
+ for (int i = 0; i < regionReplicaSet.getDataNodeLocations().size(); i++) {
+ TDataNodeLocation dataNodeLocation =
regionReplicaSet.getDataNodeLocations().get(i);
+ int toReadSizeInCurrentDataNode = readSize;
+ try (SyncDataNodeInternalServiceClient client =
+ Coordinator.getInstance()
+ .getInternalServiceClientManager()
+ .borrowClient(dataNodeLocation.getInternalEndPoint())) {
+ while (toReadSizeInCurrentDataNode > 0) {
+ req.setOffset(offset + buffer.position());
+ req.setSize(Math.min(toReadSizeInCurrentDataNode, batchSize));
+ toReadSizeInCurrentDataNode -= req.getSize();
+ ByteBuffer partial = client.readObject(req);
+ buffer.put(partial);
+ }
+ } catch (Exception e) {
+ logger.warn("Failed to read object from datanode: {}",
dataNodeLocation, e);
+ if (i == regionReplicaSet.getDataNodeLocations().size() - 1) {
+ throw new IoTDBRuntimeException(e,
TSStatusCode.OBJECT_READ_ERROR.getStatusCode());
+ }
+ continue;
+ }
+ break;
+ }
+ buffer.flip();
+ return buffer;
+ }
+
+ public static int getActualReadSize(String filePath, long fileSize, long
offset, long length) {
+ if (offset >= fileSize) {
+ throw new SemanticException(
+ String.format(
+ "offset %d is greater than or equal to object size %d, file path
is %s",
+ offset, fileSize, filePath));
+ }
+ long actualReadSize = Math.min(length < 0 ? fileSize : length, fileSize -
offset);
+ if (actualReadSize > Integer.MAX_VALUE) {
+ throw new SemanticException(
+ String.format(
+ "Read object size %s is too large (size > 2G), file path is %s",
+ actualReadSize, filePath));
+ }
+ return (int) actualReadSize;
+ }
+
+ public static Pair<Long, String> parseObjectBinary(Binary binary) {
+ byte[] bytes = binary.getValues();
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ long length = buffer.getLong();
+ String relativeObjectFilePath =
+ new String(bytes, 8, bytes.length - 8, TSFileConfig.STRING_CHARSET);
+ return new Pair<>(length, relativeObjectFilePath);
+ }
+
+ public static long getObjectLength(Binary binary) {
+ byte[] bytes = binary.getValues();
+ ByteBuffer wrap = ByteBuffer.wrap(bytes);
+ return wrap.getLong();
+ }
+
public static File getObjectPathFromBinary(Binary binary) {
byte[] bytes = binary.getValues();
String relativeObjectFilePath =
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/RecordObjectTypeTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/RecordObjectTypeTest.java
new file mode 100644
index 00000000000..e0a6ca88825
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/RecordObjectTypeTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.function;
+
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.Slice;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.RecordIterator;
+import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
+import org.apache.iotdb.db.utils.ObjectTypeUtils;
+import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.type.Type;
+
+import org.apache.tsfile.block.TsBlockBuilderStatus;
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilderStatus;
+import org.apache.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.tsfile.read.common.type.ObjectType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
+import org.apache.tsfile.utils.Pair;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.Iterator;
+
+public class RecordObjectTypeTest {
+
+ private File objectDir;
+
+ @Before
+ public void setup() {
+ try {
+ objectDir = new
File(TierManager.getInstance().getNextFolderForObjectFile());
+ } catch (DiskSpaceInsufficientException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ File[] files = objectDir.listFiles();
+ if (files != null) {
+ for (File file : files) {
+ Files.delete(file.toPath());
+ }
+ }
+ }
+
+ @Test
+ public void test() throws IOException {
+ BinaryColumnBuilder columnBuilder =
+ new BinaryColumnBuilder(new ColumnBuilderStatus(new
TsBlockBuilderStatus(1024)), 1);
+ columnBuilder.writeBinary(new Binary(createObjectBinary().array()));
+ RecordIterator recordIterator =
+ new RecordIterator(
+ Collections.singletonList(columnBuilder.build()),
+ Collections.singletonList(ObjectType.OBJECT),
+ 1);
+ Slice slice =
+ new Slice(
+ 0,
+ 1,
+ new Column[] {columnBuilder.build()},
+ Collections.singletonList(0),
+ Collections.emptyList(),
+ Collections.singletonList(Type.OBJECT));
+ testRecordIterator(recordIterator);
+ testRecordIterator(slice.getRequiredRecordIterator(false));
+ }
+
+ private void testRecordIterator(Iterator<Record> recordIterator) {
+ Assert.assertTrue(recordIterator.hasNext());
+ Record record = recordIterator.next();
+
+ Binary result = record.readObject(0);
+ Assert.assertEquals(100, result.getLength());
+ for (int j = 0; j < 100; j++) {
+ Assert.assertEquals(j, result.getValues()[j]);
+ }
+
+ result = record.readObject(0, 10, 2);
+ Assert.assertArrayEquals(new byte[] {(byte) 10, (byte) 11},
result.getValues());
+
+ Object object = record.getObject(0);
+ Assert.assertTrue(object instanceof Binary);
+ Pair<Long, String> pair = ObjectTypeUtils.parseObjectBinary((Binary)
object);
+ Assert.assertEquals(Long.valueOf(100L), pair.getLeft());
+ Assert.assertTrue(pair.getRight().startsWith("test_") &&
pair.getRight().endsWith(".bin"));
+
+ Assert.assertArrayEquals(((Binary) object).getValues(),
record.getBinary(0).getValues());
+
+ Assert.assertEquals("(Object) 100 B", record.getString(0));
+ Assert.assertFalse(recordIterator.hasNext());
+ }
+
+ private ByteBuffer createObjectBinary() throws IOException {
+ Path testFile1 = Files.createTempFile(objectDir.toPath(), "test_", ".bin");
+ byte[] content = new byte[100];
+ for (int i = 0; i < 100; i++) {
+ content[i] = (byte) i;
+ }
+ Files.write(testFile1, content);
+ String relativePath = testFile1.toFile().getName();
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES +
relativePath.length());
+ buffer.putLong(100L);
+ buffer.put(BytesUtils.stringToBytes(relativePath));
+ buffer.flip();
+ return buffer;
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/BlobLengthColumnTransformerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/BlobLengthColumnTransformerTest.java
index ffb00505d14..7fbbd6da2fa 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/BlobLengthColumnTransformerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/BlobLengthColumnTransformerTest.java
@@ -31,7 +31,7 @@ import org.mockito.Mockito;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
-import static org.apache.tsfile.read.common.type.IntType.INT32;
+import static org.apache.tsfile.read.common.type.LongType.INT64;
public class BlobLengthColumnTransformerTest {
@@ -69,13 +69,13 @@ public class BlobLengthColumnTransformerTest {
ColumnTransformer childColumnTransformer =
mockChildColumnTransformer(binaryColumn);
BlobLengthColumnTransformer blobLengthColumnTransformer =
- new BlobLengthColumnTransformer(INT32, childColumnTransformer);
+ new BlobLengthColumnTransformer(INT64, childColumnTransformer);
blobLengthColumnTransformer.addReferenceCount();
blobLengthColumnTransformer.evaluate();
Column result = blobLengthColumnTransformer.getColumn();
int expectedLength = input.getBytes(StandardCharsets.UTF_8).length;
- Assert.assertEquals(expectedLength, result.getInt(0));
+ Assert.assertEquals(expectedLength, result.getLong(0));
}
@Test
@@ -87,13 +87,13 @@ public class BlobLengthColumnTransformerTest {
ColumnTransformer childColumnTransformer =
mockChildColumnTransformer(binaryColumn);
BlobLengthColumnTransformer blobLengthColumnTransformer =
- new BlobLengthColumnTransformer(INT32, childColumnTransformer);
+ new BlobLengthColumnTransformer(INT64, childColumnTransformer);
blobLengthColumnTransformer.addReferenceCount();
blobLengthColumnTransformer.evaluate();
Column result = blobLengthColumnTransformer.getColumn();
int expectedLength = inputBytes.length;
- Assert.assertEquals(expectedLength, result.getInt(0));
+ Assert.assertEquals(expectedLength, result.getLong(0));
}
@Test
@@ -109,13 +109,13 @@ public class BlobLengthColumnTransformerTest {
ColumnTransformer childColumnTransformer =
mockChildColumnTransformer(binaryColumn);
BlobLengthColumnTransformer blobLengthColumnTransformer =
- new BlobLengthColumnTransformer(INT32, childColumnTransformer);
+ new BlobLengthColumnTransformer(INT64, childColumnTransformer);
blobLengthColumnTransformer.addReferenceCount();
blobLengthColumnTransformer.evaluate();
Column result = blobLengthColumnTransformer.getColumn();
- Assert.assertEquals(inputBytes1.length, result.getInt(0));
+ Assert.assertEquals(inputBytes1.length, result.getLong(0));
Assert.assertTrue(result.isNull(1));
- Assert.assertEquals(inputBytes2.length, result.getInt(2));
+ Assert.assertEquals(inputBytes2.length, result.getLong(2));
}
@Test
@@ -133,7 +133,7 @@ public class BlobLengthColumnTransformerTest {
ColumnTransformer child =
mockChildColumnTransformer(new BinaryColumn(values.length,
Optional.empty(), values));
BlobLengthColumnTransformer blobLengthColumnTransformer =
- new BlobLengthColumnTransformer(INT32, child);
+ new BlobLengthColumnTransformer(INT64, child);
blobLengthColumnTransformer.addReferenceCount();
blobLengthColumnTransformer.evaluateWithSelection(booleans);
Column result = blobLengthColumnTransformer.getColumn();
@@ -142,7 +142,7 @@ public class BlobLengthColumnTransformerTest {
int expectedValue3 = bytes3.length;
Assert.assertTrue(result.isNull(0));
- Assert.assertEquals(expectedValue2, result.getInt(1));
- Assert.assertEquals(expectedValue3, result.getInt(2));
+ Assert.assertEquals(expectedValue2, result.getLong(1));
+ Assert.assertEquals(expectedValue3, result.getLong(2));
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectTypeFunctionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectTypeFunctionTest.java
new file mode 100644
index 00000000000..0a0d73d2ac3
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectTypeFunctionTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar;
+
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import
org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
+
+import org.apache.tsfile.block.TsBlockBuilderStatus;
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilderStatus;
+import org.apache.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.tsfile.read.common.block.column.LongColumn;
+import org.apache.tsfile.read.common.type.BlobType;
+import org.apache.tsfile.read.common.type.LongType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Optional;
+
+public class ObjectTypeFunctionTest {
+
+ private File objectDir;
+
+ @Before
+ public void setup() {
+ try {
+ objectDir = new
File(TierManager.getInstance().getNextFolderForObjectFile());
+ } catch (DiskSpaceInsufficientException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ File[] files = objectDir.listFiles();
+ if (files != null) {
+ for (File file : files) {
+ Files.delete(file.toPath());
+ }
+ }
+ }
+
+ @Test
+ public void testLength() throws IOException {
+ BinaryColumnBuilder columnBuilder =
+ new BinaryColumnBuilder(new ColumnBuilderStatus(new
TsBlockBuilderStatus(1024)), 1);
+ columnBuilder.writeBinary(new Binary(createObjectBinary().array()));
+ ColumnTransformer childColumnTransformer =
mockChildColumnTransformer(columnBuilder.build());
+ ObjectLengthColumnTransformer transformer =
+ new ObjectLengthColumnTransformer(LongType.INT64,
childColumnTransformer);
+ transformer.addReferenceCount();
+ transformer.evaluate();
+ Column result = transformer.getColumn();
+ Assert.assertTrue(result instanceof LongColumn);
+ Assert.assertEquals(LongType.INT64, transformer.getType());
+ Assert.assertEquals(1, result.getPositionCount());
+ for (int i = 0; i < result.getPositionCount(); i++) {
+ Assert.assertEquals(100, result.getLong(i));
+ }
+ }
+
+ @Test
+ public void testReadObject1() throws IOException {
+ BinaryColumnBuilder columnBuilder =
+ new BinaryColumnBuilder(new ColumnBuilderStatus(new
TsBlockBuilderStatus(1024)), 1);
+ columnBuilder.writeBinary(new Binary(createObjectBinary().array()));
+ ColumnTransformer childColumnTransformer =
mockChildColumnTransformer(columnBuilder.build());
+ ReadObjectColumnTransformer transformer =
+ new ReadObjectColumnTransformer(BlobType.BLOB, childColumnTransformer,
Optional.empty());
+ transformer.addReferenceCount();
+ transformer.evaluate();
+ Column result = transformer.getColumn();
+ Assert.assertTrue(result instanceof BinaryColumn);
+ Assert.assertEquals(BlobType.BLOB, transformer.getType());
+ Assert.assertEquals(1, result.getPositionCount());
+ for (int i = 0; i < result.getPositionCount(); i++) {
+ Assert.assertEquals(100, result.getBinary(i).getLength());
+ for (int j = 0; j < 100; j++) {
+ Assert.assertEquals(j, result.getBinary(i).getValues()[j]);
+ }
+ }
+ }
+
+ @Test
+ public void testReadObject2() throws IOException {
+ BinaryColumnBuilder columnBuilder =
+ new BinaryColumnBuilder(new ColumnBuilderStatus(new
TsBlockBuilderStatus(1024)), 1);
+ columnBuilder.writeBinary(new Binary(createObjectBinary().array()));
+ ColumnTransformer childColumnTransformer =
mockChildColumnTransformer(columnBuilder.build());
+ ReadObjectColumnTransformer transformer =
+ new ReadObjectColumnTransformer(
+ BlobType.BLOB, 10, childColumnTransformer, Optional.empty());
+ transformer.addReferenceCount();
+ transformer.evaluate();
+ Column result = transformer.getColumn();
+ Assert.assertTrue(result instanceof BinaryColumn);
+ Assert.assertEquals(BlobType.BLOB, transformer.getType());
+ Assert.assertEquals(1, result.getPositionCount());
+ for (int i = 0; i < result.getPositionCount(); i++) {
+ Assert.assertEquals(90, result.getBinary(i).getLength());
+ for (int j = 10; j < 100; j++) {
+ Assert.assertEquals(j, result.getBinary(i).getValues()[j - 10]);
+ }
+ }
+ }
+
+ @Test
+ public void testReadObject3() throws IOException {
+ BinaryColumnBuilder columnBuilder =
+ new BinaryColumnBuilder(new ColumnBuilderStatus(new
TsBlockBuilderStatus(1024)), 1);
+ columnBuilder.writeBinary(new Binary(createObjectBinary().array()));
+ ColumnTransformer childColumnTransformer =
mockChildColumnTransformer(columnBuilder.build());
+ ReadObjectColumnTransformer transformer =
+ new ReadObjectColumnTransformer(
+ BlobType.BLOB, 10, 2, childColumnTransformer, Optional.empty());
+ transformer.addReferenceCount();
+ transformer.evaluate();
+ Column result = transformer.getColumn();
+ Assert.assertTrue(result instanceof BinaryColumn);
+ Assert.assertEquals(BlobType.BLOB, transformer.getType());
+ Assert.assertEquals(1, result.getPositionCount());
+ for (int i = 0; i < result.getPositionCount(); i++) {
+ Assert.assertEquals(2, result.getBinary(i).getLength());
+ Assert.assertArrayEquals(new byte[] {(byte) 10, (byte) 11},
result.getBinary(i).getValues());
+ }
+ }
+
+ private ByteBuffer createObjectBinary() throws IOException {
+ Path testFile1 = Files.createTempFile(objectDir.toPath(), "test_", ".bin");
+ byte[] content = new byte[100];
+ for (int i = 0; i < 100; i++) {
+ content[i] = (byte) i;
+ }
+ Files.write(testFile1, content);
+ String relativePath = testFile1.toFile().getName();
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES +
relativePath.length());
+ buffer.putLong(100L);
+ buffer.put(BytesUtils.stringToBytes(relativePath));
+ buffer.flip();
+ return buffer;
+ }
+
+ private ColumnTransformer mockChildColumnTransformer(Column column) {
+ ColumnTransformer mockColumnTransformer =
Mockito.mock(ColumnTransformer.class);
+ Mockito.when(mockColumnTransformer.getColumn()).thenReturn(column);
+ Mockito.doNothing().when(mockColumnTransformer).tryEvaluate();
+ Mockito.doNothing().when(mockColumnTransformer).clearCache();
+
Mockito.doNothing().when(mockColumnTransformer).evaluateWithSelection(Mockito.any());
+ return mockColumnTransformer;
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java
new file mode 100644
index 00000000000..4bc11fbec57
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.compaction.object;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.schema.table.TsTable;
+import org.apache.iotdb.commons.schema.table.column.FieldColumnSchema;
+import org.apache.iotdb.commons.schema.table.column.TagColumnSchema;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadPointCompactionPerformer;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CrossSpaceCompactionTask;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.SettleCompactionTask;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
+
+import org.apache.tsfile.enums.ColumnCategory;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.ColumnSchema;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.writer.TsFileIOWriter;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+
+public class ObjectTypeCompactionTest extends AbstractCompactionTest {
+
+ private static final TableSchema tableSchema =
+ new TableSchema(
+ "t1",
+ Arrays.asList(
+ new ColumnSchema("device", TSDataType.STRING,
ColumnCategory.TAG),
+ new ColumnSchema("s1", TSDataType.OBJECT,
ColumnCategory.FIELD)));
+
+ private String threadName;
+ private File objectDir;
+
+ @Before
+ @Override
+ public void setUp()
+ throws IOException, WriteProcessException, MetadataException,
InterruptedException {
+ this.threadName = Thread.currentThread().getName();
+ Thread.currentThread().setName("pool-1-IoTDB-Compaction-Worker-1");
+ DataNodeTableCache.getInstance().invalid(this.COMPACTION_TEST_SG);
+ createTable("t1", 1);
+ super.setUp();
+ try {
+ objectDir = new
File(TierManager.getInstance().getNextFolderForObjectFile());
+ } catch (DiskSpaceInsufficientException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @After
+ @Override
+ public void tearDown() throws IOException, StorageEngineException {
+ super.tearDown();
+ Thread.currentThread().setName(threadName);
+ DataNodeTableCache.getInstance().invalid(this.COMPACTION_TEST_SG);
+ File[] files = objectDir.listFiles();
+ if (files != null) {
+ for (File file : files) {
+ Files.delete(file.toPath());
+ }
+ }
+ }
+
+ public void createTable(String tableName, long ttl) {
+ TsTable tsTable = new TsTable(tableName);
+ tsTable.addColumnSchema(new TagColumnSchema("device", TSDataType.STRING));
+ tsTable.addColumnSchema(
+ new FieldColumnSchema("s1", TSDataType.OBJECT, TSEncoding.PLAIN,
CompressionType.LZ4));
+ tsTable.addProp(TsTable.TTL_PROPERTY, ttl + "");
+ DataNodeTableCache.getInstance().preUpdateTable(this.COMPACTION_TEST_SG,
tsTable, null);
+
DataNodeTableCache.getInstance().commitUpdateTable(this.COMPACTION_TEST_SG,
tableName, null);
+ }
+
+ @Test
+ public void testSeqCompactionWithTTL() throws IOException,
WriteProcessException {
+ Pair<TsFileResource, File> pair1 =
+ generateTsFileAndObject(true, System.currentTimeMillis() - 10000);
+ Pair<TsFileResource, File> pair2 =
+ generateTsFileAndObject(true, System.currentTimeMillis() + 1000000);
+ tsFileManager.add(pair1.getLeft(), true);
+ tsFileManager.add(pair2.getLeft(), true);
+ InnerSpaceCompactionTask task =
+ new InnerSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ tsFileManager.getTsFileList(true),
+ true,
+ new ReadChunkCompactionPerformer(),
+ 0);
+ Assert.assertTrue(task.start());
+ Assert.assertFalse(pair1.getRight().exists());
+ Assert.assertTrue(pair2.getRight().exists());
+ }
+
+ @Test
+ public void testUnseqCompactionWithTTL() throws IOException,
WriteProcessException {
+ Pair<TsFileResource, File> pair1 =
+ generateTsFileAndObject(false, System.currentTimeMillis() + 100000);
+ Pair<TsFileResource, File> pair2 =
+ generateTsFileAndObject(false, System.currentTimeMillis() - 1000000);
+ tsFileManager.add(pair1.getLeft(), false);
+ tsFileManager.add(pair2.getLeft(), false);
+ InnerSpaceCompactionTask task =
+ new InnerSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ tsFileManager.getTsFileList(false),
+ false,
+ new FastCompactionPerformer(false),
+ 0);
+ Assert.assertTrue(task.start());
+ Assert.assertFalse(pair2.getRight().exists());
+ Assert.assertTrue(pair1.getRight().exists());
+ }
+
+ @Test
+ public void testUnseqCompactionWithReadPointWithTTL() throws IOException,
WriteProcessException {
+ Pair<TsFileResource, File> pair1 =
+ generateTsFileAndObject(false, System.currentTimeMillis() + 100000);
+ Pair<TsFileResource, File> pair2 =
+ generateTsFileAndObject(false, System.currentTimeMillis() - 1000000);
+ tsFileManager.add(pair1.getLeft(), false);
+ tsFileManager.add(pair2.getLeft(), false);
+ InnerSpaceCompactionTask task =
+ new InnerSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ tsFileManager.getTsFileList(false),
+ false,
+ new ReadPointCompactionPerformer(),
+ 0);
+ Assert.assertTrue(task.start());
+ Assert.assertTrue(pair1.getRight().exists());
+ Assert.assertFalse(pair2.getRight().exists());
+ }
+
+ @Test
+ public void testCrossCompactionWithTTL() throws IOException,
WriteProcessException {
+ Pair<TsFileResource, File> pair1 =
+ generateTsFileAndObject(true, System.currentTimeMillis() + 100000);
+ Pair<TsFileResource, File> pair2 =
+ generateTsFileAndObject(false, System.currentTimeMillis() - 1000000);
+ tsFileManager.add(pair1.getLeft(), true);
+ tsFileManager.add(pair2.getLeft(), false);
+ CrossSpaceCompactionTask task =
+ new CrossSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ tsFileManager.getTsFileList(true),
+ tsFileManager.getTsFileList(false),
+ new FastCompactionPerformer(true),
+ 1,
+ 0);
+ Assert.assertTrue(task.start());
+ Assert.assertFalse(pair2.getRight().exists());
+ Assert.assertTrue(pair1.getRight().exists());
+ }
+
+ @Test
+ public void testSettleCompaction() throws IOException, WriteProcessException
{
+ Pair<TsFileResource, File> pair1 =
+ generateTsFileAndObject(true, System.currentTimeMillis() - 10000);
+ Pair<TsFileResource, File> pair2 =
+ generateTsFileAndObject(true, System.currentTimeMillis() + 1000000);
+ tsFileManager.add(pair1.getLeft(), true);
+ tsFileManager.add(pair2.getLeft(), true);
+ SettleCompactionTask task =
+ new SettleCompactionTask(
+ 0,
+ tsFileManager,
+ tsFileManager.getTsFileList(true),
+ Collections.emptyList(),
+ true,
+ new FastCompactionPerformer(true),
+ 0);
+ Assert.assertTrue(task.start());
+ Assert.assertFalse(pair1.getRight().exists());
+ Assert.assertTrue(pair2.getRight().exists());
+ }
+
+ private Pair<TsFileResource, File> generateTsFileAndObject(boolean seq, long
timestamp)
+ throws IOException, WriteProcessException {
+ TsFileResource resource = createEmptyFileAndResource(seq);
+ Path testFile1 = Files.createTempFile(objectDir.toPath(), "test_", ".bin");
+ byte[] content = new byte[100];
+ for (int i = 0; i < 100; i++) {
+ content[i] = (byte) i;
+ }
+ Files.write(testFile1, content);
+ String relativePath = testFile1.toFile().getName();
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES +
relativePath.length());
+ buffer.putLong(100L);
+ buffer.put(BytesUtils.stringToBytes(relativePath));
+ buffer.flip();
+ IDeviceID deviceID = new StringArrayDeviceID("t1", "d1");
+ try (TsFileIOWriter writer = new TsFileIOWriter(resource.getTsFile())) {
+ writer.getSchema().registerTableSchema(tableSchema);
+ writer.startChunkGroup(deviceID);
+ AlignedChunkWriterImpl alignedChunkWriter =
+ new AlignedChunkWriterImpl(Arrays.asList(new MeasurementSchema("s1",
TSDataType.OBJECT)));
+ alignedChunkWriter.write(timestamp);
+ alignedChunkWriter.write(timestamp, new Binary(buffer.array()), false);
+ alignedChunkWriter.sealCurrentPage();
+ alignedChunkWriter.writeToFileWriter(writer);
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ resource.updateStartTime(deviceID, 1);
+ resource.updateEndTime(deviceID, 1);
+ resource.serialize();
+ resource.deserialize();
+ resource.setStatus(TsFileResourceStatus.NORMAL);
+ return new Pair<>(resource, testFile1.toFile());
+ }
+}
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index caaf44c16a7..469d2bb006a 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -773,6 +773,12 @@ struct TKillQueryInstanceReq {
2: optional string allowedUsername
}
+struct TReadObjectReq {
+ 1: string relativePath
+ 2: i64 offset
+ 3: i32 size
+}
+
/**
* END: Used for EXPLAIN ANALYZE
**/
@@ -1257,6 +1263,8 @@ service IDataNodeRPCService {
* Write an audit log entry to the DataNode's AuditEventLogger
*/
common.TSStatus writeAuditLog(TAuditLogReq req);
+
+ binary readObject(TReadObjectReq req);
}
service MPPDataExchangeService {