IMPALA-4270: Gracefully fail unsupported queries with mt_dop > 0. MT_DOP > 0 is only supported for plans without distributed joins or table sinks. Adds validation to fail unsupported queries gracefully in planning.
For scans in queries that are executable with MT_DOP > 0 we either use the optimized MT scan node BE implementation (only Parquet), or we use the conventional scan node with num_scanner_threads=1. TODO: Still need to add end-to-end tests. Change-Id: I91a60ea7b6e3ae4ee44be856615ddd3cd0af476d Reviewed-on: http://gerrit.cloudera.org:8080/4677 Reviewed-by: Alex Behm <alex.b...@cloudera.com> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/04802535 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/04802535 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/04802535 Branch: refs/heads/master Commit: 04802535661979c50e5d06ef04e62eee677b901e Parents: b0e87c6 Author: Alex Behm <alex.b...@cloudera.com> Authored: Mon Oct 10 11:03:43 2016 -0700 Committer: Internal Jenkins <cloudera-hud...@gerrit.cloudera.org> Committed: Mon Oct 17 09:22:57 2016 +0000 ---------------------------------------------------------------------- be/src/exec/exec-node.cc | 5 +- common/thrift/PlanNodes.thrift | 5 + .../org/apache/impala/analysis/Analyzer.java | 10 +- .../org/apache/impala/planner/HdfsScanNode.java | 28 +- .../java/org/apache/impala/planner/Planner.java | 13 +- .../apache/impala/planner/PlannerContext.java | 10 +- .../impala/planner/SingleNodePlanner.java | 24 +- .../org/apache/impala/planner/PlannerTest.java | 21 +- .../apache/impala/planner/PlannerTestBase.java | 18 +- .../queries/PlannerTest/mt-dop-validation.test | 350 +++++++++++++++++++ 10 files changed, 450 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/04802535/be/src/exec/exec-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc index 837fc09..df491dd 100644 --- a/be/src/exec/exec-node.cc +++ b/be/src/exec/exec-node.cc @@ -264,9 +264,12 @@ Status ExecNode::CreateNode(ObjectPool* pool, const TPlanNode& tnode, switch (tnode.node_type) { case TPlanNodeType::HDFS_SCAN_NODE: *node = pool->Add(new HdfsScanNode(pool, tnode, descs)); - if (state->query_options().mt_dop > 0) { + if (tnode.hdfs_scan_node.use_mt_scan_node) { + DCHECK_GT(state->query_options().mt_dop, 0); *node = pool->Add(new HdfsScanNodeMt(pool, tnode, descs)); } else { + DCHECK(state->query_options().mt_dop == 0 + || state->query_options().num_scanner_threads == 1); *node = pool->Add(new HdfsScanNode(pool, tnode, descs)); } // If true, this node requests codegen over interpretation for conjuncts http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/04802535/common/thrift/PlanNodes.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift index 4cf1357..49fcfbb 100644 --- a/common/thrift/PlanNodes.thrift +++ b/common/thrift/PlanNodes.thrift @@ -202,6 +202,11 @@ struct THdfsScanNode { // Number of header lines to skip at the beginning of each file of this table. Only set // for hdfs text files. 6: optional i32 skip_header_line_count + + // Indicates whether the MT scan node implementation should be used. + // If this is true then the MT_DOP query option must be > 0. + // TODO: Remove this option when the MT scan node supports all file formats. + 7: optional bool use_mt_scan_node } struct TDataSourceScanNode { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/04802535/fe/src/main/java/org/apache/impala/analysis/Analyzer.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java index 3edddf2..f9909b1 100644 --- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java +++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java @@ -30,9 +30,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.impala.analysis.Path.PathType; import org.apache.impala.authorization.AuthorizationConfig; import org.apache.impala.authorization.Privilege; @@ -66,10 +63,14 @@ import org.apache.impala.thrift.TCatalogObjectType; import org.apache.impala.thrift.TLineageGraph; import org.apache.impala.thrift.TNetworkAddress; import org.apache.impala.thrift.TQueryCtx; +import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.util.DisjointSet; import org.apache.impala.util.EventSequence; import org.apache.impala.util.ListMap; import org.apache.impala.util.TSessionStateUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; @@ -2246,6 +2247,9 @@ public class Analyzer { public String getDefaultDb() { return globalState_.queryCtx.session.database; } public User getUser() { return user_; } public TQueryCtx getQueryCtx() { return globalState_.queryCtx; } + public TQueryOptions getQueryOptions() { + return globalState_.queryCtx.getRequest().getQuery_options(); + } public AuthorizationConfig getAuthzConfig() { return globalState_.authzConfig; } public ListMap<TNetworkAddress> getHostIndex() { return globalState_.hostIndex; } public ColumnLineageGraph getColumnLineageGraph() { return globalState_.lineageGraph; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/04802535/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index 4052867..3d52aa4 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -21,9 +21,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.Set; import org.apache.impala.analysis.Analyzer; import org.apache.impala.analysis.Expr; @@ -55,6 +53,9 @@ import org.apache.impala.thrift.TScanRange; import org.apache.impala.thrift.TScanRangeLocation; import org.apache.impala.thrift.TScanRangeLocations; import org.apache.impala.util.MembershipSnapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.google.common.base.Joiner; import com.google.common.base.Objects; import com.google.common.base.Objects.ToStringHelper; @@ -107,6 +108,9 @@ public class HdfsScanNode extends ScanNode { // True if this scan node should use codegen for evaluting conjuncts. private boolean codegenConjuncts_; + // True if this scan node should use the MT implementation in the backend. + private boolean useMtScanNode_; + // Conjuncts that can be evaluated while materializing the items (tuples) of // collection-typed slots. Maps from tuple descriptor to the conjuncts bound by that // tuple. Uses a linked hash map for consistent display in explain. @@ -168,7 +172,16 @@ public class HdfsScanNode extends ScanNode { computeMemLayout(analyzer); // compute scan range locations - computeScanRangeLocations(analyzer); + Set<HdfsFileFormat> fileFormats = computeScanRangeLocations(analyzer); + + // Determine backend scan node implementation to use. The optimized MT implementation + // is currently only supported for Parquet. + if (analyzer.getQueryOptions().mt_dop > 0 && + fileFormats.size() == 1 && fileFormats.contains(HdfsFileFormat.PARQUET)) { + useMtScanNode_ = true; + } else { + useMtScanNode_ = false; + } // do this at the end so it can take all conjuncts and scan ranges into account computeStats(analyzer); @@ -298,12 +311,15 @@ public class HdfsScanNode extends ScanNode { /** * Computes scan ranges (hdfs splits) plus their storage locations, including volume * ids, based on the given maximum number of bytes each scan range should scan. + * Returns the set of file formats being scanned. */ - private void computeScanRangeLocations(Analyzer analyzer) { + private Set<HdfsFileFormat> computeScanRangeLocations(Analyzer analyzer) { long maxScanRangeLength = analyzer.getQueryCtx().getRequest().getQuery_options() .getMax_scan_range_length(); scanRanges_ = Lists.newArrayList(); + Set<HdfsFileFormat> fileFormats = Sets.newHashSet(); for (HdfsPartition partition: partitions_) { + fileFormats.add(partition.getFileFormat()); Preconditions.checkState(partition.getId() >= 0); for (HdfsPartition.FileDescriptor fileDesc: partition.getFileDescriptors()) { for (THdfsFileBlock thriftBlock: fileDesc.getFileBlocks()) { @@ -353,6 +369,7 @@ public class HdfsScanNode extends ScanNode { } } } + return fileFormats; } /** @@ -542,6 +559,7 @@ public class HdfsScanNode extends ScanNode { if (skipHeaderLineCount_ > 0) { msg.hdfs_scan_node.setSkip_header_line_count(skipHeaderLineCount_); } + msg.hdfs_scan_node.setUse_mt_scan_node(useMtScanNode_); } @Override http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/04802535/fe/src/main/java/org/apache/impala/planner/Planner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java index ed4c677..8abb901 100644 --- a/fe/src/main/java/org/apache/impala/planner/Planner.java +++ b/fe/src/main/java/org/apache/impala/planner/Planner.java @@ -21,9 +21,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.impala.analysis.AnalysisContext; import org.apache.impala.analysis.Analyzer; import org.apache.impala.analysis.ColumnLineageGraph; @@ -43,6 +40,9 @@ import org.apache.impala.thrift.TQueryExecRequest; import org.apache.impala.thrift.TRuntimeFilterMode; import org.apache.impala.thrift.TTableName; import org.apache.impala.util.MaxRowsProcessedVisitor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -117,12 +117,13 @@ public class Planner { "Runtime filters computed"); } + singleNodePlanner.validatePlan(singleNodePlan); + if (ctx_.isSingleNodeExec()) { // create one fragment containing the entire single-node plan tree fragments = Lists.newArrayList(new PlanFragment( ctx_.getNextFragmentId(), singleNodePlan, DataPartition.UNPARTITIONED)); } else { - singleNodePlanner.validatePlan(singleNodePlan); // create distributed plan fragments = distributedPlanner.createPlanFragments(singleNodePlan); } @@ -200,10 +201,14 @@ public class Planner { * TODO: roll into createPlan() */ public List<PlanFragment> createParallelPlans() throws ImpalaException { + Preconditions.checkState(ctx_.getQueryOptions().mt_dop > 0); ArrayList<PlanFragment> distrPlan = createPlan(); Preconditions.checkNotNull(distrPlan); ParallelPlanner planner = new ParallelPlanner(ctx_); List<PlanFragment> parallelPlans = planner.createPlans(distrPlan.get(0)); + // Only use one scanner thread per scan-node instance since intra-node + // parallelism is achieved via multiple fragment instances. + ctx_.getQueryOptions().setNum_scanner_threads(1); ctx_.getRootAnalyzer().getTimeline().markEvent("Parallel plans created"); return parallelPlans; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/04802535/fe/src/main/java/org/apache/impala/planner/PlannerContext.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/PlannerContext.java b/fe/src/main/java/org/apache/impala/planner/PlannerContext.java index 3275a7a..721acf9 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlannerContext.java +++ b/fe/src/main/java/org/apache/impala/planner/PlannerContext.java @@ -25,6 +25,7 @@ import org.apache.impala.analysis.QueryStmt; import org.apache.impala.common.IdGenerator; import org.apache.impala.thrift.TQueryCtx; import org.apache.impala.thrift.TQueryOptions; + import com.google.common.collect.Lists; /** @@ -79,9 +80,7 @@ public class PlannerContext { public QueryStmt getQueryStmt() { return queryStmt_; } public TQueryCtx getQueryCtx() { return queryCtx_; } - public TQueryOptions getQueryOptions() { - return queryCtx_.getRequest().getQuery_options(); - } + public TQueryOptions getQueryOptions() { return getRootAnalyzer().getQueryOptions(); } public AnalysisContext.AnalysisResult getAnalysisResult() { return analysisResult_; } public Analyzer getRootAnalyzer() { return analysisResult_.getAnalyzer(); } public boolean isSingleNodeExec() { return getQueryOptions().num_nodes == 1; } @@ -91,7 +90,10 @@ public class PlannerContext { return analysisResult_.isInsertStmt() || analysisResult_.isCreateTableAsSelectStmt(); } public boolean isQuery() { return analysisResult_.isQueryStmt(); } - + public boolean hasTableSink() { + return isInsertOrCtas() || analysisResult_.isUpdateStmt() + || analysisResult_.isDeleteStmt(); + } public boolean hasSubplan() { return !subplans_.isEmpty(); } public SubplanNode getSubplan() { return subplans_.getFirst(); } public boolean pushSubplan(SubplanNode n) { return subplans_.offerFirst(n); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/04802535/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java index b686fe6..434e36d 100644 --- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java @@ -27,9 +27,6 @@ import java.util.ListIterator; import java.util.Map; import java.util.Set; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.impala.analysis.AggregateInfo; import org.apache.impala.analysis.AnalyticInfo; import org.apache.impala.analysis.Analyzer; @@ -67,6 +64,10 @@ import org.apache.impala.common.ImpalaException; import org.apache.impala.common.InternalException; import org.apache.impala.common.NotImplementedException; import org.apache.impala.common.Pair; +import org.apache.impala.common.RuntimeEnv; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; @@ -148,11 +149,22 @@ public class SingleNodePlanner { } /** - * Validates a single-node plan by checking that it does not contain right or - * full outer joins with no equi-join conjuncts that are not inside the right child - * of a SubplanNode. Throws a NotImplementedException if plan validation fails. + * Checks that the given single-node plan is executable: + * - It may not contain right or full outer joins with no equi-join conjuncts that + * are not inside the right child of a SubplanNode. + * - MT_DOP > 0 is not supported for plans with base table joins or table sinks. + * Throws a NotImplementedException if plan validation fails. */ public void validatePlan(PlanNode planNode) throws NotImplementedException { + if (ctx_.getQueryOptions().mt_dop > 0 && !RuntimeEnv.INSTANCE.isTestEnv() + && (planNode instanceof JoinNode || ctx_.hasTableSink())) { + throw new NotImplementedException( + "MT_DOP not supported for plans with base table joins or table sinks."); + } + + // As long as MT_DOP == 0 any join can run in a single-node plan. + if (ctx_.isSingleNodeExec() && ctx_.getQueryOptions().mt_dop == 0) return; + if (planNode instanceof NestedLoopJoinNode) { JoinNode joinNode = (JoinNode) planNode; JoinOperator joinOp = joinNode.getJoinOp(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/04802535/fe/src/test/java/org/apache/impala/planner/PlannerTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java index 88a8631..6250969 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java @@ -17,14 +17,13 @@ package org.apache.impala.planner; -import org.junit.Assume; -import org.junit.Test; - import org.apache.impala.catalog.Db; import org.apache.impala.common.RuntimeEnv; import org.apache.impala.thrift.TExplainLevel; import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.thrift.TRuntimeFilterMode; +import org.junit.Assume; +import org.junit.Test; // All planner tests, except for S3 specific tests should go here. public class PlannerTest extends PlannerTestBase { @@ -279,4 +278,20 @@ public class PlannerTest extends PlannerTestBase { Assume.assumeTrue(RuntimeEnv.INSTANCE.isKuduSupported()); runPlannerTestFile("tpch-kudu"); } + + @Test + public void testMtDopValidation() { + // Tests that queries supported with mt_dop > 0 produce a parallel plan, or + // throw a NotImplementedException otherwise (e.g. plan has a distributed join). + TQueryOptions options = defaultQueryOptions(); + options.setMt_dop(3); + try { + // Temporarily unset the test env such that unsupported queries with mt_dop > 0 + // throw an exception. Those are otherwise allowed for testing parallel plans. + RuntimeEnv.INSTANCE.setTestEnv(false); + runPlannerTestFile("mt-dop-validation", options); + } finally { + RuntimeEnv.INSTANCE.setTestEnv(true); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/04802535/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java index 284d7e5..9c12b89 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java @@ -33,13 +33,6 @@ import java.util.regex.Pattern; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.fs.Path; -import org.apache.kudu.client.KuduClient; -import org.apache.kudu.client.KuduScanToken; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.impala.analysis.ColumnLineageGraph; import org.apache.impala.catalog.CatalogException; import org.apache.impala.common.FrontendTestBase; @@ -72,6 +65,13 @@ import org.apache.impala.thrift.TTableDescriptor; import org.apache.impala.thrift.TTupleDescriptor; import org.apache.impala.thrift.TUpdateMembershipRequest; import org.apache.impala.util.MembershipSnapshot; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduScanToken; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -503,6 +503,7 @@ public class PlannerTestBase extends FrontendTestBase { // Query exec request may not be set for DDL, e.g., CTAS. String locationsStr = null; if (execRequest != null && execRequest.isSetQuery_exec_request()) { + if (execRequest.query_exec_request.fragments == null) return; buildMaps(execRequest.query_exec_request); // If we optimize the partition key scans, we may get all the partition key values // from the metadata and don't reference any table. Skip the check in this case. @@ -563,7 +564,8 @@ public class PlannerTestBase extends FrontendTestBase { String query, TExecRequest execRequest, StringBuilder errorLog) { if (execRequest == null) return; if (!execRequest.isSetQuery_exec_request() - || execRequest.query_exec_request == null) { + || execRequest.query_exec_request == null + || execRequest.query_exec_request.fragments == null) { return; } for (TPlanFragment planFragment : execRequest.query_exec_request.fragments) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/04802535/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test new file mode 100644 index 0000000..fe25599 --- /dev/null +++ b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test @@ -0,0 +1,350 @@ +# Distributed nested-loop join not allowed. +select count(*) from +functional_parquet.alltypestiny a, +functional_parquet.alltypestiny b +---- PLAN +not implemented: MT_DOP not supported for plans with base table joins or table sinks. +---- PARALLELPLANS +not implemented: MT_DOP not supported for plans with base table joins or table sinks. +==== +# Distributed hash-join not allowed. +select count(*) from +functional_parquet.alltypestiny a, +functional_parquet.alltypestiny b +where a.id = b.id +---- PLAN +not implemented: MT_DOP not supported for plans with base table joins or table sinks. +---- PARALLELPLANS +not implemented: MT_DOP not supported for plans with base table joins or table sinks. +==== +# Insert not allowed. +insert into functional_parquet.alltypes partition(year,month) +select * from functional_parquet.alltypessmall +---- PLAN +not implemented: MT_DOP not supported for plans with base table joins or table sinks. +---- PARALLELPLANS +not implemented: MT_DOP not supported for plans with base table joins or table sinks. +==== +# CTAS not allowed. +create table ctas_mt_dop_test as select * from functional_parquet.alltypes +---- PLAN +not implemented: MT_DOP not supported for plans with base table joins or table sinks. +---- PARALLELPLANS +not implemented: MT_DOP not supported for plans with base table joins or table sinks. +==== +# Single-table scan/filter/agg/topn should work. +select count(int_col) cnt from functional_parquet.alltypes +where id < 10 +group by bigint_col +order by cnt, bigint_col +limit 10 +---- PLAN +PLAN-ROOT SINK +| +02:TOP-N [LIMIT=10] +| order by: count(int_col) ASC, bigint_col ASC +| hosts=3 per-host-mem=unavailable +| tuple-ids=2 row-size=16B cardinality=10 +| +01:AGGREGATE [FINALIZE] +| output: count(int_col) +| group by: bigint_col +| hosts=3 per-host-mem=unavailable +| tuple-ids=1 row-size=16B cardinality=unavailable +| +00:SCAN HDFS [functional_parquet.alltypes] + partitions=24/24 files=24 size=156.57KB + predicates: id < 10 + table stats: unavailable + column stats: unavailable + hosts=3 per-host-mem=unavailable + tuple-ids=0 row-size=16B cardinality=unavailable +---- PARALLELPLANS +PLAN-ROOT SINK +| +05:MERGING-EXCHANGE [UNPARTITIONED] +| order by: count(int_col) ASC, bigint_col ASC +| limit: 10 +| hosts=3 per-host-mem=unavailable +| tuple-ids=2 row-size=16B cardinality=10 +| +02:TOP-N [LIMIT=10] +| order by: count(int_col) ASC, bigint_col ASC +| hosts=3 per-host-mem=unavailable +| tuple-ids=2 row-size=16B cardinality=10 +| +04:AGGREGATE [FINALIZE] +| output: count:merge(int_col) +| group by: bigint_col +| hosts=3 per-host-mem=unavailable +| tuple-ids=1 row-size=16B cardinality=unavailable +| +03:EXCHANGE [HASH(bigint_col)] +| hosts=3 per-host-mem=unavailable +| tuple-ids=1 row-size=16B cardinality=unavailable +| +01:AGGREGATE [STREAMING] +| output: count(int_col) +| group by: bigint_col +| hosts=3 per-host-mem=unavailable +| tuple-ids=1 row-size=16B cardinality=unavailable +| +00:SCAN HDFS [functional_parquet.alltypes, RANDOM] + partitions=24/24 files=24 size=156.57KB + predicates: id < 10 + table stats: unavailable + column stats: unavailable + hosts=3 per-host-mem=unavailable + tuple-ids=0 row-size=16B cardinality=unavailable +==== +# Single-table scan/filter/analysic should work. +select row_number() over(partition by int_col order by id) +from functional_parquet.alltypes +where id < 10 +---- PLAN +PLAN-ROOT SINK +| +02:ANALYTIC +| functions: row_number() +| partition by: int_col +| order by: id ASC +| window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW +| hosts=3 per-host-mem=unavailable +| tuple-ids=3,2 row-size=16B cardinality=unavailable +| +01:SORT +| order by: int_col ASC NULLS FIRST, id ASC +| hosts=3 per-host-mem=unavailable +| tuple-ids=3 row-size=8B cardinality=unavailable +| +00:SCAN HDFS [functional_parquet.alltypes] + partitions=24/24 files=24 size=156.57KB + predicates: id < 10 + table stats: unavailable + column stats: unavailable + hosts=3 per-host-mem=unavailable + tuple-ids=0 row-size=8B cardinality=unavailable +---- PARALLELPLANS +PLAN-ROOT SINK +| +04:EXCHANGE [UNPARTITIONED] +| hosts=3 per-host-mem=unavailable +| tuple-ids=3,2 row-size=16B cardinality=unavailable +| +02:ANALYTIC +| functions: row_number() +| partition by: int_col +| order by: id ASC +| window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW +| hosts=3 per-host-mem=unavailable +| tuple-ids=3,2 row-size=16B cardinality=unavailable +| +01:SORT +| order by: int_col ASC NULLS FIRST, id ASC +| hosts=3 per-host-mem=unavailable +| tuple-ids=3 row-size=8B cardinality=unavailable +| +03:EXCHANGE [HASH(int_col)] +| hosts=3 per-host-mem=unavailable +| tuple-ids=0 row-size=8B cardinality=unavailable +| +00:SCAN HDFS [functional_parquet.alltypes, RANDOM] + partitions=24/24 files=24 size=156.57KB + predicates: id < 10 + table stats: unavailable + column stats: unavailable + hosts=3 per-host-mem=unavailable + tuple-ids=0 row-size=8B cardinality=unavailable +==== +# Nested-loop join in a subplan should work. +select * +from tpch_nested_parquet.customer c, c.c_orders o, o.o_lineitems +where c_custkey < 10 and o_orderkey < 5 and l_linenumber < 3 +---- PLAN +PLAN-ROOT SINK +| +01:SUBPLAN +| hosts=3 per-host-mem=unavailable +| tuple-ids=2,1,0 row-size=562B cardinality=1500000 +| +|--08:NESTED LOOP JOIN [CROSS JOIN] +| | hosts=3 per-host-mem=unavailable +| | tuple-ids=2,1,0 row-size=562B cardinality=100 +| | +| |--02:SINGULAR ROW SRC +| | parent-subplan=01 +| | hosts=3 per-host-mem=unavailable +| | tuple-ids=0 row-size=254B cardinality=1 +| | +| 04:SUBPLAN +| | hosts=3 per-host-mem=unavailable +| | tuple-ids=2,1 row-size=308B cardinality=100 +| | +| |--07:NESTED LOOP JOIN [CROSS JOIN] +| | | hosts=3 per-host-mem=unavailable +| | | tuple-ids=2,1 row-size=308B cardinality=10 +| | | +| | |--05:SINGULAR ROW SRC +| | | parent-subplan=04 +| | | hosts=3 per-host-mem=unavailable +| | | tuple-ids=1 row-size=124B cardinality=1 +| | | +| | 06:UNNEST [o.o_lineitems] +| | parent-subplan=04 +| | hosts=3 per-host-mem=unavailable +| | tuple-ids=2 row-size=0B cardinality=10 +| | +| 03:UNNEST [c.c_orders o] +| parent-subplan=01 +| hosts=3 per-host-mem=unavailable +| tuple-ids=1 row-size=0B cardinality=10 +| +00:SCAN HDFS [tpch_nested_parquet.customer c] + partitions=1/1 files=4 size=292.36MB + predicates: c_custkey < 10, !empty(c.c_orders) + predicates on o: !empty(o.o_lineitems), o_orderkey < 5 + predicates on o_lineitems: l_linenumber < 3 + table stats: 150000 rows total + columns missing stats: c_orders + hosts=3 per-host-mem=unavailable + tuple-ids=0 row-size=254B cardinality=15000 +---- PARALLELPLANS +PLAN-ROOT SINK +| +09:EXCHANGE [UNPARTITIONED] +| hosts=3 per-host-mem=unavailable +| tuple-ids=2,1,0 row-size=562B cardinality=1500000 +| +01:SUBPLAN +| hosts=3 per-host-mem=unavailable +| tuple-ids=2,1,0 row-size=562B cardinality=1500000 +| +|--08:NESTED LOOP JOIN [CROSS JOIN] +| | hosts=3 per-host-mem=unavailable +| | tuple-ids=2,1,0 row-size=562B cardinality=100 +| | +| |--02:SINGULAR ROW SRC +| | parent-subplan=01 +| | hosts=3 per-host-mem=unavailable +| | tuple-ids=0 row-size=254B cardinality=1 +| | +| 04:SUBPLAN +| | hosts=3 per-host-mem=unavailable +| | tuple-ids=2,1 row-size=308B cardinality=100 +| | +| |--07:NESTED LOOP JOIN [CROSS JOIN] +| | | hosts=3 per-host-mem=unavailable +| | | tuple-ids=2,1 row-size=308B cardinality=10 +| | | +| | |--05:SINGULAR ROW SRC +| | | parent-subplan=04 +| | | hosts=3 per-host-mem=unavailable +| | | tuple-ids=1 row-size=124B cardinality=1 +| | | +| | 06:UNNEST [o.o_lineitems] +| | parent-subplan=04 +| | hosts=3 per-host-mem=unavailable +| | tuple-ids=2 row-size=0B cardinality=10 +| | +| 03:UNNEST [c.c_orders o] +| parent-subplan=01 +| hosts=3 per-host-mem=unavailable +| tuple-ids=1 row-size=0B cardinality=10 +| +00:SCAN HDFS [tpch_nested_parquet.customer c, RANDOM] + partitions=1/1 files=4 size=292.36MB + predicates: c_custkey < 10, !empty(c.c_orders) + predicates on o: !empty(o.o_lineitems), o_orderkey < 5 + predicates on o_lineitems: l_linenumber < 3 + table stats: 150000 rows total + columns missing stats: c_orders + hosts=3 per-host-mem=unavailable + tuple-ids=0 row-size=254B cardinality=15000 +==== +# Hash-join in a subplan should work. +select c.* +from tpch_nested_parquet.customer c, c.c_orders o1, c.c_orders o2 +where o1.o_orderkey = o2.o_orderkey + 2 and o1.o_orderkey < 5 +---- PLAN +PLAN-ROOT SINK +| +01:SUBPLAN +| hosts=3 per-host-mem=unavailable +| tuple-ids=1,0,2 row-size=286B cardinality=1500000 +| +|--06:HASH JOIN [INNER JOIN] +| | hash predicates: o1.o_orderkey = o2.o_orderkey + 2 +| | hosts=3 per-host-mem=unavailable +| | tuple-ids=1,0,2 row-size=286B cardinality=10 +| | +| |--04:UNNEST [c.c_orders o2] +| | parent-subplan=01 +| | hosts=3 per-host-mem=unavailable +| | tuple-ids=2 row-size=0B cardinality=10 +| | +| 05:NESTED LOOP JOIN [CROSS JOIN] +| | hosts=3 per-host-mem=unavailable +| | tuple-ids=1,0 row-size=278B cardinality=10 +| | +| |--02:SINGULAR ROW SRC +| | parent-subplan=01 +| | hosts=3 per-host-mem=unavailable +| | tuple-ids=0 row-size=270B cardinality=1 +| | +| 03:UNNEST [c.c_orders o1] +| parent-subplan=01 +| hosts=3 per-host-mem=unavailable +| tuple-ids=1 row-size=0B cardinality=10 +| +00:SCAN HDFS [tpch_nested_parquet.customer c] + partitions=1/1 files=4 size=292.36MB + predicates: !empty(c.c_orders), !empty(c.c_orders) + predicates on o1: o1.o_orderkey < 5 + table stats: 150000 rows total + columns missing stats: c_orders, c_orders + hosts=3 per-host-mem=unavailable + tuple-ids=0 row-size=270B cardinality=150000 +---- PARALLELPLANS +PLAN-ROOT SINK +| +07:EXCHANGE [UNPARTITIONED] +| hosts=3 per-host-mem=unavailable +| tuple-ids=1,0,2 row-size=286B cardinality=1500000 +| +01:SUBPLAN +| hosts=3 per-host-mem=unavailable +| tuple-ids=1,0,2 row-size=286B cardinality=1500000 +| +|--06:HASH JOIN [INNER JOIN] +| | hash predicates: o1.o_orderkey = o2.o_orderkey + 2 +| | hosts=3 per-host-mem=unavailable +| | tuple-ids=1,0,2 row-size=286B cardinality=10 +| | +| |--04:UNNEST [c.c_orders o2] +| | parent-subplan=01 +| | hosts=3 per-host-mem=unavailable +| | tuple-ids=2 row-size=0B cardinality=10 +| | +| 05:NESTED LOOP JOIN [CROSS JOIN] +| | hosts=3 per-host-mem=unavailable +| | tuple-ids=1,0 row-size=278B cardinality=10 +| | +| |--02:SINGULAR ROW SRC +| | parent-subplan=01 +| | hosts=3 per-host-mem=unavailable +| | tuple-ids=0 row-size=270B cardinality=1 +| | +| 03:UNNEST [c.c_orders o1] +| parent-subplan=01 +| hosts=3 per-host-mem=unavailable +| tuple-ids=1 row-size=0B cardinality=10 +| +00:SCAN HDFS [tpch_nested_parquet.customer c, RANDOM] + partitions=1/1 files=4 size=292.36MB + predicates: !empty(c.c_orders), !empty(c.c_orders) + predicates on o1: o1.o_orderkey < 5 + table stats: 150000 rows total + columns missing stats: c_orders, c_orders + hosts=3 per-host-mem=unavailable + tuple-ids=0 row-size=270B cardinality=150000 +====