This is an automated email from the ASF dual-hosted git repository.
lancelly pushed a commit to branch support_uncorrelated_in_predicate
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to
refs/heads/support_uncorrelated_in_predicate by this push:
new b2223903f27 add some ITs and UTs
b2223903f27 is described below
commit b2223903f27fde555434c1fdf848487dc852f839
Author: lancelly <[email protected]>
AuthorDate: Tue Dec 10 16:08:07 2024 +0800
add some ITs and UTs
---
.../recent/subquery/SubqueryDataSetUtils.java | 10 +-
.../IoTDBUncorrelatedInPredicateSubqueryIT.java | 280 +++++++++++++++++++++
.../IoTDBUncorrelatedScalarSubqueryIT.java | 2 +-
.../relational/AbstractMergeSortJoinOperator.java | 9 +-
.../relational/MergeSortSemiJoinOperator.java | 52 ++--
.../relational/analyzer/ExpressionAnalyzer.java | 17 +-
.../plan/relational/planner/SubqueryPlanner.java | 2 +-
.../optimizations/LogicalOptimizeFactory.java | 10 +-
.../optimizations/PushPredicateIntoTableScan.java | 50 ++--
.../plan/relational/planner/SubqueryTest.java | 98 ++++++++
.../planner/assertions/PlanMatchPattern.java | 11 +
.../planner/assertions/SemiJoinMatcher.java | 79 ++++++
12 files changed, 563 insertions(+), 57 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/subquery/SubqueryDataSetUtils.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/subquery/SubqueryDataSetUtils.java
index 1bd779aac65..36d66eb4b24 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/subquery/SubqueryDataSetUtils.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/subquery/SubqueryDataSetUtils.java
@@ -20,9 +20,9 @@
package org.apache.iotdb.relational.it.query.recent.subquery;
public class SubqueryDataSetUtils {
- protected static final String DATABASE_NAME = "subqueryTest";
- protected static final String[] NUMERIC_MEASUREMENTS = new String[] {"s1",
"s2", "s3", "s4"};
- protected static final String[] CREATE_SQLS =
+ public static final String DATABASE_NAME = "subqueryTest";
+ public static final String[] NUMERIC_MEASUREMENTS = new String[] {"s1",
"s2", "s3", "s4"};
+ public static final String[] CREATE_SQLS =
new String[] {
"CREATE DATABASE " + DATABASE_NAME,
"USE " + DATABASE_NAME,
@@ -104,6 +104,10 @@ public class SubqueryDataSetUtils {
+ " values(4, 'd1', 'text4', 'string4', X'cafebabe04', 4,
'2024-10-04')",
"INSERT INTO table2(time,device_id,s1,s2,s3,s4,s5) "
+ " values(5, 'd1', 5, 55, 5.5, 55.5, false)",
+ // table3
+ "CREATE TABLE table3(device_id STRING ID, s1 INT32 MEASUREMENT, s2
INT64 MEASUREMENT, s3 FLOAT MEASUREMENT, s4 DOUBLE MEASUREMENT, s5 BOOLEAN
MEASUREMENT, s6 TEXT MEASUREMENT, s7 STRING MEASUREMENT, s8 BLOB MEASUREMENT,
s9 TIMESTAMP MEASUREMENT, s10 DATE MEASUREMENT)",
+ "INSERT INTO table3(time,device_id,s1,s2,s3,s4,s5,s6,s7,s8,s9,s10)
values
(2024-09-24T06:13:30.000+00:00,'d01',30,30,30.0,30.0,true,'shanghai_huangpu_red_A_d01_30','shanghai_huangpu_red_A_d01_30',X'cafebabe30',2024-09-24T06:13:00.000+00:00,'2024-09-23')",
+ "INSERT INTO table3(time,device_id,s1,s2,s3,s4,s5,s6,s7,s8,s9,s10)
values
(2024-09-24T06:14:30.000+00:00,'d01',40,40,40.0,40.0,false,'shanghai_huangpu_red_A_d01_40','shanghai_huangpu_red_A_d01_40',X'cafebabe40',2024-09-24T06:14:00.000+00:00,'2024-09-24')",
"FLUSH",
"CLEAR ATTRIBUTE CACHE",
};
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/subquery/uncorrelated/IoTDBUncorrelatedInPredicateSubqueryIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/subquery/uncorrelated/IoTDBUncorrelatedInPredicateSubqueryIT.java
new file mode 100644
index 00000000000..988cc0c6041
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/subquery/uncorrelated/IoTDBUncorrelatedInPredicateSubqueryIT.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.query.recent.subquery.uncorrelated;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareTableData;
+import static org.apache.iotdb.db.it.utils.TestUtils.tableAssertTestFail;
+import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest;
+import static
org.apache.iotdb.relational.it.query.recent.subquery.SubqueryDataSetUtils.CREATE_SQLS;
+import static
org.apache.iotdb.relational.it.query.recent.subquery.SubqueryDataSetUtils.DATABASE_NAME;
+import static
org.apache.iotdb.relational.it.query.recent.subquery.SubqueryDataSetUtils.NUMERIC_MEASUREMENTS;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBUncorrelatedInPredicateSubqueryIT {
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvFactory.getEnv().getConfig().getCommonConfig().setSortBufferSize(128 *
1024);
+
EnvFactory.getEnv().getConfig().getCommonConfig().setMaxTsBlockSizeInByte(4 *
1024);
+ EnvFactory.getEnv().initClusterEnvironment();
+ prepareTableData(CREATE_SQLS);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testInPredicateSubqueryInWhereClause() {
+ String sql;
+ String[] expectedHeader;
+ String[] retArray;
+
+ // Test case: where s in (subquery)
+ sql =
+ "SELECT cast(%s AS INT32) as %s FROM table1 WHERE device_id = 'd01'
and %s in (SELECT (%s) from table3 WHERE device_id = 'd01')";
+ retArray = new String[] {"30,", "40,"};
+ for (String measurement : NUMERIC_MEASUREMENTS) {
+ expectedHeader = new String[] {measurement};
+ tableResultSetEqualTest(
+ String.format(sql, measurement, measurement, measurement,
measurement),
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
+ // Test case: where s not in (subquery)
+ sql =
+ "SELECT cast(%s AS INT32) as %s FROM table1 WHERE device_id = 'd01'
and %s not in (SELECT (%s) FROM table3 WHERE device_id = 'd01')";
+ retArray = new String[] {"50,", "60,", "70,"};
+ for (String measurement : NUMERIC_MEASUREMENTS) {
+ expectedHeader = new String[] {measurement};
+ tableResultSetEqualTest(
+ String.format(sql, measurement, measurement, measurement,
measurement),
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
+ // Test case: where s in (subquery), subquery returns empty set
+ sql =
+ "SELECT cast(%s AS INT32) as %s FROM table1 WHERE device_id = 'd01'
and %s in (SELECT (%s) FROM table3 WHERE device_id = 'd_empty')";
+ retArray = new String[] {};
+ for (String measurement : NUMERIC_MEASUREMENTS) {
+ expectedHeader = new String[] {measurement};
+ tableResultSetEqualTest(
+ String.format(sql, measurement, measurement, measurement,
measurement),
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
+ // Test case: where s not in (subquery), subquery returns empty set.
Should return all rows
+ sql =
+ "SELECT cast(%s AS INT32) as %s FROM table1 WHERE device_id = 'd01'
and %s not in (SELECT (%s) FROM table3 WHERE device_id = 'd_empty')";
+ retArray = new String[] {"30,", "40,", "50,", "60,", "70,"};
+ for (String measurement : NUMERIC_MEASUREMENTS) {
+ expectedHeader = new String[] {measurement};
+ tableResultSetEqualTest(
+ String.format(sql, measurement, measurement, measurement,
measurement),
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
+ // Test case: where s in (subquery), subquery contains scalar subquery.
+ sql =
+ "SELECT cast(%s AS INT32) as %s FROM table1 WHERE device_id = 'd01'
and %s in (SELECT min(%s) from table3 WHERE device_id = 'd01')";
+ retArray = new String[] {"30,"};
+ for (String measurement : NUMERIC_MEASUREMENTS) {
+ expectedHeader = new String[] {measurement};
+ tableResultSetEqualTest(
+ String.format(sql, measurement, measurement, measurement,
measurement),
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
+ // Test case: where s in (subquery), subquery contains scalar subquery.
+ sql =
+ "SELECT cast(%s AS INT32) as %s FROM table1 WHERE device_id = 'd01'
and %s in (SELECT (%s) from table3 WHERE device_id = 'd01' and (%s) > (select
avg(%s) from table3 where device_id = 'd01'))";
+ retArray = new String[] {"40,"};
+ for (String measurement : NUMERIC_MEASUREMENTS) {
+ expectedHeader = new String[] {measurement};
+ tableResultSetEqualTest(
+ String.format(
+ sql, measurement, measurement, measurement, measurement,
measurement, measurement),
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
+ // Test case: where s in (subquery), subquery contains expression.
+ sql =
+ "SELECT cast(%s AS INT32) as %s FROM table1 WHERE device_id = 'd01'
and cast(%s AS INT32) in (SELECT cast(((%s) + 30) AS INT32) from table3 WHERE
device_id = 'd01')";
+ retArray = new String[] {"60,", "70,"};
+ for (String measurement : NUMERIC_MEASUREMENTS) {
+ expectedHeader = new String[] {measurement};
+ tableResultSetEqualTest(
+ String.format(sql, measurement, measurement, measurement,
measurement),
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+ }
+
+ @Test
+ public void testInPredicateSubqueryInHavingClause() {
+ String sql;
+ String[] expectedHeader;
+ String[] retArray;
+
+ // Test case: having s in (subquery)
+ sql =
+ "SELECT device_id, count(*) from table1 group by device_id having
count(*) + 25 in (SELECT cast(s1 as INT64) from table3 where device_id =
'd01')";
+ expectedHeader = new String[] {"device_id", "_col1"};
+ retArray =
+ new String[] {
+ "d01,5,", "d03,5,", "d05,5,", "d07,5,", "d09,5,", "d11,5,",
"d13,5,", "d15,5,"
+ };
+ for (String measurement : NUMERIC_MEASUREMENTS) {
+ tableResultSetEqualTest(
+ String.format(sql, measurement, measurement, measurement,
measurement),
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
+ // Test case: having s not in (subquery)
+ sql =
+ "SELECT device_id, count(*) from table1 group by device_id having
count(*) + 30 not in (SELECT cast(s1 as INT64) from table3 where device_id =
'd01') and count(*) > 3";
+ expectedHeader = new String[] {"device_id", "_col1"};
+ retArray =
+ new String[] {
+ "d01,5,", "d03,5,", "d05,5,", "d07,5,", "d09,5,", "d11,5,",
"d13,5,", "d15,5,"
+ };
+ for (String measurement : NUMERIC_MEASUREMENTS) {
+ tableResultSetEqualTest(
+ String.format(sql, measurement, measurement, measurement,
measurement),
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
+ // Test case: having s in (subquery), subquery returns empty set
+ sql =
+ "SELECT device_id, count(*) from table1 group by device_id having
count(*) + 25 in (SELECT cast(s1 as INT64) from table3 where device_id =
'd010')";
+ expectedHeader = new String[] {"device_id", "_col1"};
+ retArray = new String[] {};
+ for (String measurement : NUMERIC_MEASUREMENTS) {
+ tableResultSetEqualTest(
+ String.format(sql, measurement, measurement, measurement,
measurement),
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
+ // Test case: having s not in (subquery), subquery returns empty set,
should return all rows
+ sql =
+ "SELECT device_id, count(*) from table1 group by device_id having
count(*) + 25 not in (SELECT cast(s1 as INT64) from table3 where device_id =
'd11') and count(*) > 3";
+ expectedHeader = new String[] {"device_id", "_col1"};
+ retArray =
+ new String[] {
+ "d01,5,", "d03,5,", "d05,5,", "d07,5,", "d09,5,", "d11,5,",
"d13,5,", "d15,5,"
+ };
+ for (String measurement : NUMERIC_MEASUREMENTS) {
+ tableResultSetEqualTest(
+ String.format(sql, measurement, measurement, measurement,
measurement),
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+ }
+
+ @Test
+ public void testInPredicateSubqueryInSelectClause() {
+ String sql;
+ String[] expectedHeader;
+ String[] retArray;
+
+ // Test case: select s in (subquery)
+ sql =
+ "SELECT %s in (SELECT (%s) from table3 WHERE device_id = 'd01')
from table1 where device_id = 'd01'";
+ expectedHeader = new String[] {"_col0"};
+ retArray = new String[] {"true,", "true,", "false,","false,","false,"};
+ for (String measurement : NUMERIC_MEASUREMENTS) {
+ tableResultSetEqualTest(
+ String.format(sql, measurement, measurement),
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
+ // Test case: select s not in (subquery)
+ sql =
+ "SELECT %s not in (SELECT (%s) from table3 WHERE device_id =
'd01') from table1 where device_id = 'd01'";
+ expectedHeader = new String[] {"_col0"};
+ retArray = new String[] {"false,", "false,", "true,","true,","true,"};
+ for (String measurement : NUMERIC_MEASUREMENTS) {
+ tableResultSetEqualTest(
+ String.format(sql, measurement, measurement),
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+ }
+
+ @Test
+ public void testInPredicateSubqueryLegalityCheck() {
+ // Legality check: Multiple parentheses around subquery, this behaves the
same as Trino
+ tableAssertTestFail(
+ "select s1 from table1 where device_id = 'd01' and s1 not in ((select
s1 from table3 where device_id = 'd01'))",
+ "301: Scalar sub-query has returned multiple rows.",
+ DATABASE_NAME);
+
+ // Legality check: Join key type mismatch.(left key is int and right key
is double)
+ tableAssertTestFail(
+ "select s1 from table1 where device_id = 'd01' and s1 in (select s1 +
30.0 from table3 where device_id = 'd01')",
+ "301: Join key type mismatch.",
+ DATABASE_NAME);
+
+ // Legality check: subquery can not be parsed(without parentheses)
+ tableAssertTestFail(
+ "select s1 from table1 where s1 in select s1 from table1",
+ "mismatched input",
+ DATABASE_NAME);
+
+ // Legality check: subquery can not be parsed
+ tableAssertTestFail(
+ "select s1 from table1 where s1 in (select s1 from)", "mismatched
input", DATABASE_NAME);
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/subquery/IoTDBUncorrelatedScalarSubqueryIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/subquery/uncorrelated/IoTDBUncorrelatedScalarSubqueryIT.java
similarity index 99%
rename from
integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/subquery/IoTDBUncorrelatedScalarSubqueryIT.java
rename to
integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/subquery/uncorrelated/IoTDBUncorrelatedScalarSubqueryIT.java
index 1d93e36197e..b715b5cfeb9 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/subquery/IoTDBUncorrelatedScalarSubqueryIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/subquery/uncorrelated/IoTDBUncorrelatedScalarSubqueryIT.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.relational.it.query.recent.subquery;
+package org.apache.iotdb.relational.it.query.recent.subquery.uncorrelated;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractMergeSortJoinOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractMergeSortJoinOperator.java
index 8a10e881211..0529f673817 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractMergeSortJoinOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractMergeSortJoinOperator.java
@@ -268,7 +268,7 @@ public abstract class AbstractMergeSortJoinOperator extends
AbstractOperator {
if (!leftBlockNotEmpty()) {
if (leftChild.hasNextWithTimer()) {
TsBlock block = leftChild.nextWithTimer();
- pruneLastNullValuesOfBlock(block);
+ pruneLastNullValuesOfBlock(block, leftJoinKeyPosition);
leftBlock = block;
leftIndex = 0;
} else {
@@ -284,7 +284,7 @@ public abstract class AbstractMergeSortJoinOperator extends
AbstractOperator {
} else {
if (rightChild.hasNextWithTimer()) {
TsBlock block = rightChild.nextWithTimer();
- pruneLastNullValuesOfBlock(block);
+ pruneLastNullValuesOfBlock(block, rightJoinKeyPosition);
if (block != null && !block.isEmpty()) {
addRightBlockWithMemoryReservation(block);
}
@@ -299,13 +299,12 @@ public abstract class AbstractMergeSortJoinOperator
extends AbstractOperator {
}
}
- protected void pruneLastNullValuesOfBlock(TsBlock block) {
+ protected void pruneLastNullValuesOfBlock(TsBlock block, int columnIndex) {
if (block == null) {
return;
}
int lastNonNullIndex = block.getPositionCount() - 1;
- while (lastNonNullIndex >= 0
- && block.getColumn(rightJoinKeyPosition).isNull(lastNonNullIndex)) {
+ while (lastNonNullIndex >= 0 &&
block.getColumn(columnIndex).isNull(lastNonNullIndex)) {
lastNonNullIndex--;
}
block.setPositionCount(lastNonNullIndex + 1);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MergeSortSemiJoinOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MergeSortSemiJoinOperator.java
index 34ef6d6cd3f..824355c8c57 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MergeSortSemiJoinOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MergeSortSemiJoinOperator.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.comparator.JoinKeyComparator;
+import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.type.Type;
import org.apache.tsfile.utils.RamUsageEstimator;
@@ -34,6 +35,8 @@ public class MergeSortSemiJoinOperator extends
AbstractMergeSortJoinOperator {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(MergeSortSemiJoinOperator.class);
+ private final int outputColumnNum;
+
public MergeSortSemiJoinOperator(
OperatorContext operatorContext,
Operator leftChild,
@@ -55,6 +58,7 @@ public class MergeSortSemiJoinOperator extends
AbstractMergeSortJoinOperator {
joinKeyComparator,
dataTypes,
joinKeyType);
+ outputColumnNum = dataTypes.size();
}
@Override
@@ -63,17 +67,24 @@ public class MergeSortSemiJoinOperator extends
AbstractMergeSortJoinOperator {
return true;
}
- return !leftFinished && !rightFinished;
+ return !leftFinished;
}
@Override
protected boolean prepareInput() throws Exception {
gotCandidateBlocks();
+ if (rightFinished) {
+ return leftBlockNotEmpty();
+ }
return leftBlockNotEmpty() && rightBlockNotEmpty() && gotNextRightBlock();
}
@Override
protected boolean processFinished() {
+ if (rightFinished) {
+ buildUseRemainingBlocks();
+ return true;
+ }
// all the join keys in rightTsBlock are less than leftTsBlock, just skip
right
if (allRightLessThanLeft()) {
resetRightBlockList();
@@ -110,6 +121,8 @@ public class MergeSortSemiJoinOperator extends
AbstractMergeSortJoinOperator {
rightBlockList.get(rightBlockListIdx),
rightJoinKeyPosition,
rightIndex)) {
+ // current left won't match any right, append left with false SemiJoin
result
+ appendValueToResult(false);
leftIndex++;
if (leftIndex >= leftBlock.getPositionCount()) {
resetLeftBlock();
@@ -134,25 +147,36 @@ public class MergeSortSemiJoinOperator extends
AbstractMergeSortJoinOperator {
@Override
protected boolean hasMatchedRightValueToProbeLeft() {
- if (comparator.equalsTo(
- leftBlock,
- leftJoinKeyPosition,
- leftIndex,
- rightBlockList.get(rightBlockListIdx),
- rightJoinKeyPosition,
- rightIndex)) {
- recordsWhenDataMatches();
- appendValueToResultWhenMatches();
- return true;
- }
- return false;
+ boolean matches =
+ comparator.equalsTo(
+ leftBlock,
+ leftJoinKeyPosition,
+ leftIndex,
+ rightBlockList.get(rightBlockListIdx),
+ rightJoinKeyPosition,
+ rightIndex);
+ appendValueToResult(matches);
+ return matches;
}
- protected void appendValueToResultWhenMatches() {
+ private void appendValueToResult(boolean matches) {
appendLeftBlockData(leftOutputSymbolIdx, resultBuilder, leftBlock,
leftIndex);
+ appendSemiJoinOutput(matches);
resultBuilder.declarePosition();
}
+ private void appendSemiJoinOutput(boolean value) {
+ ColumnBuilder columnBuilder =
resultBuilder.getColumnBuilder(outputColumnNum - 1);
+ columnBuilder.writeBoolean(value);
+ }
+
+ private void buildUseRemainingBlocks() {
+ while (leftBlockNotEmpty()) {
+ appendValueToResult(false);
+ leftIndex++;
+ }
+ }
+
@Override
protected void recordsWhenDataMatches() {
// do nothing
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ExpressionAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ExpressionAnalyzer.java
index 404c88b2e1e..dd54a54bc73 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ExpressionAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ExpressionAnalyzer.java
@@ -106,6 +106,7 @@ import java.util.function.Function;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
+import static com.google.common.collect.Iterators.getOnlyElement;
import static java.lang.String.format;
import static java.util.Collections.unmodifiableMap;
import static java.util.Collections.unmodifiableSet;
@@ -996,21 +997,15 @@ public class ExpressionAnalyzer {
Type declaredValueType,
SubqueryExpression subquery,
StackableAstVisitorContext<Context> context) {
+ // For now, we only support one column in subqueries, we have checked
this before.
Type valueRowType = declaredValueType;
- if (!(declaredValueType instanceof RowType) && !(declaredValueType
instanceof UnknownType)) {
+ /*if (!(declaredValueType instanceof RowType) && !(declaredValueType
instanceof UnknownType)) {
valueRowType = RowType.anonymous(ImmutableList.of(declaredValueType));
- }
+ }*/
Type subqueryType = analyzeSubquery(subquery, context);
setExpressionType(subquery, subqueryType);
- if (subqueryType.equals(valueRowType)) {
- throw new SemanticException(
- String.format(
- "Value expression and result of subquery must be of the same
type: %s vs %s",
- valueRowType, subqueryType));
- }
-
Optional<Type> valueCoercion = Optional.empty();
// if (!valueRowType.equals(commonType.get())) {
// valueCoercion = commonType;
@@ -1048,7 +1043,9 @@ public class ExpressionAnalyzer {
}
sourceFields.addAll(queryScope.getRelationType().getVisibleFields());
- return RowType.from(fields.build());
+ // return RowType.from(fields.build());
+ // For now, we only support one column in subqueries, we have checked
this before.
+ return getOnlyElement(fields.build().stream().iterator()).getType();
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/SubqueryPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/SubqueryPlanner.java
index 631218b4bf8..222590f5d7d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/SubqueryPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/SubqueryPlanner.java
@@ -572,7 +572,7 @@ class SubqueryPlanner {
}
List<Expression> fieldsList = fields.build();
- checkArgument(fieldsList.size() <= 1, "For now, only single column
subqueries are supported");
+ checkArgument(fieldsList.size() == 1, "For now, only single column
subqueries are supported");
/*subqueryPlan =
subqueryPlan.withNewRoot(
new ProjectNode(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
index cac5f28ab78..ce18b64c291 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
@@ -58,7 +58,6 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Re
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.RemoveTrivialFilters;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.RemoveUnreferencedScalarSubqueries;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.SimplifyExpressions;
-import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.TransformFilteringSemiJoinToInnerJoin;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.TransformUncorrelatedInPredicateSubqueryToSemiJoin;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.TransformUncorrelatedSubqueryToJoin;
@@ -218,10 +217,11 @@ public class LogicalOptimizeFactory {
new CheckSubqueryNodesAreRewritten(),
simplifyOptimizer,
new PushPredicateIntoTableScan(),
- new IterativeOptimizer(
- plannerContext,
- ruleStats,
- ImmutableSet.of(new TransformFilteringSemiJoinToInnerJoin())),
+ // Currently, Distinct is not supported, so we cant use this rule for
now.
+ // new IterativeOptimizer(
+ // plannerContext,
+ // ruleStats,
+ // ImmutableSet.of(new
TransformFilteringSemiJoinToInnerJoin())),
// redo columnPrune and inlineProjections after
pushPredicateIntoTableScan
columnPruningOptimizer,
inlineProjectionLimitFiltersOptimizer,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
index fff7f4df740..7ba4d2514fd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
@@ -72,6 +72,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -832,6 +833,8 @@ public class PushPredicateIntoTableScan implements
PlanOptimizer {
Expression inheritedPredicate =
context.inheritedPredicate != null ? context.inheritedPredicate :
TRUE_LITERAL;
Expression deterministicInheritedPredicate =
filterDeterministicConjuncts(inheritedPredicate);
+ Expression sourceEffectivePredicate = TRUE_LITERAL;
+ Expression filteringSourceEffectivePredicate = TRUE_LITERAL;
// Expression sourceEffectivePredicate =
//
filterDeterministicConjuncts(effectivePredicateExtractor.extract(session,
node.getSource(),
// types, typeAnalyzer));
@@ -853,15 +856,21 @@ public class PushPredicateIntoTableScan implements
PlanOptimizer {
// Generate equality inferences
EqualityInference allInference =
- new EqualityInference(metadata, deterministicInheritedPredicate,
joinExpression);
- // EqualityInference allInference = new EqualityInference(metadata,
- // deterministicInheritedPredicate, sourceEffectivePredicate,
- // filteringSourceEffectivePredicate, joinExpression);
- // EqualityInference allInferenceWithoutSourceInferred = new
EqualityInference(metadata,
- // deterministicInheritedPredicate, filteringSourceEffectivePredicate,
joinExpression);
- // EqualityInference allInferenceWithoutFilteringSourceInferred = new
- // EqualityInference(metadata, deterministicInheritedPredicate,
sourceEffectivePredicate,
- // joinExpression);
+ new EqualityInference(
+ metadata,
+ deterministicInheritedPredicate,
+ sourceEffectivePredicate,
+ filteringSourceEffectivePredicate,
+ joinExpression);
+ EqualityInference allInferenceWithoutSourceInferred =
+ new EqualityInference(
+ metadata,
+ deterministicInheritedPredicate,
+ filteringSourceEffectivePredicate,
+ joinExpression);
+ EqualityInference allInferenceWithoutFilteringSourceInferred =
+ new EqualityInference(
+ metadata, deterministicInheritedPredicate,
sourceEffectivePredicate, joinExpression);
// Push inheritedPredicates down to the source if they don't involve the
semi join output
Set<Symbol> sourceScope = ImmutableSet.copyOf(sourceSymbols);
@@ -892,23 +901,28 @@ public class PushPredicateIntoTableScan implements
PlanOptimizer {
}
});
- /*
// move effective predicate conjuncts source <-> filter
// See if we can push the filtering source effective predicate to the
source side
EqualityInference.nonInferrableConjuncts(metadata,
filteringSourceEffectivePredicate)
- .map(conjunct -> allInference.rewrite(conjunct, sourceScope))
- .filter(Objects::nonNull)
- .forEach(sourceConjuncts::add);
+ .map(conjunct -> allInference.rewrite(conjunct, sourceScope))
+ .filter(Objects::nonNull)
+ .forEach(sourceConjuncts::add);
// See if we can push the source effective predicate to the filtering
source side
EqualityInference.nonInferrableConjuncts(metadata,
sourceEffectivePredicate)
- .map(conjunct -> allInference.rewrite(conjunct, filterScope))
- .filter(Objects::nonNull)
- .forEach(filteringSourceConjuncts::add);
+ .map(conjunct -> allInference.rewrite(conjunct, filterScope))
+ .filter(Objects::nonNull)
+ .forEach(filteringSourceConjuncts::add);
// Add equalities from the inference back in
-
sourceConjuncts.addAll(allInferenceWithoutSourceInferred.generateEqualitiesPartitionedBy(sourceScope).getScopeEqualities());
-
filteringSourceConjuncts.addAll(allInferenceWithoutFilteringSourceInferred.generateEqualitiesPartitionedBy(filterScope).getScopeEqualities());*/
+ sourceConjuncts.addAll(
+ allInferenceWithoutSourceInferred
+ .generateEqualitiesPartitionedBy(sourceScope)
+ .getScopeEqualities());
+ filteringSourceConjuncts.addAll(
+ allInferenceWithoutFilteringSourceInferred
+ .generateEqualitiesPartitionedBy(filterScope)
+ .getScopeEqualities());
PlanNode rewrittenSource =
node.getSource().accept(this, new
RewriteContext(combineConjuncts(sourceConjuncts)));
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/SubqueryTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/SubqueryTest.java
index 652472b0591..a45dfb686f9 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/SubqueryTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/SubqueryTest.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMa
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NotExpression;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference;
import com.google.common.collect.ImmutableList;
@@ -45,9 +46,12 @@ import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.exchange;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.filter;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.join;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.mergeSort;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.output;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.project;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.semiJoin;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.singleGroupingSet;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.sort;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.tableScan;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.Step.FINAL;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.Step.INTERMEDIATE;
@@ -223,4 +227,98 @@ public class SubqueryTest {
JoinNode.JoinType.INNER,
builder ->
builder.left(tableScan1).right(enforceSingleRow(any())))))));
}
+
+ @Test
+ public void testUncorrelatedInPredicateSubquery() {
+ PlanTester planTester = new PlanTester();
+
+ String sql = "SELECT s1 FROM table1 where s1 in (select s1 from table1)";
+
+ LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql);
+
+ Expression filterPredicate = new SymbolReference("expr");
+
+ PlanMatchPattern tableScan1 =
+ tableScan("testdb.table1", ImmutableList.of("s1"),
ImmutableSet.of("s1"));
+
+ PlanMatchPattern tableScan2 = tableScan("testdb.table1",
ImmutableMap.of("s1_6", "s1"));
+
+ // Verify full LogicalPlan
+ /*
+ * └──OutputNode
+ * └──ProjectNode
+ * └──FilterNode
+ * └──SemiJoinNode
+ * |──SortNode
+ * | └──TableScanNode
+ * ├──SortNode
+ * │ └──TableScanNode
+
+ */
+ assertPlan(
+ logicalQueryPlan,
+ output(
+ project(
+ filter(
+ filterPredicate,
+ semiJoin("s1", "s1_6", "expr", sort(tableScan1),
sort(tableScan2))))));
+
+ // Verify DistributionPlan
+ assertPlan(
+ planTester.getFragmentPlan(0),
+ output(
+ project(
+ filter(
+ filterPredicate,
+ semiJoin(
+ "s1",
+ "s1_6",
+ "expr",
+ mergeSort(exchange(), sort(tableScan1), exchange()),
+ mergeSort(exchange(), sort(tableScan2),
exchange()))))));
+
+ assertPlan(planTester.getFragmentPlan(1), sort(tableScan1));
+
+ assertPlan(planTester.getFragmentPlan(2), sort(tableScan1));
+
+ assertPlan(planTester.getFragmentPlan(3), sort(tableScan2));
+
+ assertPlan(planTester.getFragmentPlan(4), sort(tableScan2));
+ }
+
+ @Test
+ public void testUncorrelatedNotInPredicateSubquery() {
+ PlanTester planTester = new PlanTester();
+
+ String sql = "SELECT s1 FROM table1 where s1 not in (select s1 from
table1)";
+
+ LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql);
+
+ Expression filterPredicate = new NotExpression(new
SymbolReference("expr"));
+
+ PlanMatchPattern tableScan1 =
+ tableScan("testdb.table1", ImmutableList.of("s1"),
ImmutableSet.of("s1"));
+
+ PlanMatchPattern tableScan2 = tableScan("testdb.table1",
ImmutableMap.of("s1_6", "s1"));
+
+ // Verify full LogicalPlan
+ /*
+ * └──OutputNode
+ * └──ProjectNode
+ * └──FilterNode
+ * └──SemiJoinNode
+ * |──SortNode
+ * | └──TableScanNode
+ * ├──SortNode
+ * │ └──TableScanNode
+
+ */
+ assertPlan(
+ logicalQueryPlan,
+ output(
+ project(
+ filter(
+ filterPredicate,
+ semiJoin("s1", "s1_6", "expr", sort(tableScan1),
sort(tableScan2))))));
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
index 09740bdbed3..30580bf0f4b 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
@@ -33,6 +33,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNod
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DataType;
@@ -375,6 +376,16 @@ public final class PlanMatchPattern {
return builder.build();
}
+ public static PlanMatchPattern semiJoin(
+ String sourceSymbolAlias,
+ String filteringSymbolAlias,
+ String outputAlias,
+ PlanMatchPattern source,
+ PlanMatchPattern filtering) {
+ return node(SemiJoinNode.class, source, filtering)
+ .with(new SemiJoinMatcher(sourceSymbolAlias, filteringSymbolAlias,
outputAlias));
+ }
+
public static PlanMatchPattern sort(PlanMatchPattern source) {
return node(SortNode.class, source);
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/SemiJoinMatcher.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/SemiJoinMatcher.java
new file mode 100644
index 00000000000..7ee9192254e
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/SemiJoinMatcher.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.assertions;
+
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.MatchResult.NO_MATCH;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.MatchResult.match;
+
+final class SemiJoinMatcher implements Matcher {
+ private final String sourceSymbolAlias;
+ private final String filteringSymbolAlias;
+ private final String outputAlias;
+
+ SemiJoinMatcher(String sourceSymbolAlias, String filteringSymbolAlias,
String outputAlias) {
+ this.sourceSymbolAlias = requireNonNull(sourceSymbolAlias,
"sourceSymbolAlias is null");
+ this.filteringSymbolAlias =
+ requireNonNull(filteringSymbolAlias, "filteringSymbolAlias is null");
+ this.outputAlias = requireNonNull(outputAlias, "outputAlias is null");
+ }
+
+ @Override
+ public boolean shapeMatches(PlanNode node) {
+ return node instanceof SemiJoinNode;
+ }
+
+ @Override
+ public MatchResult detailMatches(
+ PlanNode node, SessionInfo sessionInfo, Metadata metadata, SymbolAliases
symbolAliases) {
+ checkState(
+ shapeMatches(node),
+ "Plan testing framework error: shapeMatches returned false in
detailMatches in %s",
+ this.getClass().getName());
+
+ SemiJoinNode semiJoinNode = (SemiJoinNode) node;
+ if (!(symbolAliases
+ .get(sourceSymbolAlias)
+ .equals(semiJoinNode.getSourceJoinSymbol().toSymbolReference())
+ && symbolAliases
+ .get(filteringSymbolAlias)
+
.equals(semiJoinNode.getFilteringSourceJoinSymbol().toSymbolReference()))) {
+ return NO_MATCH;
+ }
+
+ return match(outputAlias,
semiJoinNode.getSemiJoinOutput().toSymbolReference());
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("filteringSymbolAlias", filteringSymbolAlias)
+ .add("sourceSymbolAlias", sourceSymbolAlias)
+ .add("outputAlias", outputAlias)
+ .toString();
+ }
+}