MT: Planner for multi-threaded execution New classes: - ParallelPlanner: creates build plans, assigns plans to cohorts - JoinBuildSink: DataSink for plan fragments that materialize build sides - ids for plans, hash tables, plan fragments
Tests: this adds a new test file section PARALLELPLANS and augments the tpc-h/-ds tests with those sections. In the interest of keeping this patch small I didn't augment other test files with that section yet (which will happen at a later date, to cover more corner cases). Change-Id: Ic3c34dd3f9190a131e6f03d901b4bfcd164a5174 Reviewed-on: http://gerrit.cloudera.org:8080/2846 Tested-by: Internal Jenkins Reviewed-by: Marcel Kornacker <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/3b7d5b7c Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/3b7d5b7c Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/3b7d5b7c Branch: refs/heads/master Commit: 3b7d5b7c178fff8b269b6854842cc5dcf2e4d725 Parents: 8e64273 Author: Marcel Kornacker <[email protected]> Authored: Mon Mar 21 20:13:30 2016 -0700 Committer: Tim Armstrong <[email protected]> Committed: Thu May 12 14:17:56 2016 -0700 ---------------------------------------------------------------------- be/src/service/query-options.cc | 12 + be/src/service/query-options.h | 5 +- common/thrift/DataSinks.thrift | 12 +- common/thrift/Frontend.thrift | 5 + common/thrift/ImpalaInternalService.thrift | 6 + common/thrift/ImpalaService.thrift | 3 + common/thrift/Planner.thrift | 10 + common/thrift/Types.thrift | 1 + .../impala/analysis/FunctionCallExpr.java | 3 +- .../impala/analysis/TupleDescriptor.java | 3 + .../com/cloudera/impala/common/TreeNode.java | 68 +- .../com/cloudera/impala/planner/CohortId.java | 39 + .../com/cloudera/impala/planner/DataSink.java | 1 - .../impala/planner/DistributedPlanner.java | 66 +- .../cloudera/impala/planner/ExchangeNode.java | 3 +- .../cloudera/impala/planner/HashJoinNode.java | 6 + .../cloudera/impala/planner/JoinBuildSink.java | 100 + .../com/cloudera/impala/planner/JoinNode.java | 6 + .../cloudera/impala/planner/JoinTableId.java | 44 + .../impala/planner/NestedLoopJoinNode.java | 4 + .../impala/planner/ParallelPlanner.java | 202 ++ .../cloudera/impala/planner/PlanFragment.java | 142 +- .../com/cloudera/impala/planner/PlanId.java | 39 + .../com/cloudera/impala/planner/PlanNode.java | 16 +- .../com/cloudera/impala/planner/Planner.java | 14 + .../com/cloudera/impala/service/Frontend.java | 24 +- .../impala/planner/PlannerTestBase.java | 18 +- .../impala/testutil/TestFileParser.java | 18 +- .../queries/PlannerTest/tpcds-all.test | 3400 ++++++++++++++++-- .../queries/PlannerTest/tpch-all.test | 1472 +++++++- 30 files changed, 5187 insertions(+), 555 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/be/src/service/query-options.cc ---------------------------------------------------------------------- diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc index 8f5a682..45571bf 100644 --- a/be/src/service/query-options.cc +++ b/be/src/service/query-options.cc @@ -388,6 +388,18 @@ Status impala::SetQueryOption(const string& key, const string& value, } break; } + case TImpalaQueryOptions::MT_NUM_CORES: { + StringParser::ParseResult result; + const int32_t num_cores = + StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result); + if (result != StringParser::PARSE_SUCCESS || num_cores < 0 || num_cores > 128) { + return Status( + Substitute("$0 is not valid for mt_num_cores. Valid values are in " + "[0, 128].", value)); + } + query_options->__set_mt_num_cores(num_cores); + break; + } default: // We hit this DCHECK(false) if we forgot to add the corresponding entry here // when we add a new query option. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/be/src/service/query-options.h ---------------------------------------------------------------------- diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h index c445658..a727c8d 100644 --- a/be/src/service/query-options.h +++ b/be/src/service/query-options.h @@ -32,7 +32,7 @@ class TQueryOptions; // the DCHECK. #define QUERY_OPTS_TABLE\ DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\ - TImpalaQueryOptions::PARQUET_FALLBACK_SCHEMA_RESOLUTION + 1);\ + TImpalaQueryOptions::MT_NUM_CORES + 1);\ QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\ QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR)\ QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\ @@ -75,7 +75,8 @@ class TQueryOptions; QUERY_OPT_FN(disable_row_runtime_filtering, DISABLE_ROW_RUNTIME_FILTERING)\ QUERY_OPT_FN(max_num_runtime_filters, MAX_NUM_RUNTIME_FILTERS)\ QUERY_OPT_FN(parquet_annotate_strings_utf8, PARQUET_ANNOTATE_STRINGS_UTF8)\ - QUERY_OPT_FN(parquet_fallback_schema_resolution, PARQUET_FALLBACK_SCHEMA_RESOLUTION); + QUERY_OPT_FN(parquet_fallback_schema_resolution, PARQUET_FALLBACK_SCHEMA_RESOLUTION)\ + QUERY_OPT_FN(mt_num_cores, MT_NUM_CORES); /// Converts a TQueryOptions struct into a map of key, value pairs. void TQueryOptionsToMap(const TQueryOptions& query_options, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/common/thrift/DataSinks.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift index 18019af..dd381fa 100644 --- a/common/thrift/DataSinks.thrift +++ b/common/thrift/DataSinks.thrift @@ -22,7 +22,8 @@ include "Partitions.thrift" enum TDataSinkType { DATA_STREAM_SINK, - TABLE_SINK + TABLE_SINK, + JOIN_BUILD_SINK } enum TSinkAction { @@ -73,6 +74,14 @@ struct TKuduTableSink { 2: optional bool ignore_not_found_or_duplicate; } +// Sink to create the build side of a JoinNode. +struct TJoinBuildSink { + 1: required Types.TJoinTableId join_table_id + + // only set for hash join build sinks + 2: required list<Exprs.TExpr> build_exprs +} + // Union type of all table sinks. struct TTableSink { 1: required Types.TTableId target_table_id @@ -86,4 +95,5 @@ struct TDataSink { 1: required TDataSinkType type 2: optional TDataStreamSink stream_sink 3: optional TTableSink table_sink + 4: optional TJoinBuildSink join_build_sink } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/common/thrift/Frontend.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift index 1842694..6363fcf 100644 --- a/common/thrift/Frontend.thrift +++ b/common/thrift/Frontend.thrift @@ -347,6 +347,11 @@ struct TQueryExecRequest { // it is unpartitioned. 2: required list<Planner.TPlanFragment> fragments + // Multi-threaded execution: sequence of plans; the last one materializes + // the query result + // TODO: this will eventually supercede 'fragments' + 14: optional list<Planner.TPlanFragmentTree> mt_plans + // Specifies the destination fragment of the output of each fragment. // parent_fragment_idx.size() == fragments.size() - 1 and // fragments[i] sends its output to fragments[dest_fragment_idx[i-1]] http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/common/thrift/ImpalaInternalService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index 1fcce1b..6c2fc3e 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -180,6 +180,12 @@ struct TQueryOptions { // is always, since fields IDs are NYI). Valid values are "position" (default) and // "name". 43: optional TParquetFallbackSchemaResolution parquet_fallback_schema_resolution = 0 + + // Multi-threaded execution: number of cores per query per node. + // > 1: multi-threaded execution mode, with given number of cores + // 1: single-threaded execution mode + // 0: multi-threaded execution mode, number of cores is the pool default + 44: optional i32 mt_num_cores = 1 } // Impala currently has two types of sessions: Beeswax and HiveServer2 http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/common/thrift/ImpalaService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index 3f5273e..c9535eb 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -209,6 +209,9 @@ enum TImpalaQueryOptions { // Determines how to resolve Parquet files' schemas in the absence of field IDs (which // is always, since fields IDs are NYI). Valid values are "position" and "name". PARQUET_FALLBACK_SCHEMA_RESOLUTION + + // Multi-threaded execution: number of cores per machine + MT_NUM_CORES } // The summary of an insert. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/common/thrift/Planner.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/Planner.thrift b/common/thrift/Planner.thrift index cf2cc04..43a08a9 100644 --- a/common/thrift/Planner.thrift +++ b/common/thrift/Planner.thrift @@ -76,3 +76,13 @@ struct TScanRangeLocations { // non-empty list 2: list<TScanRangeLocation> locations } + +// A plan: tree of plan fragments that materializes either a query result or the build +// side of a join used by another plan; it consists of a sequence of plan fragments. +// TODO: rename both this and PlanNodes.TPlan (TPlan should be something like TExecPlan +// or TExecTree) +struct TPlanFragmentTree { + 1: required i32 cohort_id + + 2: required list<TPlanFragment> fragments +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/common/thrift/Types.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/Types.thrift b/common/thrift/Types.thrift index 8b2c5a2..85a14dc 100644 --- a/common/thrift/Types.thrift +++ b/common/thrift/Types.thrift @@ -20,6 +20,7 @@ typedef i32 TPlanNodeId typedef i32 TTupleId typedef i32 TSlotId typedef i32 TTableId +typedef i32 TJoinTableId // TODO: Consider moving unrelated enums to better locations. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/analysis/FunctionCallExpr.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/analysis/FunctionCallExpr.java b/fe/src/main/java/com/cloudera/impala/analysis/FunctionCallExpr.java index 7950d4c..ca4097b 100644 --- a/fe/src/main/java/com/cloudera/impala/analysis/FunctionCallExpr.java +++ b/fe/src/main/java/com/cloudera/impala/analysis/FunctionCallExpr.java @@ -33,6 +33,7 @@ import com.cloudera.impala.thrift.TFunctionBinaryType; import com.google.common.base.Joiner; import com.google.common.base.Objects; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; public class FunctionCallExpr extends Expr { private final FunctionName fnName_; @@ -66,7 +67,7 @@ public class FunctionCallExpr extends Expr { fnName_ = fnName; params_ = params; isMergeAggFn_ = isMergeAggFn; - if (params.exprs() != null) children_ = params_.exprs(); + if (params.exprs() != null) children_ = Lists.newArrayList(params_.exprs()); } /** http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/analysis/TupleDescriptor.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/analysis/TupleDescriptor.java b/fe/src/main/java/com/cloudera/impala/analysis/TupleDescriptor.java index e042af1..63bcde5 100644 --- a/fe/src/main/java/com/cloudera/impala/analysis/TupleDescriptor.java +++ b/fe/src/main/java/com/cloudera/impala/analysis/TupleDescriptor.java @@ -110,10 +110,12 @@ public class TupleDescriptor { if (path_ == null) return null; return path_.getRootTable(); } + public TableName getTableName() { Table t = getTable(); return (t == null) ? null : t.getTableName(); } + public void setPath(Path p) { Preconditions.checkNotNull(p); Preconditions.checkState(p.isResolved()); @@ -128,6 +130,7 @@ public class TupleDescriptor { type_ = Path.getTypeAsStruct(p.destType()); } } + public Path getPath() { return path_; } public void setType(StructType type) { type_ = type; } public StructType getType() { return type_; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/common/TreeNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/common/TreeNode.java b/fe/src/main/java/com/cloudera/impala/common/TreeNode.java index a7d46ce..37e7995 100644 --- a/fe/src/main/java/com/cloudera/impala/common/TreeNode.java +++ b/fe/src/main/java/com/cloudera/impala/common/TreeNode.java @@ -21,12 +21,11 @@ import java.util.List; import com.cloudera.impala.util.Visitor; import com.google.common.base.Predicate; -public class TreeNode<NodeType extends TreeNode<NodeType>> { - protected List<NodeType> children_; - - protected TreeNode() { - this.children_ = new ArrayList<NodeType>(); - } +/** + * Generic tree structure. Only concrete subclasses of this can be instantiated. + */ +public abstract class TreeNode<NodeType extends TreeNode<NodeType>> { + protected ArrayList<NodeType> children_ = new ArrayList<NodeType>(); public NodeType getChild(int i) { return hasChild(i) ? children_.get(i) : null; @@ -36,13 +35,17 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> { children_.add(n); } + public void removeChild(NodeType n) { children_.remove(n); } + + public void clearChildren() { children_.clear(); } + public void addChildren(List<? extends NodeType> l) { children_.addAll(l); } public boolean hasChild(int i) { return children_.size() > i; } public void setChild(int index, NodeType n) { children_.set(index, n); } - public List<NodeType> getChildren() { return children_; } + public ArrayList<NodeType> getChildren() { return children_; } /** * Count the total number of nodes in this tree. Leaf node will return 1. @@ -50,9 +53,7 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> { */ public int numNodes() { int numNodes = 1; - for (NodeType child: children_) { - numNodes += child.numNodes(); - } + for (NodeType child: children_) numNodes += child.numNodes(); return numNodes; } @@ -72,10 +73,7 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> { matches.add((D) this); return; } - - for (NodeType child: children_) { - child.collect(predicate, matches); - } + for (NodeType child: children_) child.collect(predicate, matches); } /** @@ -89,10 +87,7 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> { matches.add((D) this); return; } - - for (NodeType child: children_) { - child.collect(cl, matches); - } + for (NodeType child: children_) child.collect(cl, matches); } /** @@ -102,13 +97,8 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> { */ public <C extends TreeNode<NodeType>, D extends C> void collectAll( Predicate<? super C> predicate, List<D> matches) { - if (predicate.apply((C) this)) { - matches.add((D) this); - } - - for (NodeType child: children_) { - child.collectAll(predicate, matches); - } + if (predicate.apply((C) this)) matches.add((D) this); + for (NodeType child: children_) child.collectAll(predicate, matches); } /** @@ -117,9 +107,7 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> { */ public static <C extends TreeNode<C>, D extends C> void collect( Collection<C> nodeList, Predicate<? super C> predicate, Collection<D> matches) { - for (C node: nodeList) { - node.collect(predicate, matches); - } + for (C node: nodeList) node.collect(predicate, matches); } /** @@ -128,9 +116,7 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> { */ public static <C extends TreeNode<C>, D extends C> void collect( Collection<C> nodeList, Class cl, Collection<D> matches) { - for (C node: nodeList) { - node.collect(cl, matches); - } + for (C node: nodeList) node.collect(cl, matches); } /** @@ -139,9 +125,7 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> { public <C extends TreeNode<NodeType>> boolean contains( Predicate<? super C> predicate) { if (predicate.apply((C) this)) return true; - for (NodeType child: children_) { - if (child.contains(predicate)) return true; - } + for (NodeType child: children_) if (child.contains(predicate)) return true; return false; } @@ -150,9 +134,7 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> { */ public boolean contains(Class cl) { if (cl.equals(getClass())) return true; - for (NodeType child: children_) { - if (child.contains(cl)) return true; - } + for (NodeType child: children_) if (child.contains(cl)) return true; return false; } @@ -162,9 +144,7 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> { */ public static <C extends TreeNode<C>, D extends C> boolean contains( Collection<C> nodeList, Predicate<? super C> predicate) { - for (C node: nodeList) { - if (node.contains(predicate)) return true; - } + for (C node: nodeList) if (node.contains(predicate)) return true; return false; } @@ -173,9 +153,7 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> { */ public static <C extends TreeNode<C>> boolean contains( List<C> nodeList, Class cl) { - for (C node: nodeList) { - if (node.contains(cl)) return true; - } + for (C node: nodeList) if (node.contains(cl)) return true; return false; } @@ -196,8 +174,6 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> { */ public <C extends TreeNode<NodeType>> void accept(Visitor<C> visitor) { visitor.visit((C) this); - for (NodeType p: children_) { - p.accept(visitor); - } + for (NodeType p: children_) p.accept(visitor); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/CohortId.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/CohortId.java b/fe/src/main/java/com/cloudera/impala/planner/CohortId.java new file mode 100644 index 0000000..2b5b2e4 --- /dev/null +++ b/fe/src/main/java/com/cloudera/impala/planner/CohortId.java @@ -0,0 +1,39 @@ +// Copyright 2016 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.cloudera.impala.planner; + +import com.cloudera.impala.common.Id; +import com.cloudera.impala.common.IdGenerator; + +public class CohortId extends Id<CohortId> { + // Construction only allowed via an IdGenerator. + protected CohortId(int id) { + super(id); + } + + public static IdGenerator<CohortId> createGenerator() { + return new IdGenerator<CohortId>() { + @Override + public CohortId getNextId() { return new CohortId(nextId_++); } + @Override + public CohortId getMaxId() { return new CohortId(nextId_ - 1); } + }; + } + + @Override + public String toString() { + return String.format("%02d", id_); + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/DataSink.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/DataSink.java b/fe/src/main/java/com/cloudera/impala/planner/DataSink.java index 084dd96..a298781 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/DataSink.java +++ b/fe/src/main/java/com/cloudera/impala/planner/DataSink.java @@ -29,7 +29,6 @@ import com.cloudera.impala.thrift.TExplainLevel; * The destination could be another plan fragment on a remote machine, * or a table into which the rows are to be inserted * (i.e., the destination of the last fragment of an INSERT statement). - * */ public abstract class DataSink { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java b/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java index d1a3069..bdf32e9 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java +++ b/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java @@ -230,7 +230,7 @@ public class DistributedPlanner { Preconditions.checkState(partitionHint == null || partitionHint); ExchangeNode exchNode = new ExchangeNode(ctx_.getNextNodeId()); - exchNode.addChild(inputFragment.getPlanRoot(), false); + exchNode.addChild(inputFragment.getPlanRoot()); exchNode.init(analyzer); Preconditions.checkState(exchNode.hasValidStats()); DataPartition partition = DataPartition.hashPartitioned(nonConstPartitionExprs); @@ -251,7 +251,7 @@ public class DistributedPlanner { throws ImpalaException { Preconditions.checkState(inputFragment.isPartitioned()); ExchangeNode mergePlan = new ExchangeNode(ctx_.getNextNodeId()); - mergePlan.addChild(inputFragment.getPlanRoot(), false); + mergePlan.addChild(inputFragment.getPlanRoot()); mergePlan.init(ctx_.getRootAnalyzer()); Preconditions.checkState(mergePlan.hasValidStats()); PlanFragment fragment = new PlanFragment(ctx_.getNextFragmentId(), mergePlan, @@ -292,7 +292,7 @@ public class DistributedPlanner { throws ImpalaException { node.setDistributionMode(DistributionMode.BROADCAST); node.setChild(0, leftChildFragment.getPlanRoot()); - connectChildFragment(node, 1, rightChildFragment); + connectChildFragment(node, 1, leftChildFragment, rightChildFragment); leftChildFragment.setPlanRoot(node); return leftChildFragment; } @@ -323,11 +323,11 @@ public class DistributedPlanner { lhsJoinExprs, rhsJoinExprs, analyzer)) { node.setChild(0, leftChildFragment.getPlanRoot()); node.setChild(1, rightChildFragment.getPlanRoot()); - // Redirect fragments sending to rightFragment to leftFragment. - for (PlanFragment fragment: fragments) { - if (fragment.getDestFragment() == rightChildFragment) { - fragment.setDestination(fragment.getDestNode()); - } + // fix up PlanNode.fragment_ for the migrated PlanNode tree of the rhs child + leftChildFragment.setFragmentInPlanTree(node.getChild(1)); + // Relocate input fragments of rightChildFragment to leftChildFragment. + for (PlanFragment rhsInput: rightChildFragment.getChildren()) { + leftChildFragment.getChildren().add(rhsInput); } // Remove right fragment because its plan tree has been merged into leftFragment. fragments.remove(rightChildFragment); @@ -346,7 +346,7 @@ public class DistributedPlanner { leftChildFragment.getDataPartition(), rhsJoinExprs, analyzer); if (rhsJoinPartition != null) { node.setChild(0, leftChildFragment.getPlanRoot()); - connectChildFragment(node, 1, rightChildFragment); + connectChildFragment(node, 1, leftChildFragment, rightChildFragment); rightChildFragment.setOutputPartition(rhsJoinPartition); leftChildFragment.setPlanRoot(node); return leftChildFragment; @@ -360,7 +360,7 @@ public class DistributedPlanner { rightChildFragment.getDataPartition(), lhsJoinExprs, analyzer); if (lhsJoinPartition != null) { node.setChild(1, rightChildFragment.getPlanRoot()); - connectChildFragment(node, 0, leftChildFragment); + connectChildFragment(node, 0, rightChildFragment, leftChildFragment); leftChildFragment.setOutputPartition(lhsJoinPartition); rightChildFragment.setPlanRoot(node); return rightChildFragment; @@ -379,11 +379,11 @@ public class DistributedPlanner { // on their respective join exprs. // The new fragment is hash-partitioned on the lhs input join exprs. ExchangeNode lhsExchange = new ExchangeNode(ctx_.getNextNodeId()); - lhsExchange.addChild(leftChildFragment.getPlanRoot(), false); + lhsExchange.addChild(leftChildFragment.getPlanRoot()); lhsExchange.computeStats(null); node.setChild(0, lhsExchange); ExchangeNode rhsExchange = new ExchangeNode(ctx_.getNextNodeId()); - rhsExchange.addChild(rightChildFragment.getPlanRoot(), false); + rhsExchange.addChild(rightChildFragment.getPlanRoot()); rhsExchange.computeStats(null); node.setChild(1, rhsExchange); @@ -502,7 +502,7 @@ public class DistributedPlanner { // the join; the build input is provided by an ExchangeNode, which is the // destination of the rightChildFragment's output node.setChild(0, leftChildFragment.getPlanRoot()); - connectChildFragment(node, 1, rightChildFragment); + connectChildFragment(node, 1, leftChildFragment, rightChildFragment); leftChildFragment.setPlanRoot(node); hjFragment = leftChildFragment; } else { @@ -624,15 +624,22 @@ public class DistributedPlanner { if (!childFragments.get(i).isPartitioned()) ++numUnpartitionedChildFragments; } + // remove all children to avoid them being tagged with the wrong + // fragment (in the PlanFragment c'tor; we haven't created ExchangeNodes yet) + unionNode.clearChildren(); + // If all child fragments are unpartitioned, return a single unpartitioned fragment // with a UnionNode that merges all child fragments. if (numUnpartitionedChildFragments == childFragments.size()) { - // Absorb the plan trees of all childFragments into unionNode. - for (int i = 0; i < childFragments.size(); ++i) { - unionNode.setChild(i, childFragments.get(i).getPlanRoot()); - } PlanFragment unionFragment = new PlanFragment(ctx_.getNextFragmentId(), unionNode, DataPartition.UNPARTITIONED); + // Absorb the plan trees of all childFragments into unionNode + // and fix up the fragment tree in the process. + for (int i = 0; i < childFragments.size(); ++i) { + unionNode.addChild(childFragments.get(i).getPlanRoot()); + unionFragment.setFragmentInPlanTree(unionNode.getChild(i)); + unionFragment.addChildren(childFragments.get(i).getChildren()); + } unionNode.init(ctx_.getRootAnalyzer()); // All child fragments have been absorbed into unionFragment. fragments.removeAll(childFragments); @@ -640,22 +647,24 @@ public class DistributedPlanner { } // There is at least one partitioned child fragment. + PlanFragment unionFragment = new PlanFragment( + ctx_.getNextFragmentId(), unionNode, DataPartition.RANDOM); for (int i = 0; i < childFragments.size(); ++i) { PlanFragment childFragment = childFragments.get(i); if (childFragment.isPartitioned()) { - // Absorb the plan trees of all partitioned child fragments into unionNode. - unionNode.setChild(i, childFragment.getPlanRoot()); + // absorb the plan trees of all partitioned child fragments into unionNode + unionNode.addChild(childFragment.getPlanRoot()); + unionFragment.setFragmentInPlanTree(unionNode.getChild(i)); + unionFragment.addChildren(childFragment.getChildren()); fragments.remove(childFragment); } else { + // dummy entry for subsequent addition of the ExchangeNode + unionNode.addChild(null); // Connect the unpartitioned child fragments to unionNode via a random exchange. - connectChildFragment(unionNode, i, childFragment); + connectChildFragment(unionNode, i, unionFragment, childFragment); childFragment.setOutputPartition(DataPartition.RANDOM); } } - - // Fragment contains the UnionNode that consumes the data of all child fragments. - PlanFragment unionFragment = new PlanFragment(ctx_.getNextFragmentId(), - unionNode, DataPartition.RANDOM); unionNode.reorderOperands(ctx_.getRootAnalyzer()); unionNode.init(ctx_.getRootAnalyzer()); return unionFragment; @@ -678,13 +687,14 @@ public class DistributedPlanner { /** * Replace node's child at index childIdx with an ExchangeNode that receives its - * input from childFragment. + * input from childFragment. ParentFragment contains node and the new ExchangeNode. */ private void connectChildFragment(PlanNode node, int childIdx, - PlanFragment childFragment) throws ImpalaException { + PlanFragment parentFragment, PlanFragment childFragment) throws ImpalaException { ExchangeNode exchangeNode = new ExchangeNode(ctx_.getNextNodeId()); - exchangeNode.addChild(childFragment.getPlanRoot(), false); + exchangeNode.addChild(childFragment.getPlanRoot()); exchangeNode.init(ctx_.getRootAnalyzer()); + exchangeNode.setFragment(parentFragment); node.setChild(childIdx, exchangeNode); childFragment.setDestination(exchangeNode); } @@ -703,7 +713,7 @@ public class DistributedPlanner { PlanFragment childFragment, DataPartition parentPartition) throws ImpalaException { ExchangeNode exchangeNode = new ExchangeNode(ctx_.getNextNodeId()); - exchangeNode.addChild(childFragment.getPlanRoot(), false); + exchangeNode.addChild(childFragment.getPlanRoot()); exchangeNode.init(ctx_.getRootAnalyzer()); PlanFragment parentFragment = new PlanFragment(ctx_.getNextFragmentId(), exchangeNode, parentPartition); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/ExchangeNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/ExchangeNode.java b/fe/src/main/java/com/cloudera/impala/planner/ExchangeNode.java index 22601b5..590f718 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/ExchangeNode.java +++ b/fe/src/main/java/com/cloudera/impala/planner/ExchangeNode.java @@ -71,7 +71,7 @@ public class ExchangeNode extends PlanNode { Preconditions.checkState(conjuncts_.isEmpty()); } - public void addChild(PlanNode node, boolean copyConjuncts) { + public void addChild(PlanNode node) { // This ExchangeNode 'inherits' several parameters from its children. // Ensure that all children agree on them. if (!children_.isEmpty()) { @@ -84,7 +84,6 @@ public class ExchangeNode extends PlanNode { tupleIds_ = Lists.newArrayList(node.tupleIds_); nullableTupleIds_ = Sets.newHashSet(node.nullableTupleIds_); } - if (copyConjuncts) conjuncts_.addAll(Expr.cloneList(node.conjuncts_)); children_.add(node); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java b/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java index 10e0460..e641119 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java +++ b/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java @@ -143,6 +143,12 @@ public class HashJoinNode extends JoinNode { output.append(String.format("%s%s [%s]\n", prefix, getDisplayLabel(), getDisplayLabelDetail())); + if (detailLevel.ordinal() > TExplainLevel.STANDARD.ordinal()) { + if (joinTableId_.isValid()) { + output.append( + detailPrefix + "hash-table-id=" + joinTableId_.toString() + "\n"); + } + } if (detailLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) { output.append(detailPrefix + "hash predicates: "); for (int i = 0; i < eqJoinConjuncts_.size(); ++i) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/JoinBuildSink.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/JoinBuildSink.java b/fe/src/main/java/com/cloudera/impala/planner/JoinBuildSink.java new file mode 100644 index 0000000..2971bf8 --- /dev/null +++ b/fe/src/main/java/com/cloudera/impala/planner/JoinBuildSink.java @@ -0,0 +1,100 @@ +// Copyright 2016 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.cloudera.impala.planner; + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.cloudera.impala.analysis.Analyzer; +import com.cloudera.impala.analysis.BinaryPredicate; +import com.cloudera.impala.analysis.Expr; +import com.cloudera.impala.common.ImpalaException; +import com.cloudera.impala.thrift.TDataSink; +import com.cloudera.impala.thrift.TDataSinkType; +import com.cloudera.impala.thrift.TExplainLevel; +import com.cloudera.impala.thrift.TJoinBuildSink; +import com.cloudera.impala.thrift.TPlanNode; +import com.cloudera.impala.thrift.TPlanNodeType; +import com.cloudera.impala.thrift.TQueryOptions; +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +/** + * Sink to materialize the build side of a join. + */ +public class JoinBuildSink extends DataSink { + private final static Logger LOG = LoggerFactory.getLogger(JoinBuildSink.class); + + // id of join's build-side table assigned during planning + private final JoinTableId joinTableId_; + + private final List<Expr> buildExprs_ = Lists.newArrayList(); + + /** + * Creates sink for build side of 'joinNode' (extracts buildExprs_ from joinNode). + */ + public JoinBuildSink(JoinTableId joinTableId, JoinNode joinNode) { + Preconditions.checkState(joinTableId.isValid()); + joinTableId_ = joinTableId; + Preconditions.checkNotNull(joinNode); + Preconditions.checkState(joinNode instanceof JoinNode); + if (!(joinNode instanceof HashJoinNode)) return; + for (Expr eqJoinConjunct: joinNode.getEqJoinConjuncts()) { + BinaryPredicate p = (BinaryPredicate) eqJoinConjunct; + // by convention the build exprs are the rhs of the join conjuncts + buildExprs_.add(p.getChild(1).clone()); + } + } + + public JoinTableId getJoinTableId() { return joinTableId_; } + + @Override + protected TDataSink toThrift() { + TDataSink result = new TDataSink(TDataSinkType.JOIN_BUILD_SINK); + TJoinBuildSink tBuildSink = new TJoinBuildSink(); + tBuildSink.setJoin_table_id(joinTableId_.asInt()); + for (Expr buildExpr: buildExprs_) { + tBuildSink.addToBuild_exprs(buildExpr.treeToThrift()); + } + result.setJoin_build_sink(tBuildSink); + return result; + } + + @Override + public String getExplainString(String prefix, String detailPrefix, + TExplainLevel detailLevel) { + StringBuilder output = new StringBuilder(); + output.append(String.format("%s%s\n", prefix, "JOIN BUILD")); + if (detailLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) { + output.append( + detailPrefix + "join-table-id=" + joinTableId_.toString() + + " plan-id=" + fragment_.getPlanId().toString() + + " cohort-id=" + fragment_.getCohortId().toString() + "\n"); + if (!buildExprs_.isEmpty()) { + output.append(detailPrefix + "build expressions: ") + .append(Expr.toSql(buildExprs_) + "\n"); + } + } + return output.toString(); + } + + @Override + public void computeCosts() { + // TODO: implement? + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java b/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java index 19e4724..34de765 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java +++ b/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java @@ -59,6 +59,10 @@ public abstract class JoinNode extends PlanNode { protected List<BinaryPredicate> eqJoinConjuncts_; protected List<Expr> otherJoinConjuncts_; + // if valid, the rhs input is materialized outside of this node and is assigned + // joinTableId_ + protected JoinTableId joinTableId_ = JoinTableId.INVALID; + public enum DistributionMode { NONE("NONE"), BROADCAST("BROADCAST"), @@ -129,6 +133,8 @@ public abstract class JoinNode extends PlanNode { public DistributionMode getDistributionModeHint() { return distrModeHint_; } public DistributionMode getDistributionMode() { return distrMode_; } public void setDistributionMode(DistributionMode distrMode) { distrMode_ = distrMode; } + public JoinTableId getJoinTableId() { return joinTableId_; } + public void setJoinTableId(JoinTableId id) { joinTableId_ = id; } @Override public void init(Analyzer analyzer) throws ImpalaException { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/JoinTableId.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/JoinTableId.java b/fe/src/main/java/com/cloudera/impala/planner/JoinTableId.java new file mode 100644 index 0000000..4e0f542 --- /dev/null +++ b/fe/src/main/java/com/cloudera/impala/planner/JoinTableId.java @@ -0,0 +1,44 @@ +// Copyright 2016 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.cloudera.impala.planner; + +import com.cloudera.impala.common.Id; +import com.cloudera.impala.common.IdGenerator; + +public class JoinTableId extends Id<JoinTableId> { + // Construction only allowed via an IdGenerator. + protected JoinTableId(int id) { + super(id); + } + + public static JoinTableId INVALID; + static { + INVALID = new JoinTableId(Id.INVALID_ID); + } + + public static IdGenerator<JoinTableId> createGenerator() { + return new IdGenerator<JoinTableId>() { + @Override + public JoinTableId getNextId() { return new JoinTableId(nextId_++); } + @Override + public JoinTableId getMaxId() { return new JoinTableId(nextId_ - 1); } + }; + } + + @Override + public String toString() { + return String.format("%02d", id_); + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java b/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java index 474a60c..fd9ef2f 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java +++ b/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java @@ -93,6 +93,10 @@ public class NestedLoopJoinNode extends JoinNode { displayName_, getDisplayLabelDetail())); } if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) { + if (joinTableId_.isValid()) { + output.append( + detailPrefix + "join table id: " + joinTableId_.toString() + "\n"); + } if (!otherJoinConjuncts_.isEmpty()) { output.append(detailPrefix + "join predicates: ") .append(getExplainString(otherJoinConjuncts_) + "\n"); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/ParallelPlanner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/ParallelPlanner.java b/fe/src/main/java/com/cloudera/impala/planner/ParallelPlanner.java new file mode 100644 index 0000000..f66ce15 --- /dev/null +++ b/fe/src/main/java/com/cloudera/impala/planner/ParallelPlanner.java @@ -0,0 +1,202 @@ +// Copyright 2016 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.cloudera.impala.planner; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.cloudera.impala.common.IdGenerator; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * The parallel planner is responsible for breaking up a single distributed plan + * (= tree of PlanFragments) into a (logical) tree of distributed plans. The root + * of that tree produces the query result, all the other ones produce intermediate + * join build sides. All plans that produce intermediate join build sides (one per join + * node in the recipient) for a single recipient plan are grouped together into a + * cohort. Since each plan may only produce a build side for at most one recipient + * plan, each plan belongs to exactly one cohort. + * + * TODO: if the input to the JoinBuildSink is the result of a grouping aggregation + * on the join keys, the AggregationNode should materialize the final hash table + * directly (instead of reading the hash table content and feeding it into a + * JoinBuildSink to build another hash table) + * + * TODO: instead of cohort ids, create a Plan class that is a subclass of TreeNode? + */ +public class ParallelPlanner { + private final static Logger LOG = LoggerFactory.getLogger(ParallelPlanner.class); + + private final IdGenerator<JoinTableId> joinTableIdGenerator_ = + JoinTableId.createGenerator(); + private final IdGenerator<PlanId> planIdGenerator_ = PlanId.createGenerator(); + private final IdGenerator<CohortId> cohortIdGenerator_ = CohortId.createGenerator(); + private final PlannerContext ctx_; + + private List<PlanFragment> planRoots_ = Lists.newArrayList(); + + public ParallelPlanner(PlannerContext ctx) { ctx_ = ctx; } + + /** + * Given a distributed plan, return list of plans ready for parallel execution. + * The last plan in the sequence materializes the query result, the preceding + * plans materialize the build sides of joins. + * Assigns cohortId and planId for all fragments. + * TODO: create class DistributedPlan with a PlanFragment member, so we don't + * need to distinguish PlanFragment and Plan through comments? + */ + public List<PlanFragment> createPlans(PlanFragment root) { + root.setPlanId(planIdGenerator_.getNextId()); + root.setCohortId(cohortIdGenerator_.getNextId()); + planRoots_.add(root); + createBuildPlans(root, null); + return planRoots_; + } + + /** + * Recursively traverse tree of fragments of 'plan' from top to bottom and + * move all build inputs of joins into separate plans. 'buildCohortId' is the + * cohort id of the build plans of 'fragment' and may be null if the plan + * to which 'fragment' belongs has so far not required any build plans. + * Assign fragment's plan id and cohort id to children. + */ + private void createBuildPlans(PlanFragment fragment, CohortId buildCohortId) { + LOG.info("createbuildplans fragment " + fragment.getId().toString()); + List<JoinNode> joins = Lists.newArrayList(); + collectJoins(fragment.getPlanRoot(), joins); + if (!joins.isEmpty()) { + List<String> joinIds = Lists.newArrayList(); + for (JoinNode join: joins) joinIds.add(join.getId().toString()); + LOG.info("collected joins " + Joiner.on(" ").join(joinIds)); + + if (buildCohortId == null) buildCohortId = cohortIdGenerator_.getNextId(); + for (JoinNode join: joins) createBuildPlan(join, buildCohortId); + } + + if (!fragment.getChildren().isEmpty()) { + List<String> ids = Lists.newArrayList(); + for (PlanFragment c: fragment.getChildren()) ids.add(c.getId().toString()); + LOG.info("collected children " + Joiner.on(" ").join(ids) + " parent " + + fragment.getId().toString()); + } + for (PlanFragment child: fragment.getChildren()) { + child.setPlanId(fragment.getPlanId()); + child.setCohortId(fragment.getCohortId()); + createBuildPlans(child, buildCohortId); + } + } + + /** + * Collect all JoinNodes that aren't themselves the build side of a join node + * in this fragment or the rhs of a SubplanNode. + */ + private void collectJoins(PlanNode node, List<JoinNode> result) { + if (node instanceof JoinNode) { + result.add((JoinNode)node); + // for joins, only descend through the probe side; + // we're recursively traversing the build side when constructing the build plan + // in createBuildPlan() + collectJoins(node.getChild(0), result); + return; + } + if (node instanceof ExchangeNode) return; + if (node instanceof SubplanNode) { + collectJoins(node.getChild(0), result); + return; + } + for (PlanNode child: node.getChildren()) collectJoins(child, result); + } + + /** + * Collect all ExchangeNodes in this fragment. + */ + private void collectExchangeNodes(PlanNode node, List<ExchangeNode> result) { + if (node instanceof ExchangeNode) { + result.add((ExchangeNode)node); + return; + } + for (PlanNode child: node.getChildren()) collectExchangeNodes(child, result); + } + + /** + * Create new plan that materializes build input of 'join' and assign it 'cohortId'. + * In the process, moves all fragments required for this materialization from tree + * rooted at 'join's fragment into the new plan. + * Also assigns the new plan a plan id. + */ + private void createBuildPlan(JoinNode join, CohortId cohortId) { + LOG.info("createbuildplan " + join.getId().toString()); + Preconditions.checkNotNull(cohortId); + // collect all ExchangeNodes on the build side and their corresponding input + // fragments + final List<ExchangeNode> exchNodes = Lists.newArrayList(); + collectExchangeNodes(join.getChild(1), exchNodes); + + com.google.common.base.Predicate<PlanFragment> isInputFragment = + new com.google.common.base.Predicate<PlanFragment>() { + public boolean apply(PlanFragment f) { + // we're starting with the fragment containing the join, which might + // be terminal + if (f.getDestNode() == null) return false; + for (ExchangeNode exch: exchNodes) { + if (exch.getId() == f.getDestNode().getId()) return true; + } + return false; + } + }; + List<PlanFragment> inputFragments = Lists.newArrayList(); + join.getFragment().collect(isInputFragment, inputFragments); + Preconditions.checkState(exchNodes.size() == inputFragments.size()); + + // Create new fragment with JoinBuildSink that consumes the output of the + // join's rhs input (the one that materializes the build side). + // The new fragment has the same data partition as the join node's fragment. + JoinBuildSink buildSink = + new JoinBuildSink(joinTableIdGenerator_.getNextId(), join); + join.setJoinTableId(buildSink.getJoinTableId()); + // c'tor fixes up PlanNode.fragment_ + PlanFragment buildFragment = new PlanFragment(ctx_.getNextFragmentId(), + join.getChild(1), join.getFragment().getDataPartition()); + buildFragment.setSink(buildSink); + + // move input fragments + for (int i = 0; i < exchNodes.size(); ++i) { + LOG.info("re-link fragment " + inputFragments.get(i).getId().toString() + " to " + + exchNodes.get(i).getFragment().getId().toString()); + Preconditions.checkState(exchNodes.get(i).getFragment() == buildFragment); + join.getFragment().removeChild(inputFragments.get(i)); + buildFragment.getChildren().add(inputFragments.get(i)); + } + + // assign plan and cohort id + buildFragment.setPlanId(planIdGenerator_.getNextId()); + PlanId parentPlanId = join.getFragment().getPlanId(); + buildFragment.setCohortId(cohortId); + + planRoots_.add(buildFragment); + LOG.info("new build fragment " + buildFragment.getId().toString()); + LOG.info("in cohort " + buildFragment.getCohortId().toString()); + LOG.info("for join node " + join.getId().toString()); + createBuildPlans(buildFragment, null); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/PlanFragment.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/PlanFragment.java b/fe/src/main/java/com/cloudera/impala/planner/PlanFragment.java index f702271..85a4d40 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/PlanFragment.java +++ b/fe/src/main/java/com/cloudera/impala/planner/PlanFragment.java @@ -29,26 +29,36 @@ import com.cloudera.impala.catalog.HdfsTable; import com.cloudera.impala.common.AnalysisException; import com.cloudera.impala.common.InternalException; import com.cloudera.impala.common.NotImplementedException; +import com.cloudera.impala.common.TreeNode; import com.cloudera.impala.planner.JoinNode.DistributionMode; import com.cloudera.impala.thrift.TExplainLevel; import com.cloudera.impala.thrift.TPartitionType; +import com.cloudera.impala.thrift.TPlan; import com.cloudera.impala.thrift.TPlanFragment; +import com.cloudera.impala.thrift.TPlanFragmentTree; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; import com.google.common.collect.Lists; /** - * A PlanFragment is part of a tree of such fragments that together make - * up a complete execution plan for a single query. Each plan fragment can have - * one or many instances, each of which in turn is executed by a single node and the - * output sent to a specific instance of the destination fragment (or, in the case - * of the root fragment, is materialized in some form). + * PlanFragments form a tree structure via their ExchangeNodes. A tree of fragments + * connected in that way forms a plan. The output of a plan is produced by the root + * fragment and is either the result of the query or an intermediate result + * needed by a different plan (such as a hash table). * - * The plan fragment encapsulates the specific tree of execution nodes that + * Plans are grouped into cohorts based on the consumer of their output: all + * plans that materialize intermediate results for a particular consumer plan + * are grouped into a single cohort. + * + * A PlanFragment encapsulates the specific tree of execution nodes that * are used to produce the output of the plan fragment, as well as output exprs, * destination node, etc. If there are no output exprs, the full row that is * is produced by the plan root is marked as materialized. * + * A plan fragment can have one or many instances, each of which in turn is executed by + * an individual node and the output sent to a specific instance of the destination + * fragment (or, in the case of the root fragment, is materialized in some form). + * * A hash-partitioned plan fragment is the result of one or more hash-partitioning data * streams being received by plan nodes in this fragment. In the future, a fragment's * data partition could also be hash partitioned based on a scan node that is reading @@ -59,11 +69,17 @@ import com.google.common.collect.Lists; * - assemble with getters, etc. * - finalize() * - toThrift() + * + * TODO: the tree of PlanNodes is connected across fragment boundaries, which makes + * it impossible search for things within a fragment (using TreeNode functions); + * fix that */ -public class PlanFragment { +public class PlanFragment extends TreeNode<PlanFragment> { private final static Logger LOG = LoggerFactory.getLogger(PlanFragment.class); private final PlanFragmentId fragmentId_; + private PlanId planId_; + private CohortId cohortId_; // root of plan tree executed by this fragment private PlanNode planRoot_; @@ -103,14 +119,26 @@ public class PlanFragment { * Does not traverse the children of ExchangeNodes because those must belong to a * different fragment. */ - private void setFragmentInPlanTree(PlanNode node) { + public void setFragmentInPlanTree(PlanNode node) { if (node == null) return; node.setFragment(this); - if (!(node instanceof ExchangeNode)) { - for (PlanNode child : node.getChildren()) { - setFragmentInPlanTree(child); - } - } + if (node instanceof ExchangeNode) return; + for (PlanNode child : node.getChildren()) setFragmentInPlanTree(child); + } + + /** + * Collect all PlanNodes that belong to the exec tree of this fragment. + */ + public void collectPlanNodes(List<PlanNode> nodes) { + Preconditions.checkNotNull(nodes); + collectPlanNodesHelper(planRoot_, nodes); + } + + private void collectPlanNodesHelper(PlanNode root, List<PlanNode> nodes) { + if (root == null) return; + nodes.add(root); + if (root instanceof ExchangeNode) return; + for (PlanNode child: root.getChildren()) collectPlanNodesHelper(child, nodes); } public void setOutputExprs(List<Expr> outputExprs) { @@ -216,31 +244,56 @@ public class PlanFragment { return result; } - public String getExplainString(TExplainLevel explainLevel) { + public TPlanFragmentTree treeToThrift() { + TPlanFragmentTree result = new TPlanFragmentTree(); + treeToThriftHelper(result); + return result; + } + + private void treeToThriftHelper(TPlanFragmentTree plan) { + plan.addToFragments(toThrift()); + for (PlanFragment child: children_) { + child.treeToThriftHelper(plan); + } + } + + public String getExplainString(TExplainLevel detailLevel) { + return getExplainString("", "", detailLevel); + } + + /** + * The root of the output tree will be prefixed by rootPrefix and the remaining plan + * output will be prefixed by prefix. + */ + protected final String getExplainString(String rootPrefix, String prefix, + TExplainLevel detailLevel) { StringBuilder str = new StringBuilder(); Preconditions.checkState(dataPartition_ != null); - String rootPrefix = ""; - String prefix = ""; - String detailPrefix = "| "; - if (explainLevel == TExplainLevel.VERBOSE) { + String detailPrefix = prefix + "| "; // sink detail + if (detailLevel == TExplainLevel.VERBOSE) { + // we're printing a new tree, start over with the indentation prefix = " "; rootPrefix = " "; detailPrefix = prefix + "| "; str.append(String.format("%s:PLAN FRAGMENT [%s]\n", fragmentId_.toString(), dataPartition_.getExplainString())); if (sink_ != null && sink_ instanceof DataStreamSink) { - str.append(sink_.getExplainString(prefix, detailPrefix, explainLevel) + "\n"); + str.append(sink_.getExplainString(rootPrefix, prefix, detailLevel) + "\n"); } } - // Always print table sinks. - if (sink_ != null && sink_ instanceof TableSink) { - str.append(sink_.getExplainString(prefix, detailPrefix, explainLevel)); - if (explainLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) { + + String planRootPrefix = rootPrefix; + // Always print sinks other than DataStreamSinks. + if (sink_ != null && !(sink_ instanceof DataStreamSink)) { + str.append(sink_.getExplainString(rootPrefix, detailPrefix, detailLevel)); + if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) { str.append(prefix + "|\n"); } + // we already used the root prefix for the sink + planRootPrefix = prefix; } if (planRoot_ != null) { - str.append(planRoot_.getExplainString(rootPrefix, prefix, explainLevel)); + str.append(planRoot_.getExplainString(planRootPrefix, prefix, detailLevel)); } return str.toString(); } @@ -251,6 +304,10 @@ public class PlanFragment { } public PlanFragmentId getId() { return fragmentId_; } + public PlanId getPlanId() { return planId_; } + public void setPlanId(PlanId id) { planId_ = id; } + public CohortId getCohortId() { return cohortId_; } + public void setCohortId(CohortId id) { cohortId_ = id; } public PlanFragment getDestFragment() { if (destNode_ == null) return null; return destNode_.getFragment(); @@ -270,7 +327,13 @@ public class PlanFragment { setFragmentInPlanTree(planRoot_); } - public void setDestination(ExchangeNode destNode) { destNode_ = destNode; } + public void setDestination(ExchangeNode destNode) { + destNode_ = destNode; + PlanFragment dest = getDestFragment(); + Preconditions.checkNotNull(dest); + dest.addChild(this); + } + public boolean hasSink() { return sink_ != null; } public DataSink getSink() { return sink_; } public void setSink(DataSink sink) { @@ -290,4 +353,33 @@ public class PlanFragment { planRoot_ = newRoot; planRoot_.setFragment(this); } + + /** + * Verify that the tree of PlanFragments and their contained tree of + * PlanNodes is constructed correctly. + */ + public void verifyTree() { + // PlanNode.fragment_ is set correctly + List<PlanNode> nodes = Lists.newArrayList(); + collectPlanNodes(nodes); + List<PlanNode> exchNodes = Lists.newArrayList(); + for (PlanNode node: nodes) { + if (node instanceof ExchangeNode) exchNodes.add(node); + Preconditions.checkState(node.getFragment() == this); + } + + // all ExchangeNodes have registered input fragments + Preconditions.checkState(exchNodes.size() == getChildren().size()); + List<PlanFragment> childFragments = Lists.newArrayList(); + for (PlanNode exchNode: exchNodes) { + PlanFragment childFragment = exchNode.getChild(0).getFragment(); + Preconditions.checkState(!childFragments.contains(childFragment)); + childFragments.add(childFragment); + Preconditions.checkState(childFragment.getDestNode() == exchNode); + } + // all registered children are accounted for + Preconditions.checkState(getChildren().containsAll(childFragments)); + + for (PlanFragment child: getChildren()) child.verifyTree(); + } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/PlanId.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/PlanId.java b/fe/src/main/java/com/cloudera/impala/planner/PlanId.java new file mode 100644 index 0000000..1c7213a --- /dev/null +++ b/fe/src/main/java/com/cloudera/impala/planner/PlanId.java @@ -0,0 +1,39 @@ +// Copyright 2016 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.cloudera.impala.planner; + +import com.cloudera.impala.common.Id; +import com.cloudera.impala.common.IdGenerator; + +public class PlanId extends Id<PlanId> { + // Construction only allowed via an IdGenerator. + protected PlanId(int id) { + super(id); + } + + public static IdGenerator<PlanId> createGenerator() { + return new IdGenerator<PlanId>() { + @Override + public PlanId getNextId() { return new PlanId(nextId_++); } + @Override + public PlanId getMaxId() { return new PlanId(nextId_ - 1); } + }; + } + + @Override + public String toString() { + return String.format("%02d", id_); + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java b/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java index c393b4f..895df78 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java +++ b/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java @@ -314,9 +314,17 @@ abstract public class PlanNode extends TreeNode<PlanNode> { String childHeadlinePrefix = prefix + "|--"; String childDetailPrefix = prefix + "| "; for (int i = children_.size() - 1; i >= 1; --i) { - expBuilder.append( - children_.get(i).getExplainString(childHeadlinePrefix, childDetailPrefix, - detailLevel)); + PlanNode child = getChild(i); + if (fragment_ != child.fragment_) { + // we're crossing a fragment boundary + expBuilder.append( + child.fragment_.getExplainString( + childHeadlinePrefix, childDetailPrefix, detailLevel)); + } else { + expBuilder.append( + child.getExplainString(childHeadlinePrefix, childDetailPrefix, + detailLevel)); + } if (printFiller) expBuilder.append(filler + "\n"); } expBuilder.append(children_.get(0).getExplainString(prefix, prefix, detailLevel)); @@ -327,7 +335,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> { /** * Return the node-specific details. * Subclass should override this function. - * Each line should be prefix by detailPrefix. + * Each line should be prefixed by detailPrefix. */ protected String getNodeExplainString(String rootPrefix, String detailPrefix, TExplainLevel detailLevel) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/Planner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/Planner.java b/fe/src/main/java/com/cloudera/impala/planner/Planner.java index cc6f923..f639c59 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/Planner.java +++ b/fe/src/main/java/com/cloudera/impala/planner/Planner.java @@ -96,6 +96,7 @@ public class Planner { } PlanFragment rootFragment = fragments.get(fragments.size() - 1); + rootFragment.verifyTree(); ExprSubstitutionMap rootNodeSmap = rootFragment.getPlanRoot().getOutputSmap(); List<Expr> resultExprs = null; if (ctx_.isInsertOrCtas()) { @@ -161,6 +162,19 @@ public class Planner { } /** + * Return a list of plans, each represented by the root of their fragment trees. + * TODO: roll into createPlan() + */ + public List<PlanFragment> createParallelPlans() throws ImpalaException { + ArrayList<PlanFragment> distrPlan = createPlan(); + Preconditions.checkNotNull(distrPlan); + ParallelPlanner planner = new ParallelPlanner(ctx_); + List<PlanFragment> parallelPlans = planner.createPlans(distrPlan.get(0)); + ctx_.getRootAnalyzer().getTimeline().markEvent("Parallel plans created"); + return parallelPlans; + } + + /** * Return combined explain string for all plan fragments. * Includes the estimated resource requirements from the request if set. */ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/service/Frontend.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/service/Frontend.java b/fe/src/main/java/com/cloudera/impala/service/Frontend.java index 4d21895..fc1f2fc 100644 --- a/fe/src/main/java/com/cloudera/impala/service/Frontend.java +++ b/fe/src/main/java/com/cloudera/impala/service/Frontend.java @@ -118,6 +118,7 @@ import com.cloudera.impala.thrift.TLoadDataReq; import com.cloudera.impala.thrift.TLoadDataResp; import com.cloudera.impala.thrift.TMetadataOpRequest; import com.cloudera.impala.thrift.TPlanFragment; +import com.cloudera.impala.thrift.TPlanFragmentTree; import com.cloudera.impala.thrift.TQueryCtx; import com.cloudera.impala.thrift.TQueryExecRequest; import com.cloudera.impala.thrift.TResetMetadataRequest; @@ -947,6 +948,23 @@ public class Frontend { // create plan LOG.debug("create plan"); Planner planner = new Planner(analysisResult, queryCtx); + if (RuntimeEnv.INSTANCE.isTestEnv() + && queryCtx.request.query_options.mt_num_cores != 1) { + // TODO: this is just to be able to run tests; implement this + List<PlanFragment> planRoots = planner.createParallelPlans(); + for (PlanFragment planRoot: planRoots) { + TPlanFragmentTree thriftPlan = planRoot.treeToThrift(); + queryExecRequest.addToMt_plans(thriftPlan); + } + queryExecRequest.setDesc_tbl(analysisResult.getAnalyzer().getDescTbl().toThrift()); + queryExecRequest.setQuery_ctx(queryCtx); + explainString.append(planner.getExplainString( + Lists.newArrayList(planRoots.get(0)), queryExecRequest, + TExplainLevel.STANDARD)); + queryExecRequest.setQuery_plan(explainString.toString()); + result.setQuery_exec_request(queryExecRequest); + return result; + } ArrayList<PlanFragment> fragments = planner.createPlan(); List<ScanNode> scanNodes = Lists.newArrayList(); @@ -954,11 +972,11 @@ public class Frontend { // queryExecRequest.dest_fragment_idx Map<PlanFragment, Integer> fragmentIdx = Maps.newHashMap(); - for (int fragmentId = 0; fragmentId < fragments.size(); ++fragmentId) { - PlanFragment fragment = fragments.get(fragmentId); + for (int idx = 0; idx < fragments.size(); ++idx) { + PlanFragment fragment = fragments.get(idx); Preconditions.checkNotNull(fragment.getPlanRoot()); fragment.getPlanRoot().collect(Predicates.instanceOf(ScanNode.class), scanNodes); - fragmentIdx.put(fragment, fragmentId); + fragmentIdx.put(fragment, idx); } // set fragment destinations http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/test/java/com/cloudera/impala/planner/PlannerTestBase.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/com/cloudera/impala/planner/PlannerTestBase.java b/fe/src/test/java/com/cloudera/impala/planner/PlannerTestBase.java index 7abd81a..ce8980e 100644 --- a/fe/src/test/java/com/cloudera/impala/planner/PlannerTestBase.java +++ b/fe/src/test/java/com/cloudera/impala/planner/PlannerTestBase.java @@ -375,10 +375,9 @@ public class PlannerTestBase { * locations to actualScanRangeLocations; compares both to the appropriate sections * of 'testCase'. */ - private void RunTestCase(TestCase testCase, StringBuilder errorLog, + private void runTestCase(TestCase testCase, StringBuilder errorLog, StringBuilder actualOutput, String dbName, TQueryOptions options) throws CatalogException { - if (options == null) { options = defaultQueryOptions(); } else { @@ -396,11 +395,13 @@ public class PlannerTestBase { queryCtx.request.query_options = options; // Test single node plan, scan range locations, and column lineage. TExecRequest singleNodeExecRequest = - testPlan(testCase, true, queryCtx, errorLog, actualOutput); + testPlan(testCase, Section.PLAN, queryCtx, errorLog, actualOutput); checkScanRangeLocations(testCase, singleNodeExecRequest, errorLog, actualOutput); checkColumnLineage(testCase, singleNodeExecRequest, errorLog, actualOutput); // Test distributed plan. - testPlan(testCase, false, queryCtx, errorLog, actualOutput); + testPlan(testCase, Section.DISTRIBUTEDPLAN, queryCtx, errorLog, actualOutput); + // test parallel plans + testPlan(testCase, Section.PARALLELPLANS, queryCtx, errorLog, actualOutput); } /** @@ -410,17 +411,20 @@ public class PlannerTestBase { * Returns the produced exec request or null if there was an error generating * the plan. */ - private TExecRequest testPlan(TestCase testCase, boolean singleNodePlan, + private TExecRequest testPlan(TestCase testCase, Section section, TQueryCtx queryCtx, StringBuilder errorLog, StringBuilder actualOutput) { - Section section = (singleNodePlan) ? Section.PLAN : Section.DISTRIBUTEDPLAN; String query = testCase.getQuery(); queryCtx.request.setStmt(query); if (section == Section.PLAN) { queryCtx.request.getQuery_options().setNum_nodes(1); } else { + // for distributed and parallel execution we want to run on all available nodes queryCtx.request.getQuery_options().setNum_nodes( ImpalaInternalServiceConstants.NUM_NODES_ALL); } + if (section == Section.PARALLELPLANS) { + queryCtx.request.query_options.mt_num_cores = 2; + } ArrayList<String> expectedPlan = testCase.getSectionContents(section); boolean sectionExists = expectedPlan != null && !expectedPlan.isEmpty(); String expectedErrorMsg = getExpectedErrorMessage(expectedPlan); @@ -614,7 +618,7 @@ public class PlannerTestBase { actualOutput.append(testCase.getSectionAsString(Section.QUERY, true, "\n")); actualOutput.append("\n"); try { - RunTestCase(testCase, errorLog, actualOutput, dbName, options); + runTestCase(testCase, errorLog, actualOutput, dbName, options); } catch (CatalogException e) { errorLog.append(String.format("Failed to plan query\n%s\n%s", testCase.getQuery(), e.getMessage())); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/test/java/com/cloudera/impala/testutil/TestFileParser.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/com/cloudera/impala/testutil/TestFileParser.java b/fe/src/test/java/com/cloudera/impala/testutil/TestFileParser.java index cac975a..eb1ca6d 100644 --- a/fe/src/test/java/com/cloudera/impala/testutil/TestFileParser.java +++ b/fe/src/test/java/com/cloudera/impala/testutil/TestFileParser.java @@ -61,6 +61,7 @@ public class TestFileParser { RESULTS, PLAN, DISTRIBUTEDPLAN, + PARALLELPLANS, FILEERRORS, PARTITIONS, SETUP, @@ -177,10 +178,14 @@ public class TestFileParser { * Returns false if the current test case is invalid due to missing sections or query */ public boolean isValid() { - return !getQuery().isEmpty() && (!getSectionContents(Section.PLAN).isEmpty() || - !getSectionContents(Section.DISTRIBUTEDPLAN).isEmpty() || - !getSectionContents(Section.LINEAGE).isEmpty()); + return !getQuery().isEmpty() + && (!getSectionContents(Section.PLAN).isEmpty() + || !getSectionContents(Section.DISTRIBUTEDPLAN).isEmpty() + || !getSectionContents(Section.PARALLELPLANS).isEmpty() + || !getSectionContents(Section.LINEAGE).isEmpty()); } + + public boolean isEmpty() { return expectedResultSections.isEmpty(); } } private final List<TestCase> testCases = Lists.newArrayList(); @@ -243,7 +248,7 @@ public class TestFileParser { ++lineNum; if (line.startsWith("====") && sectionCount > 0) { currentTestCase.addSection(currentSection, sectionContents); - if(!currentTestCase.isValid()) { + if (!currentTestCase.isValid()) { throw new IllegalStateException("Invalid test case" + " at line " + currentTestCase.startLineNum + " detected."); } @@ -288,7 +293,7 @@ public class TestFileParser { } } - if(!currentTestCase.isValid()) { + if (!currentTestCase.isEmpty() && !currentTestCase.isValid()) { throw new IllegalStateException("Invalid test case" + " at line " + currentTestCase.startLineNum + " detected."); } @@ -308,7 +313,8 @@ public class TestFileParser { open(table); testCases.clear(); while (scanner.hasNextLine()) { - testCases.add(parseOneTestCase()); + TestCase testCase = parseOneTestCase(); + if (!testCase.isEmpty()) testCases.add(testCase); } } finally { close();
