This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch fix_empty_datasource_error in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fd23d4fe98312ed0c0da76bb76f9fc974d632717 Author: Beyyes <[email protected]> AuthorDate: Fri Nov 22 10:28:29 2024 +0800 add four table join test --- .../db/it/IoTDBMultiIDsWithAttributesTableIT.java | 80 +++++-- .../main/java/org/apache/iotdb/db/TableTest.java | 258 +++++++++++++++++++++ .../TableModelStatementMemorySourceVisitor.java | 6 +- .../plan/relational/analyzer/Analysis.java | 4 +- .../planner/optimizations/SortElimination.java | 2 +- .../optimizations/TransformSortToStreamSort.java | 2 +- 6 files changed, 325 insertions(+), 27 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java index 1f414e41e0c..797ae6f3f60 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java @@ -32,6 +32,7 @@ import org.junit.runner.RunWith; import java.sql.Connection; import java.sql.Statement; +import java.util.Arrays; import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest; import static org.junit.Assert.fail; @@ -86,15 +87,6 @@ public class IoTDBMultiIDsWithAttributesTableIT { "insert into table0(device,level,time,num,bigNum,floatNum,str,bool,date) values('d2','l5',51536000000,15,3147483648,235.213,'watermelon',TRUE,'2023-01-01')" }; - // public static void main(String[] args) { - // for (String sql : sql1) { - // System.out.println(sql+";"); - // } - // for (String sql : sql2) { - // System.out.println(sql+";"); - // } - // } - private static final String[] sql3 = new String[] { "CREATE TABLE table1 (device string id, level string id, attr1 string attribute, attr2 string attribute, num int32 measurement, bigNum int64 measurement, " @@ -120,6 +112,35 @@ public class IoTDBMultiIDsWithAttributesTableIT { "insert into table1(time, device, level, attr1, attr2, num,bigNum,floatNum,str,bool) values(40, 'd11', 'l11', 'c', 'd', 3, 2947483648, 231.2121, 'coconut', FALSE)" }; + private static final String[] sql4 = + new String[] { + "create table students(region STRING ID, student_id INT32 MEASUREMENT, name STRING MEASUREMENT, genders text MEASUREMENT, date_of_birth DATE MEASUREMENT)", + "create table teachers(region STRING ID, teacher_id INT32 MEASUREMENT, course_id INT32 MEASUREMENT, age INT32 MEASUREMENT)", + "create table courses(course_id STRING ID, course_name STRING MEASUREMENT, teacher_id INT32 MEASUREMENT)", + "create table grades(grade_id STRING ID, course_id INT32 MEASUREMENT, student_id INT32 MEASUREMENT, score INT32 MEASUREMENT)", + "insert into students(time,region,student_id,name,genders,date_of_birth) values" + + "(1,'haidian',1,'Lucy','女','2015-10-10'),(2,'haidian',2,'Jack','男','2015-09-24'),(3,'chaoyang',3,'Sam','男','2014-07-20'),(4,'chaoyang',4,'Lily','女','2015-03-28')," + + "(5,'xicheng',5,'Helen','女','2016-01-22'),(6,'changping',6,'Nancy','女','2017-12-20'),(7,'changping',7,'Mike','男','2016-11-22'),(8,'shunyi',8,'Bob','男','2016-05-12')", + "insert into teachers(time,region,teacher_id,course_id,age) values" + + "(1,'haidian',1001,10000001,25),(2,'haidian',1002,10000002,26),(3,'chaoyang',1003,10000003,28)," + + "(4,'chaoyang',1004,10000004,27),(5,'xicheng',1005,10000005,26)", + "insert into courses(time,course_id,course_name,teacher_id) values" + + "(1,10000001,'数学',1001),(2,10000002,'语文',1002),(3,10000003,'英语',1003)," + + "(4,10000004,'体育',1004),(5,10000005,'历史',1005)", + "insert into grades(time,grade_id,course_id,student_id,score) values" + + "(1,1111,10000001,1,99),(2,1112,10000002,2,90),(3,1113,10000003,3,85),(4,1114,10000004,4,89),(5,1115,10000005,5,98)," + + "(6,1113,10000003,6,55),(7,1114,10000004,7,60),(8,1115,10000005,8,100),(9,1114,10000001,2,99),(10,1115,10000002,1,95)" + }; + + // public static void main(String[] args) { + // for (String sql : sql1) { + // System.out.println(sql+";"); + // } + // for (String sql : sql2) { + // System.out.println(sql+";"); + // } + // } + String[] expectedHeader; String[] retArray; String sql; @@ -144,15 +165,10 @@ public class IoTDBMultiIDsWithAttributesTableIT { private static void insertData() { try (Connection connection = EnvFactory.getEnv().getTableConnection(); Statement statement = connection.createStatement()) { - - for (String sql : sql1) { - statement.execute(sql); - } - for (String sql : sql2) { - statement.execute(sql); - } - for (String sql : sql3) { - statement.execute(sql); + for (String[] sqlList : Arrays.asList(sql1, sql2, sql3, sql4)) { + for (String sql : sqlList) { + statement.execute(sql); + } } } catch (Exception e) { e.printStackTrace(); @@ -1579,9 +1595,9 @@ public class IoTDBMultiIDsWithAttributesTableIT { // has filter @Test public void fullOuterJoinTest2() { - String[] expectedHeader = + expectedHeader = new String[] {"time", "device", "level", "t1_num_add", "device", "attr2", "num", "str"}; - String[] retArray = + retArray = new String[] { "1970-01-01T00:00:00.000Z,null,null,null,d1,d,3,coconut,", "1970-01-01T00:00:00.000Z,null,null,null,d2,c,3,coconut,", @@ -1636,6 +1652,30 @@ public class IoTDBMultiIDsWithAttributesTableIT { tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); } + @Test + public void fourTableJoinTest() { + expectedHeader = + new String[] { + "time", "s_id", "s_name", "s_birth", "t_id", "t_c_id", "c_name", "g_id", "score" + }; + retArray = new String[] { + "1970-01-01T00:00:00.001Z,1,Lucy,2015-10-10,1001,10000001,数学,1111,99,", + "1970-01-01T00:00:00.002Z,2,Jack,2015-09-24,1002,10000002,语文,1112,90,", + "1970-01-01T00:00:00.003Z,3,Sam,2014-07-20,1003,10000003,英语,1113,85,", + "1970-01-01T00:00:00.004Z,4,Lily,2015-03-28,1004,10000004,体育,1114,89,", + }; + sql = + "select s.time," + + " s.student_id as s_id, s.name as s_name, s.date_of_birth as s_birth," + + " t.teacher_id as t_id, t.course_id as t_c_id," + + " c.course_name as c_name," + + " g.grade_id as g_id, g.score as score " + + "from students s, teachers t, courses c, grades g " + + "where s.time=t.time AND c.time=g.time AND s.time=c.time " + + "order by s.student_id, t.teacher_id, c.course_id,g.grade_id"; + tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); + } + public static String[] buildHeaders(int length) { String[] expectedHeader = new String[length]; for (int i = 0; i < length; i++) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/TableTest.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/TableTest.java new file mode 100644 index 00000000000..352cb3547dc --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/TableTest.java @@ -0,0 +1,258 @@ +package org.apache.iotdb.db; + +import org.apache.iotdb.isession.IPooledSession; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.pool.SessionPool; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +// tree 127.0.0.1 1 root.treedb 10000 +// table 127.0.0.1 10 tabledb 10000 +public class TableTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(TableTest.class); + + private static final String TABLE_SQL_DIALECT = "table"; + + private static final AtomicInteger deviceIdGenerator = new AtomicInteger(0); + + private static final List<IMeasurementSchema> TABLE_SCHEMA_LIST = new ArrayList<>(); + private static final List<Tablet.ColumnType> TABLE_COLUMN_TYPES = + Arrays.asList( + Tablet.ColumnType.ID, + Tablet.ColumnType.ID, + Tablet.ColumnType.ID, + // Tablet.ColumnType.ATTRIBUTE, + Tablet.ColumnType.MEASUREMENT, + Tablet.ColumnType.MEASUREMENT, + Tablet.ColumnType.MEASUREMENT); + + private static final String COLUMN_NAME_1 = "city"; + private static final String COLUMN_NAME_2 = "region"; + private static final String COLUMN_NAME_3 = "device_id"; + private static final String COLUMN_NAME_4 = "color"; + private static final String COLUMN_NAME_5 = "s1"; + private static final String COLUMN_NAME_6 = "s2"; + private static final String COLUMN_NAME_7 = "s3"; + + private static final List<IMeasurementSchema> TREE_SCHEMA_LIST = new ArrayList<>(); + + static { + TABLE_SCHEMA_LIST.add(new MeasurementSchema(COLUMN_NAME_1, TSDataType.STRING)); + TABLE_SCHEMA_LIST.add(new MeasurementSchema(COLUMN_NAME_2, TSDataType.STRING)); + TABLE_SCHEMA_LIST.add(new MeasurementSchema(COLUMN_NAME_3, TSDataType.STRING)); + // TABLE_SCHEMA_LIST.add(new MeasurementSchema(COLUMN_NAME_4, TSDataType.STRING)); + TABLE_SCHEMA_LIST.add(new MeasurementSchema(COLUMN_NAME_5, TSDataType.DOUBLE)); + TABLE_SCHEMA_LIST.add(new MeasurementSchema(COLUMN_NAME_6, TSDataType.DOUBLE)); + TABLE_SCHEMA_LIST.add(new MeasurementSchema(COLUMN_NAME_7, TSDataType.DOUBLE)); + + TREE_SCHEMA_LIST.add(new MeasurementSchema(COLUMN_NAME_5, TSDataType.DOUBLE)); + TREE_SCHEMA_LIST.add(new MeasurementSchema(COLUMN_NAME_6, TSDataType.DOUBLE)); + TREE_SCHEMA_LIST.add(new MeasurementSchema(COLUMN_NAME_7, TSDataType.DOUBLE)); + } + + // 2024-10-01T00:00:00+08:00 + private static final long START_TIME = 1727712000000L; + + public static void main(String[] args) { + + // table + String sqlDialect = args[0]; + // 127.0.0.1 + String ip = args[1]; + // 10 + int maxSize = Integer.parseInt(args[2]); + + String database = args[3]; + + final int deviceNum = Integer.parseInt(args[4]); + + SessionPool sessionPool = + new SessionPool.Builder() + .host(ip) + .port(6667) + .user("root") + .password("root") + .maxSize(maxSize) + .sqlDialect(sqlDialect) + .database(database) + .build(); + + long startTime = System.nanoTime(); + List<Thread> subThreads = new ArrayList<>(maxSize); + if (TABLE_SQL_DIALECT.equalsIgnoreCase(sqlDialect)) { + // CREATE TABLE table1(city STRING ID, region STRING ID, device_id STRING ID, color STRING + // ATTRIBUTE, s1 DOUBLE MEASUREMENT, s2 DOUBLE MEASUREMENT, s3 DOUBLE MEASUREMENT) + for (int i = 0; i < maxSize; i++) { + Thread t = new Thread(() -> writeTable(sessionPool, deviceNum)); + subThreads.add(t); + t.start(); + } + } else { + for (int i = 0; i < maxSize; i++) { + Thread t = new Thread(() -> writeTree(sessionPool, deviceNum, database)); + subThreads.add(t); + t.start(); + } + } + subThreads.forEach( + t -> { + try { + t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + LOGGER.info( + "{} writing {} devices costs {}s", + sqlDialect, + deviceNum, + (System.nanoTime() - startTime) / 1_000_000_000L); + } + + private static void writeTable(final SessionPool sessionPool, final int deviceNum) { + + try (IPooledSession session = sessionPool.getPooledSession()) { + while (true) { + long startTime = System.nanoTime(); + int device = deviceIdGenerator.getAndIncrement(); + if (device >= deviceNum) { + break; + } + int city = device % 10; + int region = device % 100; + int color = device % 5; + + Tablet tablet = new Tablet("table1", TABLE_SCHEMA_LIST, TABLE_COLUMN_TYPES, 10_000); + String cityId = "city_" + city; + String regionId = "region_" + region; + String deviceId = "d_" + device; + String colorId = "color_" + color; + long roundStartTime = System.nanoTime(); + for (int i = 0; i < 6 * 60 * 24 * 3; i++) { + int rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, START_TIME + i * 10_000L); + tablet.addValue(COLUMN_NAME_1, rowIndex, cityId); + tablet.addValue(COLUMN_NAME_2, rowIndex, regionId); + tablet.addValue(COLUMN_NAME_3, rowIndex, deviceId); + // tablet.addValue(COLUMN_NAME_4, rowIndex, colorId); + tablet.addValue(COLUMN_NAME_5, rowIndex, i * 1.0d); + tablet.addValue(COLUMN_NAME_6, rowIndex, i * 1.0d); + tablet.addValue(COLUMN_NAME_7, rowIndex, i * 1.0d); + + if (tablet.rowSize == tablet.getMaxRowNumber()) { + session.insertTablet(tablet); + tablet.reset(); + long writtenRows = i + 1L; + if (writtenRows % 10000 == 0) { + LOGGER.info( + "Device {} has written {} rows, time cost is {}ms", + deviceId, + writtenRows, + (System.nanoTime() - roundStartTime) / 1_000_000); + roundStartTime = System.nanoTime(); + } + } + } + + if (tablet.rowSize != 0) { + session.insertTablet(tablet); + tablet.reset(); + } + session.executeNonQueryStatement( + String.format( + "UPDATE table1 SET %s='%s' WHERE %s='%s' AND %s='%s' AND %s='%s'", + COLUMN_NAME_4, + colorId, + COLUMN_NAME_1, + cityId, + COLUMN_NAME_2, + regionId, + COLUMN_NAME_3, + deviceId)); + LOGGER.info( + "Device {} finished, total time cost is {}ms", + deviceId, + (System.nanoTime() - startTime) / 1_000_000); + } + + } catch (IoTDBConnectionException e) { + LOGGER.error("Connection error", e); + throw new RuntimeException(e); + } catch (StatementExecutionException e) { + LOGGER.error("Execute error", e); + throw new RuntimeException(e); + } + } + + private static void writeTree( + final SessionPool sessionPool, final int deviceNum, final String database) { + + try { + while (true) { + long startTime = System.nanoTime(); + int device = deviceIdGenerator.getAndIncrement(); + if (device >= deviceNum) { + break; + } + int city = device % 10; + int region = device % 100; + // int color = device % 5; + + String deviceId = + String.format( + "%s.%s.%s.%s", database, "city_" + city, "region_" + region, "d_" + device); + Tablet tablet = new Tablet(deviceId, TREE_SCHEMA_LIST, 10000); + long roundStartTime = System.nanoTime(); + for (int i = 0; i < 6 * 60 * 24 * 3; i++) { + int rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, START_TIME + i * 10_000L); + tablet.addValue(COLUMN_NAME_5, rowIndex, i * 1.0d); + tablet.addValue(COLUMN_NAME_6, rowIndex, i * 1.0d); + tablet.addValue(COLUMN_NAME_7, rowIndex, i * 1.0d); + if (tablet.rowSize == tablet.getMaxRowNumber()) { + sessionPool.insertAlignedTablet(tablet); + tablet.reset(); + long writtenRows = i + 1L; + if (writtenRows % 10000 == 0) { + LOGGER.info( + "Device {} has written {} rows, time cost is {}ms", + deviceId, + writtenRows, + (System.nanoTime() - roundStartTime) / 1_000_000); + roundStartTime = System.nanoTime(); + } + } + } + + if (tablet.rowSize != 0) { + sessionPool.insertAlignedTablet(tablet); + tablet.reset(); + } + + LOGGER.info( + "Device {} finished, total time cost is {}ms", + deviceId, + (System.nanoTime() - startTime) / 1_000_000); + } + + } catch (IoTDBConnectionException e) { + LOGGER.error("Connection error", e); + throw new RuntimeException(e); + } catch (StatementExecutionException e) { + LOGGER.error("Execute error", e); + throw new RuntimeException(e); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/TableModelStatementMemorySourceVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/TableModelStatementMemorySourceVisitor.java index d4c5c19a426..f15fb94b3c9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/TableModelStatementMemorySourceVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/TableModelStatementMemorySourceVisitor.java @@ -75,9 +75,9 @@ public class TableModelStatementMemorySourceVisitor symbolAllocator, NOOP) .plan(context.getAnalysis()); - // if (context.getAnalysis().isEmptyDataSource()) { - // return new StatementMemorySource(new TsBlock(0), header); - // } + if (context.getAnalysis().isEmptyDataSource()) { + return new StatementMemorySource(new TsBlock(0), header); + } // Generate table model distributed plan final TableDistributedPlanGenerator.PlanContext planContext = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java index 51351f8bea5..74773d53e2a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java @@ -684,8 +684,8 @@ public class Analysis implements IAnalysis { this.hasValueFilter = hasValueFilter; } - public boolean hasSortNode() { - return hasSortNode; + public boolean noSortNode() { + return !hasSortNode; } public void setSortNode(boolean hasSortNode) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java index d1f249010d9..fa3b279058b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java @@ -45,7 +45,7 @@ public class SortElimination implements PlanOptimizer { @Override public PlanNode optimize(PlanNode plan, PlanOptimizer.Context context) { - if (!context.getAnalysis().hasSortNode()) { + if (context.getAnalysis().noSortNode()) { return plan; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java index 95f8004d8f8..4ec849eee47 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java @@ -49,7 +49,7 @@ public class TransformSortToStreamSort implements PlanOptimizer { @Override public PlanNode optimize(PlanNode plan, PlanOptimizer.Context context) { - if (!context.getAnalysis().hasSortNode()) { + if (context.getAnalysis().noSortNode()) { return plan; }
