This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch fix_empty_datasource_error
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fd23d4fe98312ed0c0da76bb76f9fc974d632717
Author: Beyyes <[email protected]>
AuthorDate: Fri Nov 22 10:28:29 2024 +0800

    add four table join test
---
 .../db/it/IoTDBMultiIDsWithAttributesTableIT.java  |  80 +++++--
 .../main/java/org/apache/iotdb/db/TableTest.java   | 258 +++++++++++++++++++++
 .../TableModelStatementMemorySourceVisitor.java    |   6 +-
 .../plan/relational/analyzer/Analysis.java         |   4 +-
 .../planner/optimizations/SortElimination.java     |   2 +-
 .../optimizations/TransformSortToStreamSort.java   |   2 +-
 6 files changed, 325 insertions(+), 27 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
index 1f414e41e0c..797ae6f3f60 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
@@ -32,6 +32,7 @@ import org.junit.runner.RunWith;
 
 import java.sql.Connection;
 import java.sql.Statement;
+import java.util.Arrays;
 
 import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest;
 import static org.junit.Assert.fail;
@@ -86,15 +87,6 @@ public class IoTDBMultiIDsWithAttributesTableIT {
         "insert into 
table0(device,level,time,num,bigNum,floatNum,str,bool,date) 
values('d2','l5',51536000000,15,3147483648,235.213,'watermelon',TRUE,'2023-01-01')"
       };
 
-  //  public static void main(String[] args) {
-  //    for (String sql : sql1) {
-  //      System.out.println(sql+";");
-  //    }
-  //    for (String sql : sql2) {
-  //      System.out.println(sql+";");
-  //    }
-  //  }
-
   private static final String[] sql3 =
       new String[] {
         "CREATE TABLE table1 (device string id, level string id, attr1 string 
attribute, attr2 string attribute, num int32 measurement, bigNum int64 
measurement, "
@@ -120,6 +112,35 @@ public class IoTDBMultiIDsWithAttributesTableIT {
         "insert into table1(time, device, level, attr1, attr2, 
num,bigNum,floatNum,str,bool) values(40, 'd11', 'l11', 'c', 'd', 3, 2947483648, 
231.2121, 'coconut', FALSE)"
       };
 
+  private static final String[] sql4 =
+      new String[] {
+        "create table students(region STRING ID, student_id INT32 MEASUREMENT, 
name STRING MEASUREMENT, genders text MEASUREMENT, date_of_birth DATE 
MEASUREMENT)",
+        "create table teachers(region STRING ID, teacher_id INT32 MEASUREMENT, 
course_id INT32 MEASUREMENT, age INT32 MEASUREMENT)",
+        "create table courses(course_id STRING ID, course_name STRING 
MEASUREMENT, teacher_id INT32 MEASUREMENT)",
+        "create table grades(grade_id STRING ID, course_id INT32 MEASUREMENT, 
student_id INT32 MEASUREMENT, score INT32 MEASUREMENT)",
+        "insert into 
students(time,region,student_id,name,genders,date_of_birth) values"
+            + 
"(1,'haidian',1,'Lucy','女','2015-10-10'),(2,'haidian',2,'Jack','男','2015-09-24'),(3,'chaoyang',3,'Sam','男','2014-07-20'),(4,'chaoyang',4,'Lily','女','2015-03-28'),"
+            + 
"(5,'xicheng',5,'Helen','女','2016-01-22'),(6,'changping',6,'Nancy','女','2017-12-20'),(7,'changping',7,'Mike','男','2016-11-22'),(8,'shunyi',8,'Bob','男','2016-05-12')",
+        "insert into teachers(time,region,teacher_id,course_id,age) values"
+            + 
"(1,'haidian',1001,10000001,25),(2,'haidian',1002,10000002,26),(3,'chaoyang',1003,10000003,28),"
+            + "(4,'chaoyang',1004,10000004,27),(5,'xicheng',1005,10000005,26)",
+        "insert into courses(time,course_id,course_name,teacher_id) values"
+            + 
"(1,10000001,'数学',1001),(2,10000002,'语文',1002),(3,10000003,'英语',1003),"
+            + "(4,10000004,'体育',1004),(5,10000005,'历史',1005)",
+        "insert into grades(time,grade_id,course_id,student_id,score) values"
+            + 
"(1,1111,10000001,1,99),(2,1112,10000002,2,90),(3,1113,10000003,3,85),(4,1114,10000004,4,89),(5,1115,10000005,5,98),"
+            + 
"(6,1113,10000003,6,55),(7,1114,10000004,7,60),(8,1115,10000005,8,100),(9,1114,10000001,2,99),(10,1115,10000002,1,95)"
+      };
+
+  //  public static void main(String[] args) {
+  //    for (String sql : sql1) {
+  //      System.out.println(sql+";");
+  //    }
+  //    for (String sql : sql2) {
+  //      System.out.println(sql+";");
+  //    }
+  //  }
+
   String[] expectedHeader;
   String[] retArray;
   String sql;
@@ -144,15 +165,10 @@ public class IoTDBMultiIDsWithAttributesTableIT {
   private static void insertData() {
     try (Connection connection = EnvFactory.getEnv().getTableConnection();
         Statement statement = connection.createStatement()) {
-
-      for (String sql : sql1) {
-        statement.execute(sql);
-      }
-      for (String sql : sql2) {
-        statement.execute(sql);
-      }
-      for (String sql : sql3) {
-        statement.execute(sql);
+      for (String[] sqlList : Arrays.asList(sql1, sql2, sql3, sql4)) {
+        for (String sql : sqlList) {
+          statement.execute(sql);
+        }
       }
     } catch (Exception e) {
       e.printStackTrace();
@@ -1579,9 +1595,9 @@ public class IoTDBMultiIDsWithAttributesTableIT {
   // has filter
   @Test
   public void fullOuterJoinTest2() {
-    String[] expectedHeader =
+    expectedHeader =
         new String[] {"time", "device", "level", "t1_num_add", "device", 
"attr2", "num", "str"};
-    String[] retArray =
+    retArray =
         new String[] {
           "1970-01-01T00:00:00.000Z,null,null,null,d1,d,3,coconut,",
           "1970-01-01T00:00:00.000Z,null,null,null,d2,c,3,coconut,",
@@ -1636,6 +1652,30 @@ public class IoTDBMultiIDsWithAttributesTableIT {
     tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
   }
 
+  @Test
+  public void fourTableJoinTest() {
+    expectedHeader =
+        new String[] {
+          "time", "s_id", "s_name", "s_birth", "t_id", "t_c_id", "c_name", 
"g_id", "score"
+        };
+    retArray = new String[] {
+            
"1970-01-01T00:00:00.001Z,1,Lucy,2015-10-10,1001,10000001,数学,1111,99,",
+            
"1970-01-01T00:00:00.002Z,2,Jack,2015-09-24,1002,10000002,语文,1112,90,",
+            
"1970-01-01T00:00:00.003Z,3,Sam,2014-07-20,1003,10000003,英语,1113,85,",
+            
"1970-01-01T00:00:00.004Z,4,Lily,2015-03-28,1004,10000004,体育,1114,89,",
+    };
+    sql =
+        "select s.time,"
+            + "         s.student_id as s_id, s.name as s_name, 
s.date_of_birth as s_birth,"
+            + "         t.teacher_id as t_id, t.course_id as t_c_id,"
+            + "         c.course_name as c_name,"
+            + "         g.grade_id as g_id, g.score as score "
+            + "from students s, teachers t, courses c, grades g "
+            + "where s.time=t.time AND c.time=g.time AND s.time=c.time "
+            + "order by s.student_id, t.teacher_id, c.course_id,g.grade_id";
+    tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
+  }
+
   public static String[] buildHeaders(int length) {
     String[] expectedHeader = new String[length];
     for (int i = 0; i < length; i++) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/TableTest.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/TableTest.java
new file mode 100644
index 00000000000..352cb3547dc
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/TableTest.java
@@ -0,0 +1,258 @@
+package org.apache.iotdb.db;
+
+import org.apache.iotdb.isession.IPooledSession;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.pool.SessionPool;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+// tree 127.0.0.1 1 root.treedb 10000
+// table 127.0.0.1 10 tabledb 10000
+public class TableTest {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TableTest.class);
+
+  private static final String TABLE_SQL_DIALECT = "table";
+
+  private static final AtomicInteger deviceIdGenerator = new AtomicInteger(0);
+
+  private static final List<IMeasurementSchema> TABLE_SCHEMA_LIST = new 
ArrayList<>();
+  private static final List<Tablet.ColumnType> TABLE_COLUMN_TYPES =
+      Arrays.asList(
+          Tablet.ColumnType.ID,
+          Tablet.ColumnType.ID,
+          Tablet.ColumnType.ID,
+          //          Tablet.ColumnType.ATTRIBUTE,
+          Tablet.ColumnType.MEASUREMENT,
+          Tablet.ColumnType.MEASUREMENT,
+          Tablet.ColumnType.MEASUREMENT);
+
+  private static final String COLUMN_NAME_1 = "city";
+  private static final String COLUMN_NAME_2 = "region";
+  private static final String COLUMN_NAME_3 = "device_id";
+  private static final String COLUMN_NAME_4 = "color";
+  private static final String COLUMN_NAME_5 = "s1";
+  private static final String COLUMN_NAME_6 = "s2";
+  private static final String COLUMN_NAME_7 = "s3";
+
+  private static final List<IMeasurementSchema> TREE_SCHEMA_LIST = new 
ArrayList<>();
+
+  static {
+    TABLE_SCHEMA_LIST.add(new MeasurementSchema(COLUMN_NAME_1, 
TSDataType.STRING));
+    TABLE_SCHEMA_LIST.add(new MeasurementSchema(COLUMN_NAME_2, 
TSDataType.STRING));
+    TABLE_SCHEMA_LIST.add(new MeasurementSchema(COLUMN_NAME_3, 
TSDataType.STRING));
+    //    TABLE_SCHEMA_LIST.add(new MeasurementSchema(COLUMN_NAME_4, 
TSDataType.STRING));
+    TABLE_SCHEMA_LIST.add(new MeasurementSchema(COLUMN_NAME_5, 
TSDataType.DOUBLE));
+    TABLE_SCHEMA_LIST.add(new MeasurementSchema(COLUMN_NAME_6, 
TSDataType.DOUBLE));
+    TABLE_SCHEMA_LIST.add(new MeasurementSchema(COLUMN_NAME_7, 
TSDataType.DOUBLE));
+
+    TREE_SCHEMA_LIST.add(new MeasurementSchema(COLUMN_NAME_5, 
TSDataType.DOUBLE));
+    TREE_SCHEMA_LIST.add(new MeasurementSchema(COLUMN_NAME_6, 
TSDataType.DOUBLE));
+    TREE_SCHEMA_LIST.add(new MeasurementSchema(COLUMN_NAME_7, 
TSDataType.DOUBLE));
+  }
+
+  // 2024-10-01T00:00:00+08:00
+  private static final long START_TIME = 1727712000000L;
+
+  public static void main(String[] args) {
+
+    // table
+    String sqlDialect = args[0];
+    // 127.0.0.1
+    String ip = args[1];
+    // 10
+    int maxSize = Integer.parseInt(args[2]);
+
+    String database = args[3];
+
+    final int deviceNum = Integer.parseInt(args[4]);
+
+    SessionPool sessionPool =
+        new SessionPool.Builder()
+            .host(ip)
+            .port(6667)
+            .user("root")
+            .password("root")
+            .maxSize(maxSize)
+            .sqlDialect(sqlDialect)
+            .database(database)
+            .build();
+
+    long startTime = System.nanoTime();
+    List<Thread> subThreads = new ArrayList<>(maxSize);
+    if (TABLE_SQL_DIALECT.equalsIgnoreCase(sqlDialect)) {
+      // CREATE TABLE table1(city STRING ID, region STRING ID, device_id 
STRING ID, color STRING
+      // ATTRIBUTE, s1 DOUBLE MEASUREMENT, s2 DOUBLE MEASUREMENT, s3 DOUBLE 
MEASUREMENT)
+      for (int i = 0; i < maxSize; i++) {
+        Thread t = new Thread(() -> writeTable(sessionPool, deviceNum));
+        subThreads.add(t);
+        t.start();
+      }
+    } else {
+      for (int i = 0; i < maxSize; i++) {
+        Thread t = new Thread(() -> writeTree(sessionPool, deviceNum, 
database));
+        subThreads.add(t);
+        t.start();
+      }
+    }
+    subThreads.forEach(
+        t -> {
+          try {
+            t.join();
+          } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+          }
+        });
+    LOGGER.info(
+        "{} writing {} devices costs {}s",
+        sqlDialect,
+        deviceNum,
+        (System.nanoTime() - startTime) / 1_000_000_000L);
+  }
+
+  private static void writeTable(final SessionPool sessionPool, final int 
deviceNum) {
+
+    try (IPooledSession session = sessionPool.getPooledSession()) {
+      while (true) {
+        long startTime = System.nanoTime();
+        int device = deviceIdGenerator.getAndIncrement();
+        if (device >= deviceNum) {
+          break;
+        }
+        int city = device % 10;
+        int region = device % 100;
+        int color = device % 5;
+
+        Tablet tablet = new Tablet("table1", TABLE_SCHEMA_LIST, 
TABLE_COLUMN_TYPES, 10_000);
+        String cityId = "city_" + city;
+        String regionId = "region_" + region;
+        String deviceId = "d_" + device;
+        String colorId = "color_" + color;
+        long roundStartTime = System.nanoTime();
+        for (int i = 0; i < 6 * 60 * 24 * 3; i++) {
+          int rowIndex = tablet.rowSize++;
+          tablet.addTimestamp(rowIndex, START_TIME + i * 10_000L);
+          tablet.addValue(COLUMN_NAME_1, rowIndex, cityId);
+          tablet.addValue(COLUMN_NAME_2, rowIndex, regionId);
+          tablet.addValue(COLUMN_NAME_3, rowIndex, deviceId);
+          //          tablet.addValue(COLUMN_NAME_4, rowIndex, colorId);
+          tablet.addValue(COLUMN_NAME_5, rowIndex, i * 1.0d);
+          tablet.addValue(COLUMN_NAME_6, rowIndex, i * 1.0d);
+          tablet.addValue(COLUMN_NAME_7, rowIndex, i * 1.0d);
+
+          if (tablet.rowSize == tablet.getMaxRowNumber()) {
+            session.insertTablet(tablet);
+            tablet.reset();
+            long writtenRows = i + 1L;
+            if (writtenRows % 10000 == 0) {
+              LOGGER.info(
+                  "Device {} has written {} rows, time cost is {}ms",
+                  deviceId,
+                  writtenRows,
+                  (System.nanoTime() - roundStartTime) / 1_000_000);
+              roundStartTime = System.nanoTime();
+            }
+          }
+        }
+
+        if (tablet.rowSize != 0) {
+          session.insertTablet(tablet);
+          tablet.reset();
+        }
+        session.executeNonQueryStatement(
+            String.format(
+                "UPDATE table1 SET %s='%s' WHERE %s='%s' AND %s='%s' AND 
%s='%s'",
+                COLUMN_NAME_4,
+                colorId,
+                COLUMN_NAME_1,
+                cityId,
+                COLUMN_NAME_2,
+                regionId,
+                COLUMN_NAME_3,
+                deviceId));
+        LOGGER.info(
+            "Device {} finished, total time cost is {}ms",
+            deviceId,
+            (System.nanoTime() - startTime) / 1_000_000);
+      }
+
+    } catch (IoTDBConnectionException e) {
+      LOGGER.error("Connection error", e);
+      throw new RuntimeException(e);
+    } catch (StatementExecutionException e) {
+      LOGGER.error("Execute error", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static void writeTree(
+      final SessionPool sessionPool, final int deviceNum, final String 
database) {
+
+    try {
+      while (true) {
+        long startTime = System.nanoTime();
+        int device = deviceIdGenerator.getAndIncrement();
+        if (device >= deviceNum) {
+          break;
+        }
+        int city = device % 10;
+        int region = device % 100;
+        //        int color = device % 5;
+
+        String deviceId =
+            String.format(
+                "%s.%s.%s.%s", database, "city_" + city, "region_" + region, 
"d_" + device);
+        Tablet tablet = new Tablet(deviceId, TREE_SCHEMA_LIST, 10000);
+        long roundStartTime = System.nanoTime();
+        for (int i = 0; i < 6 * 60 * 24 * 3; i++) {
+          int rowIndex = tablet.rowSize++;
+          tablet.addTimestamp(rowIndex, START_TIME + i * 10_000L);
+          tablet.addValue(COLUMN_NAME_5, rowIndex, i * 1.0d);
+          tablet.addValue(COLUMN_NAME_6, rowIndex, i * 1.0d);
+          tablet.addValue(COLUMN_NAME_7, rowIndex, i * 1.0d);
+          if (tablet.rowSize == tablet.getMaxRowNumber()) {
+            sessionPool.insertAlignedTablet(tablet);
+            tablet.reset();
+            long writtenRows = i + 1L;
+            if (writtenRows % 10000 == 0) {
+              LOGGER.info(
+                  "Device {} has written {} rows, time cost is {}ms",
+                  deviceId,
+                  writtenRows,
+                  (System.nanoTime() - roundStartTime) / 1_000_000);
+              roundStartTime = System.nanoTime();
+            }
+          }
+        }
+
+        if (tablet.rowSize != 0) {
+          sessionPool.insertAlignedTablet(tablet);
+          tablet.reset();
+        }
+
+        LOGGER.info(
+            "Device {} finished, total time cost is {}ms",
+            deviceId,
+            (System.nanoTime() - startTime) / 1_000_000);
+      }
+
+    } catch (IoTDBConnectionException e) {
+      LOGGER.error("Connection error", e);
+      throw new RuntimeException(e);
+    } catch (StatementExecutionException e) {
+      LOGGER.error("Execute error", e);
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/TableModelStatementMemorySourceVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/TableModelStatementMemorySourceVisitor.java
index d4c5c19a426..f15fb94b3c9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/TableModelStatementMemorySourceVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/TableModelStatementMemorySourceVisitor.java
@@ -75,9 +75,9 @@ public class TableModelStatementMemorySourceVisitor
                 symbolAllocator,
                 NOOP)
             .plan(context.getAnalysis());
-    //    if (context.getAnalysis().isEmptyDataSource()) {
-    //      return new StatementMemorySource(new TsBlock(0), header);
-    //    }
+    if (context.getAnalysis().isEmptyDataSource()) {
+      return new StatementMemorySource(new TsBlock(0), header);
+    }
 
     // Generate table model distributed plan
     final TableDistributedPlanGenerator.PlanContext planContext =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
index 51351f8bea5..74773d53e2a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
@@ -684,8 +684,8 @@ public class Analysis implements IAnalysis {
     this.hasValueFilter = hasValueFilter;
   }
 
-  public boolean hasSortNode() {
-    return hasSortNode;
+  public boolean noSortNode() {
+    return !hasSortNode;
   }
 
   public void setSortNode(boolean hasSortNode) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java
index d1f249010d9..fa3b279058b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java
@@ -45,7 +45,7 @@ public class SortElimination implements PlanOptimizer {
 
   @Override
   public PlanNode optimize(PlanNode plan, PlanOptimizer.Context context) {
-    if (!context.getAnalysis().hasSortNode()) {
+    if (context.getAnalysis().noSortNode()) {
       return plan;
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
index 95f8004d8f8..4ec849eee47 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
@@ -49,7 +49,7 @@ public class TransformSortToStreamSort implements 
PlanOptimizer {
 
   @Override
   public PlanNode optimize(PlanNode plan, PlanOptimizer.Context context) {
-    if (!context.getAnalysis().hasSortNode()) {
+    if (context.getAnalysis().noSortNode()) {
       return plan;
     }
 

Reply via email to