This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 325b8d8fd01 Fix error in MergeSortJoin when some columns exist null
values & Support cross join with filters
325b8d8fd01 is described below
commit 325b8d8fd01f6cb172855eb19d8bf31f35b93054
Author: Beyyes <[email protected]>
AuthorDate: Mon Dec 16 09:22:04 2024 +0800
Fix error in MergeSortJoin when some columns exist null values & Support
cross join with filters
---
.../db/it/IoTDBMultiIDsWithAttributesTableIT.java | 245 ++++++++++++++++++-
.../it/query/recent/IoTDBTableAggregationIT.java | 1 -
.../comparator/AscBinaryTypeJoinKeyComparator.java | 48 ++--
.../AscBooleanTypeJoinKeyComparator.java | 38 ++-
.../comparator/AscDoubleTypeJoinKeyComparator.java | 38 ++-
.../comparator/AscFloatTypeJoinKeyComparator.java | 38 ++-
.../comparator/AscIntTypeJoinKeyComparator.java | 38 ++-
.../comparator/AscLongTypeJoinKeyComparator.java | 38 ++-
.../DescBinaryTypeJoinKeyComparator.java | 48 ++--
.../DescBooleanTypeJoinKeyComparator.java | 38 ++-
.../DescDoubleTypeJoinKeyComparator.java | 38 ++-
.../comparator/DescFloatTypeJoinKeyComparator.java | 38 ++-
.../comparator/DescIntTypeJoinKeyComparator.java | 38 ++-
.../comparator/DescLongTypeJoinKeyComparator.java | 38 ++-
.../join/merge/comparator/JoinKeyComparator.java | 8 +-
.../merge/comparator/JoinKeyComparatorFactory.java | 15 +-
.../relational/AbstractMergeSortJoinOperator.java | 160 +++++++++---
.../relational/MergeSortFullOuterJoinOperator.java | 99 ++++----
.../relational/MergeSortInnerJoinOperator.java | 71 +++---
.../plan/planner/TableOperatorGenerator.java | 109 ++++-----
.../relational/analyzer/StatementAnalyzer.java | 46 +---
.../plan/relational/planner/node/JoinNode.java | 2 -
.../planner/optimizations/JoinUtils.java | 12 +-
.../optimizations/PushPredicateIntoTableScan.java | 135 +++++------
.../optimizations/QueryCardinalityUtil.java | 17 ++
.../plan/relational/analyzer/JoinTest.java | 268 +++++++++++++++++----
.../plan/relational/planner/SubqueryTest.java | 1 +
.../planner/assertions/EquiJoinClauseProvider.java | 4 +-
.../planner/assertions/PlanMatchPattern.java | 2 +-
.../relational/planner/assertions/SymbolAlias.java | 4 +-
30 files changed, 1212 insertions(+), 463 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
index a02889a4777..8bf4728fd18 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
@@ -36,6 +36,7 @@ import java.util.Arrays;
import static org.apache.iotdb.db.it.utils.TestUtils.tableAssertTestFail;
import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.JoinUtils.FULL_JOIN_ONLY_SUPPORT_EQUI_JOIN;
import static org.junit.Assert.fail;
/** In this IT, table has more than one IDs and Attributes. */
@@ -1383,7 +1384,7 @@ public class IoTDBMultiIDsWithAttributesTableIT {
// ==================================================================
// no filter
@Test
- public void innerJoinTest1() {
+ public void selfTimeColumnInnerJoinTest1() {
String[] expectedHeader =
new String[] {"time", "device", "level", "num", "device", "attr2",
"num", "str"};
String[] retArray =
@@ -1420,7 +1421,7 @@ public class IoTDBMultiIDsWithAttributesTableIT {
// has filter
@Test
- public void innerJoinTest2() {
+ public void selfTimeColumnInnerJoinTest2() {
String[] expectedHeader =
new String[] {"time", "device", "level", "t1_num_add", "device",
"attr2", "num", "str"};
String[] retArray =
@@ -1510,7 +1511,7 @@ public class IoTDBMultiIDsWithAttributesTableIT {
// no filter
@Test
- public void fullOuterJoinTest1() {
+ public void timeColumnFullOuterJoinTest1() {
expectedHeader =
new String[] {"time", "device", "level", "num", "device", "attr2",
"num", "str"};
retArray =
@@ -1682,7 +1683,7 @@ public class IoTDBMultiIDsWithAttributesTableIT {
// has filter
@Test
- public void fullOuterJoinTest2() {
+ public void timeColumnFullOuterJoinTest2() {
expectedHeader =
new String[] {"time", "device", "level", "t1_num_add", "device",
"attr2", "num", "str"};
retArray =
@@ -1785,7 +1786,7 @@ public class IoTDBMultiIDsWithAttributesTableIT {
}
@Test
- public void innerJoinTest() {
+ public void twoTableTimeColumnInnerJoinTest() {
expectedHeader = new String[] {"time", "device1", "value1", "device2",
"value2"};
sql =
"SELECT "
@@ -1807,7 +1808,7 @@ public class IoTDBMultiIDsWithAttributesTableIT {
}
@Test
- public void innerJoinOnTwoColumns() {
+ public void innerJoinOnMultiColumns() {
expectedHeader = new String[] {"time", "device1", "value1", "device2",
"value2"};
sql =
"SELECT "
@@ -1826,9 +1827,237 @@ public class IoTDBMultiIDsWithAttributesTableIT {
tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
sql =
- "select table1.s1 from table1 t1 join table2 t2 on t1.time = t2.time
and t1.device = t2.device";
+ "SELECT "
+ + " t1.time, "
+ + " t1.device as device1, "
+ + " t1.value as value1, "
+ + " t2.device as device2, "
+ + " t2.value as value2 "
+ + "FROM "
+ + " tableA t1 cross join tableB t2 "
+ + "where t1.time = t2.time and t1.device = t2.device order by
t1.time";
+ tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
+
+ sql =
+ "SELECT "
+ + " t1.time, "
+ + " t1.device as device1, "
+ + " t1.value as value1, "
+ + " t2.device as device2, "
+ + " t2.value as value2 "
+ + "FROM "
+ + " tableA t1 JOIN tableB t2 "
+ + "ON t1.time = t2.time and t1.device = t2.device order by
t1.time";
+ tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
+
+ expectedHeader = new String[] {"time", "device", "value1", "value2"};
+ sql =
+ "SELECT "
+ + " time, device, "
+ + " t1.value as value1, "
+ + " t2.value as value2 "
+ + "FROM "
+ + " tableA t1 JOIN tableB t2 "
+ + "USING(time, device) ORDER BY time";
+ retArray =
+ new String[] {
+ "2020-01-01T00:00:03.000Z,d1,3,30,",
"2020-01-01T00:00:05.000Z,d2,5,50,",
+ };
+ tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
+
+ expectedHeader = new String[] {"device", "device", "num", "num",
"floatnum", "floatnum"};
+ sql =
+ "select t0.device, t1.device, t0.num, t1.num, t0.floatnum, t1.floatnum
from table0 t0 join table1 t1 on t0.device=t1.device AND t0.attr2=t1.attr2 AND
t0.num>t1.num AND t0.floatnum>t1.floatnum ORDER BY
t0.device,t1.device,t0.num,t1.num";
+ retArray =
+ new String[] {
+ "d1,d1,4,1,213.1,12.123,", "d1,d1,6,3,1231.21,231.2121,",
"d1,d1,14,1,231.34,12.123,",
+ };
+ tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
+
+ expectedHeader = new String[] {"time1", "time2", "device1", "device2"};
+ sql =
+ "select t1.time as time1, t2.time as time2, t1.device as device1,
t2.device as device2 from tablea t1 join tableb t2 "
+ + "on cast(substring(t1.device,2) as int32) =
cast(substring(t2.device,2) as int32)+1 order by time1,time2,device1,device2";
+ retArray =
+ new String[] {
+ "2020-01-01T00:00:05.000Z,2020-01-01T00:00:02.000Z,d2,d1,",
+ "2020-01-01T00:00:05.000Z,2020-01-01T00:00:03.000Z,d2,d1,",
+ "2020-01-01T00:00:07.000Z,2020-01-01T00:00:02.000Z,d2,d1,",
+ "2020-01-01T00:00:07.000Z,2020-01-01T00:00:03.000Z,d2,d1,",
+ };
+ tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
+
+ expectedHeader = new String[] {"attr1", "attr2"};
+ sql =
+ "select t0.attr1,t0.attr2 from table0 t0 join table1 t1 on
t0.attr1=t1.attr1 AND t0.attr2=t1.attr2";
+ retArray =
+ new String[] {
+ "c,d,", "c,d,", "c,d,", "c,d,", "c,d,", "c,d,", "c,d,", "c,d,",
"c,d,", "c,d,", "c,d,",
+ "c,d,", "c,d,", "c,d,", "c,d,", "c,d,", "c,d,", "c,d,", "c,d,",
"c,d,", "c,d,", "t,a,",
+ "t,a,", "t,a,",
+ };
+ tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
+
+ // expectedHeader = new String[] {"device", "device", "attr1", "attr1",
"date", "date"};
+ // sql =
+ // "select t0.device, t1.device, t0.attr1, t1.attr1, t0.date,
t1.date from table0 t0 join
+ // table1 t1 on t0.device=t1.device and t0.attr1=t1.attr1 OR
t0.date=t1.date";
+ // retArray =
+ // new String[] {
+ // "d1,d1,c,c,null,2023-01-01,",
+ // "d1,d1,c,c,null,2023-01-01,",
+ // "d1,d1,c,c,null,2023-01-01,",
+ // "d1,d1,c,c,null,null,",
+ // "d1,d1,c,c,null,null,",
+ // "d1,d1,c,c,null,null,",
+ // "d1,d1,t,t,null,null,",
+ // "d1,d1,t,t,null,null,",
+ // "d1,d1,t,t,null,null,",
+ // "d2,d1,null,c,2023-01-01,2023-01-01,",
+ // };
+ // tableResultSetEqualTest(sql, expectedHeader, retArray,
DATABASE_NAME);
+ }
+
+ @Test
+ public void fullJoinTest() {
+ expectedHeader = new String[] {"date", "date"};
+ sql =
+ "select t0.date, t1.date from table0 t0 full join table1 t1 on
t0.date=t1.date order by t0.date, t1.date";
+ retArray =
+ new String[] {
+ "2022-01-01,null,",
+ "2023-01-01,2023-01-01,",
+ "null,2023-05-01,",
+ "null,2023-10-01,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ "null,null,",
+ };
+ tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
+
+ expectedHeader = new String[] {"attr1", "attr2", "attr1", "attr2"};
+ sql =
+ "select t0.attr1,t0.attr2,t1.attr1,t1.attr2 from table0 t0 full join
table1 t1 on t0.attr1=t1.attr1 AND t0.attr2=t1.attr2 order by
t0.attr1,t0.attr2,t1.attr1,t1.attr2";
+ retArray =
+ new String[] {
+ "c,d,c,d,",
+ "c,d,c,d,",
+ "c,d,c,d,",
+ "c,d,c,d,",
+ "c,d,c,d,",
+ "c,d,c,d,",
+ "c,d,c,d,",
+ "c,d,c,d,",
+ "c,d,c,d,",
+ "c,d,c,d,",
+ "c,d,c,d,",
+ "c,d,c,d,",
+ "c,d,c,d,",
+ "c,d,c,d,",
+ "c,d,c,d,",
+ "c,d,c,d,",
+ "c,d,c,d,",
+ "c,d,c,d,",
+ "c,d,c,d,",
+ "c,d,c,d,",
+ "c,d,c,d,",
+ "d,c,null,null,",
+ "d,c,null,null,",
+ "d,c,null,null,",
+ "t,a,t,a,",
+ "t,a,t,a,",
+ "t,a,t,a,",
+ "vv,null,null,null,",
+ "vv,null,null,null,",
+ "vv,null,null,null,",
+ "yy,zz,null,null,",
+ "yy,zz,null,null,",
+ "yy,zz,null,null,",
+ "null,null,y,z,",
+ "null,null,null,null,",
+ "null,null,null,null,",
+ "null,null,null,null,",
+ "null,null,null,null,",
+ "null,null,null,null,",
+ "null,null,null,null,",
+ "null,null,null,null,",
+ "null,null,null,null,",
+ "null,null,null,null,",
+ "null,null,null,null,",
+ "null,null,null,null,",
+ "null,null,null,null,",
+ "null,null,null,null,",
+ "null,null,null,null,",
+ "null,null,null,null,",
+ "null,null,null,null,",
+ "null,null,null,null,",
+ "null,null,null,null,",
+ "null,null,null,null,",
+ };
+ tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
+ }
+
+ @Test
+ public void exceptionTest() {
+ tableAssertTestFail(
+ "select * from table0 t0 full join table1 t1 on t0.num>t1.num",
+ FULL_JOIN_ONLY_SUPPORT_EQUI_JOIN,
+ DATABASE_NAME);
+
+ tableAssertTestFail(
+ "select * from table0 t0 full join table1 t1 on t0.num!=t1.num",
+ FULL_JOIN_ONLY_SUPPORT_EQUI_JOIN,
+ DATABASE_NAME);
+
+ tableAssertTestFail(
+ "select * from table0 t0 full join table1 t1 on t0.device=t1.device
AND t0.num>t1.num",
+ FULL_JOIN_ONLY_SUPPORT_EQUI_JOIN,
+ DATABASE_NAME);
+
tableAssertTestFail(
- sql, "701: Only support time column equi-join in current version",
DATABASE_NAME);
+ "select * from table0 t0 full join table1 t1 on t0.device=t1.device OR
t0.num>t1.num",
+ FULL_JOIN_ONLY_SUPPORT_EQUI_JOIN,
+ DATABASE_NAME);
+
+ tableAssertTestFail(
+ "select * from table0 t0 full join table1 t1 on t0.device=t1.device OR
t0.time=t1.time",
+ FULL_JOIN_ONLY_SUPPORT_EQUI_JOIN,
+ DATABASE_NAME);
}
public static String[] buildHeaders(int length) {
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java
index 09ead40a6b6..ce46b910e9d 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java
@@ -3718,7 +3718,6 @@ public class IoTDBTableAggregationIT {
"select s1 from table1 where s2 in (select s2 from table1)",
"Not a valid IR expression",
DATABASE_NAME);
-
tableAssertTestFail(
"select avg() from table1",
"701: Aggregate functions [avg] should only have one argument",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/AscBinaryTypeJoinKeyComparator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/AscBinaryTypeJoinKeyComparator.java
index 9507bec8669..9a0f9714261 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/AscBinaryTypeJoinKeyComparator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/AscBinaryTypeJoinKeyComparator.java
@@ -21,6 +21,8 @@ package
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.co
import org.apache.tsfile.read.common.block.TsBlock;
+import java.util.Optional;
+
public class AscBinaryTypeJoinKeyComparator implements JoinKeyComparator {
private static final AscBinaryTypeJoinKeyComparator INSTANCE =
@@ -35,43 +37,61 @@ public class AscBinaryTypeJoinKeyComparator implements
JoinKeyComparator {
}
@Override
- public boolean lessThan(
+ public Optional<Boolean> lessThan(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex)
- .getBinary(leftRowIndex)
-
.compareTo(right.getColumn(rightColumnIndex).getBinary(rightRowIndex))
- < 0;
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex)
+ .getBinary(leftRowIndex)
+
.compareTo(right.getColumn(rightColumnIndex).getBinary(rightRowIndex))
+ < 0);
}
@Override
- public boolean equalsTo(
+ public Optional<Boolean> equalsTo(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex)
- .getBinary(leftRowIndex)
- .equals(right.getColumn(rightColumnIndex).getBinary(rightRowIndex));
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex)
+ .getBinary(leftRowIndex)
+
.equals(right.getColumn(rightColumnIndex).getBinary(rightRowIndex)));
}
@Override
- public boolean lessThanOrEqual(
+ public Optional<Boolean> lessThanOrEqual(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex)
- .getBinary(leftRowIndex)
-
.compareTo(right.getColumn(rightColumnIndex).getBinary(rightRowIndex))
- <= 0;
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex)
+ .getBinary(leftRowIndex)
+
.compareTo(right.getColumn(rightColumnIndex).getBinary(rightRowIndex))
+ <= 0);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/AscBooleanTypeJoinKeyComparator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/AscBooleanTypeJoinKeyComparator.java
index 29edcca65d6..9203625c69a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/AscBooleanTypeJoinKeyComparator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/AscBooleanTypeJoinKeyComparator.java
@@ -21,6 +21,8 @@ package
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.co
import org.apache.tsfile.read.common.block.TsBlock;
+import java.util.Optional;
+
// Not sure about whether we need this type of Comparator
public class AscBooleanTypeJoinKeyComparator implements JoinKeyComparator {
@@ -34,39 +36,57 @@ public class AscBooleanTypeJoinKeyComparator implements
JoinKeyComparator {
}
@Override
- public boolean lessThan(
+ public Optional<Boolean> lessThan(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return
transformBooleanToInt(left.getColumn(leftColumnIndex).getBoolean(leftRowIndex))
- <
transformBooleanToInt(right.getColumn(rightColumnIndex).getBoolean(rightRowIndex));
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+
transformBooleanToInt(left.getColumn(leftColumnIndex).getBoolean(leftRowIndex))
+ <
transformBooleanToInt(right.getColumn(rightColumnIndex).getBoolean(rightRowIndex)));
}
@Override
- public boolean equalsTo(
+ public Optional<Boolean> equalsTo(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex).getBoolean(leftRowIndex)
- == right.getColumn(rightColumnIndex).getBoolean(rightRowIndex);
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex).getBoolean(leftRowIndex)
+ == right.getColumn(rightColumnIndex).getBoolean(rightRowIndex));
}
@Override
- public boolean lessThanOrEqual(
+ public Optional<Boolean> lessThanOrEqual(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return
transformBooleanToInt(left.getColumn(leftColumnIndex).getBoolean(leftRowIndex))
- <=
transformBooleanToInt(right.getColumn(rightColumnIndex).getBoolean(rightRowIndex));
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+
transformBooleanToInt(left.getColumn(leftColumnIndex).getBoolean(leftRowIndex))
+ <=
transformBooleanToInt(right.getColumn(rightColumnIndex).getBoolean(rightRowIndex)));
}
private int transformBooleanToInt(boolean value) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/AscDoubleTypeJoinKeyComparator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/AscDoubleTypeJoinKeyComparator.java
index e464a14580e..0586fa6a53d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/AscDoubleTypeJoinKeyComparator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/AscDoubleTypeJoinKeyComparator.java
@@ -21,6 +21,8 @@ package
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.co
import org.apache.tsfile.read.common.block.TsBlock;
+import java.util.Optional;
+
public class AscDoubleTypeJoinKeyComparator implements JoinKeyComparator {
private static final AscDoubleTypeJoinKeyComparator INSTANCE =
@@ -35,38 +37,56 @@ public class AscDoubleTypeJoinKeyComparator implements
JoinKeyComparator {
}
@Override
- public boolean lessThan(
+ public Optional<Boolean> lessThan(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex).getDouble(leftRowIndex)
- < right.getColumn(rightColumnIndex).getDouble(rightRowIndex);
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex).getDouble(leftRowIndex)
+ < right.getColumn(rightColumnIndex).getDouble(rightRowIndex));
}
@Override
- public boolean equalsTo(
+ public Optional<Boolean> equalsTo(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex).getDouble(leftRowIndex)
- == right.getColumn(rightColumnIndex).getDouble(rightRowIndex);
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex).getDouble(leftRowIndex)
+ == right.getColumn(rightColumnIndex).getDouble(rightRowIndex));
}
@Override
- public boolean lessThanOrEqual(
+ public Optional<Boolean> lessThanOrEqual(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex).getDouble(leftRowIndex)
- <= right.getColumn(rightColumnIndex).getDouble(rightRowIndex);
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex).getDouble(leftRowIndex)
+ <= right.getColumn(rightColumnIndex).getDouble(rightRowIndex));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/AscFloatTypeJoinKeyComparator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/AscFloatTypeJoinKeyComparator.java
index 3d39d0cff16..351cb0599db 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/AscFloatTypeJoinKeyComparator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/AscFloatTypeJoinKeyComparator.java
@@ -21,6 +21,8 @@ package
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.co
import org.apache.tsfile.read.common.block.TsBlock;
+import java.util.Optional;
+
public class AscFloatTypeJoinKeyComparator implements JoinKeyComparator {
private static final AscFloatTypeJoinKeyComparator INSTANCE = new
AscFloatTypeJoinKeyComparator();
@@ -34,38 +36,56 @@ public class AscFloatTypeJoinKeyComparator implements
JoinKeyComparator {
}
@Override
- public boolean lessThan(
+ public Optional<Boolean> lessThan(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex).getFloat(leftRowIndex)
- < right.getColumn(rightColumnIndex).getFloat(rightRowIndex);
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex).getFloat(leftRowIndex)
+ < right.getColumn(rightColumnIndex).getFloat(rightRowIndex));
}
@Override
- public boolean equalsTo(
+ public Optional<Boolean> equalsTo(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex).getFloat(leftRowIndex)
- == right.getColumn(rightColumnIndex).getFloat(rightRowIndex);
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex).getFloat(leftRowIndex)
+ == right.getColumn(rightColumnIndex).getFloat(rightRowIndex));
}
@Override
- public boolean lessThanOrEqual(
+ public Optional<Boolean> lessThanOrEqual(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex).getFloat(leftRowIndex)
- <= right.getColumn(rightColumnIndex).getFloat(rightRowIndex);
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex).getFloat(leftRowIndex)
+ <= right.getColumn(rightColumnIndex).getFloat(rightRowIndex));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/AscIntTypeJoinKeyComparator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/AscIntTypeJoinKeyComparator.java
index 50f87d7cffd..99b9d23dd94 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/AscIntTypeJoinKeyComparator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/AscIntTypeJoinKeyComparator.java
@@ -21,6 +21,8 @@ package
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.co
import org.apache.tsfile.read.common.block.TsBlock;
+import java.util.Optional;
+
public class AscIntTypeJoinKeyComparator implements JoinKeyComparator {
private static final AscIntTypeJoinKeyComparator INSTANCE = new
AscIntTypeJoinKeyComparator();
@@ -34,38 +36,56 @@ public class AscIntTypeJoinKeyComparator implements
JoinKeyComparator {
}
@Override
- public boolean lessThan(
+ public Optional<Boolean> lessThan(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex).getInt(leftRowIndex)
- < right.getColumn(rightColumnIndex).getInt(rightRowIndex);
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex).getInt(leftRowIndex)
+ < right.getColumn(rightColumnIndex).getInt(rightRowIndex));
}
@Override
- public boolean equalsTo(
+ public Optional<Boolean> equalsTo(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex).getInt(leftRowIndex)
- == right.getColumn(rightColumnIndex).getInt(rightRowIndex);
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex).getInt(leftRowIndex)
+ == right.getColumn(rightColumnIndex).getInt(rightRowIndex));
}
@Override
- public boolean lessThanOrEqual(
+ public Optional<Boolean> lessThanOrEqual(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex).getInt(leftRowIndex)
- <= right.getColumn(rightColumnIndex).getInt(rightRowIndex);
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex).getInt(leftRowIndex)
+ <= right.getColumn(rightColumnIndex).getInt(rightRowIndex));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/AscLongTypeJoinKeyComparator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/AscLongTypeJoinKeyComparator.java
index 935d4fe61d1..793c3f2a1e6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/AscLongTypeJoinKeyComparator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/AscLongTypeJoinKeyComparator.java
@@ -21,6 +21,8 @@ package
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.co
import org.apache.tsfile.read.common.block.TsBlock;
+import java.util.Optional;
+
public class AscLongTypeJoinKeyComparator implements JoinKeyComparator {
private static final AscLongTypeJoinKeyComparator INSTANCE = new
AscLongTypeJoinKeyComparator();
@@ -34,38 +36,56 @@ public class AscLongTypeJoinKeyComparator implements
JoinKeyComparator {
}
@Override
- public boolean lessThan(
+ public Optional<Boolean> lessThan(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex).getLong(leftRowIndex)
- < right.getColumn(rightColumnIndex).getLong(rightRowIndex);
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex).getLong(leftRowIndex)
+ < right.getColumn(rightColumnIndex).getLong(rightRowIndex));
}
@Override
- public boolean equalsTo(
+ public Optional<Boolean> equalsTo(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex).getLong(leftRowIndex)
- == right.getColumn(rightColumnIndex).getLong(rightRowIndex);
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex).getLong(leftRowIndex)
+ == right.getColumn(rightColumnIndex).getLong(rightRowIndex));
}
@Override
- public boolean lessThanOrEqual(
+ public Optional<Boolean> lessThanOrEqual(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex).getLong(leftRowIndex)
- <= right.getColumn(rightColumnIndex).getLong(rightRowIndex);
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex).getLong(leftRowIndex)
+ <= right.getColumn(rightColumnIndex).getLong(rightRowIndex));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/DescBinaryTypeJoinKeyComparator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/DescBinaryTypeJoinKeyComparator.java
index effc243137a..b1ee6d5c4af 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/DescBinaryTypeJoinKeyComparator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/DescBinaryTypeJoinKeyComparator.java
@@ -21,6 +21,8 @@ package
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.co
import org.apache.tsfile.read.common.block.TsBlock;
+import java.util.Optional;
+
public class DescBinaryTypeJoinKeyComparator implements JoinKeyComparator {
private static final DescBinaryTypeJoinKeyComparator INSTANCE =
@@ -35,43 +37,61 @@ public class DescBinaryTypeJoinKeyComparator implements
JoinKeyComparator {
}
@Override
- public boolean lessThan(
+ public Optional<Boolean> lessThan(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex)
- .getBinary(leftRowIndex)
-
.compareTo(right.getColumn(rightColumnIndex).getBinary(rightRowIndex))
- > 0;
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex)
+ .getBinary(leftRowIndex)
+
.compareTo(right.getColumn(rightColumnIndex).getBinary(rightRowIndex))
+ > 0);
}
@Override
- public boolean equalsTo(
+ public Optional<Boolean> equalsTo(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex)
- .getBinary(leftRowIndex)
- .equals(right.getColumn(rightColumnIndex).getBinary(rightRowIndex));
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex)
+ .getBinary(leftRowIndex)
+
.equals(right.getColumn(rightColumnIndex).getBinary(rightRowIndex)));
}
@Override
- public boolean lessThanOrEqual(
+ public Optional<Boolean> lessThanOrEqual(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex)
- .getBinary(leftRowIndex)
-
.compareTo(right.getColumn(rightColumnIndex).getBinary(rightRowIndex))
- >= 0;
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex)
+ .getBinary(leftRowIndex)
+
.compareTo(right.getColumn(rightColumnIndex).getBinary(rightRowIndex))
+ >= 0);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/DescBooleanTypeJoinKeyComparator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/DescBooleanTypeJoinKeyComparator.java
index aeb047177fd..2402574265c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/DescBooleanTypeJoinKeyComparator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/DescBooleanTypeJoinKeyComparator.java
@@ -21,6 +21,8 @@ package
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.co
import org.apache.tsfile.read.common.block.TsBlock;
+import java.util.Optional;
+
public class DescBooleanTypeJoinKeyComparator implements JoinKeyComparator {
private static final DescBooleanTypeJoinKeyComparator INSTANCE =
@@ -33,39 +35,57 @@ public class DescBooleanTypeJoinKeyComparator implements
JoinKeyComparator {
}
@Override
- public boolean lessThan(
+ public Optional<Boolean> lessThan(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return
transformBooleanToInt(left.getColumn(leftColumnIndex).getBoolean(leftRowIndex))
- >
transformBooleanToInt(right.getColumn(rightColumnIndex).getBoolean(rightRowIndex));
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+
transformBooleanToInt(left.getColumn(leftColumnIndex).getBoolean(leftRowIndex))
+ >
transformBooleanToInt(right.getColumn(rightColumnIndex).getBoolean(rightRowIndex)));
}
@Override
- public boolean equalsTo(
+ public Optional<Boolean> equalsTo(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex).getBoolean(leftRowIndex)
- == right.getColumn(rightColumnIndex).getBoolean(rightRowIndex);
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex).getBoolean(leftRowIndex)
+ == right.getColumn(rightColumnIndex).getBoolean(rightRowIndex));
}
@Override
- public boolean lessThanOrEqual(
+ public Optional<Boolean> lessThanOrEqual(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return
transformBooleanToInt(left.getColumn(leftColumnIndex).getBoolean(leftRowIndex))
- >=
transformBooleanToInt(right.getColumn(rightColumnIndex).getBoolean(rightRowIndex));
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+
transformBooleanToInt(left.getColumn(leftColumnIndex).getBoolean(leftRowIndex))
+ >=
transformBooleanToInt(right.getColumn(rightColumnIndex).getBoolean(rightRowIndex)));
}
private int transformBooleanToInt(boolean value) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/DescDoubleTypeJoinKeyComparator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/DescDoubleTypeJoinKeyComparator.java
index b841810f7df..0c20b7d45f8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/DescDoubleTypeJoinKeyComparator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/DescDoubleTypeJoinKeyComparator.java
@@ -21,6 +21,8 @@ package
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.co
import org.apache.tsfile.read.common.block.TsBlock;
+import java.util.Optional;
+
public class DescDoubleTypeJoinKeyComparator implements JoinKeyComparator {
private static final DescDoubleTypeJoinKeyComparator INSTANCE =
@@ -35,38 +37,56 @@ public class DescDoubleTypeJoinKeyComparator implements
JoinKeyComparator {
}
@Override
- public boolean lessThan(
+ public Optional<Boolean> lessThan(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex).getDouble(leftRowIndex)
- > right.getColumn(rightColumnIndex).getDouble(rightRowIndex);
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex).getDouble(leftRowIndex)
+ > right.getColumn(rightColumnIndex).getDouble(rightRowIndex));
}
@Override
- public boolean equalsTo(
+ public Optional<Boolean> equalsTo(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex).getDouble(leftRowIndex)
- == right.getColumn(rightColumnIndex).getDouble(rightRowIndex);
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex).getDouble(leftRowIndex)
+ == right.getColumn(rightColumnIndex).getDouble(rightRowIndex));
}
@Override
- public boolean lessThanOrEqual(
+ public Optional<Boolean> lessThanOrEqual(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex).getDouble(leftRowIndex)
- >= right.getColumn(rightColumnIndex).getDouble(rightRowIndex);
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex).getDouble(leftRowIndex)
+ >= right.getColumn(rightColumnIndex).getDouble(rightRowIndex));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/DescFloatTypeJoinKeyComparator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/DescFloatTypeJoinKeyComparator.java
index 6f7d9be22de..de835133f00 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/DescFloatTypeJoinKeyComparator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/DescFloatTypeJoinKeyComparator.java
@@ -21,6 +21,8 @@ package
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.co
import org.apache.tsfile.read.common.block.TsBlock;
+import java.util.Optional;
+
public class DescFloatTypeJoinKeyComparator implements JoinKeyComparator {
private static final DescFloatTypeJoinKeyComparator INSTANCE =
@@ -35,38 +37,56 @@ public class DescFloatTypeJoinKeyComparator implements
JoinKeyComparator {
}
@Override
- public boolean lessThan(
+ public Optional<Boolean> lessThan(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex).getFloat(leftRowIndex)
- > right.getColumn(rightColumnIndex).getFloat(rightRowIndex);
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex).getFloat(leftRowIndex)
+ > right.getColumn(rightColumnIndex).getFloat(rightRowIndex));
}
@Override
- public boolean equalsTo(
+ public Optional<Boolean> equalsTo(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex).getFloat(leftRowIndex)
- == right.getColumn(rightColumnIndex).getFloat(rightRowIndex);
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex).getFloat(leftRowIndex)
+ == right.getColumn(rightColumnIndex).getFloat(rightRowIndex));
}
@Override
- public boolean lessThanOrEqual(
+ public Optional<Boolean> lessThanOrEqual(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex).getFloat(leftRowIndex)
- >= right.getColumn(rightColumnIndex).getFloat(rightRowIndex);
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex).getFloat(leftRowIndex)
+ >= right.getColumn(rightColumnIndex).getFloat(rightRowIndex));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/DescIntTypeJoinKeyComparator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/DescIntTypeJoinKeyComparator.java
index 246af1e5ca6..f3d5fbadd29 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/DescIntTypeJoinKeyComparator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/DescIntTypeJoinKeyComparator.java
@@ -21,6 +21,8 @@ package
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.co
import org.apache.tsfile.read.common.block.TsBlock;
+import java.util.Optional;
+
public class DescIntTypeJoinKeyComparator implements JoinKeyComparator {
private static final DescIntTypeJoinKeyComparator INSTANCE = new
DescIntTypeJoinKeyComparator();
@@ -34,38 +36,56 @@ public class DescIntTypeJoinKeyComparator implements
JoinKeyComparator {
}
@Override
- public boolean lessThan(
+ public Optional<Boolean> lessThan(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex).getInt(leftRowIndex)
- > right.getColumn(rightColumnIndex).getInt(rightRowIndex);
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex).getInt(leftRowIndex)
+ > right.getColumn(rightColumnIndex).getInt(rightRowIndex));
}
@Override
- public boolean equalsTo(
+ public Optional<Boolean> equalsTo(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex).getInt(leftRowIndex)
- == right.getColumn(rightColumnIndex).getInt(rightRowIndex);
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex).getInt(leftRowIndex)
+ == right.getColumn(rightColumnIndex).getInt(rightRowIndex));
}
@Override
- public boolean lessThanOrEqual(
+ public Optional<Boolean> lessThanOrEqual(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex).getInt(leftRowIndex)
- >= right.getColumn(rightColumnIndex).getInt(rightRowIndex);
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex).getInt(leftRowIndex)
+ >= right.getColumn(rightColumnIndex).getInt(rightRowIndex));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/DescLongTypeJoinKeyComparator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/DescLongTypeJoinKeyComparator.java
index 7ea67c147ac..44849b9a154 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/DescLongTypeJoinKeyComparator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/DescLongTypeJoinKeyComparator.java
@@ -21,6 +21,8 @@ package
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.co
import org.apache.tsfile.read.common.block.TsBlock;
+import java.util.Optional;
+
public class DescLongTypeJoinKeyComparator implements JoinKeyComparator {
private static final DescLongTypeJoinKeyComparator INSTANCE = new
DescLongTypeJoinKeyComparator();
@@ -34,38 +36,56 @@ public class DescLongTypeJoinKeyComparator implements
JoinKeyComparator {
}
@Override
- public boolean lessThan(
+ public Optional<Boolean> lessThan(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex).getLong(leftRowIndex)
- > right.getColumn(rightColumnIndex).getLong(rightRowIndex);
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex).getLong(leftRowIndex)
+ > right.getColumn(rightColumnIndex).getLong(rightRowIndex));
}
@Override
- public boolean equalsTo(
+ public Optional<Boolean> equalsTo(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex).getLong(leftRowIndex)
- == right.getColumn(rightColumnIndex).getLong(rightRowIndex);
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex).getLong(leftRowIndex)
+ == right.getColumn(rightColumnIndex).getLong(rightRowIndex));
}
@Override
- public boolean lessThanOrEqual(
+ public Optional<Boolean> lessThanOrEqual(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
TsBlock right,
int rightColumnIndex,
int rightRowIndex) {
- return left.getColumn(leftColumnIndex).getLong(leftRowIndex)
- >= right.getColumn(rightColumnIndex).getLong(rightRowIndex);
+ if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
+ || right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ left.getColumn(leftColumnIndex).getLong(leftRowIndex)
+ >= right.getColumn(rightColumnIndex).getLong(rightRowIndex));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/JoinKeyComparator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/JoinKeyComparator.java
index cc47419376a..2857f545e1a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/JoinKeyComparator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/JoinKeyComparator.java
@@ -21,13 +21,15 @@ package
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.co
import org.apache.tsfile.read.common.block.TsBlock;
+import java.util.Optional;
+
public interface JoinKeyComparator {
/**
* Get values at the given position from the TsBlocks and then compare these
two values. Return
* true if the left value is less than the right value.
*/
- boolean lessThan(
+ Optional<Boolean> lessThan(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
@@ -39,7 +41,7 @@ public interface JoinKeyComparator {
* Get values at the given position from the TsBlocks and then compare these
two values. Return
* true if the left value equals to the right value.
*/
- boolean equalsTo(
+ Optional<Boolean> equalsTo(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
@@ -51,7 +53,7 @@ public interface JoinKeyComparator {
* Get values at the given position from the TsBlocks and then compare these
two values. Return
* true if the left value is less than or equals to the right value.
*/
- boolean lessThanOrEqual(
+ Optional<Boolean> lessThanOrEqual(
TsBlock left,
int leftColumnIndex,
int leftRowIndex,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/JoinKeyComparatorFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/JoinKeyComparatorFactory.java
index 995d568802c..a64ff35257d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/JoinKeyComparatorFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/JoinKeyComparatorFactory.java
@@ -21,9 +21,22 @@ package
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.co
import org.apache.tsfile.read.common.type.Type;
+import java.util.ArrayList;
+import java.util.List;
+
public class JoinKeyComparatorFactory {
+ private JoinKeyComparatorFactory() {}
+
+ public static List<JoinKeyComparator> getComparators(
+ List<Type> joinKeyTypes, boolean isAscending) {
+ List<JoinKeyComparator> comparators = new ArrayList<>(joinKeyTypes.size());
+ for (Type joinKeyType : joinKeyTypes) {
+ comparators.add(getComparator(joinKeyType, isAscending));
+ }
+ return comparators;
+ }
- public static JoinKeyComparator getComparator(Type type, boolean
isAscending) {
+ private static JoinKeyComparator getComparator(Type type, boolean
isAscending) {
switch (type.getTypeEnum()) {
case INT32:
case DATE:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractMergeSortJoinOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractMergeSortJoinOperator.java
index fed9c0b6501..aad19a4a6a1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractMergeSortJoinOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractMergeSortJoinOperator.java
@@ -31,7 +31,6 @@ import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.block.TsBlockBuilder;
import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
-import org.apache.tsfile.read.common.type.Type;
import java.util.ArrayList;
import java.util.List;
@@ -46,24 +45,22 @@ public abstract class AbstractMergeSortJoinOperator extends
AbstractOperator {
protected final Operator leftChild;
protected TsBlock leftBlock;
protected int leftIndex; // start index of leftTsBlock
- protected final int leftJoinKeyPosition;
+ protected final int[] leftJoinKeyPositions;
protected final int[] leftOutputSymbolIdx;
protected boolean rightFinished;
protected final Operator rightChild;
protected List<TsBlock> rightBlockList = new ArrayList<>();
- protected final int rightJoinKeyPosition;
+ protected final int[] rightJoinKeyPositions;
protected int rightBlockListIdx;
protected int rightIndex; // start index of rightTsBlock
protected final int[] rightOutputSymbolIdx;
protected TsBlock cachedNextRightBlock; // next candidate right block after
rightBlockList
protected boolean rightConsumedUp = false; // if all data of right child are
consumed up
- protected final JoinKeyComparator comparator;
+ protected final List<JoinKeyComparator> comparators;
protected final TsBlockBuilder resultBuilder;
- protected final Type joinKeyType;
-
protected final MemoryReservationManager memoryReservationManager;
protected long maxUsedMemory;
@@ -72,23 +69,21 @@ public abstract class AbstractMergeSortJoinOperator extends
AbstractOperator {
protected AbstractMergeSortJoinOperator(
OperatorContext operatorContext,
Operator leftChild,
- int leftJoinKeyPosition,
+ int[] leftJoinKeyPositions,
int[] leftOutputSymbolIdx,
Operator rightChild,
- int rightJoinKeyPosition,
+ int[] rightJoinKeyPositions,
int[] rightOutputSymbolIdx,
- JoinKeyComparator comparator,
- List<TSDataType> dataTypes,
- Type joinKeyType) {
+ List<JoinKeyComparator> comparators,
+ List<TSDataType> dataTypes) {
this.operatorContext = operatorContext;
this.leftChild = leftChild;
- this.leftJoinKeyPosition = leftJoinKeyPosition;
+ this.leftJoinKeyPositions = leftJoinKeyPositions;
this.leftOutputSymbolIdx = leftOutputSymbolIdx;
this.rightChild = rightChild;
- this.rightJoinKeyPosition = rightJoinKeyPosition;
+ this.rightJoinKeyPositions = rightJoinKeyPositions;
this.rightOutputSymbolIdx = rightOutputSymbolIdx;
- this.comparator = comparator;
- this.joinKeyType = joinKeyType;
+ this.comparators = comparators;
this.memoryReservationManager =
operatorContext
@@ -192,45 +187,69 @@ public abstract class AbstractMergeSortJoinOperator
extends AbstractOperator {
rightIndex = 0;
}
+ protected boolean currentLeftHasNullValue() {
+ for (int leftJoinKeyPosition : leftJoinKeyPositions) {
+ if (leftBlock.getColumn(leftJoinKeyPosition).isNull(leftIndex)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ protected boolean currentRightHasNullValue() {
+ for (int rightJoinKeyPosition : rightJoinKeyPositions) {
+ if (rightBlockList
+ .get(rightBlockListIdx)
+ .getColumn(rightJoinKeyPosition)
+ .isNull(rightIndex)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ // check if the last value of the right is less than left
protected boolean allRightLessThanLeft() {
- // check if the last value of the right is less than left
- return comparator.lessThan(
+ return lessThan(
rightBlockList.get(rightBlockList.size() - 1),
- rightJoinKeyPosition,
+ rightJoinKeyPositions,
rightBlockList.get(rightBlockList.size() - 1).getPositionCount() - 1,
leftBlock,
- leftJoinKeyPosition,
+ leftJoinKeyPositions,
leftIndex);
}
+ // check if the last value of the left is less than right
protected boolean allLeftLessThanRight() {
- return comparator.lessThan(
+ return lessThan(
leftBlock,
- leftJoinKeyPosition,
+ leftJoinKeyPositions,
leftBlock.getPositionCount() - 1,
rightBlockList.get(rightBlockListIdx),
- rightJoinKeyPosition,
+ rightJoinKeyPositions,
rightIndex);
}
/**
+ * Examine if stop this round and rebuild rightBlockLists.
+ *
* @return true if last value of rightBlockList.get(0) is less than current
left or current right
- * value. Need stop the round and rebuild rightBlockLists.
+ * value.
*/
protected boolean currentRoundNeedStop() {
- if (comparator.lessThan(
+ if (lessThan(
rightBlockList.get(0),
- rightJoinKeyPosition,
+ rightJoinKeyPositions,
rightBlockList.get(0).getPositionCount() - 1,
rightBlockList.get(rightBlockListIdx),
- rightJoinKeyPosition,
+ rightJoinKeyPositions,
rightIndex)
- || comparator.lessThan(
+ || lessThan(
rightBlockList.get(0),
- rightJoinKeyPosition,
+ rightJoinKeyPositions,
rightBlockList.get(0).getPositionCount() - 1,
leftBlock,
- leftJoinKeyPosition,
+ leftJoinKeyPositions,
leftIndex)) {
for (int i = 0; i < rightBlockListIdx; i++) {
long size = rightBlockList.get(i).getRetainedSizeInBytes();
@@ -246,7 +265,19 @@ public abstract class AbstractMergeSortJoinOperator
extends AbstractOperator {
}
/**
- * @return true if right block is consumed up
+ * @return true if current left block is consumed up
+ */
+ protected boolean leftFinishedWithIncIndex() {
+ leftIndex++;
+ if (leftIndex >= leftBlock.getPositionCount()) {
+ resetLeftBlock();
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * @return true if current right block is consumed up
*/
protected boolean rightFinishedWithIncIndex() {
rightIndex++;
@@ -303,12 +334,12 @@ public abstract class AbstractMergeSortJoinOperator
extends AbstractOperator {
// if first value of block equals to last value of
rightBlockList.get(0), append this block
// to
// rightBlockList
- if (comparator.equalsTo(
+ if (equalsTo(
block,
- rightJoinKeyPosition,
+ rightJoinKeyPositions,
0,
rightBlockList.get(0),
- rightJoinKeyPosition,
+ rightJoinKeyPositions,
rightBlockList.get(0).getPositionCount() - 1)) {
addRightBlockWithMemoryReservation(block);
} else {
@@ -353,12 +384,12 @@ public abstract class AbstractMergeSortJoinOperator
extends AbstractOperator {
int tmpBlockIdx = rightBlockListIdx;
int tmpIdx = rightIndex;
boolean hasMatched = false;
- while (comparator.equalsTo(
+ while (equalsTo(
leftBlock,
- leftJoinKeyPosition,
+ leftJoinKeyPositions,
leftIndex,
rightBlockList.get(tmpBlockIdx),
- rightJoinKeyPosition,
+ rightJoinKeyPositions,
tmpIdx)) {
hasMatched = true;
recordsWhenDataMatches();
@@ -377,6 +408,63 @@ public abstract class AbstractMergeSortJoinOperator
extends AbstractOperator {
return hasMatched;
}
+ protected boolean lessThan(
+ TsBlock leftBlock,
+ int[] leftPositions,
+ int lIndex,
+ TsBlock rightBlock,
+ int[] rightPositions,
+ int rIndex) {
+ return examineLessThan(leftBlock, leftPositions, lIndex, rightBlock,
rightPositions, rIndex);
+ }
+
+ // examine lessThan( L: [a, b], R[a', b'])
+ // if a < a' ==> L < R
+ // else
+ // if a == a', continue examine if b < b'
+ // else ==> L > R
+ protected boolean examineLessThan(
+ TsBlock leftBlock,
+ int[] leftPositions,
+ int lIndex,
+ TsBlock rightBlock,
+ int[] rightPositions,
+ int rIndex) {
+ for (int i = 0; i < comparators.size(); i++) {
+ if (comparators
+ .get(i)
+ .lessThan(leftBlock, leftPositions[i], lIndex, rightBlock,
rightPositions[i], rIndex)
+ .orElse(false)) {
+ return true;
+ } else if (!comparators
+ .get(i)
+ .equalsTo(leftBlock, leftPositions[i], lIndex, rightBlock,
rightPositions[i], rIndex)
+ .orElse(false)) {
+ return false;
+ }
+ }
+
+ return false;
+ }
+
+ protected boolean equalsTo(
+ TsBlock leftBlock,
+ int[] leftPositions,
+ int lIndex,
+ TsBlock rightBlock,
+ int[] rightPositions,
+ int rIndex) {
+ for (int i = 0; i < comparators.size(); i++) {
+ if (!comparators
+ .get(i)
+ .equalsTo(leftBlock, leftPositions[i], lIndex, rightBlock,
rightPositions[i], rIndex)
+ .orElse(false)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
protected void appendLeftBlockData(
int[] leftOutputSymbolIdx, TsBlockBuilder resultBuilder, TsBlock
leftBlock, int leftIndex) {
for (int i = 0; i < leftOutputSymbolIdx.length; i++) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MergeSortFullOuterJoinOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MergeSortFullOuterJoinOperator.java
index 3433eb1c81a..337d90eebac 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MergeSortFullOuterJoinOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MergeSortFullOuterJoinOperator.java
@@ -28,45 +28,48 @@ import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.TsBlock;
-import org.apache.tsfile.read.common.type.Type;
import org.apache.tsfile.utils.RamUsageEstimator;
import java.util.List;
import java.util.function.BiFunction;
+import static
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE;
+
public class MergeSortFullOuterJoinOperator extends
AbstractMergeSortJoinOperator {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(MergeSortFullOuterJoinOperator.class);
- private final BiFunction<Column, Integer, TsBlock>
updateLastMatchedRowFunction;
-
// stores last row matched join criteria, only used in outer join
private TsBlock lastMatchedRightBlock = null;
+ private final int[] lastMatchedBlockPositions;
+ private final List<BiFunction<Column, Integer, Column>>
updateLastMatchedRowFunctions;
public MergeSortFullOuterJoinOperator(
OperatorContext operatorContext,
Operator leftChild,
- int leftJoinKeyPosition,
+ int[] leftJoinKeyPositions,
int[] leftOutputSymbolIdx,
Operator rightChild,
- int rightJoinKeyPosition,
+ int[] rightJoinKeyPositions,
int[] rightOutputSymbolIdx,
- JoinKeyComparator joinKeyComparator,
+ List<JoinKeyComparator> joinKeyComparators,
List<TSDataType> dataTypes,
- Type joinKeyType,
- BiFunction<Column, Integer, TsBlock> updateLastMatchedRowFunction) {
+ List<BiFunction<Column, Integer, Column>> updateLastMatchedRowFunctions)
{
super(
operatorContext,
leftChild,
- leftJoinKeyPosition,
+ leftJoinKeyPositions,
leftOutputSymbolIdx,
rightChild,
- rightJoinKeyPosition,
+ rightJoinKeyPositions,
rightOutputSymbolIdx,
- joinKeyComparator,
- dataTypes,
- joinKeyType);
- this.updateLastMatchedRowFunction = updateLastMatchedRowFunction;
+ joinKeyComparators,
+ dataTypes);
+ lastMatchedBlockPositions = new int[joinKeyComparators.size()];
+ for (int i = 0; i < lastMatchedBlockPositions.length; i++) {
+ lastMatchedBlockPositions[i] = i;
+ }
+ this.updateLastMatchedRowFunctions = updateLastMatchedRowFunctions;
}
@Override
@@ -112,24 +115,31 @@ public class MergeSortFullOuterJoinOperator extends
AbstractMergeSortJoinOperato
return true;
}
+ // if exist NULL values in right, just output this row with empty left
+ while (currentRightHasNullValue()) {
+ appendOneRightRowWithEmptyLeft();
+ if (rightFinishedWithIncIndex()) {
+ return true;
+ }
+ }
// continue right < left, until right >= left
- while (comparator.lessThan(
+ while (lessThan(
rightBlockList.get(rightBlockListIdx),
- rightJoinKeyPosition,
+ rightJoinKeyPositions,
rightIndex,
leftBlock,
- leftJoinKeyPosition,
+ leftJoinKeyPositions,
leftIndex)) {
if (lastMatchedRightBlock == null) {
appendOneRightRowWithEmptyLeft();
} else {
// CurrentRight can only be greater than or equal to lastMatchedRight.
- if (comparator.lessThan(
+ if (!equalsTo(
lastMatchedRightBlock,
- 0,
+ lastMatchedBlockPositions,
0,
rightBlockList.get(rightBlockListIdx),
- rightJoinKeyPosition,
+ rightJoinKeyPositions,
rightIndex)) {
appendOneRightRowWithEmptyLeft();
}
@@ -143,18 +153,23 @@ public class MergeSortFullOuterJoinOperator extends
AbstractMergeSortJoinOperato
return true;
}
+ // if exist NULL values in left, just output this row with empty right
+ while (currentLeftHasNullValue()) {
+ appendOneLeftRowWithEmptyRight();
+ if (leftFinishedWithIncIndex()) {
+ return true;
+ }
+ }
// continue left < right, until left >= right
- while (comparator.lessThan(
+ while (lessThan(
leftBlock,
- leftJoinKeyPosition,
+ leftJoinKeyPositions,
leftIndex,
rightBlockList.get(rightBlockListIdx),
- rightJoinKeyPosition,
+ rightJoinKeyPositions,
rightIndex)) {
appendOneLeftRowWithEmptyRight();
- leftIndex++;
- if (leftIndex >= leftBlock.getPositionCount()) {
- resetLeftBlock();
+ if (leftFinishedWithIncIndex()) {
return true;
}
}
@@ -163,22 +178,19 @@ public class MergeSortFullOuterJoinOperator extends
AbstractMergeSortJoinOperato
}
// has right value equals to current left, append to join result, inc
leftIndex
- if (hasMatchedRightValueToProbeLeft()) {
- leftIndex++;
-
- if (leftIndex >= leftBlock.getPositionCount()) {
- resetLeftBlock();
- return true;
- }
- }
-
- return false;
+ return hasMatchedRightValueToProbeLeft() && leftFinishedWithIncIndex();
}
@Override
protected void recordsWhenDataMatches() {
- lastMatchedRightBlock =
-
updateLastMatchedRowFunction.apply(leftBlock.getColumn(leftJoinKeyPosition),
leftIndex);
+ Column[] valueColumns = new Column[leftJoinKeyPositions.length];
+ for (int i = 0; i < leftJoinKeyPositions.length; i++) {
+ valueColumns[i] =
+ updateLastMatchedRowFunctions
+ .get(i)
+ .apply(leftBlock.getColumn(leftJoinKeyPositions[i]), leftIndex);
+ }
+ lastMatchedRightBlock = new TsBlock(1, TIME_COLUMN_TEMPLATE, valueColumns);
}
private void buildUseRemainingBlocks() {
@@ -191,16 +203,21 @@ public class MergeSortFullOuterJoinOperator extends
AbstractMergeSortJoinOperato
}
}
+ /** This method will be invoked only when `allRightLessThanLeft` or
`leftFinished`. */
private void appendRightWithEmptyLeft() {
while (rightBlockListIdx < rightBlockList.size()) {
+ // if `lastMatchedRightBlock` is not null, the value in
`lastMatchedRightBlock` must not be
+ // NULL,
+ // if current right value is null, the right row with empty left will be
appended in the join
+ // result.
if (lastMatchedRightBlock == null
- || comparator.lessThan(
+ || !equalsTo(
lastMatchedRightBlock,
- 0,
+ lastMatchedBlockPositions,
0,
rightBlockList.get(rightBlockListIdx),
- rightJoinKeyPosition,
+ rightJoinKeyPositions,
rightIndex)) {
for (int i = 0; i < leftOutputSymbolIdx.length; i++) {
ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(i);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MergeSortInnerJoinOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MergeSortInnerJoinOperator.java
index a1c4ceaeca5..3b7e17fae11 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MergeSortInnerJoinOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MergeSortInnerJoinOperator.java
@@ -25,7 +25,7 @@ import
org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.comparator.JoinKeyComparator;
import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.utils.RamUsageEstimator;
import java.util.List;
@@ -37,25 +37,23 @@ public class MergeSortInnerJoinOperator extends
AbstractMergeSortJoinOperator {
public MergeSortInnerJoinOperator(
OperatorContext operatorContext,
Operator leftChild,
- int leftJoinKeyPosition,
+ int[] leftJoinKeyPositions,
int[] leftOutputSymbolIdx,
Operator rightChild,
- int rightJoinKeyPosition,
+ int[] rightJoinKeyPositions,
int[] rightOutputSymbolIdx,
- JoinKeyComparator joinKeyComparator,
- List<TSDataType> dataTypes,
- Type joinKeyType) {
+ List<JoinKeyComparator> joinKeyComparators,
+ List<TSDataType> dataTypes) {
super(
operatorContext,
leftChild,
- leftJoinKeyPosition,
+ leftJoinKeyPositions,
leftOutputSymbolIdx,
rightChild,
- rightJoinKeyPosition,
+ rightJoinKeyPositions,
rightOutputSymbolIdx,
- joinKeyComparator,
- dataTypes,
- joinKeyType);
+ joinKeyComparators,
+ dataTypes);
}
@Override
@@ -87,13 +85,19 @@ public class MergeSortInnerJoinOperator extends
AbstractMergeSortJoinOperator {
return true;
}
+ // skip all NULL values in right, because NULL value can not appear in the
inner join result
+ while (currentRightHasNullValue()) {
+ if (rightFinishedWithIncIndex()) {
+ return true;
+ }
+ }
// continue right < left, until right >= left
- while (comparator.lessThan(
+ while (lessThan(
rightBlockList.get(rightBlockListIdx),
- rightJoinKeyPosition,
+ rightJoinKeyPositions,
rightIndex,
leftBlock,
- leftJoinKeyPosition,
+ leftJoinKeyPositions,
leftIndex)) {
if (rightFinishedWithIncIndex()) {
return true;
@@ -103,17 +107,21 @@ public class MergeSortInnerJoinOperator extends
AbstractMergeSortJoinOperator {
return true;
}
+ // skip all NULL values in left, because NULL value can not appear in the
inner join result
+ while (currentLeftHasNullValue()) {
+ if (leftFinishedWithIncIndex()) {
+ return true;
+ }
+ }
// continue left < right, until left >= right
- while (comparator.lessThan(
+ while (lessThan(
leftBlock,
- leftJoinKeyPosition,
+ leftJoinKeyPositions,
leftIndex,
rightBlockList.get(rightBlockListIdx),
- rightJoinKeyPosition,
+ rightJoinKeyPositions,
rightIndex)) {
- leftIndex++;
- if (leftIndex >= leftBlock.getPositionCount()) {
- resetLeftBlock();
+ if (leftFinishedWithIncIndex()) {
return true;
}
}
@@ -122,15 +130,24 @@ public class MergeSortInnerJoinOperator extends
AbstractMergeSortJoinOperator {
}
// has right values equal to current left, append to join result, inc
leftIndex
- if (hasMatchedRightValueToProbeLeft()) {
- leftIndex++;
- if (leftIndex >= leftBlock.getPositionCount()) {
- resetLeftBlock();
- return true;
- }
+ return hasMatchedRightValueToProbeLeft() && leftFinishedWithIncIndex();
+ }
+
+ @Override
+ protected boolean lessThan(
+ TsBlock leftBlock,
+ int[] leftPositions,
+ int lIndex,
+ TsBlock rightBlock,
+ int[] rightPositions,
+ int rIndex) {
+
+ // if join key size equals to 1, can return true in inner join
+ if (rightPositions.length == 1 &&
rightBlock.getColumn(rightPositions[0]).isNull(rIndex)) {
+ return true;
}
- return false;
+ return examineLessThan(leftBlock, leftPositions, lIndex, rightBlock,
rightPositions, rIndex);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 7728bf7895a..7dcdbe04c95 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -154,7 +154,6 @@ import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.TimeRange;
-import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.block.column.BinaryColumn;
import org.apache.tsfile.read.common.block.column.BooleanColumn;
import org.apache.tsfile.read.common.block.column.DoubleColumn;
@@ -165,7 +164,6 @@ import org.apache.tsfile.read.common.type.BinaryType;
import org.apache.tsfile.read.common.type.BlobType;
import org.apache.tsfile.read.common.type.BooleanType;
import org.apache.tsfile.read.common.type.Type;
-import org.apache.tsfile.read.common.type.TypeEnum;
import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.write.schema.IMeasurementSchema;
@@ -198,7 +196,6 @@ import static
org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggreg
import static
org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNode;
import static
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.MergeSortComparator.getComparatorForTable;
import static
org.apache.iotdb.db.queryengine.execution.operator.source.relational.InformationSchemaContentSupplierFactory.getSupplier;
-import static
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE;
import static
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath;
import static
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AccumulatorFactory.createAccumulator;
import static
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AccumulatorFactory.createGroupedAccumulator;
@@ -1267,24 +1264,34 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
dataTypes);
}
- Integer leftJoinKeyPosition =
leftColumnNamesMap.get(node.getCriteria().get(0).getLeft());
- if (leftJoinKeyPosition == null) {
- throw new IllegalStateException("Left child of JoinNode doesn't contain
left join key.");
+ int size = node.getCriteria().size();
+ int[] leftJoinKeyPositions = new int[size];
+ for (int i = 0; i < size; i++) {
+ Integer leftJoinKeyPosition =
leftColumnNamesMap.get(node.getCriteria().get(i).getLeft());
+ if (leftJoinKeyPosition == null) {
+ throw new IllegalStateException("Left child of JoinNode doesn't
contain left join key.");
+ }
+ leftJoinKeyPositions[i] = leftJoinKeyPosition;
}
- Integer rightJoinKeyPosition =
rightColumnNamesMap.get(node.getCriteria().get(0).getRight());
- if (rightJoinKeyPosition == null) {
- throw new IllegalStateException("Right child of JoinNode doesn't contain
right join key.");
+ List<Type> joinKeyTypes = new ArrayList<>(size);
+ int[] rightJoinKeyPositions = new int[size];
+ for (int i = 0; i < size; i++) {
+ Integer rightJoinKeyPosition =
rightColumnNamesMap.get(node.getCriteria().get(i).getRight());
+ if (rightJoinKeyPosition == null) {
+ throw new IllegalStateException("Right child of JoinNode doesn't
contain right join key.");
+ }
+ rightJoinKeyPositions[i] = rightJoinKeyPosition;
+
+ Type leftJoinKeyType =
+
context.getTypeProvider().getTableModelType(node.getCriteria().get(i).getLeft());
+ checkArgument(
+ leftJoinKeyType
+ ==
context.getTypeProvider().getTableModelType(node.getCriteria().get(i).getRight()),
+ "Join key type mismatch.");
+ joinKeyTypes.add(leftJoinKeyType);
}
- Type leftJoinKeyType =
-
context.getTypeProvider().getTableModelType(node.getCriteria().get(0).getLeft());
-
- checkArgument(
- leftJoinKeyType
- ==
context.getTypeProvider().getTableModelType(node.getCriteria().get(0).getRight()),
- "Join key type mismatch.");
-
if (requireNonNull(node.getJoinType()) == JoinNode.JoinType.INNER) {
OperatorContext operatorContext =
context
@@ -1296,14 +1303,13 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
return new MergeSortInnerJoinOperator(
operatorContext,
leftChild,
- leftJoinKeyPosition,
+ leftJoinKeyPositions,
leftOutputSymbolIdx,
rightChild,
- rightJoinKeyPosition,
+ rightJoinKeyPositions,
rightOutputSymbolIdx,
- JoinKeyComparatorFactory.getComparator(leftJoinKeyType, true),
- dataTypes,
- leftJoinKeyType);
+ JoinKeyComparatorFactory.getComparators(joinKeyTypes, true),
+ dataTypes);
} else if (requireNonNull(node.getJoinType()) == JoinNode.JoinType.FULL) {
OperatorContext operatorContext =
context
@@ -1315,65 +1321,46 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
return new MergeSortFullOuterJoinOperator(
operatorContext,
leftChild,
- leftJoinKeyPosition,
+ leftJoinKeyPositions,
leftOutputSymbolIdx,
rightChild,
- rightJoinKeyPosition,
+ rightJoinKeyPositions,
rightOutputSymbolIdx,
- JoinKeyComparatorFactory.getComparator(leftJoinKeyType, true),
+ JoinKeyComparatorFactory.getComparators(joinKeyTypes, true),
dataTypes,
- leftJoinKeyType,
- buildUpdateLastRowFunction(leftJoinKeyType.getTypeEnum()));
+
joinKeyTypes.stream().map(this::buildUpdateLastRowFunction).collect(Collectors.toList()));
}
throw new IllegalStateException("Unsupported join type: " +
node.getJoinType());
}
- private BiFunction<Column, Integer, TsBlock>
buildUpdateLastRowFunction(TypeEnum type) {
- switch (type) {
+ private BiFunction<Column, Integer, Column> buildUpdateLastRowFunction(Type
joinKeyType) {
+ switch (joinKeyType.getTypeEnum()) {
case INT32:
case DATE:
- return (column, rowIndex) ->
- new TsBlock(
- 1,
- TIME_COLUMN_TEMPLATE,
- new IntColumn(1, Optional.empty(), new int[]
{column.getInt(rowIndex)}));
+ return (inputColumn, rowIndex) ->
+ new IntColumn(1, Optional.empty(), new int[]
{inputColumn.getInt(rowIndex)});
case INT64:
case TIMESTAMP:
- return (column, rowIndex) ->
- new TsBlock(
- 1,
- TIME_COLUMN_TEMPLATE,
- new LongColumn(1, Optional.empty(), new long[]
{column.getLong(rowIndex)}));
+ return (inputColumn, rowIndex) ->
+ new LongColumn(1, Optional.empty(), new long[]
{inputColumn.getLong(rowIndex)});
case FLOAT:
- return (column, rowIndex) ->
- new TsBlock(
- 1,
- TIME_COLUMN_TEMPLATE,
- new FloatColumn(1, Optional.empty(), new float[]
{column.getFloat(rowIndex)}));
+ return (inputColumn, rowIndex) ->
+ new FloatColumn(1, Optional.empty(), new float[]
{inputColumn.getFloat(rowIndex)});
case DOUBLE:
- return (column, rowIndex) ->
- new TsBlock(
- 1,
- TIME_COLUMN_TEMPLATE,
- new DoubleColumn(1, Optional.empty(), new double[]
{column.getDouble(rowIndex)}));
+ return (inputColumn, rowIndex) ->
+ new DoubleColumn(1, Optional.empty(), new double[]
{inputColumn.getDouble(rowIndex)});
case BOOLEAN:
- return (column, rowIndex) ->
- new TsBlock(
- 1,
- TIME_COLUMN_TEMPLATE,
- new BooleanColumn(
- 1, Optional.empty(), new boolean[]
{column.getBoolean(rowIndex)}));
+ return (inputColumn, rowIndex) ->
+ new BooleanColumn(
+ 1, Optional.empty(), new boolean[]
{inputColumn.getBoolean(rowIndex)});
case STRING:
case TEXT:
case BLOB:
- return (column, rowIndex) ->
- new TsBlock(
- 1,
- TIME_COLUMN_TEMPLATE,
- new BinaryColumn(1, Optional.empty(), new Binary[]
{column.getBinary(rowIndex)}));
+ return (inputColumn, rowIndex) ->
+ new BinaryColumn(1, Optional.empty(), new Binary[]
{inputColumn.getBinary(rowIndex)});
default:
- throw new UnsupportedOperationException("Unsupported data type: " +
type);
+ throw new UnsupportedOperationException("Unsupported data type: " +
joinKeyType);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
index 3ea5f1b7768..0412ede205a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
@@ -51,7 +51,6 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AllRows;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterDB;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterPipe;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor;
-import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CountDevice;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateDB;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateIndex;
@@ -187,7 +186,6 @@ import static java.util.Collections.emptyList;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static
org.apache.iotdb.commons.schema.table.TsTable.TABLE_ALLOWED_PROPERTIES;
-import static org.apache.iotdb.commons.schema.table.TsTable.TIME_COLUMN_NAME;
import static
org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction.DATE_BIN;
import static
org.apache.iotdb.db.queryengine.execution.warnings.StandardWarningCode.REDUNDANT_ORDER_BY;
import static
org.apache.iotdb.db.queryengine.plan.execution.config.TableConfigTaskVisitor.DATABASE_NOT_SPECIFIED;
@@ -227,12 +225,6 @@ public class StatementAnalyzer {
private final CorrelationSupport correlationSupport;
- public static final String ONLY_SUPPORT_TIME_COLUMN_EQUI_JOIN =
- "Only support time column equi-join in current version";
-
- public static final String ONLY_SUPPORT_TIME_COLUMN_IN_USING_CLAUSE =
- "Only support time column as the parameter in JOIN USING";
-
public StatementAnalyzer(
StatementAnalyzerFactory statementAnalyzerFactory,
Analysis analysis,
@@ -2050,12 +2042,12 @@ public class StatementAnalyzer {
createAndAssignScope(
node, scope,
left.getRelationType().joinWith(right.getRelationType()));
- if (node.getType() == Join.Type.CROSS || node.getType() == LEFT ||
node.getType() == RIGHT) {
+ if (node.getType() == LEFT || node.getType() == RIGHT) {
throw new SemanticException(
String.format(
"%s JOIN is not supported, only support INNER JOIN in current
version.",
node.getType()));
- } else if (node.getType() == Join.Type.IMPLICIT) {
+ } else if (node.getType() == Join.Type.CROSS || node.getType() ==
Join.Type.IMPLICIT) {
return output;
}
if (criteria instanceof JoinOn) {
@@ -2102,40 +2094,6 @@ public class StatementAnalyzer {
if (criteria instanceof NaturalJoin) {
throw new SemanticException("Natural join not supported");
}
-
- if (criteria instanceof JoinOn) {
- JoinOn joinOn = (JoinOn) criteria;
- Expression expression = joinOn.getExpression();
- if (!(expression instanceof ComparisonExpression)) {
- throw new SemanticException(ONLY_SUPPORT_TIME_COLUMN_EQUI_JOIN);
- }
- ComparisonExpression comparisonExpression = (ComparisonExpression)
expression;
- if (comparisonExpression.getOperator() !=
ComparisonExpression.Operator.EQUAL) {
- throw new SemanticException(ONLY_SUPPORT_TIME_COLUMN_EQUI_JOIN);
- }
- checkArgument(
- comparisonExpression.getLeft() instanceof DereferenceExpression,
- ONLY_SUPPORT_TIME_COLUMN_EQUI_JOIN);
- checkArgument(
- comparisonExpression.getRight() instanceof DereferenceExpression,
- ONLY_SUPPORT_TIME_COLUMN_EQUI_JOIN);
- DereferenceExpression left = (DereferenceExpression)
comparisonExpression.getLeft();
- if (!left.getField().isPresent()
- || !left.getField().get().equals(new
Identifier(TIME_COLUMN_NAME))) {
- throw new SemanticException(ONLY_SUPPORT_TIME_COLUMN_EQUI_JOIN);
- }
- DereferenceExpression right = (DereferenceExpression)
comparisonExpression.getLeft();
- if (!right.getField().isPresent()
- || !right.getField().get().equals(new
Identifier(TIME_COLUMN_NAME))) {
- throw new SemanticException(ONLY_SUPPORT_TIME_COLUMN_EQUI_JOIN);
- }
- } else if (criteria instanceof JoinUsing) {
- List<Identifier> identifiers = ((JoinUsing) criteria).getColumns();
- if (identifiers.size() != 1
- || !identifiers.get(0).equals(new Identifier(TIME_COLUMN_NAME))) {
- throw new
SemanticException(ONLY_SUPPORT_TIME_COLUMN_IN_USING_CLAUSE);
- }
- }
}
private Scope analyzeJoinUsing(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/JoinNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/JoinNode.java
index b2e54309400..0326eb71a60 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/JoinNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/JoinNode.java
@@ -121,8 +121,6 @@ public class JoinNode extends TwoChildProcessNode {
// !(criteria.isEmpty() && rightHashSymbol.isPresent()),
// "Right hash symbol is only valid in an equijoin");
- // todo: Remove this check after supporting join on multiple columns.
- checkArgument(criteria.size() <= 1, "Only support Join on one column for
now.");
criteria.forEach(
equiJoinClause ->
checkArgument(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/JoinUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/JoinUtils.java
index 1d0abed3ae9..83ac24ff9b6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/JoinUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/JoinUtils.java
@@ -48,6 +48,10 @@ import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinN
import static
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral.TRUE_LITERAL;
public class JoinUtils {
+ public static final String FULL_JOIN_ONLY_SUPPORT_EQUI_JOIN =
+ "Full outer join only support equiJoinClauses";
+
+ private JoinUtils() {}
static Expression extractJoinPredicate(JoinNode joinNode) {
ImmutableList.Builder<Expression> builder = ImmutableList.builder();
@@ -58,6 +62,12 @@ public class JoinUtils {
return combineConjuncts(builder.build());
}
+ /**
+ * If the expression is EQUAL ComparisonExpression
+ *
+ * @return true if the expression is EQUAL ComparisonExpression and
leftSymbols contains
+ * expression.getLeft() and rightSymbols contains expression.getRight().
+ */
static boolean joinEqualityExpression(
Expression expression, Collection<Symbol> leftSymbols,
Collection<Symbol> rightSymbols) {
return joinComparisonExpression(
@@ -67,7 +77,7 @@ public class JoinUtils {
ImmutableSet.of(ComparisonExpression.Operator.EQUAL));
}
- static boolean joinComparisonExpression(
+ private static boolean joinComparisonExpression(
Expression expression,
Collection<Symbol> leftSymbols,
Collection<Symbol> rightSymbols,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
index feeda5047eb..26dd85498f8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -42,6 +41,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.EqualityInference;
import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder;
import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.ReplaceSymbolInExpression;
@@ -67,6 +67,7 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -91,10 +92,13 @@ import static
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.GlobalT
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils.combineConjuncts;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils.extractConjuncts;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils.filterDeterministicConjuncts;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode.JoinType.FULL;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode.JoinType.INNER;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.JoinUtils.FULL_JOIN_ONLY_SUPPORT_EQUI_JOIN;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.JoinUtils.extractJoinPredicate;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.JoinUtils.joinEqualityExpression;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.JoinUtils.processInnerJoin;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.QueryCardinalityUtil.extractCardinality;
import static
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral.TRUE_LITERAL;
/**
@@ -535,7 +539,8 @@ public class PushPredicateIntoTableScan implements
PlanOptimizer {
newJoinPredicate = joinPredicate;
break;
default:
- throw new IllegalStateException("Only support INNER JOIN in current
version");
+ throw new IllegalStateException(
+ "Only support INNER JOIN and FULL OUTER JOIN in current
version");
}
// newJoinPredicate = simplifyExpression(newJoinPredicate);
@@ -554,15 +559,13 @@ public class PushPredicateIntoTableScan implements
PlanOptimizer {
// Create new projections for the new join clauses
List<JoinNode.EquiJoinClause> equiJoinClauses = new ArrayList<>();
ImmutableList.Builder<Expression> joinFilterBuilder =
ImmutableList.builder();
- boolean hasFilter = false;
- Expression lastEquiJoinConjunct = null;
for (Expression conjunct : extractConjuncts(newJoinPredicate)) {
- if (joinEqualityExpressionOnTimeColumn(conjunct, node)) {
- lastEquiJoinConjunct = conjunct;
+ if (joinEqualityExpressionOnOneColumn(conjunct, node)) {
ComparisonExpression equality = (ComparisonExpression) conjunct;
boolean alignedComparison =
-
node.getLeftChild().getOutputSymbols().containsAll(extractUnique(equality.getLeft()));
+ new HashSet<>(node.getLeftChild().getOutputSymbols())
+ .containsAll(extractUnique(equality.getLeft()));
Expression leftExpression = alignedComparison ? equality.getLeft() :
equality.getRight();
Expression rightExpression = alignedComparison ? equality.getRight()
: equality.getLeft();
@@ -578,26 +581,13 @@ public class PushPredicateIntoTableScan implements
PlanOptimizer {
equiJoinClauses.add(new JoinNode.EquiJoinClause(leftSymbol,
rightSymbol));
} else {
+ if (node.getJoinType() == FULL) {
+ throw new
UnsupportedOperationException(FULL_JOIN_ONLY_SUPPORT_EQUI_JOIN);
+ }
joinFilterBuilder.add(conjunct);
- hasFilter = true;
}
}
- // todo: Remove this check after supporting join on multiple columns.
- checkArgument(equiJoinClauses.size() <= 1, "Only support Join on one
column for now.");
- if (!equiJoinClauses.isEmpty() && hasFilter) {
- equiJoinClauses.clear();
- joinFilterBuilder.add(lastEquiJoinConjunct);
- }
-
- List<Expression> joinFilter = joinFilterBuilder.build();
- // DynamicFiltersResult dynamicFiltersResult =
createDynamicFilters(node,
- // equiJoinClauses, joinFilter, session, idAllocator);
- // Map<DynamicFilterId, Symbol> dynamicFilters =
- // dynamicFiltersResult.getDynamicFilters();
- // leftPredicate = combineConjuncts(metadata, leftPredicate,
combineConjuncts(metadata,
- // dynamicFiltersResult.getPredicates()));
-
PlanNode leftSource;
PlanNode rightSource;
boolean equiJoinClausesUnmodified =
@@ -614,12 +604,24 @@ public class PushPredicateIntoTableScan implements
PlanOptimizer {
rightSource = node.getRightChild().accept(this, new
RewriteContext(rightPredicate));
}
+ Cardinality leftCardinality = extractCardinality(leftSource);
+ Cardinality rightCardinality = extractCardinality(rightSource);
+ if (leftCardinality.isAtMostScalar() ||
rightCardinality.isAtMostScalar()) {
+ // if cardinality of left or right equals to 1, use NestedLoopJoin
+ equiJoinClauses.forEach(
+ equiJoinClause ->
joinFilterBuilder.add(equiJoinClause.toExpression()));
+ equiJoinClauses.clear();
+ }
+
+ List<Expression> joinFilter = joinFilterBuilder.build();
Optional<Expression> newJoinFilter =
Optional.of(combineConjuncts(joinFilter));
- if (newJoinFilter.get().equals(TRUE_LITERAL)) {
+ if (TRUE_LITERAL.equals(newJoinFilter.get())) {
newJoinFilter = Optional.empty();
}
- if (node.getJoinType() == INNER && newJoinFilter.isPresent() &&
equiJoinClauses.isEmpty()) {
+ if (node.getJoinType() == INNER && newJoinFilter.isPresent()
+ // && equiJoinClauses.isEmpty()
+ ) {
// if we do not have any equi conjunct we do not pushdown non-equality
condition into
// inner join, so we plan execution as nested-loops-join followed by
filter instead
// hash join.
@@ -628,15 +630,17 @@ public class PushPredicateIntoTableScan implements
PlanOptimizer {
}
boolean filtersEquivalent =
- newJoinFilter.isPresent() == node.getFilter().isPresent() &&
(!newJoinFilter.isPresent());
- // areExpressionsEquivalent(newJoinFilter.get(), node.getFilter().get());
+ newJoinFilter.isPresent() == node.getFilter().isPresent()
+ && (!newJoinFilter.isPresent()
+ // || areExpressionsEquivalent(newJoinFilter.get(),
node.getFilter().get());
+ );
PlanNode output = node;
if (leftSource != node.getLeftChild()
|| rightSource != node.getRightChild()
|| !filtersEquivalent
- // !dynamicFilters.equals(node.getDynamicFilters()) ||
|| !equiJoinClausesUnmodified) {
+ // this branch is always executed in current version
leftSource =
new ProjectNode(
queryContext.getQueryId().genPlanNodeId(), leftSource,
leftProjections.build());
@@ -657,38 +661,16 @@ public class PushPredicateIntoTableScan implements
PlanOptimizer {
node.isSpillable());
}
- // sort the left and right child of join node if it is not a cross join
+ JoinNode outputJoinNode = (JoinNode) output;
if (!((JoinNode) output).isCrossJoin()) {
- JoinNode.EquiJoinClause joinCriteria = ((JoinNode)
output).getCriteria().get(0);
- OrderingScheme leftOrderingScheme =
- new OrderingScheme(
- Collections.singletonList(joinCriteria.getLeft()),
- Collections.singletonMap(joinCriteria.getLeft(),
ASC_NULLS_LAST));
- OrderingScheme rightOrderingScheme =
- new OrderingScheme(
- Collections.singletonList(joinCriteria.getRight()),
- Collections.singletonMap(joinCriteria.getRight(),
ASC_NULLS_LAST));
- SortNode leftSortNode =
- new SortNode(
- queryId.genPlanNodeId(),
- ((JoinNode) output).getLeftChild(),
- leftOrderingScheme,
- false,
- false);
- SortNode rightSortNode =
- new SortNode(
- queryId.genPlanNodeId(),
- ((JoinNode) output).getRightChild(),
- rightOrderingScheme,
- false,
- false);
- ((JoinNode) output).setLeftChild(leftSortNode);
- ((JoinNode) output).setRightChild(rightSortNode);
- }
-
- if (!postJoinPredicate.equals(TRUE_LITERAL)) {
+ // inner join or full join, use MergeSortJoinNode
+ appendSortNodeForMergeSortJoin(outputJoinNode);
+ }
+
+ if (!TRUE_LITERAL.equals(postJoinPredicate)) {
output =
- new FilterNode(queryContext.getQueryId().genPlanNodeId(), output,
postJoinPredicate);
+ new FilterNode(
+ queryContext.getQueryId().genPlanNodeId(), outputJoinNode,
postJoinPredicate);
}
if (!node.getOutputSymbols().equals(output.getOutputSymbols())) {
@@ -702,15 +684,17 @@ public class PushPredicateIntoTableScan implements
PlanOptimizer {
return output;
}
- private boolean joinEqualityExpressionOnTimeColumn(Expression conjunct,
JoinNode node) {
+ private boolean joinEqualityExpressionOnOneColumn(Expression conjunct,
JoinNode node) {
if (!joinEqualityExpression(
conjunct,
node.getLeftChild().getOutputSymbols(),
node.getRightChild().getOutputSymbols())) {
return false;
}
+
// conjunct must be a comparison expression
ComparisonExpression equality = (ComparisonExpression) conjunct;
+
// After Optimization, some subqueries are transformed into Join.
// For now, Users can only use join on time. And the join is implemented
using MergeSortJoin.
// However, it's assumed that use Filter + NestedLoopJoin is better than
MergeSortJoin
@@ -719,15 +703,9 @@ public class PushPredicateIntoTableScan implements
PlanOptimizer {
// will use Filter + NestedLoopJoin instead.
// Attention: For now, join on time column is assumed to hold the
following condition: left
// and right both contains the substring time.
- // todo: after supporting join on other columns for the user, we need to
remove the following
- // code, since the condition does not hold anymore.
- // This is temporary workaround.
Expression left = equality.getLeft();
Expression right = equality.getRight();
- return (left instanceof SymbolReference
- && ((SymbolReference)
left).getName().contains(IoTDBConstant.TIME))
- && (right instanceof SymbolReference
- && ((SymbolReference)
right).getName().contains(IoTDBConstant.TIME));
+ return (left instanceof SymbolReference && right instanceof
SymbolReference);
}
private Symbol symbolForExpression(Expression expression) {
@@ -735,10 +713,33 @@ public class PushPredicateIntoTableScan implements
PlanOptimizer {
return Symbol.from(expression);
}
- // TODO(beyyes) verify the rightness of type
return symbolAllocator.newSymbol(expression,
analysis.getType(expression));
}
+ private void appendSortNodeForMergeSortJoin(JoinNode joinNode) {
+ int size = joinNode.getCriteria().size();
+ List<Symbol> leftOrderBy = new ArrayList<>(size);
+ List<Symbol> rightOrderBy = new ArrayList<>(size);
+ Map<Symbol, SortOrder> leftOrderings = new HashMap<>(size);
+ Map<Symbol, SortOrder> rightOrderings = new HashMap<>(size);
+ for (JoinNode.EquiJoinClause equiJoinClause : joinNode.getCriteria()) {
+ leftOrderBy.add(equiJoinClause.getLeft());
+ leftOrderings.put(equiJoinClause.getLeft(), ASC_NULLS_LAST);
+ rightOrderBy.add(equiJoinClause.getRight());
+ rightOrderings.put(equiJoinClause.getRight(), ASC_NULLS_LAST);
+ }
+ OrderingScheme leftOrderingScheme = new OrderingScheme(leftOrderBy,
leftOrderings);
+ OrderingScheme rightOrderingScheme = new OrderingScheme(rightOrderBy,
rightOrderings);
+ SortNode leftSortNode =
+ new SortNode(
+ queryId.genPlanNodeId(), joinNode.getLeftChild(),
leftOrderingScheme, false, false);
+ SortNode rightSortNode =
+ new SortNode(
+ queryId.genPlanNodeId(), joinNode.getRightChild(),
rightOrderingScheme, false, false);
+ joinNode.setLeftChild(leftSortNode);
+ joinNode.setRightChild(rightSortNode);
+ }
+
@Override
public PlanNode visitInsertTablet(InsertTabletNode node, RewriteContext
context) {
return node;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/QueryCardinalityUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/QueryCardinalityUtil.java
index d92764c6a3d..7c81f196c69 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/QueryCardinalityUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/QueryCardinalityUtil.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNo
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.GroupReference;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Lookup;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
@@ -207,6 +208,22 @@ public final class QueryCardinalityUtil {
// return node.getSource().accept(this, null);
// }
+ @Override
+ public Range<Long> visitAggregationTableScan(AggregationTableScanNode
node, Void context) {
+
+ if (!node.getGroupingKeys().isEmpty()) { // exist group by
+
+ if (node.getProjection() != null
+ && !node.getProjection().getMap().isEmpty()) { // also exist
date_bin
+ return Range.atLeast(0L);
+ } else {
+ return Range.atMost((long) node.getDeviceEntries().size());
+ }
+ } else {
+ return Range.singleton((long) node.getDeviceEntries().size());
+ }
+ }
+
private Range<Long> applyLimit(PlanNode source, long limit) {
Range<Long> sourceCardinalityRange = source.accept(this, null);
if (sourceCardinalityRange.hasUpperBound()) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/JoinTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/JoinTest.java
index a6928e0cf49..a7996c09b89 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/JoinTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/JoinTest.java
@@ -23,9 +23,11 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.PlanTester;
import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.TableLogicalPlanner;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableDistributedPlanner;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode;
@@ -37,17 +39,22 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import org.junit.Ignore;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.AnalyzerTest.analyzeSQL;
-import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.StatementAnalyzer.ONLY_SUPPORT_TIME_COLUMN_EQUI_JOIN;
-import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.StatementAnalyzer.ONLY_SUPPORT_TIME_COLUMN_IN_USING_CLAUSE;
import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.ALL_DEVICE_ENTRIES;
import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.BEIJING_A1_DEVICE_ENTRY;
import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.DEFAULT_WARNING;
@@ -63,7 +70,22 @@ import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils
import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.assertTableScan;
import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.buildSymbols;
import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.getChildrenNode;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanAssert.assertPlan;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.aggregation;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.aggregationFunction;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.aggregationTableScan;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.filter;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.join;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.output;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.project;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.singleGroupingSet;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.sort;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.tableScan;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.Step.FINAL;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.Step.PARTIAL;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode.JoinType.INNER;
+import static
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression.Operator.EQUAL;
+import static
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression.Operator.GREATER_THAN;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -77,6 +99,7 @@ public class JoinTest {
MergeSortNode mergeSortNode;
DistributedQueryPlan distributedQueryPlan;
DeviceTableScanNode deviceTableScanNode;
+ String sql;
// ========== table1 join table1 ===============
@@ -340,78 +363,227 @@ public class JoinTest {
assertTableScan(deviceTableScanNode, BEIJING_A1_DEVICE_ENTRY,
Ordering.ASC, 0, 0, true, "");
}
- // has filter which can be push down, inner limit, test if inner limit can
be pushed down
- @Ignore
+ // TableScan join AggTableScan (whose cardinality is at most one)
@Test
public void innerJoinTest3() {
- assertInnerJoinTest2(
- "SELECT t1.time, t1.tag1, t1.tag2, t1.attr2, t1.s1+1 as add_s1, t1.s2,"
- + "t2.tag1, t2.tag3, t2.attr2, t2.s1, t2.s3 "
- + "FROM (SELECT * FROM table1 t1 WHERE tag1='beijing' AND
tag2='A1' AND s1>1 LIMIT 111) t1 JOIN (SELECT * FROM table1 WHERE
tag1='shenzhen' AND s2>1 LIMIT 222) t2 "
- + "ON t1.time = t2.time ORDER BY t1.tag1 OFFSET 3 LIMIT 6",
- false);
+ PlanTester planTester = new PlanTester();
+ Expression filterPredicate =
+ new ComparisonExpression(EQUAL, new SymbolReference("s1"), new
SymbolReference("max"));
+
+ PlanMatchPattern tableScan =
+ tableScan("testdb.table1", ImmutableList.of("s1"),
ImmutableSet.of("s1"));
+
+ // Verify full LogicalPlan
+ /*
+ * └──OutputNode
+ * └──ProjectNode
+ * └──FilterNode
+ * └──JoinNode
+ * |──TableScanNode
+ * ├──AggregationNode
+ * │ └──AggregationTableScanNode
+ */
+ assertPlan(
+ planTester.createPlan(
+ "SELECT s1 FROM table1 t1 JOIN (select max(s1) as agg from table1)
t2 ON t1.s1=t2.agg"),
+ output(
+ project(
+ filter(
+ filterPredicate,
+ join(
+ JoinNode.JoinType.INNER,
+ builder ->
+ builder
+ .left(tableScan)
+ .right(
+ aggregation(
+ singleGroupingSet(),
+ ImmutableMap.of(
+ Optional.of("max"),
+ aggregationFunction("max",
ImmutableList.of("max_9"))),
+ Collections.emptyList(),
+ Optional.empty(),
+ FINAL,
+ aggregationTableScan(
+ singleGroupingSet(),
+ Collections.emptyList(),
+ Optional.empty(),
+ PARTIAL,
+ "testdb.table1",
+ ImmutableList.of("max_9"),
+ ImmutableSet.of("s1_6")))))))));
+
+ filterPredicate =
+ new ComparisonExpression(EQUAL, new SymbolReference("s1"), new
SymbolReference("sum"));
+ assertPlan(
+ planTester.createPlan(
+ "SELECT s1 FROM table1 t1 JOIN (select sum(s1) as agg from table1)
t2 ON t1.s1=t2.agg"),
+ output(
+ project(
+ filter(
+ filterPredicate,
+ join(
+ JoinNode.JoinType.INNER,
+ builder ->
+ builder
+ .left(tableScan)
+ .right(
+ aggregation(
+ singleGroupingSet(),
+ ImmutableMap.of(
+ Optional.of("sum"),
+ aggregationFunction("sum",
ImmutableList.of("sum_9"))),
+ Collections.emptyList(),
+ Optional.empty(),
+ FINAL,
+ aggregationTableScan(
+ singleGroupingSet(),
+ Collections.emptyList(),
+ Optional.empty(),
+ PARTIAL,
+ "testdb.table1",
+ ImmutableList.of("sum_9"),
+ ImmutableSet.of("s1_6")))))))));
}
- // has filter which can be push down, inner limit and sort, test if inner
limit can be pushed down
- @Ignore
@Test
public void innerJoinTest4() {
+ PlanTester planTester = new PlanTester();
+
+ Expression filterPredicate =
+ new ComparisonExpression(
+ GREATER_THAN, new SymbolReference("s1"), new
SymbolReference("s1_6"));
+
+ // equiClause with non equiClause
+ sql =
+ "SELECT t1.s1 FROM table1 t1 JOIN table1 t2 ON t1.tag1=t2.tag1 AND
t1.time=t2.time AND t1.s1>t2.s1";
+ logicalQueryPlan = planTester.createPlan(sql);
+ PlanMatchPattern tableScan1 =
+ tableScan(
+ "testdb.table1",
+ ImmutableList.of("time", "tag1", "s1"),
+ ImmutableSet.of("time", "tag1", "s1"));
+ PlanMatchPattern tableScan2 =
+ tableScan(
+ "testdb.table1", ImmutableMap.of("time_0", "time", "tag1_1",
"tag1", "s1_6", "s1"));
+ // Verify full LogicalPlan
+ /*
+ * └──OutputNode
+ * └──Project
+ * └──Filter (t1.s1>t2.s1)
+ * └──JoinNode (t1.tag1=t2.tag1 AND t1.time=t2.time)
+ * |──SortNode
+ * │ └──TableScanNode
+ * ├──SortNode
+ * │ └──TableScanNode
+ */
+ assertPlan(
+ logicalQueryPlan,
+ output(
+ project(
+ filter(
+ filterPredicate,
+ join(
+ JoinNode.JoinType.INNER,
+ builder ->
+ builder
+ .left(sort(tableScan1))
+ .right(sort(tableScan2))
+ .ignoreEquiCriteria())))));
+
+ sql = "SELECT t1.s1 FROM table1 t1 JOIN table1 t2 ON t1.s1>t2.s1";
+ logicalQueryPlan = planTester.createPlan(sql);
+ // Verify full LogicalPlan
+ /*
+ * └──OutputNode
+ * └──Project
+ * └──Filter
+ * └──JoinNode
+ * │ └──TableScanNode
+ * │ └──TableScanNode
+ */
+ assertPlan(
+ logicalQueryPlan,
+ output(
+ project(
+ filter(
+ join(
+ JoinNode.JoinType.INNER,
+ builder ->
+ builder
+ .left(
+ tableScan(
+ "testdb.table1",
+ ImmutableList.of("s1"),
+ ImmutableSet.of("s1")))
+ .right(tableScan("testdb.table1",
ImmutableMap.of("s1_6", "s1")))
+ .ignoreEquiCriteria())))));
+ }
+
+ @Test
+ public void fullJoinTest() {
+ PlanTester planTester = new PlanTester();
+ sql =
+ "SELECT t1.time FROM table1 t1 FULL JOIN table1 t2 ON t1.tag1=t2.tag1
AND t1.time=t2.time";
+ logicalQueryPlan = planTester.createPlan(sql);
+ PlanMatchPattern tableScan1 =
+ tableScan(
+ "testdb.table1", ImmutableList.of("time", "tag1"),
ImmutableSet.of("time", "tag1"));
+ PlanMatchPattern tableScan2 =
+ tableScan("testdb.table1", ImmutableMap.of("time_0", "time", "tag1_1",
"tag1"));
+ // Verify full LogicalPlan
+ /*
+ * └──OutputNode
+ * └──JoinNode (t1.tag1=t2.tag1 AND t1.time=t2.time)
+ * |──SortNode
+ * │ └──TableScanNode
+ * ├──SortNode
+ * │ └──TableScanNode
+ */
+ assertPlan(
+ logicalQueryPlan,
+ output(
+ join(
+ JoinNode.JoinType.FULL,
+ builder ->
+
builder.left(sort(tableScan1)).right(sort(tableScan2)).ignoreEquiCriteria())));
+ }
+
+ @Ignore
+ @Test
+ public void otherInnerJoinTests() {
assertInnerJoinTest2(
"SELECT t1.time, t1.tag1, t1.tag2, t1.attr2, t1.s1+1 as add_s1, t1.s2,"
+ "t2.tag1, t2.tag3, t2.attr2, t2.s1, t2.s3 "
+ "FROM (SELECT * FROM table1 t1 WHERE tag1='beijing' AND
tag2='A1' AND s1>1 ORDER BY tag1 LIMIT 111) t1 JOIN (SELECT * FROM table1 WHERE
tag1='shenzhen' AND s2>1 LIMIT 222) t2 "
+ "ON t1.time = t2.time ORDER BY t1.tag1 OFFSET 3 LIMIT 6",
false);
- }
- @Ignore
- @Test
- public void innerJoinTest5() {
// 1. has logical or in subquery filter, outer query filter
- // 2. where t1.value1 > t2.value2
+ // has filter which can be push down, inner limit and sort, test if inner
limit can be pushed
+ // down
+
+ // has filter which can be push down, inner limit, test if inner limit can
be pushed down
+ assertInnerJoinTest2(
+ "SELECT t1.time, t1.tag1, t1.tag2, t1.attr2, t1.s1+1 as add_s1, t1.s2,"
+ + "t2.tag1, t2.tag3, t2.attr2, t2.s1, t2.s3 "
+ + "FROM (SELECT * FROM table1 t1 WHERE tag1='beijing' AND
tag2='A1' AND s1>1 LIMIT 111) t1 JOIN (SELECT * FROM table1 WHERE
tag1='shenzhen' AND s2>1 LIMIT 222) t2 "
+ + "ON t1.time = t2.time ORDER BY t1.tag1 OFFSET 3 LIMIT 6",
+ false);
}
// ========== unsupported test ===============
@Test
public void unsupportedJoinTest() {
- assertAnalyzeSemanticException(
- "SELECT * FROM table1 t1 INNER JOIN table1 t2 ON t1.time>t2.time",
- ONLY_SUPPORT_TIME_COLUMN_EQUI_JOIN);
-
- assertAnalyzeSemanticException(
- "SELECT * FROM table1 t1 INNER JOIN table1 t2 ON t1.tag1=t2.tag2",
- ONLY_SUPPORT_TIME_COLUMN_EQUI_JOIN);
-
- assertAnalyzeSemanticException(
- "SELECT * FROM table1 t1 INNER JOIN table1 t2 ON t1.time>t2.time AND
t1.tag1=t2.tag2",
- ONLY_SUPPORT_TIME_COLUMN_EQUI_JOIN);
-
- assertAnalyzeSemanticException(
- "SELECT * FROM table1 t1 INNER JOIN table1 t2 ON t1.time>t2.time OR
t1.tag1=t2.tag2",
- ONLY_SUPPORT_TIME_COLUMN_EQUI_JOIN);
-
- assertAnalyzeSemanticException(
- "SELECT * FROM table1 t1 INNER JOIN table1 t2 USING(tag1)",
- ONLY_SUPPORT_TIME_COLUMN_IN_USING_CLAUSE);
-
- assertAnalyzeSemanticException(
- "SELECT * FROM table1 t1 INNER JOIN table1 t2 USING(tag1, time)",
- ONLY_SUPPORT_TIME_COLUMN_IN_USING_CLAUSE);
-
- // LEFT, RIGHT JOIN
+ // LEFT JOIN
assertAnalyzeSemanticException(
"SELECT * FROM table1 t1 LEFT JOIN table1 t2 ON t1.time=t2.time",
"LEFT JOIN is not supported, only support INNER JOIN in current
version");
+ // RIGHT JOIN
assertAnalyzeSemanticException(
"SELECT * FROM table1 t1 RIGHT JOIN table1 t2 ON t1.time=t2.time",
"RIGHT JOIN is not supported, only support INNER JOIN in current
version");
-
- assertAnalyzeSemanticException(
- "SELECT * FROM table1 t1 CROSS JOIN table1 t2",
- "CROSS JOIN is not supported, only support INNER JOIN in current
version");
-
- // TODO(beyyes) has non time equal join criteria;
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/SubqueryTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/SubqueryTest.java
index 652472b0591..0e1c0ad8f99 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/SubqueryTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/SubqueryTest.java
@@ -55,6 +55,7 @@ import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Aggre
import static
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression.Operator.EQUAL;
public class SubqueryTest {
+
@Test
public void testUncorrelatedScalarSubqueryInWhereClause() {
PlanTester planTester = new PlanTester();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/EquiJoinClauseProvider.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/EquiJoinClauseProvider.java
index de1fcecc2ca..c151b6874a6 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/EquiJoinClauseProvider.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/EquiJoinClauseProvider.java
@@ -23,11 +23,11 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
import static java.util.Objects.requireNonNull;
-class EquiJoinClauseProvider implements
ExpectedValueProvider<JoinNode.EquiJoinClause> {
+public class EquiJoinClauseProvider implements
ExpectedValueProvider<JoinNode.EquiJoinClause> {
private final SymbolAlias left;
private final SymbolAlias right;
- EquiJoinClauseProvider(SymbolAlias left, SymbolAlias right) {
+ public EquiJoinClauseProvider(SymbolAlias left, SymbolAlias right) {
this.left = requireNonNull(left, "left is null");
this.right = requireNonNull(right, "right is null");
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
index 98e1fbe6f95..c5171ab927b 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
@@ -834,7 +834,7 @@ public final class PlanMatchPattern {
public static GroupingSetDescriptor singleGroupingSet(List<String>
groupingKeys) {
Set<Integer> globalGroupingSets;
- if (groupingKeys.size() == 0) {
+ if (groupingKeys.isEmpty()) {
globalGroupingSets = ImmutableSet.of(0);
} else {
globalGroupingSets = ImmutableSet.of();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/SymbolAlias.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/SymbolAlias.java
index 6776809415a..0e01d178b35 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/SymbolAlias.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/SymbolAlias.java
@@ -23,10 +23,10 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
import static java.util.Objects.requireNonNull;
-class SymbolAlias implements PlanTestSymbol {
+public class SymbolAlias implements PlanTestSymbol {
private final String alias;
- SymbolAlias(String alias) {
+ public SymbolAlias(String alias) {
this.alias = requireNonNull(alias, "alias is null");
}