This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch force_ci/object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9dd78387fd4fef499e3269339aaf5e61cde57ed1 Author: Haonan <[email protected]> AuthorDate: Thu Nov 6 19:32:36 2025 +0800 [To force_ci/object_type] Support insert object by sql & add IT (#16683) --- .../it/query/old/IoTDBSimpleQueryTableIT.java | 63 ++++++- .../it/session/IoTDBSessionRelationalIT.java | 182 ++++++++++++++++++++- .../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 + iotdb-core/datanode/pom.xml | 5 - .../dataregion/DataExecutionVisitor.java | 2 +- .../planner/plan/node/write/InsertRowNode.java | 11 +- .../planner/plan/node/write/InsertTabletNode.java | 2 +- .../plan/planner/plan/node/write/ObjectNode.java | 25 --- .../plan/node/write/RelationalInsertRowsNode.java | 45 ++++- .../node/write/RelationalInsertTabletNode.java | 2 - .../plan/relational/sql/util/AstUtil.java | 30 ++++ .../scheduler/FragmentInstanceDispatcherImpl.java | 2 +- .../compaction/execute/utils/CompactionUtils.java | 9 +- .../org/apache/iotdb/db/utils/TabletDecoder.java | 1 + .../compaction/CompactionDeleteObjectFileTest.java | 122 -------------- 15 files changed, 336 insertions(+), 166 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/old/IoTDBSimpleQueryTableIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/old/IoTDBSimpleQueryTableIT.java index 57a187950e2..b97dc4915a1 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/old/IoTDBSimpleQueryTableIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/old/IoTDBSimpleQueryTableIT.java @@ -657,19 +657,26 @@ public class IoTDBSimpleQueryTableIT { statement.execute("CREATE DATABASE test"); statement.execute("USE " + DATABASE_NAME); statement.execute( - "CREATE TABLE table1(device STRING TAG, s4 DATE FIELD, s5 TIMESTAMP FIELD, s6 BLOB FIELD, s7 STRING FIELD)"); + "CREATE TABLE table1(device STRING TAG, " + + "s4 DATE FIELD, s5 TIMESTAMP FIELD, s6 BLOB FIELD, s7 STRING FIELD, s8 OBJECT FIELD)"); for (int i = 1; i <= 10; i++) { statement.execute( String.format( - "insert into table1(time, device, s4, s5, s6, s7) values(%d, 'd1', '%s', %d, %s, '%s')", - i, LocalDate.of(2024, 5, i % 31 + 1), i, "X'cafebabe'", i)); + "insert into table1(time, device, s4, s5, s6, s7, s8) " + + "values(%d, 'd1', '%s', %d, %s, '%s', %s)", + i, + LocalDate.of(2024, 5, i % 31 + 1), + i, + "X'cafebabe'", + i, + "to_object(true, 0, X'cafebabe')")); } try (ResultSet resultSet = statement.executeQuery("select * from table1")) { final ResultSetMetaData metaData = resultSet.getMetaData(); final int columnCount = metaData.getColumnCount(); - assertEquals(6, columnCount); + assertEquals(7, columnCount); HashMap<Integer, TSDataType> columnType = new HashMap<>(); for (int i = 3; i <= columnCount; i++) { if (metaData.getColumnLabel(i).equals("s4")) { @@ -680,6 +687,8 @@ public class IoTDBSimpleQueryTableIT { columnType.put(i, TSDataType.BLOB); } else if (metaData.getColumnLabel(i).equals("s7")) { columnType.put(i, TSDataType.TEXT); + } else if (metaData.getColumnLabel(i).equals("s8")) { + columnType.put(i, TSDataType.OBJECT); } } byte[] byteArray = new byte[] {(byte) 0xCA, (byte) 0xFE, (byte) 0xBA, (byte) 0xBE}; @@ -689,12 +698,58 @@ public class IoTDBSimpleQueryTableIT { long timestamp = resultSet.getLong(4); byte[] blob = resultSet.getBytes(5); String text = resultSet.getString(6); + String objectSizeString = resultSet.getString(7); assertEquals(2024 - 1900, date.getYear()); assertEquals(5 - 1, date.getMonth()); assertEquals(time % 31 + 1, date.getDate()); assertEquals(time, timestamp); assertArrayEquals(byteArray, blob); assertEquals(String.valueOf(time), text); + assertEquals("(Object) 4 B", objectSizeString); + } + } + try (ResultSet resultSet = statement.executeQuery("select read_object(s8) from table1")) { + final ResultSetMetaData metaData = resultSet.getMetaData(); + final int columnCount = metaData.getColumnCount(); + assertEquals(1, columnCount); + byte[] byteArray = new byte[] {(byte) 0xCA, (byte) 0xFE, (byte) 0xBA, (byte) 0xBE}; + while (resultSet.next()) { + byte[] blob = resultSet.getBytes(1); + assertArrayEquals(byteArray, blob); + } + } + + } catch (SQLException e) { + fail(); + } + } + + @Test + public void testObjectDataType() { + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + statement.execute("CREATE DATABASE test"); + statement.execute("USE " + DATABASE_NAME); + statement.execute("CREATE TABLE table1(device STRING TAG, s8 OBJECT FIELD)"); + statement.execute( + "insert into table1(time, device, s8) values(1, 'd1', to_object(false, 0, X'cafe'))"); + statement.execute( + "insert into table1(time, device, s8) values(1, 'd1', to_object(true, 2, X'babe'))"); + + try (ResultSet resultSet = statement.executeQuery("select * from table1")) { + while (resultSet.next()) { + String objectSizeString = resultSet.getString(3); + assertEquals("(Object) 4 B", objectSizeString); + } + } + try (ResultSet resultSet = statement.executeQuery("select read_object(s8) from table1")) { + final ResultSetMetaData metaData = resultSet.getMetaData(); + final int columnCount = metaData.getColumnCount(); + assertEquals(1, columnCount); + byte[] byteArray = new byte[] {(byte) 0xCA, (byte) 0xFE, (byte) 0xBA, (byte) 0xBE}; + while (resultSet.next()) { + byte[] blob = resultSet.getBytes(1); + assertArrayEquals(byteArray, blob); } } diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java index fa24f595a80..7f63d190302 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java @@ -36,6 +36,7 @@ import org.apache.iotdb.itbase.category.TableClusterIT; import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.session.TableSessionBuilder; import org.apache.tsfile.enums.ColumnCategory; @@ -44,6 +45,7 @@ import org.apache.tsfile.exception.write.WriteProcessException; import org.apache.tsfile.file.metadata.TableSchema; import org.apache.tsfile.read.common.RowRecord; import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BytesUtils; import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; @@ -61,6 +63,8 @@ import org.junit.runner.RunWith; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; import java.time.LocalDate; import java.util.ArrayList; import java.util.Arrays; @@ -847,6 +851,183 @@ public class IoTDBSessionRelationalIT { } } + @Test + public void insertObjectTest() + throws IoTDBConnectionException, StatementExecutionException, IOException { + String testObject = + System.getProperty("user.dir") + + File.separator + + "target" + + File.separator + + "test-classes" + + File.separator + + "ainode-example" + + File.separator + + "model.pt"; + File object = new File(testObject); + + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { + session.executeNonQueryStatement("USE \"db1\""); + // insert table data by tablet + List<String> columnNameList = + Arrays.asList("region_id", "plant_id", "device_id", "temperature", "file"); + List<TSDataType> dataTypeList = + Arrays.asList( + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.FLOAT, + TSDataType.OBJECT); + List<ColumnCategory> columnTypeList = + new ArrayList<>( + Arrays.asList( + ColumnCategory.TAG, + ColumnCategory.TAG, + ColumnCategory.TAG, + ColumnCategory.FIELD, + ColumnCategory.FIELD)); + Tablet tablet = new Tablet("object_table", columnNameList, dataTypeList, columnTypeList, 1); + int rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, 1); + tablet.addValue(rowIndex, 0, "1"); + tablet.addValue(rowIndex, 1, "5"); + tablet.addValue(rowIndex, 2, "3"); + tablet.addValue(rowIndex, 3, 37.6F); + tablet.addValue(rowIndex, 4, true, 0, Files.readAllBytes(Paths.get(testObject))); + session.insert(tablet); + tablet.reset(); + + try (SessionDataSet dataSet = + session.executeQueryStatement("select file from object_table where time = 1")) { + SessionDataSet.DataIterator iterator = dataSet.iterator(); + while (iterator.next()) { + Assert.assertEquals( + BytesUtils.parseObjectByteArrayToString(BytesUtils.longToBytes(object.length())), + iterator.getString(1)); + } + } + + try (SessionDataSet dataSet = + session.executeQueryStatement( + "select READ_OBJECT(file) from object_table where time = 1")) { + SessionDataSet.DataIterator iterator = dataSet.iterator(); + while (iterator.next()) { + Binary binary = iterator.getBlob(1); + Assert.assertArrayEquals(Files.readAllBytes(Paths.get(testObject)), binary.getValues()); + } + } + } + } + + @Test + public void insertObjectSegmentsTest() + throws IoTDBConnectionException, StatementExecutionException, IOException { + String testObject = + System.getProperty("user.dir") + + File.separator + + "target" + + File.separator + + "test-classes" + + File.separator + + "ainode-example" + + File.separator + + "model.pt"; + byte[] objectBytes = Files.readAllBytes(Paths.get(testObject)); + List<byte[]> objectSegments = new ArrayList<>(); + for (int i = 0; i < objectBytes.length; i += 512) { + objectSegments.add(Arrays.copyOfRange(objectBytes, i, Math.min(i + 512, objectBytes.length))); + } + + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { + session.executeNonQueryStatement("USE \"db1\""); + // insert table data by tablet + List<String> columnNameList = + Arrays.asList("region_id", "plant_id", "device_id", "temperature", "file"); + List<TSDataType> dataTypeList = + Arrays.asList( + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.FLOAT, + TSDataType.OBJECT); + List<ColumnCategory> columnTypeList = + new ArrayList<>( + Arrays.asList( + ColumnCategory.TAG, + ColumnCategory.TAG, + ColumnCategory.TAG, + ColumnCategory.FIELD, + ColumnCategory.FIELD)); + Tablet tablet = new Tablet("object_table", columnNameList, dataTypeList, columnTypeList, 1); + for (int i = 0; i < objectSegments.size() - 1; i++) { + int rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, 1); + tablet.addValue(rowIndex, 0, "1"); + tablet.addValue(rowIndex, 1, "5"); + tablet.addValue(rowIndex, 2, "3"); + tablet.addValue(rowIndex, 3, 37.6F); + tablet.addValue(rowIndex, 4, false, i * 512L, objectSegments.get(i)); + session.insert(tablet); + tablet.reset(); + } + + try (SessionDataSet dataSet = + session.executeQueryStatement("select file from object_table where time = 1")) { + SessionDataSet.DataIterator iterator = dataSet.iterator(); + while (iterator.next()) { + assertNull(iterator.getString(1)); + } + } + + // insert segment with wrong offset + try { + int rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, 1); + tablet.addValue(rowIndex, 0, "1"); + tablet.addValue(rowIndex, 1, "5"); + tablet.addValue(rowIndex, 2, "3"); + tablet.addValue(rowIndex, 3, 37.6F); + tablet.addValue(rowIndex, 4, false, 512L, objectSegments.get(1)); + session.insert(tablet); + } catch (StatementExecutionException e) { + Assert.assertEquals(TSStatusCode.OBJECT_INSERT_ERROR.getStatusCode(), e.getStatusCode()); + Assert.assertEquals( + String.format( + "741: The file length %d is not equal to the offset %d", + ((objectSegments.size() - 1) * 512), 512L), + e.getMessage()); + } finally { + tablet.reset(); + } + + // last segment + int rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, 1); + tablet.addValue(rowIndex, 0, "1"); + tablet.addValue(rowIndex, 1, "5"); + tablet.addValue(rowIndex, 2, "3"); + tablet.addValue(rowIndex, 3, 37.6F); + tablet.addValue( + rowIndex, + 4, + true, + (objectSegments.size() - 1) * 512L, + objectSegments.get(objectSegments.size() - 1)); + session.insert(tablet); + tablet.reset(); + + try (SessionDataSet dataSet = + session.executeQueryStatement( + "select READ_OBJECT(file) from object_table where time = 1")) { + SessionDataSet.DataIterator iterator = dataSet.iterator(); + while (iterator.next()) { + Binary binary = iterator.getBlob(1); + Assert.assertArrayEquals(Files.readAllBytes(Paths.get(testObject)), binary.getValues()); + } + } + } + } + @Test public void autoCreateNontagColumnTest() throws IoTDBConnectionException, StatementExecutionException { @@ -1627,7 +1808,6 @@ public class IoTDBSessionRelationalIT { throws IoTDBConnectionException, StatementExecutionException { int testNum = 14; Set<TSDataType> dataTypes = TSDataTypeTestUtils.getSupportedTypes(); - try { for (TSDataType from : dataTypes) { for (TSDataType to : dataTypes) { diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index ddf178889b4..3b6a08d6475 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -147,6 +147,7 @@ public enum TSStatusCode { // OBJECT OBJECT_NOT_EXISTS(740), + OBJECT_INSERT_ERROR(741), // Arithmetic NUMERIC_VALUE_OUT_OF_RANGE(750), diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml index c660b3aa853..80bebe90f77 100644 --- a/iotdb-core/datanode/pom.xml +++ b/iotdb-core/datanode/pom.xml @@ -371,11 +371,6 @@ <version>1.3.0</version> <scope>test</scope> </dependency> - <dependency> - <groupId>org.gdal</groupId> - <artifactId>gdal</artifactId> - <version>3.11.0</version> - </dependency> </dependencies> <build> <plugins> diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java index 643f62830c2..0ae796f3913 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java @@ -302,7 +302,7 @@ public class DataExecutionVisitor extends PlanVisitor<TSStatus, DataRegion> { return StatusUtils.OK; } catch (final Exception e) { LOGGER.error("Error in executing plan node: {}", node, e); - return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); + return RpcUtils.getStatus(TSStatusCode.OBJECT_INSERT_ERROR.getStatusCode(), e.getMessage()); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java index 04bef0b577c..760d36f1f5c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java @@ -367,6 +367,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue { case TEXT: case STRING: case BLOB: + case OBJECT: ReadWriteIOUtils.write((Binary) values[i], buffer); break; default: @@ -426,6 +427,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue { case TEXT: case STRING: case BLOB: + case OBJECT: ReadWriteIOUtils.write((Binary) values[i], stream); break; default: @@ -520,6 +522,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue { case TEXT: case STRING: case BLOB: + case OBJECT: values[i] = ReadWriteIOUtils.readBinary(buffer); break; default: @@ -589,6 +592,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue { case TEXT: case STRING: case BLOB: + case OBJECT: size += ReadWriteIOUtils.sizeToWrite((Binary) values[i]); break; default: @@ -668,6 +672,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue { case TEXT: case BLOB: case STRING: + case OBJECT: WALWriteUtils.write((Binary) values[i], buffer); break; default: @@ -759,6 +764,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue { case TEXT: case STRING: case BLOB: + case OBJECT: values[i] = ReadWriteIOUtils.readBinary(stream); break; default: @@ -849,6 +855,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue { case TEXT: case STRING: case BLOB: + case OBJECT: values[i] = ReadWriteIOUtils.readBinary(buffer); break; default: @@ -889,7 +896,9 @@ public class InsertRowNode extends InsertNode implements WALEntryValue { } public TimeValuePair composeTimeValuePair(int columnIndex) { - if (columnIndex >= values.length || Objects.isNull(dataTypes[columnIndex])) { + if (columnIndex >= values.length + || Objects.isNull(dataTypes[columnIndex]) + || dataTypes[columnIndex] == TSDataType.OBJECT) { return null; } Object value = values[columnIndex]; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java index 52386b7e077..3d3c25c0613 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java @@ -106,7 +106,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { } shouldCheckTTL = true; for (MeasurementSchema measurementSchema : measurementSchemas) { - if (measurementSchema.getType() == TSDataType.OBJECT) { + if (measurementSchema != null && measurementSchema.getType() == TSDataType.OBJECT) { shouldCheckTTL = false; break; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java index ebe839bbc0c..10fb9bc3443 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java @@ -68,10 +68,6 @@ public class ObjectNode extends SearchNode implements WALEntryValue { private boolean isGeneratedByRemoteConsensusLeader; - private Long time = null; - - private String table = null; - public ObjectNode(boolean isEOF, long offset, byte[] content, String filePath) { super(new PlanNodeId("")); this.isEOF = isEOF; @@ -89,27 +85,6 @@ public class ObjectNode extends SearchNode implements WALEntryValue { this.contentLength = contentLength; } - public long getTimestamp() { - calculateTimeAndTableName(); - return time; - } - - public String getTable() { - calculateTimeAndTableName(); - return table; - } - - private void calculateTimeAndTableName() { - if (time != null && table != null) { - return; - } - File file = new File(filePath); - String fileName = new File(filePath).getName(); - String timeStr = fileName.substring(0, fileName.length() - ".bin".length()); - time = Long.parseLong(timeStr); - table = file.getParentFile().getParentFile().getParentFile().getParentFile().getName(); - } - public boolean isEOF() { return isEOF; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java index 77020d9220d..83f6bbec63e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java @@ -27,13 +27,19 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.IDeviceID.Factory; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BytesUtils; +import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -159,6 +165,7 @@ public class RelationalInsertRowsNode extends InsertRowsNode { @Override public List<WritePlanNode> splitByPartition(IAnalysis analysis) { + List<WritePlanNode> writePlanNodeList = new ArrayList<>(); Map<TRegionReplicaSet, RelationalInsertRowsNode> splitMap = new HashMap<>(); List<TEndPoint> redirectInfo = new ArrayList<>(); for (int i = 0; i < getInsertRowNodeList().size(); i++) { @@ -172,6 +179,9 @@ public class RelationalInsertRowsNode extends InsertRowsNode { insertRowNode.getDeviceID(), TimePartitionUtils.getTimePartitionSlot(insertRowNode.getTime()), analysis.getDatabaseName()); + // handle object type + handleObjectValue(insertRowNode, dataRegionReplicaSet, writePlanNodeList); + // Collect redirectInfo redirectInfo.add(dataRegionReplicaSet.getDataNodeLocations().get(0).getClientRpcEndPoint()); RelationalInsertRowsNode tmpNode = splitMap.get(dataRegionReplicaSet); @@ -185,8 +195,41 @@ public class RelationalInsertRowsNode extends InsertRowsNode { } } analysis.setRedirectNodeList(redirectInfo); + writePlanNodeList.addAll(splitMap.values()); + + return writePlanNodeList; + } - return new ArrayList<>(splitMap.values()); + private void handleObjectValue( + InsertRowNode insertRowNode, + TRegionReplicaSet dataRegionReplicaSet, + List<WritePlanNode> writePlanNodeList) { + for (int j = 0; j < insertRowNode.getDataTypes().length; j++) { + if (insertRowNode.getDataTypes()[j] == TSDataType.OBJECT) { + Object[] values = insertRowNode.getValues(); + byte[] binary = ((Binary) values[j]).getValues(); + ByteBuffer buffer = ByteBuffer.wrap(binary); + boolean isEoF = buffer.get() == 1; + long offset = buffer.getLong(); + byte[] content = ReadWriteIOUtils.readBytes(buffer, buffer.remaining()); + String relativePath = + TsFileNameGenerator.generateObjectFilePath( + dataRegionReplicaSet.getRegionId().getId(), + insertRowNode.getTime(), + insertRowNode.getDeviceID(), + insertRowNode.getMeasurements()[j]); + ObjectNode objectNode = new ObjectNode(isEoF, offset, content, relativePath); + objectNode.setDataRegionReplicaSet(dataRegionReplicaSet); + byte[] filePathBytes = relativePath.getBytes(StandardCharsets.UTF_8); + byte[] valueBytes = new byte[filePathBytes.length + Long.BYTES]; + System.arraycopy( + BytesUtils.longToBytes(offset + content.length), 0, valueBytes, 0, Long.BYTES); + System.arraycopy(filePathBytes, 0, valueBytes, Long.BYTES, filePathBytes.length); + ((Binary) values[j]).setValues(valueBytes); + insertRowNode.setValues(values); + writePlanNodeList.add(objectNode); + } + } } public RelationalInsertRowsNode emptyClone() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java index ee93712e77d..e3a114211e1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java @@ -64,8 +64,6 @@ public class RelationalInsertTabletNode extends InsertTabletNode { private boolean singleDevice; - private Object[] convertedColumns; - public RelationalInsertTabletNode( PlanNodeId id, PartialPath devicePath, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/AstUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/AstUtil.java index cb17a90c04f..700d5e40beb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/AstUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/AstUtil.java @@ -21,16 +21,22 @@ package org.apache.iotdb.db.queryengine.plan.relational.sql.util; import org.apache.iotdb.commons.utils.CommonDateTimeUtils; import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BinaryLiteral; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FunctionCall; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Identifier; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Literal; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LongLiteral; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Node; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NullLiteral; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.StringLiteral; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.TableExpressionType; import com.google.common.graph.SuccessorsFunction; import com.google.common.graph.Traverser; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BytesUtils; import java.time.ZoneId; import java.util.List; @@ -113,6 +119,30 @@ public final class AstUtil { throw new SemanticException( String.format("Cannot insert identifier %s, please use string literal", expression)); } + if (expression instanceof FunctionCall + && "to_object".equals(((FunctionCall) expression).getName().toString())) { + List<Expression> arguments = ((FunctionCall) expression).getArguments(); + if (arguments.size() == 3 + && arguments.get(0).getExpressionType() == TableExpressionType.BOOLEAN_LITERAL + && arguments.get(1).getExpressionType() == TableExpressionType.LONG_LITERAL + && arguments.get(2).getExpressionType() == TableExpressionType.BINARY_LITERAL) { + boolean isEOF = + (boolean) + ((BooleanLiteral) ((FunctionCall) expression).getArguments().get(0)).getTsValue(); + long offset = + (long) ((LongLiteral) ((FunctionCall) expression).getArguments().get(1)).getTsValue(); + byte[] content = + ((Binary) + ((BinaryLiteral) ((FunctionCall) expression).getArguments().get(2)) + .getTsValue()) + .getValues(); + byte[] val = new byte[content.length + 9]; + val[0] = (byte) (isEOF ? 1 : 0); + System.arraycopy(BytesUtils.longToBytes(offset), 0, val, 1, 8); + System.arraycopy(content, 0, val, 9, content.length); + return new Binary(val); + } + } throw new SemanticException("Unsupported expression: " + expression); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java index 7e61bea8a4a..18ab1cdc275 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -328,7 +328,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { if (dispatchFailures.isEmpty()) { return immediateFuture(new FragInstanceDispatchResult(true)); } - if (instances.size() == 1) { + if (instances.size() == 1 || dispatchFailures.size() == 1) { return immediateFuture(new FragInstanceDispatchResult(dispatchFailures.get(0))); } else { List<TSStatus> failureStatusList = new ArrayList<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java index 1668355db31..b0575837828 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java @@ -512,14 +512,19 @@ public class CompactionUtils { } } - public static void removeDeletedObjectFiles(TsFileResource resource) - throws IOException, IllegalPathException { + public static void removeDeletedObjectFiles(TsFileResource resource) { + // check for compaction recovery + if (!resource.tsFileExists()) { + return; + } try (MultiTsFileDeviceIterator deviceIterator = new MultiTsFileDeviceIterator(Collections.singletonList(resource))) { while (deviceIterator.hasNextDevice()) { deviceIterator.nextDevice(); deviceIterator.getReaderAndChunkMetadataForCurrentAlignedSeries(); } + } catch (Exception e) { + logger.warn("Failed to remove object files from file {}", resource.getTsFilePath(), e); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TabletDecoder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TabletDecoder.java index 335d7a23b42..bd1eba9cfa8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TabletDecoder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TabletDecoder.java @@ -192,6 +192,7 @@ public class TabletDecoder { case STRING: case BLOB: case TEXT: + case OBJECT: Binary[] binaryCol = new Binary[rowSize]; if (encoding == TSEncoding.PLAIN) { // PlainEncoder uses var int, which may cause compatibility problem diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDeleteObjectFileTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDeleteObjectFileTest.java deleted file mode 100644 index a4b35c01da6..00000000000 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDeleteObjectFileTest.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.storageengine.dataregion.compaction; - -import org.apache.iotdb.commons.exception.MetadataException; -import org.apache.iotdb.commons.schema.table.TsTable; -import org.apache.iotdb.commons.schema.table.column.FieldColumnSchema; -import org.apache.iotdb.commons.schema.table.column.TagColumnSchema; -import org.apache.iotdb.db.exception.StorageEngineException; -import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.SettleCompactionTask; -import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; -import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate; -import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; -import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; - -import org.apache.tsfile.enums.TSDataType; -import org.apache.tsfile.exception.write.WriteProcessException; -import org.apache.tsfile.file.metadata.StringArrayDeviceID; -import org.apache.tsfile.file.metadata.enums.CompressionType; -import org.apache.tsfile.file.metadata.enums.TSEncoding; -import org.apache.tsfile.read.common.TimeRange; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -public class CompactionDeleteObjectFileTest extends AbstractCompactionTest { - @Before - public void setUp() - throws IOException, WriteProcessException, MetadataException, InterruptedException { - super.setUp(); - } - - @After - public void tearDown() throws IOException, StorageEngineException { - super.tearDown(); - } - - @Test - public void test1() throws IOException { - createTable("tsfile_table", 100); - File dir = new File("/Users/shuww/Downloads/0708/1_副本"); - List<TsFileResource> resources = new ArrayList<>(); - for (File file : dir.listFiles()) { - if (!file.getName().endsWith(".tsfile")) { - continue; - } - TsFileResource resource = new TsFileResource(file); - - try (ModificationFile modificationFile = resource.getExclusiveModFile()) { - modificationFile.write( - new TableDeletionEntry( - new DeletionPredicate( - "tsfile_table", - new IDPredicate.FullExactMatch( - new StringArrayDeviceID(new String[] {"tsfile_table", "1", "5", "3"})), - Arrays.asList("file")), - new TimeRange(-1, 0))); - modificationFile.write( - new TableDeletionEntry( - new DeletionPredicate( - "tsfile_table", - new IDPredicate.FullExactMatch( - new StringArrayDeviceID(new String[] {"tsfile_table", "1", "5", "3"})), - Arrays.asList("file")), - new TimeRange(2, 2))); - } - resource.deserialize(); - resources.add(resource); - } - - // InnerSpaceCompactionTask task = - // new InnerSpaceCompactionTask( - // 0, tsFileManager, resources, true, new ReadChunkCompactionPerformer(), 0); - SettleCompactionTask task = - new SettleCompactionTask( - 0, - tsFileManager, - resources, - Collections.emptyList(), - true, - new FastCompactionPerformer(false), - 0); - task.start(); - } - - public void createTable(String tableName, long ttl) { - TsTable tsTable = new TsTable(tableName); - tsTable.addColumnSchema(new TagColumnSchema("id_column", TSDataType.STRING)); - tsTable.addColumnSchema( - new FieldColumnSchema("s1", TSDataType.STRING, TSEncoding.PLAIN, CompressionType.LZ4)); - tsTable.addProp(TsTable.TTL_PROPERTY, ttl + ""); - DataNodeTableCache.getInstance().preUpdateTable("Downloads", tsTable, null); - DataNodeTableCache.getInstance().commitUpdateTable("Downloads", tableName, null); - } -}
