This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e94c2c1ff71 Add subquery UT and IT, Remove some wrong implementation
code
e94c2c1ff71 is described below
commit e94c2c1ff717cf8e18d45e764460f829e4e9411d
Author: Beyyes <[email protected]>
AuthorDate: Mon Aug 26 11:02:14 2024 +0800
Add subquery UT and IT, Remove some wrong implementation code
---
.../db/it/IoTDBMultiIDsWithAttributesTableIT.java | 99 +++--
.../plan/relational/planner/PredicateUtils.java | 217 ----------
.../iterative/rule/PushLimitThroughProject.java | 9 +
.../PushLimitOffsetIntoTableScan.java | 188 +++-----
.../planner/optimizations/SortElimination.java | 7 +-
.../optimizations/TransformSortToStreamSort.java | 18 +
.../analyzer/LimitOffsetPushDownTest.java | 16 +
.../plan/relational/analyzer/SortTest.java | 94 ++--
.../plan/relational/analyzer/SubQueryTest.java | 477 +++++++++++++++++++++
.../planner/assertions/OffsetMatcher.java | 6 +
.../relational/planner/assertions/PlanAssert.java | 3 +-
.../planner/assertions/PlanMatchPattern.java | 5 +
12 files changed, 706 insertions(+), 433 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
index 270012a9b83..4457055bd3e 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
@@ -59,6 +59,8 @@ public class IoTDBMultiIDsWithAttributesTableIT {
"insert into table0(device, level, time,num,bigNum,floatNum,str,bool)
values('d1', 'l4',31536001000,5,2149783648,56.32,'orange',FALSE)",
"insert into table0(device, level, time,num,bigNum,floatNum,str,bool)
values('d1', 'l5',31536010000,7,2147983648,213.112,'lemon',TRUE)",
"insert into table0(device, level, time,num,bigNum,floatNum,str,bool)
values('d1', 'l1',31536100000,11,2147468648,54.121,'pitaya',FALSE)",
+ "insert into
table0(device,level,attr1,attr2,time,num,bigNum,floatNum,str,bool)
values('d2','l1','d','c',0,3,2947483648,231.2121,'coconut',FALSE)",
+ "insert into
table0(device,level,attr1,time,num,bigNum,floatNum,str,bool) values('d2','l2',
'vv', 31536000100,10,3147483648,231.55,'pumelo',FALSE)",
"insert into table0(device, level, attr1, attr2,
time,num,bigNum,floatNum,str,bool) values('d1', 'l2', 'yy', 'zz',
41536000000,12,2146483648,45.231,'strawberry',FALSE)",
"insert into table0(device, level, time,num,bigNum,floatNum,str,bool)
values('d1', 'l3',41536000020,14,2907483648,231.34,'cherry',FALSE)",
"insert into table0(device, level, time,num,bigNum,floatNum,str,bool)
values('d1', 'l4',41536900000,13,2107483648,54.12,'lychee',TRUE)",
@@ -67,13 +69,11 @@ public class IoTDBMultiIDsWithAttributesTableIT {
private static final String[] sql2 =
new String[] {
- "insert into
table0(device,level,attr1,attr2,time,num,bigNum,floatNum,str,bool)
values('d2','l1','d','c',0,3,2947483648,231.2121,'coconut',FALSE)",
"insert into table0(device,level,time,num,bigNum,floatNum,str,bool)
values('d2','l2',20,2,2147483648,434.12,'pineapple',TRUE)",
"insert into table0(device,level,time,num,bigNum,floatNum,str,bool)
values('d2','l3',40,1,2247483648,12.123,'apricot',TRUE)",
"insert into table0(device,level,time,num,bigNum,floatNum,str,bool)
values('d2','l4',80,9,2147483646,43.12,'apple',FALSE)",
"insert into table0(device,level,time,num,bigNum,floatNum,str,bool)
values('d2','l5',100,8,2147483964,4654.231,'papaya',TRUE)",
"insert into table0(device,level,time,num,bigNum,floatNum,str,bool)
values('d2','l1',31536000000,6,2147483650,1231.21,'banana',TRUE)",
- "insert into
table0(device,level,attr1,time,num,bigNum,floatNum,str,bool) values('d2','l2',
'vv', 31536000100,10,3147483648,231.55,'pumelo',FALSE)",
"insert into table0(device,level,time,num,bigNum,floatNum,str,bool)
values('d2','l3',31536000500,4,2147493648,213.1,'peach',FALSE)",
"insert into table0(device,level,time,num,bigNum,floatNum,str,bool)
values('d2','l4',31536001000,5,2149783648,56.32,'orange',FALSE)",
"insert into table0(device,level,time,num,bigNum,floatNum,str,bool)
values('d2','l5',31536010000,7,2147983648,213.112,'lemon',TRUE)",
@@ -88,6 +88,14 @@ public class IoTDBMultiIDsWithAttributesTableIT {
public static void setUp() throws Exception {
EnvFactory.getEnv().getConfig().getDataNodeCommonConfig().setSortBufferSize(1024
* 1024L);
EnvFactory.getEnv().initClusterEnvironment();
+ EnvFactory.getEnv()
+ .getConfig()
+ .getCommonConfig()
+ .setEnableSeqSpaceCompaction(false)
+ .setEnableUnseqSpaceCompaction(false)
+ .setEnableCrossSpaceCompaction(false)
+ .setMaxTsBlockLineNumber(1)
+ .setMaxNumberOfPointsInPage(1);
insertData();
}
@@ -159,43 +167,43 @@ public class IoTDBMultiIDsWithAttributesTableIT {
expectedHeader =
new String[] {
- "time", "num", "str",
+ "num", "str", "attr2",
};
retArray =
new String[] {
- "1970-01-01T00:00:00.040Z,1,apricot,",
- "1971-01-01T00:00:00.500Z,4,peach,",
- "1971-04-26T17:46:40.020Z,14,cherry,",
- "1970-01-01T00:00:00.000Z,3,coconut,",
- "1971-01-01T00:00:00.000Z,6,banana,",
- "1971-01-01T00:01:40.000Z,11,pitaya,",
- "1970-01-01T00:00:00.000Z,3,coconut,",
- "1971-01-01T00:00:00.000Z,6,banana,",
- "1971-01-01T00:01:40.000Z,11,pitaya,",
- "1970-01-01T00:00:00.020Z,2,pineapple,",
- "1971-01-01T00:00:00.100Z,10,pumelo,",
- "1971-04-26T17:46:40.000Z,12,strawberry,",
- "1970-01-01T00:00:00.100Z,8,papaya,",
- "1970-01-01T00:00:00.080Z,9,apple,",
- "1971-01-01T00:00:10.000Z,7,lemon,",
- "1971-01-01T00:00:01.000Z,5,orange,",
- "1971-08-20T11:33:20.000Z,15,watermelon,",
- "1971-04-26T18:01:40.000Z,13,lychee,",
- "1970-01-01T00:00:00.020Z,2,pineapple,",
- "1970-01-01T00:00:00.040Z,1,apricot,",
- "1971-01-01T00:00:00.100Z,10,pumelo,",
- "1971-01-01T00:00:00.500Z,4,peach,",
- "1971-04-26T17:46:40.000Z,12,strawberry,",
- "1971-04-26T17:46:40.020Z,14,cherry,",
- "1970-01-01T00:00:00.080Z,9,apple,",
- "1970-01-01T00:00:00.100Z,8,papaya,",
- "1971-01-01T00:00:01.000Z,5,orange,",
- "1971-01-01T00:00:10.000Z,7,lemon,",
- "1971-04-26T18:01:40.000Z,13,lychee,",
- "1971-08-20T11:33:20.000Z,15,watermelon,",
+ "3,coconut,d,",
+ "6,banana,d,",
+ "11,pitaya,d,",
+ "2,pineapple,zz,",
+ "10,pumelo,zz,",
+ "12,strawberry,zz,",
+ "1,apricot,a,",
+ "4,peach,a,",
+ "14,cherry,a,",
+ "9,apple,null,",
+ "5,orange,null,",
+ "13,lychee,null,",
+ "8,papaya,null,",
+ "7,lemon,null,",
+ "15,watermelon,null,",
+ "3,coconut,c,",
+ "6,banana,c,",
+ "11,pitaya,c,",
+ "2,pineapple,null,",
+ "10,pumelo,null,",
+ "12,strawberry,null,",
+ "1,apricot,null,",
+ "4,peach,null,",
+ "14,cherry,null,",
+ "9,apple,null,",
+ "5,orange,null,",
+ "13,lychee,null,",
+ "8,papaya,null,",
+ "7,lemon,null,",
+ "15,watermelon,null,",
};
tableResultSetEqualTest(
- "select time, num, str from table0 order by attr2, device",
+ "select num, str, attr2 from table0 order by device, level, time",
expectedHeader,
retArray,
DATABASE_NAME);
@@ -381,4 +389,27 @@ public class IoTDBMultiIDsWithAttributesTableIT {
retArray,
DATABASE_NAME);
}
+
+ @Test
+ public void subQueryTest1() {
+ String[] expectedHeader = new String[] {"time", "level", "device",
"add_num"};
+ String[] retArray =
+ new String[] {
+ "1970-01-01T00:00:00.100Z,l5,d1,9.0,",
+ "1971-01-01T00:00:01.000Z,l4,d1,6.0,",
+ "1971-01-01T00:00:10.000Z,l5,d1,8.0,",
+ "1971-04-26T18:01:40.000Z,l4,d1,14.0,",
+ "1971-08-20T11:33:20.000Z,l5,d1,16.0,",
+ "1970-01-01T00:00:00.080Z,l4,d2,10.0,",
+ };
+
+ expectedHeader = new String[] {"time", "level", "device", "add_num"};
+ tableResultSetEqualTest(
+ "SELECT time, level, device, add_num FROM (\n"
+ + "SELECT time, level, device, substring(str, 2) as cast_str,
attr2, bignum, num+1 as add_num FROM table0 WHERE num>1 ORDER BY level DESC,
time, device LIMIT 12\n"
+ + ") ORDER BY DEVICE,time OFFSET 1 LIMIT 6",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/PredicateUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/PredicateUtils.java
index 1c07128d62c..62531541c4e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/PredicateUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/PredicateUtils.java
@@ -29,11 +29,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LongLiteral;
import org.apache.tsfile.utils.Pair;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
import static org.apache.iotdb.commons.conf.IoTDBConstant.TIME;
import static
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral.TRUE_LITERAL;
@@ -199,217 +195,4 @@ public class PredicateUtils {
&& ((Identifier) timeExpression).getValue().equalsIgnoreCase(TIME)
&& valueExpression instanceof LongLiteral;
}
-
- // private static boolean checkBetweenConstantSatisfy(Expression e1,
Expression e2) {
- // return e1.isConstantOperand()
- // && e2.isConstantOperand()
- // && ((ConstantOperand) e1).getDataType() == TSDataType.INT64
- // && ((ConstantOperand) e2).getDataType() == TSDataType.INT64
- // && (Long.parseLong(((ConstantOperand) e1).getValueString())
- // <= Long.parseLong(((ConstantOperand) e2).getValueString()));
- // }
-
- /**
- * Check if the given expression contains time filter.
- *
- * @param predicate given expression
- * @return true if the given expression contains time filter
- */
- // public static boolean checkIfTimeFilterExist(Expression predicate) {
- // return new TimeFilterExistChecker().process(predicate, null);
- // }
-
- /**
- * Recursively removes all use of the not() operator in a predicate by
replacing all instances of
- * not(x) with the inverse(x),
- *
- * <p>eg: not(and(eq(), not(eq(y))) -> or(notEq(), eq(y))
- *
- * <p>The returned predicate should have the same meaning as the original,
but without the use of
- * the not() operator.
- *
- * <p>See also {@link PredicateUtils#reversePredicate(Expression)}, which is
used to do the
- * inversion.
- *
- * @param predicate the predicate to remove not() from
- * @return the predicate with all not() operators removed
- */
- // public static Expression predicateRemoveNot(Expression predicate) {
- // if (predicate.getExpressionType().equals(ExpressionType.LOGIC_AND)) {
- // return ExpressionFactory.and(
- // predicateRemoveNot(((BinaryExpression)
predicate).getLeftExpression()),
- // predicateRemoveNot(((BinaryExpression)
predicate).getRightExpression()));
- // } else if
(predicate.getExpressionType().equals(ExpressionType.LOGIC_OR)) {
- // return ExpressionFactory.or(
- // predicateRemoveNot(((BinaryExpression)
predicate).getLeftExpression()),
- // predicateRemoveNot(((BinaryExpression)
predicate).getRightExpression()));
- // } else if
(predicate.getExpressionType().equals(ExpressionType.LOGIC_NOT)) {
- // return reversePredicate(((LogicNotExpression)
predicate).getExpression());
- // }
- // return predicate;
- // }
-
- /**
- * Converts a predicate to its logical inverse. The returned predicate
should be equivalent to
- * not(p), but without the use of a not() operator.
- *
- * <p>See also {@link PredicateUtils#predicateRemoveNot(Expression)}, which
can remove the use of
- * all not() operators without inverting the overall predicate.
- *
- * @param predicate given predicate
- * @return the predicate after reversing
- */
- // public static Expression reversePredicate(Expression predicate) {
- // return new ReversePredicateVisitor().process(predicate, null);
- // }
-
- /**
- * Simplify the given predicate (Remove the NULL and TRUE/FALSE expression).
- *
- * @param predicate given predicate
- * @return the predicate after simplifying
- */
- // public static Expression simplifyPredicate(Expression predicate) {
- // return new PredicateSimplifier().process(predicate, null);
- // }
-
- /**
- * Convert the given predicate to time filter.
- *
- * <p>Note: the supplied predicate must not contain any instances of the
not() operator as this is
- * not supported by this filter. The supplied predicate should first be run
through {@link
- * PredicateUtils#predicateRemoveNot(Expression)} to rewrite it in a form
that doesn't make use of
- * the not() operator.
- *
- * @param predicate given predicate
- * @return the time filter converted from the given predicate
- */
- // public static Filter convertPredicateToTimeFilter(Expression predicate) {
- // if (predicate == null) {
- // return null;
- // }
- // return predicate.accept(new ConvertPredicateToTimeFilterVisitor(),
null);
- // }
-
- // public static Filter convertPredicateToFilter(
- // Expression predicate,
- // List<String> allMeasurements,
- // boolean isBuildPlanUseTemplate,
- // TypeProvider typeProvider) {
- // if (predicate == null) {
- // return null;
- // }
- // return predicate.accept(
- // new ConvertPredicateToFilterVisitor(),
- // new ConvertPredicateToFilterVisitor.Context(
- // allMeasurements, isBuildPlanUseTemplate, typeProvider));
- // }
-
- /**
- * Combine the given conjuncts into a single expression using "and".
- *
- * @param conjuncts given conjuncts
- * @return the expression combined by the given conjuncts
- */
- public static Expression combineConjuncts(List<Expression> conjuncts) {
- if (conjuncts.size() == 1) {
- return conjuncts.get(0);
- }
- return constructRightDeepTreeWithAnd(conjuncts);
- }
-
- private static Expression constructRightDeepTreeWithAnd(List<Expression>
conjuncts) {
- // TODO: consider other structures of tree
- if (conjuncts.size() == 2) {
- return and(conjuncts.get(0), conjuncts.get(1));
- } else {
- return and(
- conjuncts.get(0), constructRightDeepTreeWithAnd(conjuncts.subList(1,
conjuncts.size())));
- }
- }
-
- public static Expression removeDuplicateConjunct(Expression predicate) {
- if (predicate == null) {
- return null;
- }
- Set<Expression> conjuncts = new HashSet<>();
- extractConjuncts(predicate, conjuncts);
- return combineConjuncts(new ArrayList<>(conjuncts));
- }
-
- public static List<Expression> extractConjuncts(Expression predicate) {
- Set<Expression> conjuncts = new HashSet<>();
- extractConjuncts(predicate, conjuncts);
- return new ArrayList<>(conjuncts);
- }
-
- private static void extractConjuncts(Expression predicate, Set<Expression>
conjuncts) {
- if (predicate instanceof LogicalExpression
- && ((LogicalExpression) predicate).getOperator() == AND) {
- extractConjuncts(((LogicalExpression) predicate).getTerms().get(0),
conjuncts);
- extractConjuncts(((LogicalExpression) predicate).getTerms().get(1),
conjuncts);
- } else {
- conjuncts.add(predicate);
- }
- }
-
- /**
- * Extract the source symbol (full path for non-aligned path, device path
for aligned path) from
- * the given predicate. If the predicate contains multiple source symbols,
return null.
- *
- * @param predicate given predicate
- * @return the source symbol extracted from the given predicate
- */
- // public static PartialPath extractPredicateSourceSymbol(Expression
predicate) {
- // List<Expression> sourceExpressions =
ExpressionAnalyzer.searchSourceExpressions(predicate);
- // Set<PartialPath> sourcePaths =
- // sourceExpressions.stream()
- // .map(expression -> ((TimeSeriesOperand) expression).getPath())
- // .collect(Collectors.toSet());
- // Iterator<PartialPath> pathIterator = sourcePaths.iterator();
- // MeasurementPath firstPath = (MeasurementPath) pathIterator.next();
- //
- // if (sourcePaths.size() == 1) {
- // // only contain one source path, can be push down
- // return firstPath.isUnderAlignedEntity() ? firstPath.getDevicePath()
: firstPath;
- // }
- //
- // // sourcePaths contain more than one path, can be push down when
- // // these paths under on aligned device
- // if (!firstPath.isUnderAlignedEntity()) {
- // return null;
- // }
- // PartialPath checkedDevice = firstPath.getDevicePath();
- // while (pathIterator.hasNext()) {
- // MeasurementPath path = (MeasurementPath) pathIterator.next();
- // if (!path.isUnderAlignedEntity() ||
!path.getDevicePath().equals(checkedDevice)) {
- // return null;
- // }
- // }
- // return checkedDevice;
- // }
-
- /**
- * Check if the given predicate can be pushed down from FilterNode to
ScanNode.
- *
- * <p>The predicate <b>cannot</b> be pushed down if it satisfies the
following conditions:
- * <li>predicate contains IS_NULL
- *
- * @param predicate given predicate
- * @return true if the given predicate can be pushed down to source
- */
- // public static boolean predicateCanPushDownToSource(Expression predicate)
{
- // return new PredicateCanPushDownToSourceChecker().process(predicate,
null);
- // }
-
- /**
- * Check if the given predicate can be pushed into ScanOperator and execute
using the {@link
- * Filter} interface.
- *
- * @param predicate given predicate
- * @return true if the given predicate can be pushed into ScanOperator
- */
- // public static boolean predicateCanPushIntoScan(Expression predicate) {
- // return new PredicatePushIntoScanChecker().process(predicate, null);
- // }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushLimitThroughProject.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushLimitThroughProject.java
index 26ecef14a4c..aece5057546 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushLimitThroughProject.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushLimitThroughProject.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture;
import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern;
@@ -29,6 +30,7 @@ import static
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.limit;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.project;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.source;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan.containsDiffFunction;
import static
org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture.newCapture;
public class PushLimitThroughProject implements Rule<LimitNode> {
@@ -53,6 +55,13 @@ public class PushLimitThroughProject implements
Rule<LimitNode> {
@Override
public Result apply(LimitNode parent, Captures captures, Context context) {
ProjectNode projectNode = captures.get(CHILD);
+
+ for (Expression expression :
projectNode.getAssignments().getMap().values()) {
+ if (containsDiffFunction(expression)) {
+ return Result.empty();
+ }
+ }
+
return Result.ofPlanNode(transpose(parent, projectNode));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
index 3b0ee37034f..54a14a10f7a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
@@ -21,11 +21,8 @@ import
org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
-import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
-import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
-import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
@@ -59,10 +56,15 @@ public class PushLimitOffsetIntoTableScan implements
PlanOptimizer {
return plan;
}
- return plan.accept(new Rewriter(), new Context(context.getAnalysis()));
+ return plan.accept(new Rewriter(context.getAnalysis()), new Context());
}
private static class Rewriter extends PlanVisitor<PlanNode, Context> {
+ private final Analysis analysis;
+
+ public Rewriter(Analysis analysis) {
+ this.analysis = analysis;
+ }
@Override
public PlanNode visitPlan(PlanNode node, Context context) {
@@ -74,66 +76,67 @@ public class PushLimitOffsetIntoTableScan implements
PlanOptimizer {
}
@Override
- public PlanNode visitOutput(OutputNode node, Context context) {
- return visitPlan(node, context);
- }
-
- @Override
- public PlanNode visitLimit(LimitNode node, Context context) {
- context.setLimit(node.getCount());
+ public PlanNode visitFilter(FilterNode node, Context context) {
+ // In Filter-TableScan and Filter-Project-TableScan case, limit can not
be pushed down.
+ // In later, we need consider other case such as Filter-Values.
+ // FilterNode in outer query can not be pushed down.
+ if (node.getChild() instanceof TableScanNode
+ || (node.getChild() instanceof ProjectNode
+ && ((ProjectNode) node.getChild()).getChild() instanceof
TableScanNode)) {
+ context.enablePushDown = false;
+ return node;
+ }
node.setChild(node.getChild().accept(this, context));
return node;
}
@Override
- public PlanNode visitOffset(OffsetNode node, Context context) {
- context.setOffset(node.getCount());
- // already use rule {@link PushLimitThroughOffset}
- // if (context.getLimit() > 0) {
- // context.setLimit(context.getLimit() + context.getOffset());
- // }
+ public PlanNode visitProject(ProjectNode node, Context context) {
+ for (Expression expression : node.getAssignments().getMap().values()) {
+ if (containsDiffFunction(expression)) {
+ context.enablePushDown = false;
+ return node;
+ }
+ }
node.setChild(node.getChild().accept(this, context));
return node;
}
@Override
- public PlanNode visitCollect(CollectNode node, Context context) {
- PlanNode newNode = node.clone();
- for (PlanNode child : node.getChildren()) {
- newNode.addChild(child.accept(this, context));
- }
- return newNode;
- }
-
- @Override
- public PlanNode visitProject(ProjectNode node, Context context) {
- for (Expression expression : node.getAssignments().getMap().values()) {
- if (containsDiffFunction(expression)) {
- context.setEnablePushDown(false);
- return node;
+ public PlanNode visitLimit(LimitNode node, Context context) {
+ Context subContext = new Context();
+ node.setChild(node.getChild().accept(this, subContext));
+ context.existLimitNode = true;
+ if (!subContext.enablePushDown || subContext.existLimitNode) {
+ context.enablePushDown = false;
+ return node;
+ } else {
+ TableScanNode tableScanNode = subContext.tableScanNode;
+ context.tableScanNode = tableScanNode;
+ if (tableScanNode != null) {
+ tableScanNode.setPushDownLimit(node.getCount());
}
+ return node;
}
- return visitPlan(node, context);
}
@Override
public PlanNode visitSort(SortNode node, Context context) {
- Context newContext =
- new Context(
- context.analysis,
- context.getLimit(),
- context.getOffset(),
- context.isEnablePushDown(),
- context.canPushLimitToEachDevice());
- PlanNode child = node.getChild().accept(this, newContext);
- if (!newContext.isEnablePushDown()) {
+ Context subContext = new Context();
+ node.setChild(node.getChild().accept(this, subContext));
+ context.existSortNode = true;
+ // Children of SortNode have SortNode or LimitNode, set
enablePushDown==false.
+ // In later, there will have more Nodes to perfect this judgement.
+ if (!subContext.enablePushDown || subContext.existSortNode ||
subContext.existLimitNode) {
+ context.enablePushDown = false;
return node;
}
+ TableScanNode tableScanNode = subContext.tableScanNode;
+ context.tableScanNode = tableScanNode;
OrderingScheme orderingScheme = node.getOrderingScheme();
- TableScanNode tableScanNode = newContext.getTableScanNode();
Map<Symbol, ColumnSchema> tableColumnSchema =
-
context.getAnalysis().getTableColumnSchema(tableScanNode.getQualifiedObjectName());
+
analysis.getTableColumnSchema(tableScanNode.getQualifiedObjectName());
Set<Symbol> sortSymbols = new HashSet<>();
for (Symbol orderBy : orderingScheme.getOrderBy()) {
if (TIMESTAMP_STR.equalsIgnoreCase(orderBy.getName())) {
@@ -144,8 +147,7 @@ public class PushLimitOffsetIntoTableScan implements
PlanOptimizer {
if (!tableColumnSchema.containsKey(orderBy)
|| tableColumnSchema.get(orderBy).getColumnCategory()
== TsTableColumnCategory.MEASUREMENT) {
- tableScanNode.setPushDownLimit(0);
- tableScanNode.setPushDownOffset(0);
+ context.enablePushDown = false;
return node;
}
@@ -161,112 +163,32 @@ public class PushLimitOffsetIntoTableScan implements
PlanOptimizer {
}
}
tableScanNode.setPushLimitToEachDevice(pushLimitToEachDevice);
- node.setChild(child);
return node;
}
- @Override
- public PlanNode visitTopK(TopKNode node, Context context) {
- throw new IllegalStateException(
- "TopKNode must be appeared after PushLimitOffsetIntoTableScan");
- }
-
@Override
public PlanNode visitStreamSort(StreamSortNode node, Context context) {
return visitSort(node, context);
}
@Override
- public PlanNode visitFilter(FilterNode node, Context context) {
- // If there is still a FilterNode here, it means that there are read
filter conditions that
- // cannot be pushed
- // down to TableScan.
- context.setEnablePushDown(false);
+ public PlanNode visitTableScan(TableScanNode node, Context context) {
+ context.tableScanNode = node;
return node;
}
@Override
- public PlanNode visitTableScan(TableScanNode node, Context context) {
- context.setTableScanNode(node);
- if (context.isEnablePushDown()) {
- if (context.getLimit() > 0) {
- node.setPushDownLimit(context.getLimit());
- }
- // TODO only one data region, pushDownOffset can be set
- // if (context.getOffset() > 0) {
- // node.setPushDownOffset(context.getOffset());
- // }
- if (context.canPushLimitToEachDevice()) {
- node.setPushLimitToEachDevice(true);
- }
- }
- return node;
+ public PlanNode visitTopK(TopKNode node, Context context) {
+ throw new IllegalStateException(
+ "TopKNode must be appeared after PushLimitOffsetIntoTableScan");
}
}
private static class Context {
- private final Analysis analysis;
- private long limit;
- private long offset;
+ // means if limit and offset can be pushed down into TableScanNode
private boolean enablePushDown = true;
- private boolean pushLimitToEachDevice = false;
private TableScanNode tableScanNode;
-
- public Context(Analysis analysis) {
- this.analysis = analysis;
- }
-
- public Context(
- Analysis analysis,
- long limit,
- long offset,
- boolean enablePushDown,
- boolean pushLimitToEachDevice) {
- this.analysis = analysis;
- this.limit = limit;
- this.offset = offset;
- this.enablePushDown = enablePushDown;
- this.pushLimitToEachDevice = pushLimitToEachDevice;
- }
-
- public Analysis getAnalysis() {
- return analysis;
- }
-
- public long getLimit() {
- return limit;
- }
-
- public void setLimit(long limit) {
- this.limit = limit;
- }
-
- public long getOffset() {
- return offset;
- }
-
- public void setOffset(long offset) {
- this.offset = offset;
- }
-
- public boolean isEnablePushDown() {
- return enablePushDown;
- }
-
- public void setEnablePushDown(boolean enablePushDown) {
- this.enablePushDown = enablePushDown;
- }
-
- public boolean canPushLimitToEachDevice() {
- return pushLimitToEachDevice;
- }
-
- public TableScanNode getTableScanNode() {
- return tableScanNode;
- }
-
- public void setTableScanNode(TableScanNode tableScanNode) {
- this.tableScanNode = tableScanNode;
- }
+ private boolean existSortNode = false;
+ private boolean existLimitNode = false;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java
index 41231ff7f27..e85d7623592 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java
@@ -30,6 +30,7 @@ import static
org.apache.iotdb.db.utils.constant.TestConstant.TIMESTAMP_STR;
* <li>When order by time and there is only one device entry in TableScanNode
below, the SortNode
* can be eliminated.
* <li>When order by all IDColumns and time, the SortNode can be eliminated.
+ * <li>When StreamSortIndex==OrderBy size()-1, remove this StreamSortNode
*/
public class SortElimination implements PlanOptimizer {
@@ -68,7 +69,11 @@ public class SortElimination implements PlanOptimizer {
@Override
public PlanNode visitStreamSort(StreamSortNode node, Context context) {
PlanNode child = node.getChild().accept(this, context);
- return node.isOrderByAllIdsAndTime() ? child : node;
+ return node.isOrderByAllIdsAndTime()
+ || node.getStreamCompareKeyEndIndex()
+ == node.getOrderingScheme().getOrderBy().size() - 1
+ ? child
+ : node;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
index 683c35d1207..7650b8681df 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
@@ -75,7 +75,16 @@ public class TransformSortToStreamSort implements
PlanOptimizer {
@Override
public PlanNode visitSort(SortNode node, Context context) {
+
PlanNode child = node.getChild().accept(this, context);
+
+ // sort in outer query cannot use StreamSort
+ if (context.isExistSortNodeInSubQuery()) {
+ node.setChild(child);
+ return node;
+ }
+ context.setExistSortNodeInSubQuery(true);
+
TableScanNode tableScanNode = context.getTableScanNode();
Map<Symbol, ColumnSchema> tableColumnSchema =
analysis.getTableColumnSchema(tableScanNode.getQualifiedObjectName());
@@ -133,6 +142,7 @@ public class TransformSortToStreamSort implements
PlanOptimizer {
private static class Context {
private TableScanNode tableScanNode;
+ private boolean existSortNodeInSubQuery = false;
public TableScanNode getTableScanNode() {
return tableScanNode;
@@ -141,5 +151,13 @@ public class TransformSortToStreamSort implements
PlanOptimizer {
public void setTableScanNode(TableScanNode tableScanNode) {
this.tableScanNode = tableScanNode;
}
+
+ public boolean isExistSortNodeInSubQuery() {
+ return existSortNodeInSubQuery;
+ }
+
+ public void setExistSortNodeInSubQuery(boolean existSortNodeInSubQuery) {
+ this.existSortNodeInSubQuery = existSortNodeInSubQuery;
+ }
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java
index 6791070b16c..2e6f3bca6df 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java
@@ -297,6 +297,22 @@ public class LimitOffsetPushDownTest {
assertTrue(tableScanNode.getPushDownLimit() == 0 &&
tableScanNode.getPushDownOffset() == 0);
}
+ /** Actually with diff function, LimitNode should be above of ProjectNode. */
+ @Test
+ public void limitDiffProjectTest() {
+ sql = "SELECT time, diff(s1) FROM table1 limit 10";
+ context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
+ actualAnalysis = analyzeSQL(sql, metadata, context);
+ logicalQueryPlan =
+ new LogicalPlanner(context, metadata, sessionInfo,
WarningCollector.NOOP)
+ .plan(actualAnalysis);
+ // LogicalPlan: `Output - Project - Limit - TableScan`
+ rootNode = logicalQueryPlan.getRootNode();
+ assertTrue(getChildrenNode(rootNode, 1) instanceof ProjectNode);
+ assertTrue(getChildrenNode(rootNode, 2) instanceof LimitNode);
+ assertTrue(getChildrenNode(rootNode, 3) instanceof TableScanNode);
+ }
+
static PlanNode getChildrenNode(PlanNode root, int idx) {
PlanNode result = root;
for (int i = 1; i <= idx; i++) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
index b779fbc53c7..02578aafc7c 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
@@ -60,8 +60,8 @@ import static org.junit.Assert.assertTrue;
public class SortTest {
- QueryId queryId = new QueryId("test_query");
- SessionInfo sessionInfo =
+ static QueryId queryId = new QueryId("test_query");
+ static SessionInfo sessionInfo =
new SessionInfo(
1L,
"iotdb-user",
@@ -69,9 +69,9 @@ public class SortTest {
IoTDBConstant.ClientVersion.V_1_0,
"db",
IClientSession.SqlDialect.TABLE);
- Metadata metadata = new TestMatadata();
+ static Metadata metadata = new TestMatadata();
String sql;
- Analysis actualAnalysis;
+ Analysis analysis;
MPPQueryContext context;
WarningCollector warningCollector = WarningCollector.NOOP;
LogicalQueryPlan logicalQueryPlan;
@@ -88,7 +88,7 @@ public class SortTest {
"table1.shenzhen.B1.XX",
"table1.shenzhen.B2.ZZ",
"table1.shanghai.A3.YY");
- List<String> originalDeviceEntries2 =
+ static List<String> originalDeviceEntries2 =
Arrays.asList("table1.shenzhen.B1.XX", "table1.shenzhen.B2.ZZ");
// order by some_ids, time, others; has filter
@@ -98,9 +98,9 @@ public class SortTest {
"SELECT time, tag3, tag1, cast(s2 as double), s2+s3, attr1 FROM table1
"
+ "where s1>1 and s1+s3>0 and cast(s1 as double)>1.0 order by tag2
desc, tag3 asc, time desc, s1+s2 desc offset 5 limit 10";
context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
- actualAnalysis = analyzeSQL(sql, metadata, context);
+ analysis = analyzeSQL(sql, metadata, context);
logicalQueryPlan =
- new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(actualAnalysis);
+ new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(analysis);
logicalPlanNode = logicalQueryPlan.getRootNode();
// LogicalPlan:
`Output-Offset-Limit-Project-StreamSort-Project-Filter-TableScan`
@@ -127,7 +127,7 @@ public class SortTest {
// DistributePlan:
`Output-Offset-Limit-Project-MergeSort-StreamSort-Project-Filter-TableScan`
// to
// `Output-Offset-Project-TopK-Limit-StreamSort-Project-Filter-TableScan`
- distributionPlanner = new TableDistributedPlanner(actualAnalysis,
logicalQueryPlan, context);
+ distributionPlanner = new TableDistributedPlanner(analysis,
logicalQueryPlan, context);
distributedQueryPlan = distributionPlanner.plan();
assertEquals(3, distributedQueryPlan.getFragments().size());
IdentitySinkNode identitySinkNode =
@@ -174,22 +174,22 @@ public class SortTest {
sql = "SELECT * FROM table1 order by tag2 desc, tag3 asc offset 5 limit
10";
context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
- actualAnalysis = analyzeSQL(sql, metadata, context);
+ analysis = analyzeSQL(sql, metadata, context);
logicalQueryPlan =
- new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(actualAnalysis);
+ new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(analysis);
logicalPlanNode = logicalQueryPlan.getRootNode();
// LogicalPlan: `Output-Offset-Limit-StreamSort-TableScan`
assertTrue(getChildrenNode(logicalPlanNode, 3) instanceof StreamSortNode);
- distributionPlanner = new TableDistributedPlanner(actualAnalysis,
logicalQueryPlan, context);
+ distributionPlanner = new TableDistributedPlanner(analysis,
logicalQueryPlan, context);
distributedQueryPlan = distributionPlanner.plan();
assertEquals(3, distributedQueryPlan.getFragments().size());
- // DistributedPlan: `Output-Offset-TopK-Limit-StreamSort-TableScan`
+ // DistributedPlan: `Output-Offset-TopK-Limit-TableScan`
identitySinkNode =
(IdentitySinkNode)
distributedQueryPlan.getFragments().get(0).getPlanNodeTree();
assertTrue(getChildrenNode(identitySinkNode, 3) instanceof TopKNode);
topKNode = (TopKNode) getChildrenNode(identitySinkNode, 3);
assertTrue(topKNode.getChildren().get(1) instanceof LimitNode);
- assertTrue(getChildrenNode(topKNode.getChildren().get(1), 1) instanceof
StreamSortNode);
+ assertTrue(getChildrenNode(topKNode.getChildren().get(1), 1) instanceof
TableScanNode);
}
// order by all_ids, time, others
@@ -200,9 +200,9 @@ public class SortTest {
"SELECT time, tag3, tag1, cast(s2 as double), s2+s3, attr1 FROM table1
"
+ "where s1>1 and s1+s3>0 and cast(s1 as double)>1.0 order by tag2
desc, tag1 desc, tag3 asc, time desc, s1+s2 desc offset 5 limit 10";
context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
- actualAnalysis = analyzeSQL(sql, metadata, context);
+ analysis = analyzeSQL(sql, metadata, context);
logicalQueryPlan =
- new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(actualAnalysis);
+ new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(analysis);
logicalPlanNode = logicalQueryPlan.getRootNode();
// LogicalPlan:
`Output-Offset-Limit-Project-StreamSort-Project-Filter-TableScan`
@@ -223,7 +223,7 @@ public class SortTest {
// DistributePlan: optimize
//
`Output-Offset-Limit-Project-MergeSort-StreamSort-Project-Filter-TableScan`
// to `Output-Offset-Project-TopK-Limit-Project-Filter-TableScan`
- distributionPlanner = new TableDistributedPlanner(actualAnalysis,
logicalQueryPlan, context);
+ distributionPlanner = new TableDistributedPlanner(analysis,
logicalQueryPlan, context);
distributedQueryPlan = distributionPlanner.plan();
assertEquals(3, distributedQueryPlan.getFragments().size());
outputNode =
@@ -278,9 +278,9 @@ public class SortTest {
"SELECT time, tag3, tag1, cast(s2 as double), s2+s3, attr1 FROM table1
"
+ "where s1>1 and s1+s3>0 and cast(s1 as double)>1.0 order by tag2
desc, tag1 desc, tag3 asc, time desc, s1+s2 desc";
context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
- actualAnalysis = analyzeSQL(sql, metadata, context);
+ analysis = analyzeSQL(sql, metadata, context);
logicalQueryPlan =
- new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(actualAnalysis);
+ new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(analysis);
logicalPlanNode = logicalQueryPlan.getRootNode();
// LogicalPlan: `Output-Project-StreamSort-Project-Filter-TableScan`
@@ -297,7 +297,7 @@ public class SortTest {
assertEquals(6, tableScanNode.getDeviceEntries().size());
// DistributePlan: optimize
`Output-Project-MergeSort-Project-Filter-TableScan`
- distributionPlanner = new TableDistributedPlanner(actualAnalysis,
logicalQueryPlan, context);
+ distributionPlanner = new TableDistributedPlanner(analysis,
logicalQueryPlan, context);
distributedQueryPlan = distributionPlanner.plan();
assertEquals(3, distributedQueryPlan.getFragments().size());
outputNode =
@@ -346,9 +346,9 @@ public class SortTest {
"SELECT time, tag3, tag1, cast(s2 as double), s2+s3, attr1 FROM table1
"
+ "where s1>1 and s1+s3>0 and cast(s1 as double)>1.0 order by tag2
desc, tag1 desc, s1+s2 desc, time desc offset 5 limit 10";
context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
- actualAnalysis = analyzeSQL(sql, metadata, context);
+ analysis = analyzeSQL(sql, metadata, context);
logicalQueryPlan =
- new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(actualAnalysis);
+ new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(analysis);
logicalPlanNode = logicalQueryPlan.getRootNode();
// LogicalPlan:
`Output-Offset-Limit-Project-StreamSort-Project-Filter-TableScan`
@@ -375,7 +375,7 @@ public class SortTest {
// DistributePlan: optimize
//
`Output-Offset-Limit-Project-MergeSort-StreamSort-Project-Filter-TableScan` to
// `Output-Offset-Project-TopK-Limit-StreamSort-Project-Filter-TableScan`
- distributionPlanner = new TableDistributedPlanner(actualAnalysis,
logicalQueryPlan, context);
+ distributionPlanner = new TableDistributedPlanner(analysis,
logicalQueryPlan, context);
distributedQueryPlan = distributionPlanner.plan();
assertEquals(3, distributedQueryPlan.getFragments().size());
outputNode =
@@ -426,9 +426,9 @@ public class SortTest {
"SELECT time, tag3, tag1, cast(s2 as double), s2+s3, attr1 FROM table1
"
+ "where s1>1 and s1+s3>0 and cast(s1 as double)>1.0 order by tag2
desc, tag1 desc, tag3 asc, s1+s2 desc, time desc offset 5 limit 10";
context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
- actualAnalysis = analyzeSQL(sql, metadata, context);
+ analysis = analyzeSQL(sql, metadata, context);
logicalQueryPlan =
- new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(actualAnalysis);
+ new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(analysis);
logicalPlanNode = logicalQueryPlan.getRootNode();
// LogicalPlan:
`Output-Offset-Limit-Project-StreamSort-Project-Filter-TableScan`
@@ -451,7 +451,7 @@ public class SortTest {
// DistributePlan: optimize
//
`Output-Offset-Limit-Project-MergeSort-StreamSort-Project-Filter-TableScan`
// to
`Output-Offset-Project-TopK-Limit-StreamSort-Project-Filter-TableScan`
- distributionPlanner = new TableDistributedPlanner(actualAnalysis,
logicalQueryPlan, context);
+ distributionPlanner = new TableDistributedPlanner(analysis,
logicalQueryPlan, context);
distributedQueryPlan = distributionPlanner.plan();
assertEquals(3, distributedQueryPlan.getFragments().size());
outputNode =
@@ -504,9 +504,9 @@ public class SortTest {
sql =
"SELECT time, tag3, tag1, cast(s2 as double), s2+s3, attr1 FROM table1
order by time desc, tag2 asc, tag3 desc, s1+s2 desc offset 5 limit 10";
context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
- actualAnalysis = analyzeSQL(sql, metadata, context);
+ analysis = analyzeSQL(sql, metadata, context);
logicalQueryPlan =
- new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(actualAnalysis);
+ new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(analysis);
logicalPlanNode = logicalQueryPlan.getRootNode();
assertTopKNoFilter(originalDeviceEntries1, originalDeviceEntries2, DESC,
15, 0, true);
@@ -516,9 +516,9 @@ public class SortTest {
"SELECT time, tag3, substring(tag1, 1), cast(s2 as double), s2+s3,
attr1 FROM table1 "
+ "where s1>1 and s1+s3>0 and cast(s1 as double)>1.0 order by time
desc, s1+s2 asc, tag2 asc, tag1 desc offset 5 limit 10";
context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
- actualAnalysis = analyzeSQL(sql, metadata, context);
+ analysis = analyzeSQL(sql, metadata, context);
logicalQueryPlan =
- new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(actualAnalysis);
+ new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(analysis);
logicalPlanNode = logicalQueryPlan.getRootNode();
assertTopKWithFilter(originalDeviceEntries1, originalDeviceEntries2, DESC,
0, 0, false);
@@ -527,9 +527,9 @@ public class SortTest {
"SELECT time, tag3, tag1, cast(s2 as double), s2+s3, attr1 FROM table1
"
+ "where s1>1 and s1+s3>0 and cast(s1 as double)>1.0 order by time
desc, s1+s2 asc, tag2 asc, tag3 desc, tag1 desc offset 5 limit 10";
context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
- actualAnalysis = analyzeSQL(sql, metadata, context);
+ analysis = analyzeSQL(sql, metadata, context);
logicalQueryPlan =
- new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(actualAnalysis);
+ new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(analysis);
logicalPlanNode = logicalQueryPlan.getRootNode();
assertTopKWithFilter(originalDeviceEntries1, originalDeviceEntries2, DESC,
0, 0, false);
@@ -538,9 +538,9 @@ public class SortTest {
"SELECT time, tag3, tag1, cast(s2 as double), s2+s3, attr1 FROM table1
"
+ "where s1>1 and s1+s3>0 and cast(s1 as double)>1.0 order by time
desc, tag2 asc, tag3 desc, tag1 asc, s1+s2 desc offset 5 limit 10";
context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
- actualAnalysis = analyzeSQL(sql, metadata, context);
+ analysis = analyzeSQL(sql, metadata, context);
logicalQueryPlan =
- new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(actualAnalysis);
+ new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(analysis);
logicalPlanNode = logicalQueryPlan.getRootNode();
assertTopKWithFilter(originalDeviceEntries1, originalDeviceEntries2, DESC,
0, 0, false);
@@ -553,9 +553,9 @@ public class SortTest {
"SELECT time, tag3, tag1, cast(s2 as double), s2+s3, attr1 FROM table1
"
+ "order by s1+s2 desc, tag2 desc, tag1 desc, time desc offset 5
limit 10";
context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
- actualAnalysis = analyzeSQL(sql, metadata, context);
+ analysis = analyzeSQL(sql, metadata, context);
logicalQueryPlan =
- new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(actualAnalysis);
+ new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(analysis);
logicalPlanNode = logicalQueryPlan.getRootNode();
assertTopKNoFilter(originalDeviceEntries1, originalDeviceEntries2, ASC, 0,
0, false);
@@ -564,9 +564,9 @@ public class SortTest {
"SELECT time, tag3, tag1, cast(s2 as double), s2+s3, attr1 FROM table1
"
+ "where s1>1 and s1+s3>0 and cast(s1 as double)>1.0 order by
s1+s2 desc, tag2 desc, tag1 desc, tag3 desc, time asc offset 5 limit 10";
context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
- actualAnalysis = analyzeSQL(sql, metadata, context);
+ analysis = analyzeSQL(sql, metadata, context);
logicalQueryPlan =
- new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(actualAnalysis);
+ new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(analysis);
logicalPlanNode = logicalQueryPlan.getRootNode();
assertTopKWithFilter(originalDeviceEntries1, originalDeviceEntries2, ASC,
0, 0, false);
@@ -575,9 +575,9 @@ public class SortTest {
"SELECT time, tag3, tag1, cast(s2 as double), s2+s3, attr1 FROM table1
"
+ "where s1>1 and s1+s3>0 and cast(s1 as double)>1.0 order by
s1+s2 desc, time desc, tag2 desc, tag1 desc offset 5 limit 10";
context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
- actualAnalysis = analyzeSQL(sql, metadata, context);
+ analysis = analyzeSQL(sql, metadata, context);
logicalQueryPlan =
- new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(actualAnalysis);
+ new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(analysis);
logicalPlanNode = logicalQueryPlan.getRootNode();
assertTopKWithFilter(originalDeviceEntries1, originalDeviceEntries2, ASC,
0, 0, false);
@@ -586,9 +586,9 @@ public class SortTest {
"SELECT time, tag3, tag1, cast(s2 as double), s2+s3, attr1 FROM table1
"
+ "where s1>1 and s1+s3>0 and cast(s1 as double)>1.0 order by
s1+s2 desc, time desc, tag2 desc, tag1 desc, tag3 asc offset 5 limit 10";
context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
- actualAnalysis = analyzeSQL(sql, metadata, context);
+ analysis = analyzeSQL(sql, metadata, context);
logicalQueryPlan =
- new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(actualAnalysis);
+ new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(analysis);
logicalPlanNode = logicalQueryPlan.getRootNode();
assertTopKWithFilter(originalDeviceEntries1, originalDeviceEntries2, ASC,
0, 0, false);
}
@@ -598,11 +598,11 @@ public class SortTest {
// columns in order and select is different
sql = "SELECT time, attr1, s1 FROM table1 order by attr2 limit 5";
context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
- actualAnalysis = analyzeSQL(sql, metadata, context);
+ analysis = analyzeSQL(sql, metadata, context);
logicalQueryPlan =
- new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(actualAnalysis);
+ new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(analysis);
logicalPlanNode = logicalQueryPlan.getRootNode();
- distributionPlanner = new TableDistributedPlanner(actualAnalysis,
logicalQueryPlan, context);
+ distributionPlanner = new TableDistributedPlanner(analysis,
logicalQueryPlan, context);
distributedQueryPlan = distributionPlanner.plan();
assertEquals(3, distributedQueryPlan.getFragments().size());
IdentitySinkNode sinkNode =
@@ -639,7 +639,7 @@ public class SortTest {
assertEquals(isPushLimitToEachDevice,
tableScanNode.isPushLimitToEachDevice());
// DistributePlan `Identity - Output - Offset - Project - TopK - {Exchange
+ TopK + Exchange}
- distributionPlanner = new TableDistributedPlanner(actualAnalysis,
logicalQueryPlan, context);
+ distributionPlanner = new TableDistributedPlanner(analysis,
logicalQueryPlan, context);
distributedQueryPlan = distributionPlanner.plan();
assertEquals(3, distributedQueryPlan.getFragments().size());
IdentitySinkNode identitySinkNode =
@@ -707,7 +707,7 @@ public class SortTest {
assertEquals(isPushLimitToEachDevice,
tableScanNode.isPushLimitToEachDevice());
// DistributePlan `Identity - Output - Offset - Project - TopK - {Exchange
+ TopK + Exchange}
- distributionPlanner = new TableDistributedPlanner(actualAnalysis,
logicalQueryPlan, context);
+ distributionPlanner = new TableDistributedPlanner(analysis,
logicalQueryPlan, context);
distributedQueryPlan = distributionPlanner.plan();
assertEquals(3, distributedQueryPlan.getFragments().size());
IdentitySinkNode identitySinkNode =
@@ -754,7 +754,7 @@ public class SortTest {
long expectedPushDownOffset,
boolean isPushLimitToEachDevice) {}
- public void assertTableScan(
+ public static void assertTableScan(
TableScanNode tableScanNode,
List<String> deviceEntries,
Ordering ordering,
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SubQueryTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SubQueryTest.java
new file mode 100644
index 00000000000..7d4aa16f82a
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SubQueryTest.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.analyzer;
+
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.LogicalPlanner;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableDistributedPlanner;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.AnalyzerTest.analyzeSQL;
+import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.LimitOffsetPushDownTest.getChildrenNode;
+import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.SortTest.assertTableScan;
+import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.SortTest.metadata;
+import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.SortTest.queryId;
+import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.SortTest.sessionInfo;
+import static
org.apache.iotdb.db.queryengine.plan.statement.component.Ordering.ASC;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SubQueryTest {
+
+ String sql;
+ Analysis analysis;
+ MPPQueryContext context;
+ WarningCollector warningCollector = WarningCollector.NOOP;
+ LogicalQueryPlan logicalQueryPlan;
+ PlanNode logicalPlanNode;
+ OutputNode outputNode;
+ ProjectNode projectNode;
+ StreamSortNode streamSortNode;
+ TableDistributedPlanner distributionPlanner;
+ DistributedQueryPlan distributedQueryPlan;
+ TableScanNode tableScanNode;
+ List<String> originalDeviceEntries1 =
+ Arrays.asList(
+ "table1.shanghai.B3.YY",
+ "table1.shenzhen.B1.XX",
+ "table1.shenzhen.B2.ZZ",
+ "table1.shanghai.A3.YY");
+ static List<String> originalDeviceEntries2 =
+ Arrays.asList("table1.shenzhen.B1.XX", "table1.shenzhen.B2.ZZ");
+
+ @Test
+ public void subQueryTest1() {
+ // outer query has limit and sort,
+ // sub query only has sort,
+ // sort in sub query is invalid,
+ // limit can be pushed down into TableScan.
+ sql =
+ "SELECT time, tag2, attr2, CAST(add_s2 as double) "
+ + "FROM (SELECT time, SUBSTRING(tag1, 1) as sub_tag1, tag2, attr2,
s1, s2+1 as add_s2 FROM table1 WHERE s1>1 ORDER BY tag1 DESC) "
+ + "ORDER BY tag2 OFFSET 3 LIMIT 6";
+ context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
+ analysis = analyzeSQL(sql, metadata, context);
+ logicalQueryPlan =
+ new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(analysis);
+ logicalPlanNode = logicalQueryPlan.getRootNode();
+
+ // LogicalPlan: `Output - Offset - Limit - StreamSort - Project -
TableScan`
+ assertTrue(logicalPlanNode instanceof OutputNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 1) instanceof OffsetNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 2) instanceof LimitNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 3) instanceof StreamSortNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 4) instanceof ProjectNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 5) instanceof TableScanNode);
+ tableScanNode = (TableScanNode) getChildrenNode(logicalPlanNode, 5);
+ assertEquals(9, tableScanNode.getPushDownLimit());
+ assertEquals(true, tableScanNode.isPushLimitToEachDevice());
+ assertEquals("(\"s1\" > 1)",
tableScanNode.getPushDownPredicate().toString());
+
+ /*
+ * IdentitySinkNode-163
+ * └──OutputNode-14
+ * └──OffsetNode-10
+ * └──TopKNode-11
+ * ├──ExchangeNode-159:
[SourceAddress:192.0.12.1/test_query.2.0/161]
+ * ├──LimitNode-137
+ * │ └──ProjectNode-118
+ * │ └──TableScanNode-115
+ * └──ExchangeNode-160:
[SourceAddress:192.0.10.1/test_query.3.0/162]
+ */
+ distributionPlanner = new TableDistributedPlanner(analysis,
logicalQueryPlan, context);
+ distributedQueryPlan = distributionPlanner.plan();
+ assertEquals(3, distributedQueryPlan.getFragments().size());
+ IdentitySinkNode identitySinkNode =
+ (IdentitySinkNode)
distributedQueryPlan.getFragments().get(0).getPlanNodeTree();
+ outputNode = (OutputNode) getChildrenNode(identitySinkNode, 1);
+ assertTrue(getChildrenNode(outputNode, 1) instanceof OffsetNode);
+ assertTrue(getChildrenNode(outputNode, 2) instanceof TopKNode);
+ TopKNode topKNode = (TopKNode) getChildrenNode(outputNode, 2);
+ assertTrue(topKNode.getChildren().get(0) instanceof ExchangeNode);
+ assertTrue(topKNode.getChildren().get(1) instanceof LimitNode);
+ assertTrue(topKNode.getChildren().get(2) instanceof ExchangeNode);
+ projectNode = (ProjectNode) getChildrenNode(topKNode.getChildren().get(1),
1);
+ assertTrue(getChildrenNode(projectNode, 1) instanceof TableScanNode);
+ tableScanNode = (TableScanNode) getChildrenNode(projectNode, 1);
+ assertTableScan(
+ tableScanNode,
+ Arrays.asList(
+ "table1.shanghai.A3.YY",
+ "table1.shenzhen.B1.XX",
+ "table1.shenzhen.B2.ZZ",
+ "table1.shanghai.B3.YY"),
+ ASC,
+ 9,
+ 0,
+ true);
+ /*
+ * IdentitySinkNode-161
+ * └──LimitNode-136
+ * └──ProjectNode-117
+ * └──TableScanNode-114
+ */
+ identitySinkNode =
+ (IdentitySinkNode)
distributedQueryPlan.getFragments().get(1).getPlanNodeTree();
+ assertTrue(getChildrenNode(identitySinkNode, 1) instanceof LimitNode);
+ assertTrue(getChildrenNode(identitySinkNode, 2) instanceof ProjectNode);
+ assertTrue(getChildrenNode(identitySinkNode, 3) instanceof TableScanNode);
+ tableScanNode = (TableScanNode) getChildrenNode(identitySinkNode, 3);
+ assertTableScan(
+ tableScanNode,
+ Arrays.asList("table1.shenzhen.B1.XX", "table1.shenzhen.B2.ZZ"),
+ ASC,
+ 9,
+ 0,
+ true);
+ }
+
+ @Test
+ public void subQueryTest2() {
+ // outer query has limit_1(count=3) and sort,
+ // sub query has limit_2(count=9) and sort,
+ // only the sort in sub query can be pushed down into TableScan.
+ sql =
+ "SELECT time, tag2, attr2, CAST(add_s2 as double) "
+ + "FROM (SELECT time, SUBSTRING(tag1, 1) as sub_tag1, tag2, attr2,
s1, s2+1 as add_s2 FROM table1 WHERE s1>1 ORDER BY tag1 DESC limit 3) "
+ + "ORDER BY tag2 ASC OFFSET 5 LIMIT 10";
+ context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
+ analysis = analyzeSQL(sql, metadata, context);
+ logicalQueryPlan =
+ new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(analysis);
+ logicalPlanNode = logicalQueryPlan.getRootNode();
+
+ // LogicalPlan: `Output - Offset - TopK - Project - Limit - Project -
StreamSort - Project -
+ // TableScan`
+ assertTrue(logicalPlanNode instanceof OutputNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 1) instanceof OffsetNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 2) instanceof TopKNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 3) instanceof ProjectNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 4) instanceof LimitNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 5) instanceof ProjectNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 6) instanceof StreamSortNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 7) instanceof ProjectNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 8) instanceof TableScanNode);
+ tableScanNode = (TableScanNode) getChildrenNode(logicalPlanNode, 8);
+ assertEquals(3, tableScanNode.getPushDownLimit());
+ assertTrue(tableScanNode.isPushLimitToEachDevice());
+ assertEquals("(\"s1\" > 1)",
tableScanNode.getPushDownPredicate().toString());
+
+ /*
+ * IdentitySinkNode-199
+ * └──OutputNode-16
+ * └──OffsetNode-12
+ * └──TopKNode-13
+ * └──ProjectNode-9
+ * └──ProjectNode-59
+ * └──TopKNode-6
+ * ├──ExchangeNode-195:
[SourceAddress:192.0.12.1/test_query.2.0/197]
+ * ├──LimitNode-172 (Notice: child StreamSort
has been eliminated)
+ * │ └──ProjectNode-150
+ * │ └──TableScanNode-147
+ * └──ExchangeNode-196:
[SourceAddress:192.0.10.1/test_query.3.0/198]
+ */
+ distributionPlanner = new TableDistributedPlanner(analysis,
logicalQueryPlan, context);
+ distributedQueryPlan = distributionPlanner.plan();
+ assertEquals(3, distributedQueryPlan.getFragments().size());
+ IdentitySinkNode identitySinkNode =
+ (IdentitySinkNode)
distributedQueryPlan.getFragments().get(0).getPlanNodeTree();
+ outputNode = (OutputNode) getChildrenNode(identitySinkNode, 1);
+ assertTrue(getChildrenNode(outputNode, 1) instanceof OffsetNode);
+ assertTrue(getChildrenNode(outputNode, 2) instanceof TopKNode);
+ assertTrue(getChildrenNode(outputNode, 3) instanceof ProjectNode);
+ assertTrue(getChildrenNode(outputNode, 4) instanceof ProjectNode);
+ assertTrue(getChildrenNode(outputNode, 5) instanceof TopKNode);
+ TopKNode topKNode = (TopKNode) getChildrenNode(outputNode, 5);
+ assertTrue(topKNode.getChildren().get(0) instanceof ExchangeNode);
+ assertTrue(topKNode.getChildren().get(1) instanceof LimitNode);
+ assertTrue(topKNode.getChildren().get(2) instanceof ExchangeNode);
+ LimitNode limitNode = (LimitNode) topKNode.getChildren().get(1);
+ assertTrue(getChildrenNode(limitNode, 1) instanceof ProjectNode);
+ assertTrue(getChildrenNode(limitNode, 2) instanceof TableScanNode);
+ tableScanNode = (TableScanNode) getChildrenNode(limitNode, 2);
+ assertTableScan(
+ tableScanNode,
+ Arrays.asList(
+ "table1.shenzhen.B1.XX",
+ "table1.shenzhen.B2.ZZ",
+ "table1.shanghai.B3.YY",
+ "table1.shanghai.A3.YY"),
+ ASC,
+ 3,
+ 0,
+ true);
+ /*
+ * IdentitySinkNode-161
+ * └──LimitNode-136
+ * └──ProjectNode-117
+ * └──TableScanNode-114
+ */
+ identitySinkNode =
+ (IdentitySinkNode)
distributedQueryPlan.getFragments().get(1).getPlanNodeTree();
+ assertTrue(getChildrenNode(identitySinkNode, 1) instanceof LimitNode);
+ assertTrue(getChildrenNode(identitySinkNode, 2) instanceof ProjectNode);
+ assertTrue(getChildrenNode(identitySinkNode, 3) instanceof TableScanNode);
+ tableScanNode = (TableScanNode) getChildrenNode(identitySinkNode, 3);
+ assertTableScan(
+ tableScanNode,
+ Arrays.asList("table1.shenzhen.B1.XX", "table1.shenzhen.B2.ZZ"),
+ ASC,
+ 3,
+ 0,
+ true);
+ }
+
+ @Test
+ public void subQueryTest3() {
+ // outer query has limit_1 and sort,
+ // sub query has limit_2 and sort,
+ // only the limit and sort in sub query can be pushed down into TableScan.
+ sql =
+ "SELECT time, tag2, attr2, CAST(add_s2 as double) "
+ + "FROM (SELECT time, SUBSTRING(tag1, 1) as sub_tag1, tag2, attr2,
s1, s2+1 as add_s2 FROM table1 WHERE s1>1 ORDER BY tag1 DESC limit 3) "
+ + "ORDER BY s1,tag2 ASC OFFSET 5 LIMIT 10";
+ context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
+ analysis = analyzeSQL(sql, metadata, context);
+ logicalQueryPlan =
+ new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(analysis);
+ logicalPlanNode = logicalQueryPlan.getRootNode();
+
+ // LogicalPlan: `Output - Offset - ProjectNode - TopK - Project - Limit -
Project - StreamSort -
+ // Project -
+ // TableScan`
+ assertTrue(logicalPlanNode instanceof OutputNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 1) instanceof OffsetNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 2) instanceof ProjectNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 3) instanceof TopKNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 4) instanceof ProjectNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 5) instanceof LimitNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 6) instanceof ProjectNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 7) instanceof StreamSortNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 8) instanceof ProjectNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 9) instanceof TableScanNode);
+ tableScanNode = (TableScanNode) getChildrenNode(logicalPlanNode, 9);
+ assertEquals(3, tableScanNode.getPushDownLimit());
+ assertTrue(tableScanNode.isPushLimitToEachDevice());
+ assertEquals("(\"s1\" > 1)",
tableScanNode.getPushDownPredicate().toString());
+
+ /*
+ * IdentitySinkNode-199
+ * └──OutputNode-16
+ * └──OffsetNode-12
+ * └──TopKNode-13
+ * └──ProjectNode-9
+ * └──ProjectNode-59
+ * └──TopKNode-6
+ * ├──ExchangeNode-195:
[SourceAddress:192.0.12.1/test_query.2.0/197]
+ * ├──LimitNode-172 (Notice: child StreamSort
has been eliminated)
+ * │ └──ProjectNode-150
+ * │ └──TableScanNode-147
+ * └──ExchangeNode-196:
[SourceAddress:192.0.10.1/test_query.3.0/198]
+ */
+ /*
+ * IdentitySinkNode-205
+ * └──OutputNode-16
+ * └──OffsetNode-12
+ * └──ProjectNode-43
+ * └──TopKNode-13
+ * └──ProjectNode-9
+ * └──ProjectNode-59
+ * └──TopKNode-6
+ * ├──ExchangeNode-201:
[SourceAddress:192.0.12.1/test_query.2.0/203]
+ * ├──LimitNode-177
+ * │ └──ProjectNode-154
+ * │ └──TableScanNode-151
+ * └──ExchangeNode-202:
[SourceAddress:192.0.10.1/test_query.3.0/204]
+ */
+ distributionPlanner = new TableDistributedPlanner(analysis,
logicalQueryPlan, context);
+ distributedQueryPlan = distributionPlanner.plan();
+ assertEquals(3, distributedQueryPlan.getFragments().size());
+ IdentitySinkNode identitySinkNode =
+ (IdentitySinkNode)
distributedQueryPlan.getFragments().get(0).getPlanNodeTree();
+ outputNode = (OutputNode) getChildrenNode(identitySinkNode, 1);
+ assertTrue(getChildrenNode(outputNode, 1) instanceof OffsetNode);
+ assertTrue(getChildrenNode(outputNode, 2) instanceof ProjectNode);
+ assertTrue(getChildrenNode(outputNode, 3) instanceof TopKNode);
+ assertTrue(getChildrenNode(outputNode, 4) instanceof ProjectNode);
+ assertTrue(getChildrenNode(outputNode, 5) instanceof ProjectNode);
+ assertTrue(getChildrenNode(outputNode, 6) instanceof TopKNode);
+ TopKNode topKNode = (TopKNode) getChildrenNode(outputNode, 6);
+ assertTrue(topKNode.getChildren().get(0) instanceof ExchangeNode);
+ assertTrue(topKNode.getChildren().get(1) instanceof LimitNode);
+ assertTrue(topKNode.getChildren().get(2) instanceof ExchangeNode);
+ LimitNode limitNode = (LimitNode) topKNode.getChildren().get(1);
+ assertTrue(getChildrenNode(limitNode, 1) instanceof ProjectNode);
+ assertTrue(getChildrenNode(limitNode, 2) instanceof TableScanNode);
+ tableScanNode = (TableScanNode) getChildrenNode(limitNode, 2);
+ assertTableScan(
+ tableScanNode,
+ Arrays.asList(
+ "table1.shenzhen.B1.XX",
+ "table1.shenzhen.B2.ZZ",
+ "table1.shanghai.B3.YY",
+ "table1.shanghai.A3.YY"),
+ ASC,
+ 3,
+ 0,
+ true);
+ /*
+ * IdentitySinkNode-161
+ * └──LimitNode-136
+ * └──ProjectNode-117
+ * └──TableScanNode-114
+ */
+ identitySinkNode =
+ (IdentitySinkNode)
distributedQueryPlan.getFragments().get(1).getPlanNodeTree();
+ assertTrue(getChildrenNode(identitySinkNode, 1) instanceof LimitNode);
+ assertTrue(getChildrenNode(identitySinkNode, 2) instanceof ProjectNode);
+ assertTrue(getChildrenNode(identitySinkNode, 3) instanceof TableScanNode);
+ tableScanNode = (TableScanNode) getChildrenNode(identitySinkNode, 3);
+ assertTableScan(
+ tableScanNode,
+ Arrays.asList("table1.shenzhen.B1.XX", "table1.shenzhen.B2.ZZ"),
+ ASC,
+ 3,
+ 0,
+ true);
+ }
+
+ @Test
+ public void subQueryTest4() {
+ // outer query has limit_1, sort and filter,
+ // sub query has limit_2 and sort,
+ // only the limit and sort in sub query can be pushed down into TableScan.
+ sql =
+ "SELECT time, tag2, attr2, CAST(add_s2 as double) "
+ + "FROM (SELECT time, SUBSTRING(tag1, 1) as sub_tag1, tag2, attr2,
s1, s2+1 as add_s2 FROM table1 "
+ + "WHERE s1>1 ORDER BY tag1 DESC limit 3) "
+ + "WHERE s1>1 ORDER BY s1,tag2 ASC OFFSET 5 LIMIT 10";
+ context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
+ analysis = analyzeSQL(sql, metadata, context);
+ logicalQueryPlan =
+ new LogicalPlanner(context, metadata, sessionInfo,
warningCollector).plan(analysis);
+ logicalPlanNode = logicalQueryPlan.getRootNode();
+
+ // LogicalPlan: `Output - Offset - ProjectNode - TopK - Project - Filter -
Limit - Project -
+ // StreamSort - Project -
+ // TableScan`
+ assertTrue(logicalPlanNode instanceof OutputNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 1) instanceof OffsetNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 2) instanceof ProjectNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 3) instanceof TopKNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 4) instanceof ProjectNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 5) instanceof FilterNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 6) instanceof LimitNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 7) instanceof ProjectNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 8) instanceof StreamSortNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 9) instanceof ProjectNode);
+ assertTrue(getChildrenNode(logicalPlanNode, 10) instanceof TableScanNode);
+ tableScanNode = (TableScanNode) getChildrenNode(logicalPlanNode, 10);
+ assertEquals(3, tableScanNode.getPushDownLimit());
+ assertTrue(tableScanNode.isPushLimitToEachDevice());
+ assertEquals("(\"s1\" > 1)",
tableScanNode.getPushDownPredicate().toString());
+
+ /*
+ * IdentitySinkNode-219
+ * └──OutputNode-17
+ * └──OffsetNode-13
+ * └──ProjectNode-45
+ * └──TopKNode-14
+ * └──ProjectNode-10
+ * └──FilterNode-9
+ * └──ProjectNode-65
+ * └──TopKNode-6
+ * ├──ExchangeNode-215:
[SourceAddress:192.0.12.1/test_query.2.0/217]
+ * ├──LimitNode-190
+ * │ └──ProjectNode-166
+ * │ └──TableScanNode-163
+ * └──ExchangeNode-216:
[SourceAddress:192.0.10.1/test_query.3.0/218]
+ */
+ distributionPlanner = new TableDistributedPlanner(analysis,
logicalQueryPlan, context);
+ distributedQueryPlan = distributionPlanner.plan();
+ assertEquals(3, distributedQueryPlan.getFragments().size());
+ IdentitySinkNode identitySinkNode =
+ (IdentitySinkNode)
distributedQueryPlan.getFragments().get(0).getPlanNodeTree();
+ outputNode = (OutputNode) getChildrenNode(identitySinkNode, 1);
+ assertTrue(getChildrenNode(outputNode, 1) instanceof OffsetNode);
+ assertTrue(getChildrenNode(outputNode, 2) instanceof ProjectNode);
+ assertTrue(getChildrenNode(outputNode, 3) instanceof TopKNode);
+ assertTrue(getChildrenNode(outputNode, 4) instanceof ProjectNode);
+ assertTrue(getChildrenNode(outputNode, 5) instanceof FilterNode);
+ assertTrue(getChildrenNode(outputNode, 6) instanceof ProjectNode);
+ assertTrue(getChildrenNode(outputNode, 7) instanceof TopKNode);
+ TopKNode topKNode = (TopKNode) getChildrenNode(outputNode, 7);
+ assertTrue(topKNode.getChildren().get(0) instanceof ExchangeNode);
+ assertTrue(topKNode.getChildren().get(1) instanceof LimitNode);
+ assertTrue(topKNode.getChildren().get(2) instanceof ExchangeNode);
+ LimitNode limitNode = (LimitNode) topKNode.getChildren().get(1);
+ assertTrue(getChildrenNode(limitNode, 1) instanceof ProjectNode);
+ assertTrue(getChildrenNode(limitNode, 2) instanceof TableScanNode);
+ tableScanNode = (TableScanNode) getChildrenNode(limitNode, 2);
+ assertTableScan(
+ tableScanNode,
+ Arrays.asList(
+ "table1.shenzhen.B1.XX",
+ "table1.shenzhen.B2.ZZ",
+ "table1.shanghai.B3.YY",
+ "table1.shanghai.A3.YY"),
+ ASC,
+ 3,
+ 0,
+ true);
+ /*
+ * IdentitySinkNode-161
+ * └──LimitNode-136
+ * └──ProjectNode-117
+ * └──TableScanNode-114
+ */
+ identitySinkNode =
+ (IdentitySinkNode)
distributedQueryPlan.getFragments().get(1).getPlanNodeTree();
+ assertTrue(getChildrenNode(identitySinkNode, 1) instanceof LimitNode);
+ assertTrue(getChildrenNode(identitySinkNode, 2) instanceof ProjectNode);
+ assertTrue(getChildrenNode(identitySinkNode, 3) instanceof TableScanNode);
+ tableScanNode = (TableScanNode) getChildrenNode(identitySinkNode, 3);
+ assertTableScan(
+ tableScanNode,
+ Arrays.asList("table1.shenzhen.B1.XX", "table1.shenzhen.B2.ZZ"),
+ ASC,
+ 3,
+ 0,
+ true);
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/OffsetMatcher.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/OffsetMatcher.java
index 9cb92468d68..b8f3ec33093 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/OffsetMatcher.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/OffsetMatcher.java
@@ -18,6 +18,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
+import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkState;
public class OffsetMatcher implements Matcher {
@@ -42,4 +43,9 @@ public class OffsetMatcher implements Matcher {
checkState(shapeMatches(node));
return MatchResult.match();
}
+
+ @Override
+ public String toString() {
+ return toStringHelper(this).add("offset", rowCount).toString();
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanAssert.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanAssert.java
index 1a68a5572f2..c3422f78760 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanAssert.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanAssert.java
@@ -70,7 +70,8 @@ public final class PlanAssert {
if (!containsGroupReferences(actual)) {
throw new AssertionError(
format(
- "Plan does not match, expected [\n\n%s\n] but found
[\n\n%s\n]", pattern, actual));
+ "Plan does not match, expected [\n\n%s\n] but found
[\n\n%s\n], matches:[%s]",
+ pattern, actual, matches));
}
// TODO support print plan tree
PlanNode resolvedPlan = resolveGroupReferences(actual, lookup);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
index c26f39224ec..1ec16841e1b 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
@@ -26,6 +26,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DataType;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
@@ -300,6 +301,10 @@ public final class PlanMatchPattern {
return node(SortNode.class, source).with(new SortMatcher(orderBy));
}
+ public static PlanMatchPattern streamSort(List<Ordering> orderBy,
PlanMatchPattern source) {
+ return node(StreamSortNode.class, source).with(new SortMatcher(orderBy));
+ }
+
/*public static PlanMatchPattern topN(long count, List<Ordering> orderBy,
PlanMatchPattern source)
{
return topN(count, orderBy, TopNNode.Step.SINGLE, source);