http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/PlanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java index 978e0ac..d5c72a0 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java @@ -65,6 +65,10 @@ import com.google.common.math.LongMath; abstract public class PlanNode extends TreeNode<PlanNode> { private final static Logger LOG = LoggerFactory.getLogger(PlanNode.class); + // The size of buffer used in spilling nodes. Used in computeResourceProfile(). + // TODO: IMPALA-3200: get from query option + protected final static long SPILLABLE_BUFFER_BYTES = 8L * 1024L * 1024L; + // String used for this node in getExplainString(). protected String displayName_; @@ -110,13 +114,13 @@ abstract public class PlanNode extends TreeNode<PlanNode> { // set in computeStats(); invalid: -1 protected int numNodes_; + // resource requirements and estimates for this plan node. + // set in computeResourceProfile(). + protected ResourceProfile resourceProfile_ = null; + // sum of tupleIds_' avgSerializedSizes; set in computeStats() protected float avgRowSize_; - // estimated per-host memory requirement for this node; - // set in computeCosts(); invalid: -1 - protected long perHostMemCost_ = -1; - // If true, disable codegen for this plan node. protected boolean disableCodegen_; @@ -187,9 +191,9 @@ abstract public class PlanNode extends TreeNode<PlanNode> { } public long getLimit() { return limit_; } public boolean hasLimit() { return limit_ > -1; } - public long getPerHostMemCost() { return perHostMemCost_; } public long getCardinality() { return cardinality_; } public int getNumNodes() { return numNodes_; } + public ResourceProfile getResourceProfile() { return resourceProfile_; } public float getAvgRowSize() { return avgRowSize_; } public void setFragment(PlanFragment fragment) { fragment_ = fragment; } public PlanFragment getFragment() { return fragment_; } @@ -235,8 +239,8 @@ abstract public class PlanNode extends TreeNode<PlanNode> { conjuncts_.clear(); } - public String getExplainString() { - return getExplainString("", "", TExplainLevel.VERBOSE); + public String getExplainString(TQueryOptions queryOptions) { + return getExplainString("", "", queryOptions, TExplainLevel.VERBOSE); } protected void setDisplayName(String s) { displayName_ = s; } @@ -269,7 +273,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> { * output will be prefixed by prefix. */ protected final String getExplainString(String rootPrefix, String prefix, - TExplainLevel detailLevel) { + TQueryOptions queryOptions, TExplainLevel detailLevel) { StringBuilder expBuilder = new StringBuilder(); String detailPrefix = prefix; String filler; @@ -302,11 +306,12 @@ abstract public class PlanNode extends TreeNode<PlanNode> { // Output cardinality, cost estimates and tuple Ids only when explain plan level // is extended or above. if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) { - // Print estimated output cardinality and memory cost. - expBuilder.append(PrintUtils.printHosts(detailPrefix, numNodes_)); - expBuilder.append(PrintUtils.printMemCost(" ", perHostMemCost_) + "\n"); + // Print resource profile. + expBuilder.append(detailPrefix); + expBuilder.append(resourceProfile_.getExplainString()); + expBuilder.append("\n"); - // Print tuple ids and row size. + // Print tuple ids, row size and cardinality. expBuilder.append(detailPrefix + "tuple-ids="); for (int i = 0; i < tupleIds_.size(); ++i) { TupleId tupleId = tupleIds_.get(i); @@ -331,15 +336,23 @@ abstract public class PlanNode extends TreeNode<PlanNode> { // we're crossing a fragment boundary expBuilder.append( child.fragment_.getExplainString( - childHeadlinePrefix, childDetailPrefix, detailLevel)); + childHeadlinePrefix, childDetailPrefix, queryOptions, detailLevel)); } else { - expBuilder.append( - child.getExplainString(childHeadlinePrefix, childDetailPrefix, - detailLevel)); + expBuilder.append(child.getExplainString(childHeadlinePrefix, + childDetailPrefix, queryOptions, detailLevel)); } if (printFiller) expBuilder.append(filler + "\n"); } - expBuilder.append(children_.get(0).getExplainString(prefix, prefix, detailLevel)); + PlanFragment childFragment = children_.get(0).fragment_; + if (fragment_ != childFragment && detailLevel == TExplainLevel.EXTENDED) { + // we're crossing a fragment boundary - print the fragment header. + expBuilder.append(prefix); + expBuilder.append( + childFragment.getFragmentHeaderString(queryOptions.getMt_dop())); + expBuilder.append("\n"); + } + expBuilder.append( + children_.get(0).getExplainString(prefix, prefix, queryOptions, detailLevel)); } return expBuilder.toString(); } @@ -379,7 +392,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> { TExecStats estimatedStats = new TExecStats(); estimatedStats.setCardinality(cardinality_); - estimatedStats.setMemory_used(perHostMemCost_); + estimatedStats.setMemory_used(resourceProfile_.getMemEstimateBytes()); msg.setLabel(getDisplayLabel()); msg.setLabel_detail(getDisplayLabelDetail()); msg.setEstimated_stats(estimatedStats); @@ -605,13 +618,12 @@ abstract public class PlanNode extends TreeNode<PlanNode> { public boolean isBlockingNode() { return false; } /** - * Estimates the cost of executing this PlanNode. Currently only sets perHostMemCost_. - * May only be called after this PlanNode has been placed in a PlanFragment because - * the cost computation is dependent on the enclosing fragment's data partition. + * Compute resources consumed when executing this PlanNode, initializing + * 'resource_profile_'. May only be called after this PlanNode has been placed in a + * PlanFragment because the cost computation is dependent on the enclosing fragment's + * data partition. */ - public void computeCosts(TQueryOptions queryOptions) { - perHostMemCost_ = 0; - } + public abstract void computeResourceProfile(TQueryOptions queryOptions); /** * The input cardinality is the sum of output cardinalities of its children.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java index a199f54..fba9149 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java +++ b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java @@ -20,6 +20,7 @@ package org.apache.impala.planner; import org.apache.impala.thrift.TDataSink; import org.apache.impala.thrift.TDataSinkType; import org.apache.impala.thrift.TExplainLevel; +import org.apache.impala.thrift.TQueryOptions; /** * Sink for the root of a query plan that produces result rows. Allows coordination @@ -28,9 +29,15 @@ import org.apache.impala.thrift.TExplainLevel; */ public class PlanRootSink extends DataSink { - public String getExplainString(String prefix, String detailPrefix, - TExplainLevel explainLevel) { - return String.format("%sPLAN-ROOT SINK\n", prefix); + public void appendSinkExplainString(String prefix, String detailPrefix, + TQueryOptions queryOptions, TExplainLevel explainLevel, StringBuilder output) { + output.append(String.format("%sPLAN-ROOT SINK\n", prefix)); + } + + @Override + public void computeResourceProfile(TQueryOptions queryOptions) { + // TODO: add a memory estimate + resourceProfile_ = new ResourceProfile(0, 0); } protected TDataSink toThrift() { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/Planner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java index 8842c9c..ad5bba5 100644 --- a/fe/src/main/java/org/apache/impala/planner/Planner.java +++ b/fe/src/main/java/org/apache/impala/planner/Planner.java @@ -40,6 +40,7 @@ import org.apache.impala.service.BackendConfig; import org.apache.impala.thrift.TExplainLevel; import org.apache.impala.thrift.TQueryCtx; import org.apache.impala.thrift.TQueryExecRequest; +import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.thrift.TRuntimeFilterMode; import org.apache.impala.thrift.TTableName; import org.apache.impala.util.MaxRowsProcessedVisitor; @@ -266,11 +267,14 @@ public class Planner { TQueryExecRequest request, TExplainLevel explainLevel) { StringBuilder str = new StringBuilder(); boolean hasHeader = false; - if (request.isSetPer_host_mem_req() && request.isSetPer_host_vcores()) { - str.append( - String.format("Estimated Per-Host Requirements: Memory=%s VCores=%s\n", - PrintUtils.printBytes(request.getPer_host_mem_req()), - request.per_host_vcores)); + if (request.isSetPer_host_min_reservation()) { + str.append(String.format("Per-Host Resource Reservation: Memory=%s\n", + PrintUtils.printBytes(request.getPer_host_min_reservation()))) ; + hasHeader = true; + } + if (request.isSetPer_host_mem_estimate()) { + str.append(String.format("Per-Host Resource Estimates: Memory=%s\n", + PrintUtils.printBytes(request.getPer_host_mem_estimate()))); hasHeader = true; } @@ -324,12 +328,12 @@ public class Planner { if (explainLevel.ordinal() < TExplainLevel.VERBOSE.ordinal()) { // Print the non-fragmented parallel plan. - str.append(fragments.get(0).getExplainString(explainLevel)); + str.append(fragments.get(0).getExplainString(ctx_.getQueryOptions(), explainLevel)); } else { // Print the fragmented parallel plan. for (int i = 0; i < fragments.size(); ++i) { PlanFragment fragment = fragments.get(i); - str.append(fragment.getExplainString(explainLevel)); + str.append(fragment.getExplainString(ctx_.getQueryOptions(), explainLevel)); if (i < fragments.size() - 1) str.append("\n"); } } @@ -337,91 +341,46 @@ public class Planner { } /** - * Returns true if the fragments are for a trivial, coordinator-only query: - * Case 1: Only an EmptySetNode, e.g. query has a limit 0. - * Case 2: Query has only constant exprs. - */ - private static boolean isTrivialCoordOnlyPlan(List<PlanFragment> fragments) { - Preconditions.checkNotNull(fragments); - Preconditions.checkState(!fragments.isEmpty()); - if (fragments.size() > 1) return false; - PlanNode root = fragments.get(0).getPlanRoot(); - if (root instanceof EmptySetNode) return true; - if (root instanceof UnionNode && ((UnionNode) root).isConstantUnion()) return true; - return false; - } - - /** - * Estimates the per-host memory and CPU requirements for the given plan fragments, - * and sets the results in request. - * Optionally excludes the requirements for unpartitioned fragments. + * Estimates the per-host resource requirements for the given plans, and sets the + * results in request. * TODO: The LOG.warn() messages should eventually become Preconditions checks * once resource estimation is more robust. - * TODO: Revisit and possibly remove during MT work, particularly references to vcores. */ - public void computeResourceReqs(List<PlanFragment> fragments, - boolean excludeUnpartitionedFragments, + public void computeResourceReqs(List<PlanFragment> planRoots, TQueryExecRequest request) { - Preconditions.checkState(!fragments.isEmpty()); + Preconditions.checkState(!planRoots.isEmpty()); Preconditions.checkNotNull(request); - // Compute pipelined plan node sets. - ArrayList<PipelinedPlanNodeSet> planNodeSets = - PipelinedPlanNodeSet.computePlanNodeSets(fragments.get(0).getPlanRoot()); - - // Compute the max of the per-host mem and vcores requirement. - // Note that the max mem and vcores may come from different plan node sets. - long maxPerHostMem = Long.MIN_VALUE; - int maxPerHostVcores = Integer.MIN_VALUE; - for (PipelinedPlanNodeSet planNodeSet: planNodeSets) { - if (!planNodeSet.computeResourceEstimates( - excludeUnpartitionedFragments, ctx_.getQueryOptions())) { - continue; - } - long perHostMem = planNodeSet.getPerHostMem(); - int perHostVcores = planNodeSet.getPerHostVcores(); - if (perHostMem > maxPerHostMem) maxPerHostMem = perHostMem; - if (perHostVcores > maxPerHostVcores) maxPerHostVcores = perHostVcores; - } - - // Do not ask for more cores than are in the RuntimeEnv. - maxPerHostVcores = Math.min(maxPerHostVcores, RuntimeEnv.INSTANCE.getNumCores()); - - // Special case for some trivial coordinator-only queries (IMPALA-3053, IMPALA-1092). - if (isTrivialCoordOnlyPlan(fragments)) { - maxPerHostMem = 1024; - maxPerHostVcores = 1; - } - - // Set costs to zero if there are only unpartitioned fragments and - // excludeUnpartitionedFragments is true. - // TODO: handle this case with a better indication for unknown, e.g. -1 or not set. - if (maxPerHostMem == Long.MIN_VALUE || maxPerHostVcores == Integer.MIN_VALUE) { - boolean allUnpartitioned = true; - for (PlanFragment fragment: fragments) { - if (fragment.isPartitioned()) { - allUnpartitioned = false; - break; - } + // Compute the sum over all plans. + // TODO: Revisit during MT work - scheduling of fragments will change and computing + // the sum may not be correct or optimal. + ResourceProfile totalResources = ResourceProfile.invalid(); + for (PlanFragment planRoot: planRoots) { + ResourceProfile planMaxResources = ResourceProfile.invalid(); + ArrayList<PlanFragment> fragments = planRoot.getNodesPreOrder(); + // Compute pipelined plan node sets. + ArrayList<PipelinedPlanNodeSet> planNodeSets = + PipelinedPlanNodeSet.computePlanNodeSets(fragments.get(0).getPlanRoot()); + + // Compute the max of the per-host resources requirement. + // Note that the different maxes may come from different plan node sets. + for (PipelinedPlanNodeSet planNodeSet : planNodeSets) { + TQueryOptions queryOptions = ctx_.getQueryOptions(); + ResourceProfile perHostResources = + planNodeSet.computePerHostResources(queryOptions); + if (!perHostResources.isValid()) continue; + planMaxResources = ResourceProfile.max(planMaxResources, perHostResources); } - if (allUnpartitioned && excludeUnpartitionedFragments) { - maxPerHostMem = 0; - maxPerHostVcores = 0; - } - } - - if (maxPerHostMem < 0 || maxPerHostMem == Long.MIN_VALUE) { - LOG.warn("Invalid per-host memory requirement: " + maxPerHostMem); - } - if (maxPerHostVcores < 0 || maxPerHostVcores == Integer.MIN_VALUE) { - LOG.warn("Invalid per-host virtual cores requirement: " + maxPerHostVcores); + totalResources = ResourceProfile.sum(totalResources, planMaxResources); } - request.setPer_host_mem_req(maxPerHostMem); - request.setPer_host_vcores((short) maxPerHostVcores); + Preconditions.checkState(totalResources.getMemEstimateBytes() >= 0); + Preconditions.checkState(totalResources.getMinReservationBytes() >= 0); + request.setPer_host_mem_estimate(totalResources.getMemEstimateBytes()); + request.setPer_host_min_reservation(totalResources.getMinReservationBytes()); if (LOG.isTraceEnabled()) { - LOG.trace("Estimated per-host peak memory requirement: " + maxPerHostMem); - LOG.trace("Estimated per-host virtual cores requirement: " + maxPerHostVcores); + LOG.trace("Per-host min buffer : " + totalResources.getMinReservationBytes()); + LOG.trace("Estimated per-host memory: " + totalResources.getMemEstimateBytes()); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java b/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java new file mode 100644 index 0000000..c0dc607 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.planner; + +import org.apache.impala.common.PrintUtils; + +/** + * The resources that will be consumed by a set of plan nodes. + */ +public class ResourceProfile { + // If the computed values are valid. + private final boolean isValid_; + + // Estimated memory consumption in bytes. + // TODO: IMPALA-5013: currently we are inconsistent about how these estimates are + // derived or what they mean. Re-evaluate what they mean and either deprecate or + // fix them. + private final long memEstimateBytes_; + + // Minimum buffer reservation required to execute in bytes. + private final long minReservationBytes_; + + private ResourceProfile(boolean isValid, long memEstimateBytes, long minReservationBytes) { + isValid_ = isValid; + memEstimateBytes_ = memEstimateBytes; + minReservationBytes_ = minReservationBytes; + } + + public ResourceProfile(long memEstimateBytes, long minReservationBytes) { + this(true, memEstimateBytes, minReservationBytes); + } + + public static ResourceProfile invalid() { + return new ResourceProfile(false, -1, -1); + } + + public boolean isValid() { return isValid_; } + public long getMemEstimateBytes() { return memEstimateBytes_; } + public long getMinReservationBytes() { return minReservationBytes_; } + + // Return a string with the resource profile information suitable for display in an + // explain plan in a format like: "resource1=value resource2=value" + public String getExplainString() { + StringBuilder output = new StringBuilder(); + output.append("mem-estimate="); + output.append(isValid_ ? PrintUtils.printBytes(memEstimateBytes_) : "invalid"); + output.append(" mem-reservation="); + output.append(isValid_ ? PrintUtils.printBytes(minReservationBytes_) : "invalid"); + return output.toString(); + } + + // Returns a profile with the max of each value in 'p1' and 'p2'. + public static ResourceProfile max(ResourceProfile p1, ResourceProfile p2) { + if (!p1.isValid()) return p2; + if (!p2.isValid()) return p1; + return new ResourceProfile( + Math.max(p1.getMemEstimateBytes(), p2.getMemEstimateBytes()), + Math.max(p1.getMinReservationBytes(), p2.getMinReservationBytes())); + } + + // Returns a profile with the sum of each value in 'p1' and 'p2'. + public static ResourceProfile sum(ResourceProfile p1, ResourceProfile p2) { + if (!p1.isValid()) return p2; + if (!p2.isValid()) return p1; + return new ResourceProfile(p1.getMemEstimateBytes() + p2.getMemEstimateBytes(), + p1.getMinReservationBytes() + p2.getMinReservationBytes()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/SelectNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/SelectNode.java b/fe/src/main/java/org/apache/impala/planner/SelectNode.java index e09d572..c346df9 100644 --- a/fe/src/main/java/org/apache/impala/planner/SelectNode.java +++ b/fe/src/main/java/org/apache/impala/planner/SelectNode.java @@ -27,6 +27,8 @@ import org.apache.impala.analysis.Expr; import org.apache.impala.thrift.TExplainLevel; import org.apache.impala.thrift.TPlanNode; import org.apache.impala.thrift.TPlanNodeType; +import org.apache.impala.thrift.TQueryOptions; + import com.google.common.base.Preconditions; /** @@ -80,6 +82,12 @@ public class SelectNode extends PlanNode { } @Override + public void computeResourceProfile(TQueryOptions queryOptions) { + // TODO: add an estimate + resourceProfile_ = new ResourceProfile(0, 0); + } + + @Override protected String getNodeExplainString(String prefix, String detailPrefix, TExplainLevel detailLevel) { StringBuilder output = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java b/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java index 5b66d18..82a1c41 100644 --- a/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java +++ b/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java @@ -22,6 +22,8 @@ import org.apache.impala.common.ImpalaException; import org.apache.impala.thrift.TExplainLevel; import org.apache.impala.thrift.TPlanNode; import org.apache.impala.thrift.TPlanNodeType; +import org.apache.impala.thrift.TQueryOptions; + import com.google.common.base.Preconditions; /** @@ -64,6 +66,12 @@ public class SingularRowSrcNode extends PlanNode { } @Override + public void computeResourceProfile(TQueryOptions queryOptions) { + // TODO: add an estimate + resourceProfile_ = new ResourceProfile(0, 0); + } + + @Override protected String getNodeExplainString(String prefix, String detailPrefix, TExplainLevel detailLevel) { StringBuilder output = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/SortNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/SortNode.java b/fe/src/main/java/org/apache/impala/planner/SortNode.java index 0533b22..ef05499 100644 --- a/fe/src/main/java/org/apache/impala/planner/SortNode.java +++ b/fe/src/main/java/org/apache/impala/planner/SortNode.java @@ -205,10 +205,12 @@ public class SortNode extends PlanNode { } @Override - public void computeCosts(TQueryOptions queryOptions) { + public void computeResourceProfile(TQueryOptions queryOptions) { Preconditions.checkState(hasValidStats()); if (useTopN_) { - perHostMemCost_ = (long) Math.ceil((cardinality_ + offset_) * avgRowSize_); + long perInstanceMemEstimate = + (long) Math.ceil((cardinality_ + offset_) * avgRowSize_); + resourceProfile_ = new ResourceProfile(perInstanceMemEstimate, 0); return; } @@ -233,7 +235,15 @@ public class SortNode extends PlanNode { // doubles the block size when there are var-len columns present. if (hasVarLenSlots) blockSize *= 2; double numInputBlocks = Math.ceil(fullInputSize / blockSize); - perHostMemCost_ = blockSize * (long) Math.ceil(Math.sqrt(numInputBlocks)); + long perInstanceMemEstimate = blockSize * (long) Math.ceil(Math.sqrt(numInputBlocks)); + + // Must be kept in sync with min_buffers_required in Sorter in be. + long perInstanceMinReservation = 3 * SPILLABLE_BUFFER_BYTES; + if (info_.getSortTupleDescriptor().hasVarLenSlots()) { + perInstanceMinReservation *= 2; + } + resourceProfile_ = + new ResourceProfile(perInstanceMemEstimate, perInstanceMinReservation); } private static String getDisplayName(boolean isTopN, boolean isMergeOnly) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/SubplanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/SubplanNode.java b/fe/src/main/java/org/apache/impala/planner/SubplanNode.java index 6143255..cbe7087 100644 --- a/fe/src/main/java/org/apache/impala/planner/SubplanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/SubplanNode.java @@ -22,6 +22,8 @@ import org.apache.impala.common.InternalException; import org.apache.impala.thrift.TExplainLevel; import org.apache.impala.thrift.TPlanNode; import org.apache.impala.thrift.TPlanNodeType; +import org.apache.impala.thrift.TQueryOptions; + import com.google.common.base.Preconditions; /** @@ -91,6 +93,12 @@ public class SubplanNode extends PlanNode { } @Override + public void computeResourceProfile(TQueryOptions queryOptions) { + // TODO: add an estimate + resourceProfile_ = new ResourceProfile(0, 0); + } + + @Override protected String getNodeExplainString(String prefix, String detailPrefix, TExplainLevel detailLevel) { StringBuilder output = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/UnionNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/UnionNode.java b/fe/src/main/java/org/apache/impala/planner/UnionNode.java index d724c59..6b8d331 100644 --- a/fe/src/main/java/org/apache/impala/planner/UnionNode.java +++ b/fe/src/main/java/org/apache/impala/planner/UnionNode.java @@ -30,6 +30,7 @@ import org.apache.impala.thrift.TExplainLevel; import org.apache.impala.thrift.TExpr; import org.apache.impala.thrift.TPlanNode; import org.apache.impala.thrift.TPlanNodeType; +import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.thrift.TUnionNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -127,6 +128,12 @@ public class UnionNode extends PlanNode { } } + @Override + public void computeResourceProfile(TQueryOptions queryOptions) { + // TODO: add an estimate + resourceProfile_ = new ResourceProfile(0, 0); + } + /** * Returns true if rows from the child with 'childTupleIds' and 'childResultExprs' can * be returned directly by the union node (without materialization into a new tuple). http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/UnnestNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/UnnestNode.java b/fe/src/main/java/org/apache/impala/planner/UnnestNode.java index 35abc55..5847e62 100644 --- a/fe/src/main/java/org/apache/impala/planner/UnnestNode.java +++ b/fe/src/main/java/org/apache/impala/planner/UnnestNode.java @@ -24,6 +24,7 @@ import org.apache.impala.common.ImpalaException; import org.apache.impala.thrift.TExplainLevel; import org.apache.impala.thrift.TPlanNode; import org.apache.impala.thrift.TPlanNodeType; +import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.thrift.TUnnestNode; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -73,6 +74,12 @@ public class UnnestNode extends PlanNode { } @Override + public void computeResourceProfile(TQueryOptions queryOptions) { + // TODO: add an estimate + resourceProfile_ = new ResourceProfile(0, 0); + } + + @Override protected String getNodeExplainString(String prefix, String detailPrefix, TExplainLevel detailLevel) { StringBuilder output = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/service/Frontend.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index d3cefa6..1348129 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -962,6 +962,9 @@ public class Frontend { } } + // Clear pre-existing lists to avoid adding duplicate entries in FE tests. + queryCtx.unsetTables_missing_stats(); + queryCtx.unsetTables_with_corrupt_stats(); for (TTableName tableName: tablesMissingStats) { queryCtx.addToTables_missing_stats(tableName); } @@ -972,16 +975,6 @@ public class Frontend { queryCtx.addToTables_missing_diskids(tableName); } - // Compute resource requirements after scan range locations because the cost - // estimates of scan nodes rely on them. - try { - planner.computeResourceReqs(fragments, true, queryExecRequest); - } catch (Exception e) { - // Turn exceptions into a warning to allow the query to execute. - LOG.error("Failed to compute resource requirements for query\n" + - queryCtx.client_request.getStmt(), e); - } - // The fragment at this point has all state set, serialize it to thrift. for (PlanFragment fragment: fragments) { TPlanFragment thriftFragment = fragment.toThrift(); @@ -1020,6 +1013,10 @@ public class Frontend { createPlanExecInfo(planRoot, planner, queryCtx, result)); } + // Compute resource requirements after scan range locations because the cost + // estimates of scan nodes rely on them. + planner.computeResourceReqs(planRoots, result); + // Optionally disable spilling in the backend. Allow spilling if there are plan hints // or if all tables have stats. boolean disableSpilling = http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/test/java/org/apache/impala/planner/PlannerTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java index 5148f68..363c59c 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java @@ -369,4 +369,14 @@ public class PlannerTest extends PlannerTestBase { // Check that the effective MT_DOP is as expected. Assert.assertEquals(actualMtDop, expectedMtDop); } + + @Test + public void testResourceRequirements() { + // Tests the resource requirement computation from the planner. + TQueryOptions options = defaultQueryOptions(); + options.setExplain_level(TExplainLevel.EXTENDED); + options.setNum_scanner_threads(1); // Required so that output doesn't vary by machine + runPlannerTestFile("resource-requirements", options, false); + } + } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java index d354897..eceaeca 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java @@ -390,7 +390,8 @@ public class PlannerTestBase extends FrontendTestBase { * of 'testCase'. */ private void runTestCase(TestCase testCase, StringBuilder errorLog, - StringBuilder actualOutput, String dbName, TQueryOptions options) + StringBuilder actualOutput, String dbName, TQueryOptions options, + boolean ignoreExplainHeader) throws CatalogException { if (options == null) { options = defaultQueryOptions(); @@ -408,16 +409,18 @@ public class PlannerTestBase extends FrontendTestBase { dbName, System.getProperty("user.name")); queryCtx.client_request.query_options = options; // Test single node plan, scan range locations, and column lineage. - TExecRequest singleNodeExecRequest = - testPlan(testCase, Section.PLAN, queryCtx, errorLog, actualOutput); + TExecRequest singleNodeExecRequest = testPlan(testCase, Section.PLAN, queryCtx, + ignoreExplainHeader, errorLog, actualOutput); validateTableIds(singleNodeExecRequest); checkScanRangeLocations(testCase, singleNodeExecRequest, errorLog, actualOutput); checkColumnLineage(testCase, singleNodeExecRequest, errorLog, actualOutput); checkLimitCardinality(query, singleNodeExecRequest, errorLog); // Test distributed plan. - testPlan(testCase, Section.DISTRIBUTEDPLAN, queryCtx, errorLog, actualOutput); + testPlan(testCase, Section.DISTRIBUTEDPLAN, queryCtx, ignoreExplainHeader, errorLog, + actualOutput); // test parallel plans - testPlan(testCase, Section.PARALLELPLANS, queryCtx, errorLog, actualOutput); + testPlan(testCase, Section.PARALLELPLANS, queryCtx, ignoreExplainHeader, errorLog, + actualOutput); } /** @@ -471,19 +474,26 @@ public class PlannerTestBase extends FrontendTestBase { * * Returns the produced exec request or null if there was an error generating * the plan. + * + * If ignoreExplainHeader is true, the explain header with warnings and resource + * estimates is stripped out. */ private TExecRequest testPlan(TestCase testCase, Section section, - TQueryCtx queryCtx, StringBuilder errorLog, StringBuilder actualOutput) { + TQueryCtx queryCtx, boolean ignoreExplainHeader, + StringBuilder errorLog, StringBuilder actualOutput) { String query = testCase.getQuery(); queryCtx.client_request.setStmt(query); + TQueryOptions queryOptions = queryCtx.client_request.getQuery_options(); if (section == Section.PLAN) { - queryCtx.client_request.getQuery_options().setNum_nodes(1); + queryOptions.setNum_nodes(1); } else { // for distributed and parallel execution we want to run on all available nodes - queryCtx.client_request.getQuery_options().setNum_nodes( + queryOptions.setNum_nodes( ImpalaInternalServiceConstants.NUM_NODES_ALL); } - if (section == Section.PARALLELPLANS) { + if (section == Section.PARALLELPLANS + && (!queryOptions.isSetMt_dop() || queryOptions.getMt_dop() == 0)) { + // Set mt_dop to force production of parallel plans. queryCtx.client_request.query_options.setMt_dop(2); } ArrayList<String> expectedPlan = testCase.getSectionContents(section); @@ -507,7 +517,8 @@ public class PlannerTestBase extends FrontendTestBase { // Failed to produce an exec request. if (execRequest == null) return null; - String explainStr = removeExplainHeader(explainBuilder.toString()); + String explainStr = explainBuilder.toString(); + if (ignoreExplainHeader) explainStr = removeExplainHeader(explainStr); actualOutput.append(explainStr); LOG.info(section.toString() + ":" + explainStr); if (expectedErrorMsg != null) { @@ -702,10 +713,16 @@ public class PlannerTestBase extends FrontendTestBase { } protected void runPlannerTestFile(String testFile, TQueryOptions options) { - runPlannerTestFile(testFile, "default", options); + runPlannerTestFile(testFile, options, true); + } + + protected void runPlannerTestFile(String testFile, TQueryOptions options, + boolean ignoreExplainHeader) { + runPlannerTestFile(testFile, "default", options, ignoreExplainHeader); } - private void runPlannerTestFile(String testFile, String dbName, TQueryOptions options) { + private void runPlannerTestFile(String testFile, String dbName, TQueryOptions options, + boolean ignoreExplainHeader) { String fileName = testDir_ + "/" + testFile + ".test"; TestFileParser queryFileParser = new TestFileParser(fileName); StringBuilder actualOutput = new StringBuilder(); @@ -716,7 +733,8 @@ public class PlannerTestBase extends FrontendTestBase { actualOutput.append(testCase.getSectionAsString(Section.QUERY, true, "\n")); actualOutput.append("\n"); try { - runTestCase(testCase, errorLog, actualOutput, dbName, options); + runTestCase(testCase, errorLog, actualOutput, dbName, options, + ignoreExplainHeader); } catch (CatalogException e) { errorLog.append(String.format("Failed to plan query\n%s\n%s", testCase.getQuery(), e.getMessage())); @@ -743,10 +761,10 @@ public class PlannerTestBase extends FrontendTestBase { } protected void runPlannerTestFile(String testFile) { - runPlannerTestFile(testFile, "default", null); + runPlannerTestFile(testFile, "default", null, true); } protected void runPlannerTestFile(String testFile, String dbName) { - runPlannerTestFile(testFile, dbName, null); + runPlannerTestFile(testFile, dbName, null, true); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test index 3a9855d..7effd9b 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test @@ -4,42 +4,44 @@ from tpch_nested_parquet.customer c, c.c_orders o, o.o_lineitems where 5 + 5 < c_custkey and o_orderkey = (2 + 2) and (coalesce(2, 3, 4) * 10) + l_linenumber < (0 * 1) ---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B | 01:SUBPLAN -| hosts=3 per-host-mem=unavailable +| mem-estimate=0B mem-reservation=0B | tuple-ids=2,1,0 row-size=52B cardinality=1500000 | |--08:NESTED LOOP JOIN [CROSS JOIN] -| | hosts=3 per-host-mem=unavailable +| | mem-estimate=24B mem-reservation=0B | | tuple-ids=2,1,0 row-size=52B cardinality=100 | | | |--02:SINGULAR ROW SRC | | parent-subplan=01 -| | hosts=3 per-host-mem=unavailable +| | mem-estimate=0B mem-reservation=0B | | tuple-ids=0 row-size=24B cardinality=1 | | | 04:SUBPLAN -| | hosts=3 per-host-mem=unavailable +| | mem-estimate=0B mem-reservation=0B | | tuple-ids=2,1 row-size=28B cardinality=100 | | | |--07:NESTED LOOP JOIN [CROSS JOIN] -| | | hosts=3 per-host-mem=unavailable +| | | mem-estimate=24B mem-reservation=0B | | | tuple-ids=2,1 row-size=28B cardinality=10 | | | | | |--05:SINGULAR ROW SRC | | | parent-subplan=04 -| | | hosts=3 per-host-mem=unavailable +| | | mem-estimate=0B mem-reservation=0B | | | tuple-ids=1 row-size=24B cardinality=1 | | | | | 06:UNNEST [o.o_lineitems] | | parent-subplan=04 -| | hosts=3 per-host-mem=unavailable +| | mem-estimate=0B mem-reservation=0B | | tuple-ids=2 row-size=0B cardinality=10 | | | 03:UNNEST [c.c_orders o] | parent-subplan=01 -| hosts=3 per-host-mem=unavailable +| mem-estimate=0B mem-reservation=0B | tuple-ids=1 row-size=0B cardinality=10 | 00:SCAN HDFS [tpch_nested_parquet.customer c] @@ -51,7 +53,7 @@ PLAN-ROOT SINK columns missing stats: c_orders parquet statistics predicates: c_custkey > 10 parquet dictionary predicates: c_custkey > 10 - hosts=3 per-host-mem=unavailable + mem-estimate=176.00MB mem-reservation=0B tuple-ids=0 row-size=24B cardinality=15000 ==== # Test HBase scan node. @@ -59,7 +61,9 @@ select * from functional_hbase.stringids where string_col = cast(4 as string) and 2 + 3 = tinyint_col and id between concat('1', '0') and upper('20') ---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B | 00:SCAN HBASE [functional_hbase.stringids] start key: 10 @@ -68,19 +72,21 @@ PLAN-ROOT SINK predicates: tinyint_col = 5, string_col = '4' table stats: 10000 rows total column stats: all - hosts=100 per-host-mem=unavailable + mem-estimate=1.00GB mem-reservation=0B tuple-ids=0 row-size=119B cardinality=1 ==== # Test datasource scan node. select * from functional.alltypes_datasource where tinyint_col < (pow(2, 8)) and float_col != 0 and 1 + 1 > int_col ---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B | 00:SCAN DATA SOURCE [functional.alltypes_datasource] data source predicates: tinyint_col < 256, int_col < 2 predicates: float_col != 0 - hosts=1 per-host-mem=unavailable + mem-estimate=1.00GB mem-reservation=0B tuple-ids=0 row-size=116B cardinality=500 ==== # Test aggregation. @@ -91,20 +97,22 @@ having 1024 * 1024 * count(*) % 2 = 0 and (sm > 1 or sm > 1) and (sm between 5 and 10) ---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B | 01:AGGREGATE [FINALIZE] | output: sum(2 + id), count(*) | group by: timestamp_col = TIMESTAMP '2016-11-15 00:00:00' | having: sum(2 + id) <= 10, sum(2 + id) > 1, sum(2 + id) >= 5, 1048576 * count(*) % 2 = 0 -| hosts=3 per-host-mem=unavailable +| mem-estimate=10.00MB mem-reservation=264.00MB | tuple-ids=1 row-size=17B cardinality=0 | 00:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB table stats: 7300 rows total column stats: all - hosts=3 per-host-mem=unavailable + mem-estimate=128.00MB mem-reservation=0B tuple-ids=0 row-size=20B cardinality=7300 ==== # Test hash join. @@ -114,13 +122,15 @@ left outer join functional.alltypes b a.int_col between 0 + 0 + 0 + b.bigint_col and b.bigint_col + ascii('a')) where round(1.11 + 2.22 + 3.33 + 4.44, 1) < cast(b.double_col as decimal(3, 2)) ---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B | 02:HASH JOIN [LEFT OUTER JOIN] | hash predicates: 2 + a.id = b.id - 2 | other join predicates: a.int_col <= b.bigint_col + 97, a.int_col >= 0 + b.bigint_col | other predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1 -| hosts=3 per-host-mem=unavailable +| mem-estimate=15.68KB mem-reservation=136.00MB | tuple-ids=0,1N row-size=28B cardinality=7300 | |--01:SCAN HDFS [functional.alltypes b] @@ -129,14 +139,14 @@ PLAN-ROOT SINK | table stats: 7300 rows total | column stats: all | parquet dictionary predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1 -| hosts=3 per-host-mem=unavailable +| mem-estimate=128.00MB mem-reservation=0B | tuple-ids=1 row-size=20B cardinality=730 | 00:SCAN HDFS [functional.alltypes a] partitions=24/24 files=24 size=478.45KB table stats: 7300 rows total column stats: all - hosts=3 per-host-mem=unavailable + mem-estimate=128.00MB mem-reservation=0B tuple-ids=0 row-size=8B cardinality=7300 ==== # Test nested-loop join. Same as above but and with a disjunction in the On clause. @@ -147,12 +157,14 @@ left outer join functional.alltypes b a.int_col between 0 + 0 + 0 + b.bigint_col and b.bigint_col + ascii('a')) where cast(b.double_col as decimal(3, 2)) > round(1.11 + 2.22 + 3.33 + 4.44, 1) ---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B | 02:NESTED LOOP JOIN [LEFT OUTER JOIN] | join predicates: (2 + a.id = b.id - 2 OR a.int_col >= 0 + b.bigint_col AND a.int_col <= b.bigint_col + 97) | predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1 -| hosts=3 per-host-mem=unavailable +| mem-estimate=14.26KB mem-reservation=0B | tuple-ids=0,1N row-size=28B cardinality=7300 | |--01:SCAN HDFS [functional.alltypes b] @@ -161,14 +173,14 @@ PLAN-ROOT SINK | table stats: 7300 rows total | column stats: all | parquet dictionary predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1 -| hosts=3 per-host-mem=unavailable +| mem-estimate=128.00MB mem-reservation=0B | tuple-ids=1 row-size=20B cardinality=730 | 00:SCAN HDFS [functional.alltypes a] partitions=24/24 files=24 size=478.45KB table stats: 7300 rows total column stats: all - hosts=3 per-host-mem=unavailable + mem-estimate=128.00MB mem-reservation=0B tuple-ids=0 row-size=8B cardinality=7300 ==== # Test distinct aggregation with grouping. @@ -177,26 +189,28 @@ from functional.alltypes group by timestamp_col = cast('2015-11-15' as timestamp) + interval 1 year having 1024 * 1024 * count(*) % 2 = 0 ---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B | 02:AGGREGATE [FINALIZE] | output: sum(2 + id), count:merge(*) | group by: timestamp_col = TIMESTAMP '2016-11-15 00:00:00' | having: 1048576 * count(*) % 2 = 0 -| hosts=3 per-host-mem=unavailable +| mem-estimate=10.00MB mem-reservation=264.00MB | tuple-ids=2 row-size=17B cardinality=0 | 01:AGGREGATE | output: count(*) | group by: timestamp_col = TIMESTAMP '2016-11-15 00:00:00', 2 + id -| hosts=3 per-host-mem=unavailable +| mem-estimate=10.00MB mem-reservation=264.00MB | tuple-ids=1 row-size=17B cardinality=7300 | 00:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB table stats: 7300 rows total column stats: all - hosts=3 per-host-mem=unavailable + mem-estimate=128.00MB mem-reservation=0B tuple-ids=0 row-size=20B cardinality=7300 ==== # Test non-grouping distinct aggregation. @@ -204,25 +218,27 @@ select sum(distinct 1 + 1 + id) from functional.alltypes having 1024 * 1024 * count(*) % 2 = 0 ---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B | 02:AGGREGATE [FINALIZE] | output: sum(2 + id), count:merge(*) | having: 1048576 * zeroifnull(count(*)) % 2 = 0 -| hosts=3 per-host-mem=unavailable +| mem-estimate=10.00MB mem-reservation=0B | tuple-ids=2 row-size=16B cardinality=0 | 01:AGGREGATE | output: count(*) | group by: 2 + id -| hosts=3 per-host-mem=unavailable +| mem-estimate=10.00MB mem-reservation=264.00MB | tuple-ids=1 row-size=16B cardinality=7300 | 00:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB table stats: 7300 rows total column stats: all - hosts=3 per-host-mem=unavailable + mem-estimate=128.00MB mem-reservation=0B tuple-ids=0 row-size=4B cardinality=7300 ==== # Test analytic eval node. @@ -231,44 +247,48 @@ select first_value(1 + 1 + int_col - (1 - 1)) over order by greatest(greatest(10, 20), bigint_col)) from functional.alltypes ---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B | 02:ANALYTIC | functions: first_value(2 + int_col - 0) | partition by: concat('ab', string_col) | order by: greatest(20, bigint_col) ASC | window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW -| hosts=3 per-host-mem=unavailable +| mem-estimate=0B mem-reservation=16.00MB | tuple-ids=3,2 row-size=37B cardinality=7300 | 01:SORT | order by: concat('ab', string_col) ASC NULLS FIRST, greatest(20, bigint_col) ASC -| hosts=3 per-host-mem=unavailable +| mem-estimate=16.00MB mem-reservation=48.00MB | tuple-ids=3 row-size=29B cardinality=7300 | 00:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB table stats: 7300 rows total column stats: all - hosts=3 per-host-mem=unavailable + mem-estimate=128.00MB mem-reservation=0B tuple-ids=0 row-size=29B cardinality=7300 ==== # Test sort node. select int_col from functional.alltypes order by id * abs((factorial(5) / power(2, 4))) ---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B | 01:SORT | order by: id * 7.5 ASC -| hosts=3 per-host-mem=unavailable +| mem-estimate=8.00MB mem-reservation=24.00MB | tuple-ids=1 row-size=8B cardinality=7300 | 00:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB table stats: 7300 rows total column stats: all - hosts=3 per-host-mem=unavailable + mem-estimate=128.00MB mem-reservation=0B tuple-ids=0 row-size=8B cardinality=7300 ==== # Test HDFS table sink. @@ -276,15 +296,16 @@ insert into functional.alltypes (id, int_col) partition(year,month) select id, int_col, cast(1 + 1 + 1 + year as int), cast(month - (1 - 1 - 1) as int) from functional.alltypessmall ---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(CAST(3 + year AS INT),CAST(month - -1 AS INT))] | partitions=4 -| hosts=1 per-host-mem=unavailable +| mem-estimate=1.56KB mem-reservation=0B | 00:SCAN HDFS [functional.alltypessmall] partitions=4/4 files=4 size=6.32KB table stats: 100 rows total column stats: all - hosts=3 per-host-mem=unavailable + mem-estimate=32.00MB mem-reservation=0B tuple-ids=0 row-size=16B cardinality=100 ==== # Constant folding does not work across query blocks. @@ -295,11 +316,13 @@ select sum(id + c3) from ) v2 ) v3 ---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B | 01:AGGREGATE [FINALIZE] | output: sum(id + 10 + 20 + 30) -| hosts=3 per-host-mem=unavailable +| mem-estimate=10.00MB mem-reservation=0B | tuple-ids=4 row-size=8B cardinality=1 | 00:SCAN HDFS [functional.alltypes] @@ -307,6 +330,6 @@ PLAN-ROOT SINK table stats: 7300 rows total column stats: all limit: 2 - hosts=3 per-host-mem=unavailable + mem-estimate=128.00MB mem-reservation=0B tuple-ids=0 row-size=4B cardinality=2 ==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test index b064d2b..bad3299 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test @@ -1,100 +1,112 @@ select * from functional_kudu.zipcode_incomes where id = '8600000US00601' ---- PLAN -F00:PLAN FRAGMENT [UNPARTITIONED] +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK + | mem-estimate=0B mem-reservation=0B | 00:SCAN KUDU [functional_kudu.zipcode_incomes] kudu predicates: id = '8600000US00601' - hosts=3 per-host-mem=unavailable + mem-estimate=0B mem-reservation=0B tuple-ids=0 row-size=124B cardinality=1 ---- DISTRIBUTEDPLAN -F01:PLAN FRAGMENT [UNPARTITIONED] +F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK + | mem-estimate=0B mem-reservation=0B | 01:EXCHANGE [UNPARTITIONED] - hosts=3 per-host-mem=unavailable + mem-estimate=0B mem-reservation=0B tuple-ids=0 row-size=124B cardinality=1 -F00:PLAN FRAGMENT [RANDOM] +F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED] + | mem-estimate=0B mem-reservation=0B 00:SCAN KUDU [functional_kudu.zipcode_incomes] kudu predicates: id = '8600000US00601' - hosts=3 per-host-mem=0B + mem-estimate=0B mem-reservation=0B tuple-ids=0 row-size=124B cardinality=1 ==== # The cardinality from "zip = '2'" should dominate. select * from functional_kudu.zipcode_incomes where id != '1' and zip = '2' ---- PLAN -F00:PLAN FRAGMENT [UNPARTITIONED] +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK + | mem-estimate=0B mem-reservation=0B | 00:SCAN KUDU [functional_kudu.zipcode_incomes] predicates: id != '1' kudu predicates: zip = '2' - hosts=3 per-host-mem=unavailable + mem-estimate=0B mem-reservation=0B tuple-ids=0 row-size=124B cardinality=1 ---- DISTRIBUTEDPLAN -F01:PLAN FRAGMENT [UNPARTITIONED] +F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK + | mem-estimate=0B mem-reservation=0B | 01:EXCHANGE [UNPARTITIONED] - hosts=3 per-host-mem=unavailable + mem-estimate=0B mem-reservation=0B tuple-ids=0 row-size=124B cardinality=1 -F00:PLAN FRAGMENT [RANDOM] +F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED] + | mem-estimate=0B mem-reservation=0B 00:SCAN KUDU [functional_kudu.zipcode_incomes] predicates: id != '1' kudu predicates: zip = '2' - hosts=3 per-host-mem=0B + mem-estimate=0B mem-reservation=0B tuple-ids=0 row-size=124B cardinality=1 ==== select * from functional_kudu.zipcode_incomes where id > '1' and zip > '2' ---- PLAN -F00:PLAN FRAGMENT [UNPARTITIONED] +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK + | mem-estimate=0B mem-reservation=0B | 00:SCAN KUDU [functional_kudu.zipcode_incomes] kudu predicates: zip > '2', id > '1' - hosts=3 per-host-mem=unavailable + mem-estimate=0B mem-reservation=0B tuple-ids=0 row-size=124B cardinality=3317 ---- DISTRIBUTEDPLAN -F01:PLAN FRAGMENT [UNPARTITIONED] +F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK + | mem-estimate=0B mem-reservation=0B | 01:EXCHANGE [UNPARTITIONED] - hosts=3 per-host-mem=unavailable + mem-estimate=0B mem-reservation=0B tuple-ids=0 row-size=124B cardinality=3317 -F00:PLAN FRAGMENT [RANDOM] +F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED] + | mem-estimate=0B mem-reservation=0B 00:SCAN KUDU [functional_kudu.zipcode_incomes] kudu predicates: zip > '2', id > '1' - hosts=3 per-host-mem=0B + mem-estimate=0B mem-reservation=0B tuple-ids=0 row-size=124B cardinality=3317 ==== select * from functional_kudu.zipcode_incomes where id = '1' or id = '2' ---- PLAN -F00:PLAN FRAGMENT [UNPARTITIONED] +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK + | mem-estimate=0B mem-reservation=0B | 00:SCAN KUDU [functional_kudu.zipcode_incomes] predicates: id = '1' OR id = '2' - hosts=3 per-host-mem=unavailable + mem-estimate=0B mem-reservation=0B tuple-ids=0 row-size=124B cardinality=2 ---- DISTRIBUTEDPLAN -F01:PLAN FRAGMENT [UNPARTITIONED] +F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK + | mem-estimate=0B mem-reservation=0B | 01:EXCHANGE [UNPARTITIONED] - hosts=3 per-host-mem=unavailable + mem-estimate=0B mem-reservation=0B tuple-ids=0 row-size=124B cardinality=2 -F00:PLAN FRAGMENT [RANDOM] +F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED] + | mem-estimate=0B mem-reservation=0B 00:SCAN KUDU [functional_kudu.zipcode_incomes] predicates: id = '1' OR id = '2' - hosts=3 per-host-mem=0B + mem-estimate=0B mem-reservation=0B tuple-ids=0 row-size=124B cardinality=2 ==== select * from functional_kudu.alltypes where @@ -121,13 +133,14 @@ double_col in (cast('inf' as double)) and string_col not in ("bar") and id in (int_col) ---- PLAN -F00:PLAN FRAGMENT [UNPARTITIONED] +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK + | mem-estimate=0B mem-reservation=0B | 00:SCAN KUDU [functional_kudu.alltypes] predicates: id IN (int_col), string_col NOT IN ('bar'), bigint_col IN (9999999999999999999), double_col IN (CAST('inf' AS DOUBLE)), float_col IN (CAST('NaN' AS FLOAT)), int_col IN (9999999999), smallint_col IN (99999, 2), tinyint_col IN (1, 999), bool_col IN (1) kudu predicates: double_col IN (0.0), float_col IN (0.0), bigint_col IN (1, 2), int_col IN (1, 2), smallint_col IN (0, 2), string_col IN ('foo', 'foo '), tinyint_col IN (1, 2), bool_col IN (TRUE) - hosts=3 per-host-mem=unavailable + mem-estimate=0B mem-reservation=0B tuple-ids=0 row-size=126B cardinality=4 ==== select * from functional_kudu.alltypes where @@ -135,12 +148,13 @@ tinyint_col is not null and smallint_col is null and cast(date_string_col as tinyint) is null ---- PLAN -F00:PLAN FRAGMENT [UNPARTITIONED] +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK + | mem-estimate=0B mem-reservation=0B | 00:SCAN KUDU [functional_kudu.alltypes] predicates: CAST(date_string_col AS TINYINT) IS NULL kudu predicates: smallint_col IS NULL, tinyint_col IS NOT NULL - hosts=3 per-host-mem=unavailable + mem-estimate=0B mem-reservation=0B tuple-ids=0 row-size=126B cardinality=730 ==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test index 4d9544d..e3dd297 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test @@ -39,17 +39,19 @@ group by bigint_col order by cnt, bigint_col limit 10 ---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B | 02:TOP-N [LIMIT=10] | order by: count(int_col) ASC, bigint_col ASC -| hosts=3 per-host-mem=unavailable +| mem-estimate=160B mem-reservation=0B | tuple-ids=2 row-size=16B cardinality=10 | 01:AGGREGATE [FINALIZE] | output: count(int_col) | group by: bigint_col -| hosts=3 per-host-mem=unavailable +| mem-estimate=128.00MB mem-reservation=264.00MB | tuple-ids=1 row-size=16B cardinality=unavailable | 00:SCAN HDFS [functional_parquet.alltypes] @@ -59,36 +61,40 @@ PLAN-ROOT SINK column stats: unavailable parquet statistics predicates: id < 10 parquet dictionary predicates: id < 10 - hosts=3 per-host-mem=unavailable + mem-estimate=16.00MB mem-reservation=0B tuple-ids=0 row-size=16B cardinality=unavailable ---- PARALLELPLANS +F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B | 05:MERGING-EXCHANGE [UNPARTITIONED] | order by: count(int_col) ASC, bigint_col ASC | limit: 10 -| hosts=3 per-host-mem=unavailable +| mem-estimate=0B mem-reservation=0B | tuple-ids=2 row-size=16B cardinality=10 | +F01:PLAN FRAGMENT [HASH(bigint_col)] hosts=3 instances=9 02:TOP-N [LIMIT=10] | order by: count(int_col) ASC, bigint_col ASC -| hosts=3 per-host-mem=160B +| mem-estimate=160B mem-reservation=0B | tuple-ids=2 row-size=16B cardinality=10 | 04:AGGREGATE [FINALIZE] | output: count:merge(int_col) | group by: bigint_col -| hosts=3 per-host-mem=128.00MB +| mem-estimate=128.00MB mem-reservation=264.00MB | tuple-ids=1 row-size=16B cardinality=unavailable | 03:EXCHANGE [HASH(bigint_col)] -| hosts=3 per-host-mem=0B +| mem-estimate=0B mem-reservation=0B | tuple-ids=1 row-size=16B cardinality=unavailable | +F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=9 01:AGGREGATE [STREAMING] | output: count(int_col) | group by: bigint_col -| hosts=3 per-host-mem=128.00MB +| mem-estimate=128.00MB mem-reservation=0B | tuple-ids=1 row-size=16B cardinality=unavailable | 00:SCAN HDFS [functional_parquet.alltypes, RANDOM] @@ -98,27 +104,29 @@ PLAN-ROOT SINK column stats: unavailable parquet statistics predicates: id < 10 parquet dictionary predicates: id < 10 - hosts=3 per-host-mem=16.00MB + mem-estimate=16.00MB mem-reservation=0B tuple-ids=0 row-size=16B cardinality=unavailable ==== -# Single-table scan/filter/analysic should work. +# Single-table scan/filter/analytic should work. select row_number() over(partition by int_col order by id) from functional_parquet.alltypes where id < 10 ---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B | 02:ANALYTIC | functions: row_number() | partition by: int_col | order by: id ASC | window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW -| hosts=3 per-host-mem=unavailable +| mem-estimate=0B mem-reservation=16.00MB | tuple-ids=4,3 row-size=16B cardinality=unavailable | 01:SORT | order by: int_col ASC NULLS FIRST, id ASC -| hosts=3 per-host-mem=unavailable +| mem-estimate=0B mem-reservation=24.00MB | tuple-ids=4 row-size=8B cardinality=unavailable | 00:SCAN HDFS [functional_parquet.alltypes] @@ -128,32 +136,36 @@ PLAN-ROOT SINK column stats: unavailable parquet statistics predicates: id < 10 parquet dictionary predicates: id < 10 - hosts=3 per-host-mem=unavailable + mem-estimate=16.00MB mem-reservation=0B tuple-ids=0 row-size=8B cardinality=unavailable ---- PARALLELPLANS +F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B | 04:EXCHANGE [UNPARTITIONED] -| hosts=3 per-host-mem=unavailable +| mem-estimate=0B mem-reservation=0B | tuple-ids=4,3 row-size=16B cardinality=unavailable | +F01:PLAN FRAGMENT [HASH(int_col)] hosts=3 instances=9 02:ANALYTIC | functions: row_number() | partition by: int_col | order by: id ASC | window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW -| hosts=3 per-host-mem=0B +| mem-estimate=0B mem-reservation=16.00MB | tuple-ids=4,3 row-size=16B cardinality=unavailable | 01:SORT | order by: int_col ASC NULLS FIRST, id ASC -| hosts=3 per-host-mem=0B +| mem-estimate=0B mem-reservation=24.00MB | tuple-ids=4 row-size=8B cardinality=unavailable | 03:EXCHANGE [HASH(int_col)] -| hosts=3 per-host-mem=0B +| mem-estimate=0B mem-reservation=0B | tuple-ids=0 row-size=8B cardinality=unavailable | +F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=9 00:SCAN HDFS [functional_parquet.alltypes, RANDOM] partitions=24/24 files=24 size=156.57KB predicates: id < 10 @@ -161,7 +173,7 @@ PLAN-ROOT SINK column stats: unavailable parquet statistics predicates: id < 10 parquet dictionary predicates: id < 10 - hosts=3 per-host-mem=16.00MB + mem-estimate=16.00MB mem-reservation=0B tuple-ids=0 row-size=8B cardinality=unavailable ==== # Nested-loop join in a subplan should work. @@ -169,42 +181,44 @@ select * from tpch_nested_parquet.customer c, c.c_orders o, o.o_lineitems where c_custkey < 10 and o_orderkey < 5 and l_linenumber < 3 ---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B | 01:SUBPLAN -| hosts=3 per-host-mem=unavailable +| mem-estimate=0B mem-reservation=0B | tuple-ids=2,1,0 row-size=562B cardinality=1500000 | |--08:NESTED LOOP JOIN [CROSS JOIN] -| | hosts=3 per-host-mem=unavailable +| | mem-estimate=254B mem-reservation=0B | | tuple-ids=2,1,0 row-size=562B cardinality=100 | | | |--02:SINGULAR ROW SRC | | parent-subplan=01 -| | hosts=3 per-host-mem=unavailable +| | mem-estimate=0B mem-reservation=0B | | tuple-ids=0 row-size=254B cardinality=1 | | | 04:SUBPLAN -| | hosts=3 per-host-mem=unavailable +| | mem-estimate=0B mem-reservation=0B | | tuple-ids=2,1 row-size=308B cardinality=100 | | | |--07:NESTED LOOP JOIN [CROSS JOIN] -| | | hosts=3 per-host-mem=unavailable +| | | mem-estimate=124B mem-reservation=0B | | | tuple-ids=2,1 row-size=308B cardinality=10 | | | | | |--05:SINGULAR ROW SRC | | | parent-subplan=04 -| | | hosts=3 per-host-mem=unavailable +| | | mem-estimate=0B mem-reservation=0B | | | tuple-ids=1 row-size=124B cardinality=1 | | | | | 06:UNNEST [o.o_lineitems] | | parent-subplan=04 -| | hosts=3 per-host-mem=unavailable +| | mem-estimate=0B mem-reservation=0B | | tuple-ids=2 row-size=0B cardinality=10 | | | 03:UNNEST [c.c_orders o] | parent-subplan=01 -| hosts=3 per-host-mem=unavailable +| mem-estimate=0B mem-reservation=0B | tuple-ids=1 row-size=0B cardinality=10 | 00:SCAN HDFS [tpch_nested_parquet.customer c] @@ -216,49 +230,52 @@ PLAN-ROOT SINK columns missing stats: c_orders parquet statistics predicates: c_custkey < 10 parquet dictionary predicates: c_custkey < 10 - hosts=3 per-host-mem=unavailable + mem-estimate=88.00MB mem-reservation=0B tuple-ids=0 row-size=254B cardinality=15000 ---- PARALLELPLANS +F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B | 09:EXCHANGE [UNPARTITIONED] -| hosts=3 per-host-mem=unavailable +| mem-estimate=0B mem-reservation=0B | tuple-ids=2,1,0 row-size=562B cardinality=1500000 | +F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=9 01:SUBPLAN -| hosts=3 per-host-mem=0B +| mem-estimate=0B mem-reservation=0B | tuple-ids=2,1,0 row-size=562B cardinality=1500000 | |--08:NESTED LOOP JOIN [CROSS JOIN] -| | hosts=3 per-host-mem=254B +| | mem-estimate=254B mem-reservation=0B | | tuple-ids=2,1,0 row-size=562B cardinality=100 | | | |--02:SINGULAR ROW SRC | | parent-subplan=01 -| | hosts=3 per-host-mem=0B +| | mem-estimate=0B mem-reservation=0B | | tuple-ids=0 row-size=254B cardinality=1 | | | 04:SUBPLAN -| | hosts=3 per-host-mem=0B +| | mem-estimate=0B mem-reservation=0B | | tuple-ids=2,1 row-size=308B cardinality=100 | | | |--07:NESTED LOOP JOIN [CROSS JOIN] -| | | hosts=3 per-host-mem=124B +| | | mem-estimate=124B mem-reservation=0B | | | tuple-ids=2,1 row-size=308B cardinality=10 | | | | | |--05:SINGULAR ROW SRC | | | parent-subplan=04 -| | | hosts=3 per-host-mem=0B +| | | mem-estimate=0B mem-reservation=0B | | | tuple-ids=1 row-size=124B cardinality=1 | | | | | 06:UNNEST [o.o_lineitems] | | parent-subplan=04 -| | hosts=3 per-host-mem=0B +| | mem-estimate=0B mem-reservation=0B | | tuple-ids=2 row-size=0B cardinality=10 | | | 03:UNNEST [c.c_orders o] | parent-subplan=01 -| hosts=3 per-host-mem=0B +| mem-estimate=0B mem-reservation=0B | tuple-ids=1 row-size=0B cardinality=10 | 00:SCAN HDFS [tpch_nested_parquet.customer c, RANDOM] @@ -270,7 +287,7 @@ PLAN-ROOT SINK columns missing stats: c_orders parquet statistics predicates: c_custkey < 10 parquet dictionary predicates: c_custkey < 10 - hosts=3 per-host-mem=88.00MB + mem-estimate=88.00MB mem-reservation=0B tuple-ids=0 row-size=254B cardinality=15000 ==== # Hash-join in a subplan should work. @@ -278,34 +295,36 @@ select c.* from tpch_nested_parquet.customer c, c.c_orders o1, c.c_orders o2 where o1.o_orderkey = o2.o_orderkey + 2 and o1.o_orderkey < 5 ---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B | 01:SUBPLAN -| hosts=3 per-host-mem=unavailable +| mem-estimate=0B mem-reservation=0B | tuple-ids=1,0,2 row-size=286B cardinality=1500000 | |--06:HASH JOIN [INNER JOIN] | | hash predicates: o1.o_orderkey = o2.o_orderkey + 2 -| | hosts=3 per-host-mem=unavailable +| | mem-estimate=0B mem-reservation=136.00MB | | tuple-ids=1,0,2 row-size=286B cardinality=10 | | | |--04:UNNEST [c.c_orders o2] | | parent-subplan=01 -| | hosts=3 per-host-mem=unavailable +| | mem-estimate=0B mem-reservation=0B | | tuple-ids=2 row-size=0B cardinality=10 | | | 05:NESTED LOOP JOIN [CROSS JOIN] -| | hosts=3 per-host-mem=unavailable +| | mem-estimate=270B mem-reservation=0B | | tuple-ids=1,0 row-size=278B cardinality=10 | | | |--02:SINGULAR ROW SRC | | parent-subplan=01 -| | hosts=3 per-host-mem=unavailable +| | mem-estimate=0B mem-reservation=0B | | tuple-ids=0 row-size=270B cardinality=1 | | | 03:UNNEST [c.c_orders o1] | parent-subplan=01 -| hosts=3 per-host-mem=unavailable +| mem-estimate=0B mem-reservation=0B | tuple-ids=1 row-size=0B cardinality=10 | 00:SCAN HDFS [tpch_nested_parquet.customer c] @@ -314,41 +333,44 @@ PLAN-ROOT SINK predicates on o1: o1.o_orderkey < 5 table stats: 150000 rows total columns missing stats: c_orders, c_orders - hosts=3 per-host-mem=unavailable + mem-estimate=88.00MB mem-reservation=0B tuple-ids=0 row-size=270B cardinality=150000 ---- PARALLELPLANS +F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B | 07:EXCHANGE [UNPARTITIONED] -| hosts=3 per-host-mem=unavailable +| mem-estimate=0B mem-reservation=0B | tuple-ids=1,0,2 row-size=286B cardinality=1500000 | +F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=9 01:SUBPLAN -| hosts=3 per-host-mem=0B +| mem-estimate=0B mem-reservation=0B | tuple-ids=1,0,2 row-size=286B cardinality=1500000 | |--06:HASH JOIN [INNER JOIN] | | hash predicates: o1.o_orderkey = o2.o_orderkey + 2 -| | hosts=3 per-host-mem=0B +| | mem-estimate=0B mem-reservation=136.00MB | | tuple-ids=1,0,2 row-size=286B cardinality=10 | | | |--04:UNNEST [c.c_orders o2] | | parent-subplan=01 -| | hosts=3 per-host-mem=0B +| | mem-estimate=0B mem-reservation=0B | | tuple-ids=2 row-size=0B cardinality=10 | | | 05:NESTED LOOP JOIN [CROSS JOIN] -| | hosts=3 per-host-mem=270B +| | mem-estimate=270B mem-reservation=0B | | tuple-ids=1,0 row-size=278B cardinality=10 | | | |--02:SINGULAR ROW SRC | | parent-subplan=01 -| | hosts=3 per-host-mem=0B +| | mem-estimate=0B mem-reservation=0B | | tuple-ids=0 row-size=270B cardinality=1 | | | 03:UNNEST [c.c_orders o1] | parent-subplan=01 -| hosts=3 per-host-mem=0B +| mem-estimate=0B mem-reservation=0B | tuple-ids=1 row-size=0B cardinality=10 | 00:SCAN HDFS [tpch_nested_parquet.customer c, RANDOM] @@ -357,6 +379,6 @@ PLAN-ROOT SINK predicates on o1: o1.o_orderkey < 5 table stats: 150000 rows total columns missing stats: c_orders, c_orders - hosts=3 per-host-mem=88.00MB + mem-estimate=88.00MB mem-reservation=0B tuple-ids=0 row-size=270B cardinality=150000 ==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test index 4712b96..df5f99d 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test @@ -7,21 +7,23 @@ select count(*) from functional_parquet.alltypes where int_col > 1 and int_col * rand() > 50 and int_col is null and int_col > tinyint_col; ---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B | 01:AGGREGATE [FINALIZE] | output: count(*) -| hosts=3 per-host-mem=unavailable +| mem-estimate=10.00MB mem-reservation=0B | tuple-ids=1 row-size=8B cardinality=1 | 00:SCAN HDFS [functional_parquet.alltypes] - partitions=24/24 files=24 size=165.17KB + partitions=24/24 files=24 size=156.57KB predicates: int_col IS NULL, int_col > 1, int_col > tinyint_col, int_col * rand() > 50 table stats: unavailable column stats: unavailable parquet statistics predicates: int_col > 1 parquet dictionary predicates: int_col > 1 - hosts=3 per-host-mem=unavailable + mem-estimate=32.00MB mem-reservation=0B tuple-ids=0 row-size=5B cardinality=unavailable ==== # Test a variety of types @@ -32,20 +34,22 @@ and double_col > 100.00 and date_string_col > '1993-10-01' and string_col > 'aaa and timestamp_cmp(timestamp_col, '2016-11-20 00:00:00') = 1 and year > 2000 and month < 12; ---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK +| mem-estimate=0B mem-reservation=0B | 01:AGGREGATE [FINALIZE] | output: count(*) -| hosts=3 per-host-mem=unavailable +| mem-estimate=10.00MB mem-reservation=0B | tuple-ids=1 row-size=8B cardinality=1 | 00:SCAN HDFS [functional_parquet.alltypes] - partitions=22/24 files=22 size=151.24KB + partitions=22/24 files=22 size=143.36KB predicates: bool_col, bigint_col < 5000, double_col > 100.00, float_col > 50.00, id = 1, smallint_col > 50, tinyint_col < 50, string_col > 'aaaa', mod(int_col, 2) = 1, timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = 1, date_string_col > '1993-10-01' table stats: unavailable columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col parquet statistics predicates: bigint_col < 5000, double_col > 100.00, float_col > 50.00, id = 1, smallint_col > 50, tinyint_col < 50, string_col > 'aaaa', date_string_col > '1993-10-01' parquet dictionary predicates: bool_col, bigint_col < 5000, double_col > 100.00, float_col > 50.00, id = 1, smallint_col > 50, tinyint_col < 50, string_col > 'aaaa', mod(int_col, 2) = 1, timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = 1, date_string_col > '1993-10-01' - hosts=3 per-host-mem=unavailable + mem-estimate=128.00MB mem-reservation=0B tuple-ids=0 row-size=80B cardinality=unavailable ====
