This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch TableModelIngestion2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 15b428e648c052202c896b53e01e095fe5b33ce4 Author: Tian Jiang <[email protected]> AuthorDate: Mon Jul 22 17:36:38 2024 +0800 fit IT & incorrect tableSchema during validation & get TsTable from & fix npe in partial insertion --- .../iotdb/session/it/IoTDBSessionRelationalIT.java | 126 +++++++++++++++++---- .../fetcher/TableHeaderSchemaValidator.java | 2 +- .../relational/sql/ast/WrappedInsertStatement.java | 27 +++-- 3 files changed, 119 insertions(+), 36 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java index 3a06c9fe433..1fcae69a1e7 100644 --- a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java @@ -29,7 +29,6 @@ import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.session.Session; import org.apache.tsfile.enums.TSDataType; -import org.apache.tsfile.file.metadata.ChunkMetadata; import org.apache.tsfile.read.common.RowRecord; import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.record.Tablet.ColumnType; @@ -43,9 +42,7 @@ import org.junit.runner.RunWith; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; import static org.apache.iotdb.itbase.env.BaseEnv.TABLE_SQL_DIALECT; @@ -75,7 +72,77 @@ public class IoTDBSessionRelationalIT { // for manual debugging public static void main(String[] args) throws IoTDBConnectionException, StatementExecutionException { - insertRelationalTabletPerformanceTest(); + try (ISession session = + new Session.Builder().host("127.0.0.1").port(6667).sqlDialect(TABLE_SQL_DIALECT).build()) { + session.open(); + + session.executeNonQueryStatement("CREATE DATABASE IF NOT EXISTS \"db1\""); + + session.executeNonQueryStatement("USE \"db1\""); + // only one column in this table, and others should be auto-created + session.executeNonQueryStatement("CREATE TABLE IF NOT EXISTS table1 (id1 string id)"); + + List<IMeasurementSchema> schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema("id2", TSDataType.STRING)); + schemaList.add(new MeasurementSchema("attr1", TSDataType.STRING)); + schemaList.add(new MeasurementSchema("m1", TSDataType.DOUBLE)); + final List<ColumnType> columnTypes = + Arrays.asList(ColumnType.ID, ColumnType.ATTRIBUTE, ColumnType.MEASUREMENT); + + long timestamp = 0; + Tablet tablet = new Tablet("table1", schemaList, columnTypes, 15); + + for (long row = 0; row < 15; row++) { + int rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, timestamp + row); + tablet.addValue("id2", rowIndex, "id:" + row); + tablet.addValue("attr1", rowIndex, "attr:" + row); + tablet.addValue("m1", rowIndex, row * 1.0); + if (tablet.rowSize == tablet.getMaxRowNumber()) { + session.insertRelationalTablet(tablet, true); + tablet.reset(); + } + } + + if (tablet.rowSize != 0) { + session.insertRelationalTablet(tablet); + tablet.reset(); + } + + session.executeNonQueryStatement("FLush"); + + for (long row = 15; row < 30; row++) { + int rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, timestamp + row); + tablet.addValue("id2", rowIndex, "id:" + row); + tablet.addValue("attr1", rowIndex, "attr:" + row); + tablet.addValue("m1", rowIndex, row * 1.0); + if (tablet.rowSize == tablet.getMaxRowNumber()) { + session.insertRelationalTablet(tablet, true); + tablet.reset(); + } + } + + if (tablet.rowSize != 0) { + session.insertRelationalTablet(tablet); + tablet.reset(); + } + + SessionDataSet dataSet = session.executeQueryStatement("select * from table1 order by time"); + int cnt = 0; + while (dataSet.hasNext()) { + RowRecord rowRecord = dataSet.next(); + timestamp = rowRecord.getFields().get(0).getLongV(); + // id 1 should be null + assertNull(rowRecord.getFields().get(1).getDataType()); + assertEquals("id:" + timestamp, rowRecord.getFields().get(2).getBinaryV().toString()); + assertEquals("attr:" + timestamp, rowRecord.getFields().get(3).getBinaryV().toString()); + assertEquals(timestamp * 1.0, rowRecord.getFields().get(4).getDoubleV(), 0.0001); + cnt++; + } + assertEquals(30, cnt); + } + // insertRelationalTabletPerformanceTest(); } private static void insertRelationalTabletPerformanceTest() @@ -162,8 +229,8 @@ public class IoTDBSessionRelationalIT { for (long row = 0; row < 15; row++) { session.executeNonQueryStatement( String.format( - "INSERT INTO table1 (id1, attr1, m1) VALUES ('%s', '%s', %f)", - "id:" + row, "attr:" + row, row * 1.0)); + "INSERT INTO table1 (time, id1, attr1, m1) VALUES (%d, '%s', '%s', %f)", + row, "id:" + row, "attr:" + row, row * 1.0)); } session.executeNonQueryStatement("FLush"); @@ -171,18 +238,21 @@ public class IoTDBSessionRelationalIT { for (long row = 15; row < 30; row++) { session.executeNonQueryStatement( String.format( - "INSERT INTO table1 (id1, attr1, m1) VALUES ('%s', '%s', %f)", - "id:" + row, "attr:" + row, row * 1.0)); + "INSERT INTO table1 (time, id1, attr1, m1) VALUES (%d, '%s', '%s', %f)", + row, "id:" + row, "attr:" + row, row * 1.0)); } SessionDataSet dataSet = session.executeQueryStatement("select * from table1 order by time"); + int cnt = 0; while (dataSet.hasNext()) { RowRecord rowRecord = dataSet.next(); timestamp = rowRecord.getFields().get(0).getLongV(); assertEquals("id:" + timestamp, rowRecord.getFields().get(1).getBinaryV().toString()); assertEquals("attr:" + timestamp, rowRecord.getFields().get(2).getBinaryV().toString()); assertEquals(timestamp * 1.0, rowRecord.getFields().get(3).getDoubleV(), 0.0001); + cnt++; } + assertEquals(30, cnt); // sql cannot create column assertThrows( @@ -193,14 +263,6 @@ public class IoTDBSessionRelationalIT { "INSERT INTO table1 (id1, id2, attr1, m1) VALUES ('%s', '%s', '%s', %f)", "id:" + 100, "id:" + 100, "attr:" + 100, 100 * 1.0))); } - Map<String, ChunkMetadata> chunkMetadataMap = new HashMap<>(); - List<ChunkMetadata> valueChunkMetadataList = new ArrayList<>(); - chunkMetadataMap.computeIfPresent( - "", - (k, v) -> { - valueChunkMetadataList.add(v); - return v; - }); } @Test @@ -269,6 +331,7 @@ public class IoTDBSessionRelationalIT { } SessionDataSet dataSet = session.executeQueryStatement("select * from table1 order by time"); + int cnt = 0; while (dataSet.hasNext()) { RowRecord rowRecord = dataSet.next(); timestamp = rowRecord.getFields().get(0).getLongV(); @@ -277,9 +340,9 @@ public class IoTDBSessionRelationalIT { assertEquals(timestamp * 1.0, rowRecord.getFields().get(3).getDoubleV(), 0.0001); // "m2" should not be present assertEquals(4, rowRecord.getFields().size()); - timestamp++; - // System.out.println(rowRecord); + cnt++; } + assertEquals(30, cnt); } finally { EnvFactory.getEnv().getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true); } @@ -326,13 +389,16 @@ public class IoTDBSessionRelationalIT { } SessionDataSet dataSet = session.executeQueryStatement("select * from table1 order by time"); + int cnt = 0; while (dataSet.hasNext()) { RowRecord rowRecord = dataSet.next(); timestamp = rowRecord.getFields().get(0).getLongV(); assertEquals("id:" + timestamp, rowRecord.getFields().get(1).getBinaryV().toString()); assertEquals("attr:" + timestamp, rowRecord.getFields().get(2).getBinaryV().toString()); assertEquals(timestamp * 1.0, rowRecord.getFields().get(3).getDoubleV(), 0.0001); + cnt++; } + assertEquals(30, cnt); } } @@ -438,6 +504,7 @@ public class IoTDBSessionRelationalIT { } SessionDataSet dataSet = session.executeQueryStatement("select * from table1 order by time"); + int cnt = 0; while (dataSet.hasNext()) { RowRecord rowRecord = dataSet.next(); timestamp = rowRecord.getFields().get(0).getLongV(); @@ -446,9 +513,9 @@ public class IoTDBSessionRelationalIT { assertEquals(timestamp * 1.0, rowRecord.getFields().get(3).getDoubleV(), 0.0001); // "m2" should not be present assertEquals(4, rowRecord.getFields().size()); - timestamp++; - // System.out.println(rowRecord); + cnt++; } + assertEquals(30, cnt); } finally { EnvFactory.getEnv().getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true); } @@ -511,6 +578,7 @@ public class IoTDBSessionRelationalIT { tablet.reset(); } + int cnt = 0; SessionDataSet dataSet = session.executeQueryStatement("select * from table1 order by time"); while (dataSet.hasNext()) { RowRecord rowRecord = dataSet.next(); @@ -518,7 +586,9 @@ public class IoTDBSessionRelationalIT { assertEquals("id:" + timestamp, rowRecord.getFields().get(1).getBinaryV().toString()); assertEquals("attr:" + timestamp, rowRecord.getFields().get(2).getBinaryV().toString()); assertEquals(timestamp * 1.0, rowRecord.getFields().get(3).getDoubleV(), 0.0001); + cnt++; } + assertEquals(30, cnt); } } @@ -558,6 +628,7 @@ public class IoTDBSessionRelationalIT { "table1", timestamp + row, measurementIds, dataTypes, columnTypes, values); } + int cnt = 0; SessionDataSet dataSet = session.executeQueryStatement("select * from table1 order by time"); while (dataSet.hasNext()) { RowRecord rowRecord = dataSet.next(); @@ -565,7 +636,9 @@ public class IoTDBSessionRelationalIT { assertEquals("id:" + timestamp, rowRecord.getFields().get(1).getBinaryV().toString()); assertEquals("attr:" + timestamp, rowRecord.getFields().get(2).getBinaryV().toString()); assertEquals(timestamp * 1.0, rowRecord.getFields().get(3).getDoubleV(), 0.0001); + cnt++; } + assertEquals(30, cnt); } } @@ -625,15 +698,16 @@ public class IoTDBSessionRelationalIT { } SessionDataSet dataSet = session.executeQueryStatement("select * from table1 order by time"); + int cnt = 0; while (dataSet.hasNext()) { RowRecord rowRecord = dataSet.next(); timestamp = rowRecord.getFields().get(0).getLongV(); assertEquals("id:" + timestamp, rowRecord.getFields().get(1).getBinaryV().toString()); assertEquals("attr:" + timestamp, rowRecord.getFields().get(2).getBinaryV().toString()); assertEquals(timestamp * 1.0, rowRecord.getFields().get(3).getDoubleV(), 0.0001); - timestamp++; - // System.out.println(rowRecord); + cnt++; } + assertEquals(30, cnt); } } @@ -693,6 +767,7 @@ public class IoTDBSessionRelationalIT { } SessionDataSet dataSet = session.executeQueryStatement("select * from table1 order by time"); + int cnt = 0; while (dataSet.hasNext()) { RowRecord rowRecord = dataSet.next(); timestamp = rowRecord.getFields().get(0).getLongV(); @@ -701,9 +776,9 @@ public class IoTDBSessionRelationalIT { assertEquals("id:" + timestamp, rowRecord.getFields().get(2).getBinaryV().toString()); assertEquals("attr:" + timestamp, rowRecord.getFields().get(3).getBinaryV().toString()); assertEquals(timestamp * 1.0, rowRecord.getFields().get(4).getDoubleV(), 0.0001); - timestamp++; - // System.out.println(rowRecord); + cnt++; } + assertEquals(30, cnt); } } @@ -752,6 +827,7 @@ public class IoTDBSessionRelationalIT { } SessionDataSet dataSet = session.executeQueryStatement("select * from table1 order by time"); + int cnt = 0; while (dataSet.hasNext()) { RowRecord rowRecord = dataSet.next(); timestamp = rowRecord.getFields().get(0).getLongV(); @@ -759,7 +835,9 @@ public class IoTDBSessionRelationalIT { assertEquals("id2:" + timestamp, rowRecord.getFields().get(2).getBinaryV().toString()); assertEquals("attr1:" + timestamp, rowRecord.getFields().get(3).getBinaryV().toString()); assertEquals(timestamp * 1.0, rowRecord.getFields().get(4).getDoubleV(), 0.0001); + cnt++; } + assertEquals(30, cnt); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java index b6faadf5ebb..3c51cd4d195 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java @@ -134,7 +134,7 @@ public class TableHeaderSchemaValidator { // TODO table metadata: authority check for table alter // check id or attribute column data type in this method autoCreateColumn(database, tableSchema.getTableName(), missingColumnList, context); - resultColumnList.addAll(missingColumnList); + table = DataNodeTableCache.getInstance().getTable(database, tableSchema.getTableName()); } table diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java index a609d5f5218..b78830a1dab 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java @@ -74,16 +74,21 @@ public abstract class WrappedInsertStatement extends WrappedStatement List<ColumnSchema> columnSchemas = new ArrayList<>(insertBaseStatement.getMeasurements().length); for (int i = 0; i < insertBaseStatement.getMeasurements().length; i++) { - columnSchemas.add( - new ColumnSchema( - insertBaseStatement.getMeasurements()[i], - insertBaseStatement.getDataTypes() != null - ? TypeFactory.getType(insertBaseStatement.getDataTypes()[i]) - : null, - false, - insertBaseStatement.getColumnCategories() != null - ? insertBaseStatement.getColumnCategories()[i] - : null)); + if (insertBaseStatement.getMeasurements()[i] != null) { + columnSchemas.add( + new ColumnSchema( + insertBaseStatement.getMeasurements()[i], + insertBaseStatement.getDataTypes() != null + && insertBaseStatement.getDataTypes()[i] != null + ? TypeFactory.getType(insertBaseStatement.getDataTypes()[i]) + : null, + false, + insertBaseStatement.getColumnCategories() != null + ? insertBaseStatement.getColumnCategories()[i] + : null)); + } else { + columnSchemas.add(null); + } } return new TableSchema(tableName, columnSchemas); } @@ -122,7 +127,7 @@ public abstract class WrappedInsertStatement extends WrappedStatement */ public void adjustIdColumns( List<ColumnSchema> realIdColumnSchemas, final InsertBaseStatement baseStatement) { - List<ColumnSchema> incomingColumnSchemas = getTableSchema().getColumns(); + List<ColumnSchema> incomingColumnSchemas = toTableSchema(baseStatement).getColumns(); for (int realIdColPos = 0; realIdColPos < realIdColumnSchemas.size(); realIdColPos++) { ColumnSchema realColumn = realIdColumnSchemas.get(realIdColPos); int incomingIdColPos = incomingColumnSchemas.indexOf(realColumn);
