This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch force_ci/object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/force_ci/object_type by this
push:
new 391429b6192 [To force_ci/object_type] Support insert object by sql &
add IT (#16683)
391429b6192 is described below
commit 391429b61926b44d65b05f98f3fccd0181253698
Author: Haonan <[email protected]>
AuthorDate: Thu Nov 6 19:32:36 2025 +0800
[To force_ci/object_type] Support insert object by sql & add IT (#16683)
---
.../it/query/old/IoTDBSimpleQueryTableIT.java | 63 ++++++-
.../it/session/IoTDBSessionRelationalIT.java | 183 +++++++++++++++++++++
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
iotdb-core/datanode/pom.xml | 5 -
.../dataregion/DataExecutionVisitor.java | 2 +-
.../planner/plan/node/write/InsertRowNode.java | 11 +-
.../planner/plan/node/write/InsertTabletNode.java | 2 +-
.../plan/planner/plan/node/write/ObjectNode.java | 25 ---
.../plan/node/write/RelationalInsertRowsNode.java | 45 ++++-
.../node/write/RelationalInsertTabletNode.java | 2 -
.../plan/relational/sql/util/AstUtil.java | 30 ++++
.../scheduler/FragmentInstanceDispatcherImpl.java | 2 +-
.../compaction/execute/utils/CompactionUtils.java | 9 +-
.../org/apache/iotdb/db/utils/TabletDecoder.java | 1 +
.../compaction/CompactionDeleteObjectFileTest.java | 122 --------------
15 files changed, 338 insertions(+), 165 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/old/IoTDBSimpleQueryTableIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/old/IoTDBSimpleQueryTableIT.java
index e1571b00129..d89c7a6b3e4 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/old/IoTDBSimpleQueryTableIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/old/IoTDBSimpleQueryTableIT.java
@@ -657,19 +657,26 @@ public class IoTDBSimpleQueryTableIT {
statement.execute("CREATE DATABASE test");
statement.execute("USE " + DATABASE_NAME);
statement.execute(
- "CREATE TABLE table1(device STRING TAG, s4 DATE FIELD, s5 TIMESTAMP
FIELD, s6 BLOB FIELD, s7 STRING FIELD)");
+ "CREATE TABLE table1(device STRING TAG, "
+ + "s4 DATE FIELD, s5 TIMESTAMP FIELD, s6 BLOB FIELD, s7 STRING
FIELD, s8 OBJECT FIELD)");
for (int i = 1; i <= 10; i++) {
statement.execute(
String.format(
- "insert into table1(time, device, s4, s5, s6, s7) values(%d,
'd1', '%s', %d, %s, '%s')",
- i, LocalDate.of(2024, 5, i % 31 + 1), i, "X'cafebabe'", i));
+ "insert into table1(time, device, s4, s5, s6, s7, s8) "
+ + "values(%d, 'd1', '%s', %d, %s, '%s', %s)",
+ i,
+ LocalDate.of(2024, 5, i % 31 + 1),
+ i,
+ "X'cafebabe'",
+ i,
+ "to_object(true, 0, X'cafebabe')"));
}
try (ResultSet resultSet = statement.executeQuery("select * from
table1")) {
final ResultSetMetaData metaData = resultSet.getMetaData();
final int columnCount = metaData.getColumnCount();
- assertEquals(6, columnCount);
+ assertEquals(7, columnCount);
HashMap<Integer, TSDataType> columnType = new HashMap<>();
for (int i = 3; i <= columnCount; i++) {
if (metaData.getColumnLabel(i).equals("s4")) {
@@ -680,6 +687,8 @@ public class IoTDBSimpleQueryTableIT {
columnType.put(i, TSDataType.BLOB);
} else if (metaData.getColumnLabel(i).equals("s7")) {
columnType.put(i, TSDataType.TEXT);
+ } else if (metaData.getColumnLabel(i).equals("s8")) {
+ columnType.put(i, TSDataType.OBJECT);
}
}
byte[] byteArray = new byte[] {(byte) 0xCA, (byte) 0xFE, (byte) 0xBA,
(byte) 0xBE};
@@ -689,12 +698,58 @@ public class IoTDBSimpleQueryTableIT {
long timestamp = resultSet.getLong(4);
byte[] blob = resultSet.getBytes(5);
String text = resultSet.getString(6);
+ String objectSizeString = resultSet.getString(7);
assertEquals(2024 - 1900, date.getYear());
assertEquals(5 - 1, date.getMonth());
assertEquals(time % 31 + 1, date.getDate());
assertEquals(time, timestamp);
assertArrayEquals(byteArray, blob);
assertEquals(String.valueOf(time), text);
+ assertEquals("(Object) 4 B", objectSizeString);
+ }
+ }
+ try (ResultSet resultSet = statement.executeQuery("select
read_object(s8) from table1")) {
+ final ResultSetMetaData metaData = resultSet.getMetaData();
+ final int columnCount = metaData.getColumnCount();
+ assertEquals(1, columnCount);
+ byte[] byteArray = new byte[] {(byte) 0xCA, (byte) 0xFE, (byte) 0xBA,
(byte) 0xBE};
+ while (resultSet.next()) {
+ byte[] blob = resultSet.getBytes(1);
+ assertArrayEquals(byteArray, blob);
+ }
+ }
+
+ } catch (SQLException e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void testObjectDataType() {
+ try (Connection connection =
EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
+ Statement statement = connection.createStatement()) {
+ statement.execute("CREATE DATABASE test");
+ statement.execute("USE " + DATABASE_NAME);
+ statement.execute("CREATE TABLE table1(device STRING TAG, s8 OBJECT
FIELD)");
+ statement.execute(
+ "insert into table1(time, device, s8) values(1, 'd1',
to_object(false, 0, X'cafe'))");
+ statement.execute(
+ "insert into table1(time, device, s8) values(1, 'd1',
to_object(true, 2, X'babe'))");
+
+ try (ResultSet resultSet = statement.executeQuery("select * from
table1")) {
+ while (resultSet.next()) {
+ String objectSizeString = resultSet.getString(3);
+ assertEquals("(Object) 4 B", objectSizeString);
+ }
+ }
+ try (ResultSet resultSet = statement.executeQuery("select
read_object(s8) from table1")) {
+ final ResultSetMetaData metaData = resultSet.getMetaData();
+ final int columnCount = metaData.getColumnCount();
+ assertEquals(1, columnCount);
+ byte[] byteArray = new byte[] {(byte) 0xCA, (byte) 0xFE, (byte) 0xBA,
(byte) 0xBE};
+ while (resultSet.next()) {
+ byte[] blob = resultSet.getBytes(1);
+ assertArrayEquals(byteArray, blob);
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
index 8f76adb3be8..bba5681b58b 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.itbase.category.TableClusterIT;
import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.session.TableSessionBuilder;
import org.apache.tsfile.enums.ColumnCategory;
@@ -43,6 +44,7 @@ import
org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.read.common.RowRecord;
import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
@@ -60,6 +62,8 @@ import org.junit.runner.RunWith;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
@@ -847,6 +851,183 @@ public class IoTDBSessionRelationalIT {
}
}
+ @Test
+ public void insertObjectTest()
+ throws IoTDBConnectionException, StatementExecutionException,
IOException {
+ String testObject =
+ System.getProperty("user.dir")
+ + File.separator
+ + "target"
+ + File.separator
+ + "test-classes"
+ + File.separator
+ + "ainode-example"
+ + File.separator
+ + "model.pt";
+ File object = new File(testObject);
+
+ try (ITableSession session =
EnvFactory.getEnv().getTableSessionConnection()) {
+ session.executeNonQueryStatement("USE \"db1\"");
+ // insert table data by tablet
+ List<String> columnNameList =
+ Arrays.asList("region_id", "plant_id", "device_id", "temperature",
"file");
+ List<TSDataType> dataTypeList =
+ Arrays.asList(
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.FLOAT,
+ TSDataType.OBJECT);
+ List<ColumnCategory> columnTypeList =
+ new ArrayList<>(
+ Arrays.asList(
+ ColumnCategory.TAG,
+ ColumnCategory.TAG,
+ ColumnCategory.TAG,
+ ColumnCategory.FIELD,
+ ColumnCategory.FIELD));
+ Tablet tablet = new Tablet("object_table", columnNameList, dataTypeList,
columnTypeList, 1);
+ int rowIndex = tablet.getRowSize();
+ tablet.addTimestamp(rowIndex, 1);
+ tablet.addValue(rowIndex, 0, "1");
+ tablet.addValue(rowIndex, 1, "5");
+ tablet.addValue(rowIndex, 2, "3");
+ tablet.addValue(rowIndex, 3, 37.6F);
+ tablet.addValue(rowIndex, 4, true, 0,
Files.readAllBytes(Paths.get(testObject)));
+ session.insert(tablet);
+ tablet.reset();
+
+ try (SessionDataSet dataSet =
+ session.executeQueryStatement("select file from object_table where
time = 1")) {
+ SessionDataSet.DataIterator iterator = dataSet.iterator();
+ while (iterator.next()) {
+ Assert.assertEquals(
+
BytesUtils.parseObjectByteArrayToString(BytesUtils.longToBytes(object.length())),
+ iterator.getString(1));
+ }
+ }
+
+ try (SessionDataSet dataSet =
+ session.executeQueryStatement(
+ "select READ_OBJECT(file) from object_table where time = 1")) {
+ SessionDataSet.DataIterator iterator = dataSet.iterator();
+ while (iterator.next()) {
+ Binary binary = iterator.getBlob(1);
+ Assert.assertArrayEquals(Files.readAllBytes(Paths.get(testObject)),
binary.getValues());
+ }
+ }
+ }
+ }
+
+ @Test
+ public void insertObjectSegmentsTest()
+ throws IoTDBConnectionException, StatementExecutionException,
IOException {
+ String testObject =
+ System.getProperty("user.dir")
+ + File.separator
+ + "target"
+ + File.separator
+ + "test-classes"
+ + File.separator
+ + "ainode-example"
+ + File.separator
+ + "model.pt";
+ byte[] objectBytes = Files.readAllBytes(Paths.get(testObject));
+ List<byte[]> objectSegments = new ArrayList<>();
+ for (int i = 0; i < objectBytes.length; i += 512) {
+ objectSegments.add(Arrays.copyOfRange(objectBytes, i, Math.min(i + 512,
objectBytes.length)));
+ }
+
+ try (ITableSession session =
EnvFactory.getEnv().getTableSessionConnection()) {
+ session.executeNonQueryStatement("USE \"db1\"");
+ // insert table data by tablet
+ List<String> columnNameList =
+ Arrays.asList("region_id", "plant_id", "device_id", "temperature",
"file");
+ List<TSDataType> dataTypeList =
+ Arrays.asList(
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.FLOAT,
+ TSDataType.OBJECT);
+ List<ColumnCategory> columnTypeList =
+ new ArrayList<>(
+ Arrays.asList(
+ ColumnCategory.TAG,
+ ColumnCategory.TAG,
+ ColumnCategory.TAG,
+ ColumnCategory.FIELD,
+ ColumnCategory.FIELD));
+ Tablet tablet = new Tablet("object_table", columnNameList, dataTypeList,
columnTypeList, 1);
+ for (int i = 0; i < objectSegments.size() - 1; i++) {
+ int rowIndex = tablet.getRowSize();
+ tablet.addTimestamp(rowIndex, 1);
+ tablet.addValue(rowIndex, 0, "1");
+ tablet.addValue(rowIndex, 1, "5");
+ tablet.addValue(rowIndex, 2, "3");
+ tablet.addValue(rowIndex, 3, 37.6F);
+ tablet.addValue(rowIndex, 4, false, i * 512L, objectSegments.get(i));
+ session.insert(tablet);
+ tablet.reset();
+ }
+
+ try (SessionDataSet dataSet =
+ session.executeQueryStatement("select file from object_table where
time = 1")) {
+ SessionDataSet.DataIterator iterator = dataSet.iterator();
+ while (iterator.next()) {
+ assertNull(iterator.getString(1));
+ }
+ }
+
+ // insert segment with wrong offset
+ try {
+ int rowIndex = tablet.getRowSize();
+ tablet.addTimestamp(rowIndex, 1);
+ tablet.addValue(rowIndex, 0, "1");
+ tablet.addValue(rowIndex, 1, "5");
+ tablet.addValue(rowIndex, 2, "3");
+ tablet.addValue(rowIndex, 3, 37.6F);
+ tablet.addValue(rowIndex, 4, false, 512L, objectSegments.get(1));
+ session.insert(tablet);
+ } catch (StatementExecutionException e) {
+ Assert.assertEquals(TSStatusCode.OBJECT_INSERT_ERROR.getStatusCode(),
e.getStatusCode());
+ Assert.assertEquals(
+ String.format(
+ "741: The file length %d is not equal to the offset %d",
+ ((objectSegments.size() - 1) * 512), 512L),
+ e.getMessage());
+ } finally {
+ tablet.reset();
+ }
+
+ // last segment
+ int rowIndex = tablet.getRowSize();
+ tablet.addTimestamp(rowIndex, 1);
+ tablet.addValue(rowIndex, 0, "1");
+ tablet.addValue(rowIndex, 1, "5");
+ tablet.addValue(rowIndex, 2, "3");
+ tablet.addValue(rowIndex, 3, 37.6F);
+ tablet.addValue(
+ rowIndex,
+ 4,
+ true,
+ (objectSegments.size() - 1) * 512L,
+ objectSegments.get(objectSegments.size() - 1));
+ session.insert(tablet);
+ tablet.reset();
+
+ try (SessionDataSet dataSet =
+ session.executeQueryStatement(
+ "select READ_OBJECT(file) from object_table where time = 1")) {
+ SessionDataSet.DataIterator iterator = dataSet.iterator();
+ while (iterator.next()) {
+ Binary binary = iterator.getBlob(1);
+ Assert.assertArrayEquals(Files.readAllBytes(Paths.get(testObject)),
binary.getValues());
+ }
+ }
+ }
+ }
+
@Test
public void autoCreateNontagColumnTest()
throws IoTDBConnectionException, StatementExecutionException {
@@ -1628,6 +1809,7 @@ public class IoTDBSessionRelationalIT {
int testNum = 14;
Set<TSDataType> dataTypes = new HashSet<>();
Collections.addAll(dataTypes, TSDataType.values());
+ dataTypes.remove(TSDataType.OBJECT);
dataTypes.remove(TSDataType.VECTOR);
dataTypes.remove(TSDataType.UNKNOWN);
@@ -1718,6 +1900,7 @@ public class IoTDBSessionRelationalIT {
int testNum = 17;
Set<TSDataType> dataTypes = new HashSet<>();
Collections.addAll(dataTypes, TSDataType.values());
+ dataTypes.remove(TSDataType.OBJECT);
dataTypes.remove(TSDataType.VECTOR);
dataTypes.remove(TSDataType.UNKNOWN);
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index c9ab41e9090..84534adfadb 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -148,6 +148,7 @@ public enum TSStatusCode {
// OBJECT
OBJECT_NOT_EXISTS(740),
+ OBJECT_INSERT_ERROR(741),
// Arithmetic
NUMERIC_VALUE_OUT_OF_RANGE(750),
diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml
index e8409508a03..626d8265eed 100644
--- a/iotdb-core/datanode/pom.xml
+++ b/iotdb-core/datanode/pom.xml
@@ -371,11 +371,6 @@
<version>1.3.0</version>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.gdal</groupId>
- <artifactId>gdal</artifactId>
- <version>3.11.0</version>
- </dependency>
</dependencies>
<build>
<plugins>
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
index 643f62830c2..0ae796f3913 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
@@ -302,7 +302,7 @@ public class DataExecutionVisitor extends
PlanVisitor<TSStatus, DataRegion> {
return StatusUtils.OK;
} catch (final Exception e) {
LOGGER.error("Error in executing plan node: {}", node, e);
- return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ return
RpcUtils.getStatus(TSStatusCode.OBJECT_INSERT_ERROR.getStatusCode(),
e.getMessage());
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
index 04bef0b577c..760d36f1f5c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
@@ -367,6 +367,7 @@ public class InsertRowNode extends InsertNode implements
WALEntryValue {
case TEXT:
case STRING:
case BLOB:
+ case OBJECT:
ReadWriteIOUtils.write((Binary) values[i], buffer);
break;
default:
@@ -426,6 +427,7 @@ public class InsertRowNode extends InsertNode implements
WALEntryValue {
case TEXT:
case STRING:
case BLOB:
+ case OBJECT:
ReadWriteIOUtils.write((Binary) values[i], stream);
break;
default:
@@ -520,6 +522,7 @@ public class InsertRowNode extends InsertNode implements
WALEntryValue {
case TEXT:
case STRING:
case BLOB:
+ case OBJECT:
values[i] = ReadWriteIOUtils.readBinary(buffer);
break;
default:
@@ -589,6 +592,7 @@ public class InsertRowNode extends InsertNode implements
WALEntryValue {
case TEXT:
case STRING:
case BLOB:
+ case OBJECT:
size += ReadWriteIOUtils.sizeToWrite((Binary) values[i]);
break;
default:
@@ -668,6 +672,7 @@ public class InsertRowNode extends InsertNode implements
WALEntryValue {
case TEXT:
case BLOB:
case STRING:
+ case OBJECT:
WALWriteUtils.write((Binary) values[i], buffer);
break;
default:
@@ -759,6 +764,7 @@ public class InsertRowNode extends InsertNode implements
WALEntryValue {
case TEXT:
case STRING:
case BLOB:
+ case OBJECT:
values[i] = ReadWriteIOUtils.readBinary(stream);
break;
default:
@@ -849,6 +855,7 @@ public class InsertRowNode extends InsertNode implements
WALEntryValue {
case TEXT:
case STRING:
case BLOB:
+ case OBJECT:
values[i] = ReadWriteIOUtils.readBinary(buffer);
break;
default:
@@ -889,7 +896,9 @@ public class InsertRowNode extends InsertNode implements
WALEntryValue {
}
public TimeValuePair composeTimeValuePair(int columnIndex) {
- if (columnIndex >= values.length ||
Objects.isNull(dataTypes[columnIndex])) {
+ if (columnIndex >= values.length
+ || Objects.isNull(dataTypes[columnIndex])
+ || dataTypes[columnIndex] == TSDataType.OBJECT) {
return null;
}
Object value = values[columnIndex];
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index 52386b7e077..3d3c25c0613 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -106,7 +106,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
}
shouldCheckTTL = true;
for (MeasurementSchema measurementSchema : measurementSchemas) {
- if (measurementSchema.getType() == TSDataType.OBJECT) {
+ if (measurementSchema != null && measurementSchema.getType() ==
TSDataType.OBJECT) {
shouldCheckTTL = false;
break;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
index ebe839bbc0c..10fb9bc3443 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
@@ -68,10 +68,6 @@ public class ObjectNode extends SearchNode implements
WALEntryValue {
private boolean isGeneratedByRemoteConsensusLeader;
- private Long time = null;
-
- private String table = null;
-
public ObjectNode(boolean isEOF, long offset, byte[] content, String
filePath) {
super(new PlanNodeId(""));
this.isEOF = isEOF;
@@ -89,27 +85,6 @@ public class ObjectNode extends SearchNode implements
WALEntryValue {
this.contentLength = contentLength;
}
- public long getTimestamp() {
- calculateTimeAndTableName();
- return time;
- }
-
- public String getTable() {
- calculateTimeAndTableName();
- return table;
- }
-
- private void calculateTimeAndTableName() {
- if (time != null && table != null) {
- return;
- }
- File file = new File(filePath);
- String fileName = new File(filePath).getName();
- String timeStr = fileName.substring(0, fileName.length() -
".bin".length());
- time = Long.parseLong(timeStr);
- table =
file.getParentFile().getParentFile().getParentFile().getParentFile().getName();
- }
-
public boolean isEOF() {
return isEOF;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
index 77020d9220d..83f6bbec63e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
@@ -27,13 +27,19 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
+import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.IDeviceID.Factory;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -159,6 +165,7 @@ public class RelationalInsertRowsNode extends
InsertRowsNode {
@Override
public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+ List<WritePlanNode> writePlanNodeList = new ArrayList<>();
Map<TRegionReplicaSet, RelationalInsertRowsNode> splitMap = new
HashMap<>();
List<TEndPoint> redirectInfo = new ArrayList<>();
for (int i = 0; i < getInsertRowNodeList().size(); i++) {
@@ -172,6 +179,9 @@ public class RelationalInsertRowsNode extends
InsertRowsNode {
insertRowNode.getDeviceID(),
TimePartitionUtils.getTimePartitionSlot(insertRowNode.getTime()),
analysis.getDatabaseName());
+ // handle object type
+ handleObjectValue(insertRowNode, dataRegionReplicaSet,
writePlanNodeList);
+
// Collect redirectInfo
redirectInfo.add(dataRegionReplicaSet.getDataNodeLocations().get(0).getClientRpcEndPoint());
RelationalInsertRowsNode tmpNode = splitMap.get(dataRegionReplicaSet);
@@ -185,8 +195,41 @@ public class RelationalInsertRowsNode extends
InsertRowsNode {
}
}
analysis.setRedirectNodeList(redirectInfo);
+ writePlanNodeList.addAll(splitMap.values());
+
+ return writePlanNodeList;
+ }
- return new ArrayList<>(splitMap.values());
+ private void handleObjectValue(
+ InsertRowNode insertRowNode,
+ TRegionReplicaSet dataRegionReplicaSet,
+ List<WritePlanNode> writePlanNodeList) {
+ for (int j = 0; j < insertRowNode.getDataTypes().length; j++) {
+ if (insertRowNode.getDataTypes()[j] == TSDataType.OBJECT) {
+ Object[] values = insertRowNode.getValues();
+ byte[] binary = ((Binary) values[j]).getValues();
+ ByteBuffer buffer = ByteBuffer.wrap(binary);
+ boolean isEoF = buffer.get() == 1;
+ long offset = buffer.getLong();
+ byte[] content = ReadWriteIOUtils.readBytes(buffer,
buffer.remaining());
+ String relativePath =
+ TsFileNameGenerator.generateObjectFilePath(
+ dataRegionReplicaSet.getRegionId().getId(),
+ insertRowNode.getTime(),
+ insertRowNode.getDeviceID(),
+ insertRowNode.getMeasurements()[j]);
+ ObjectNode objectNode = new ObjectNode(isEoF, offset, content,
relativePath);
+ objectNode.setDataRegionReplicaSet(dataRegionReplicaSet);
+ byte[] filePathBytes = relativePath.getBytes(StandardCharsets.UTF_8);
+ byte[] valueBytes = new byte[filePathBytes.length + Long.BYTES];
+ System.arraycopy(
+ BytesUtils.longToBytes(offset + content.length), 0, valueBytes, 0,
Long.BYTES);
+ System.arraycopy(filePathBytes, 0, valueBytes, Long.BYTES,
filePathBytes.length);
+ ((Binary) values[j]).setValues(valueBytes);
+ insertRowNode.setValues(values);
+ writePlanNodeList.add(objectNode);
+ }
+ }
}
public RelationalInsertRowsNode emptyClone() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
index ee93712e77d..e3a114211e1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
@@ -64,8 +64,6 @@ public class RelationalInsertTabletNode extends
InsertTabletNode {
private boolean singleDevice;
- private Object[] convertedColumns;
-
public RelationalInsertTabletNode(
PlanNodeId id,
PartialPath devicePath,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/AstUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/AstUtil.java
index cb17a90c04f..700d5e40beb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/AstUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/AstUtil.java
@@ -21,16 +21,22 @@ package
org.apache.iotdb.db.queryengine.plan.relational.sql.util;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BinaryLiteral;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FunctionCall;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Identifier;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Literal;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LongLiteral;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Node;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NullLiteral;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.StringLiteral;
+import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.TableExpressionType;
import com.google.common.graph.SuccessorsFunction;
import com.google.common.graph.Traverser;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
import java.time.ZoneId;
import java.util.List;
@@ -113,6 +119,30 @@ public final class AstUtil {
throw new SemanticException(
String.format("Cannot insert identifier %s, please use string
literal", expression));
}
+ if (expression instanceof FunctionCall
+ && "to_object".equals(((FunctionCall)
expression).getName().toString())) {
+ List<Expression> arguments = ((FunctionCall) expression).getArguments();
+ if (arguments.size() == 3
+ && arguments.get(0).getExpressionType() ==
TableExpressionType.BOOLEAN_LITERAL
+ && arguments.get(1).getExpressionType() ==
TableExpressionType.LONG_LITERAL
+ && arguments.get(2).getExpressionType() ==
TableExpressionType.BINARY_LITERAL) {
+ boolean isEOF =
+ (boolean)
+ ((BooleanLiteral) ((FunctionCall)
expression).getArguments().get(0)).getTsValue();
+ long offset =
+ (long) ((LongLiteral) ((FunctionCall)
expression).getArguments().get(1)).getTsValue();
+ byte[] content =
+ ((Binary)
+ ((BinaryLiteral) ((FunctionCall)
expression).getArguments().get(2))
+ .getTsValue())
+ .getValues();
+ byte[] val = new byte[content.length + 9];
+ val[0] = (byte) (isEOF ? 1 : 0);
+ System.arraycopy(BytesUtils.longToBytes(offset), 0, val, 1, 8);
+ System.arraycopy(content, 0, val, 9, content.length);
+ return new Binary(val);
+ }
+ }
throw new SemanticException("Unsupported expression: " + expression);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 7e61bea8a4a..18ab1cdc275 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -328,7 +328,7 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
if (dispatchFailures.isEmpty()) {
return immediateFuture(new FragInstanceDispatchResult(true));
}
- if (instances.size() == 1) {
+ if (instances.size() == 1 || dispatchFailures.size() == 1) {
return immediateFuture(new
FragInstanceDispatchResult(dispatchFailures.get(0)));
} else {
List<TSStatus> failureStatusList = new ArrayList<>();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
index 1668355db31..b0575837828 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
@@ -512,14 +512,19 @@ public class CompactionUtils {
}
}
- public static void removeDeletedObjectFiles(TsFileResource resource)
- throws IOException, IllegalPathException {
+ public static void removeDeletedObjectFiles(TsFileResource resource) {
+ // check for compaction recovery
+ if (!resource.tsFileExists()) {
+ return;
+ }
try (MultiTsFileDeviceIterator deviceIterator =
new MultiTsFileDeviceIterator(Collections.singletonList(resource))) {
while (deviceIterator.hasNextDevice()) {
deviceIterator.nextDevice();
deviceIterator.getReaderAndChunkMetadataForCurrentAlignedSeries();
}
+ } catch (Exception e) {
+ logger.warn("Failed to remove object files from file {}",
resource.getTsFilePath(), e);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TabletDecoder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TabletDecoder.java
index 335d7a23b42..bd1eba9cfa8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TabletDecoder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TabletDecoder.java
@@ -192,6 +192,7 @@ public class TabletDecoder {
case STRING:
case BLOB:
case TEXT:
+ case OBJECT:
Binary[] binaryCol = new Binary[rowSize];
if (encoding == TSEncoding.PLAIN) {
// PlainEncoder uses var int, which may cause compatibility problem
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDeleteObjectFileTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDeleteObjectFileTest.java
deleted file mode 100644
index a4b35c01da6..00000000000
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDeleteObjectFileTest.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.storageengine.dataregion.compaction;
-
-import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.commons.schema.table.TsTable;
-import org.apache.iotdb.commons.schema.table.column.FieldColumnSchema;
-import org.apache.iotdb.commons.schema.table.column.TagColumnSchema;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.SettleCompactionTask;
-import
org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate;
-import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate;
-import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
-import
org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-
-import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.exception.write.WriteProcessException;
-import org.apache.tsfile.file.metadata.StringArrayDeviceID;
-import org.apache.tsfile.file.metadata.enums.CompressionType;
-import org.apache.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.tsfile.read.common.TimeRange;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-public class CompactionDeleteObjectFileTest extends AbstractCompactionTest {
- @Before
- public void setUp()
- throws IOException, WriteProcessException, MetadataException,
InterruptedException {
- super.setUp();
- }
-
- @After
- public void tearDown() throws IOException, StorageEngineException {
- super.tearDown();
- }
-
- @Test
- public void test1() throws IOException {
- createTable("tsfile_table", 100);
- File dir = new File("/Users/shuww/Downloads/0708/1_副本");
- List<TsFileResource> resources = new ArrayList<>();
- for (File file : dir.listFiles()) {
- if (!file.getName().endsWith(".tsfile")) {
- continue;
- }
- TsFileResource resource = new TsFileResource(file);
-
- try (ModificationFile modificationFile = resource.getExclusiveModFile())
{
- modificationFile.write(
- new TableDeletionEntry(
- new DeletionPredicate(
- "tsfile_table",
- new IDPredicate.FullExactMatch(
- new StringArrayDeviceID(new String[] {"tsfile_table",
"1", "5", "3"})),
- Arrays.asList("file")),
- new TimeRange(-1, 0)));
- modificationFile.write(
- new TableDeletionEntry(
- new DeletionPredicate(
- "tsfile_table",
- new IDPredicate.FullExactMatch(
- new StringArrayDeviceID(new String[] {"tsfile_table",
"1", "5", "3"})),
- Arrays.asList("file")),
- new TimeRange(2, 2)));
- }
- resource.deserialize();
- resources.add(resource);
- }
-
- // InnerSpaceCompactionTask task =
- // new InnerSpaceCompactionTask(
- // 0, tsFileManager, resources, true, new
ReadChunkCompactionPerformer(), 0);
- SettleCompactionTask task =
- new SettleCompactionTask(
- 0,
- tsFileManager,
- resources,
- Collections.emptyList(),
- true,
- new FastCompactionPerformer(false),
- 0);
- task.start();
- }
-
- public void createTable(String tableName, long ttl) {
- TsTable tsTable = new TsTable(tableName);
- tsTable.addColumnSchema(new TagColumnSchema("id_column",
TSDataType.STRING));
- tsTable.addColumnSchema(
- new FieldColumnSchema("s1", TSDataType.STRING, TSEncoding.PLAIN,
CompressionType.LZ4));
- tsTable.addProp(TsTable.TTL_PROPERTY, ttl + "");
- DataNodeTableCache.getInstance().preUpdateTable("Downloads", tsTable,
null);
- DataNodeTableCache.getInstance().commitUpdateTable("Downloads", tableName,
null);
- }
-}