Repository: incubator-impala Updated Branches: refs/heads/master e301ca641 -> f2cd5bd51
IMPALA-3548: Prune runtime filters based on query options in the FE Currently, the FE generates a number of runtime filters and assigns them to the single node plan without taking the value of RUNTIME_FILTER_MODE and DISABLE_ROW_RUNTIME_FILTERING query options into account. The backend then removes filters from exec nodes, based on the following rules: 1. If DISABLE_ROW_RUNTIME_FILTERING is set, filters are removed from the exec nodes that are marked as targets not bound by partition columns. 2. If RUNTIME_FILTER_MODE is set to LOCAL, filters are removed from the exec nodes that are marked as remote targets. This may cause some confusion to users because they may see runtime filters in the output of explain that are not applied when the query is executed. This change moves the logic of runtime filter pruning to the planner in the FE. The runtime filter assignment is done on the distributed plan and the above constraints are enforced there directly. Change-Id: Id0f0b200e02442edcad8df3979f652d66c6e52eb Reviewed-on: http://gerrit.cloudera.org:8080/7564 Tested-by: Impala Public Jenkins Reviewed-by: Alex Behm <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/00fd8388 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/00fd8388 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/00fd8388 Branch: refs/heads/master Commit: 00fd8388c37e1b3207dddf564463b7eeafe3f887 Parents: e301ca6 Author: Attila Jeges <[email protected]> Authored: Tue Jul 18 17:25:23 2017 +0200 Committer: Alex Behm <[email protected]> Committed: Thu Oct 26 17:06:32 2017 +0000 ---------------------------------------------------------------------- be/src/exec/partitioned-hash-join-builder.cc | 14 +- be/src/exec/scan-node.cc | 12 +- be/src/runtime/coordinator.cc | 8 +- be/src/service/fe-support.cc | 30 ++ .../impala/planner/DistributedPlanner.java | 9 - .../java/org/apache/impala/planner/Planner.java | 15 +- .../impala/planner/RuntimeFilterGenerator.java | 126 +++-- .../org/apache/impala/service/FeSupport.java | 45 ++ .../org/apache/impala/planner/PlannerTest.java | 5 + .../apache/impala/planner/PlannerTestBase.java | 21 +- .../apache/impala/testutil/TestFileParser.java | 63 ++- .../runtime-filter-query-options.test | 535 +++++++++++++++++++ 12 files changed, 775 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/00fd8388/be/src/exec/partitioned-hash-join-builder.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc index 4b49fb0..bf5f891 100644 --- a/be/src/exec/partitioned-hash-join-builder.cc +++ b/be/src/exec/partitioned-hash-join-builder.cc @@ -103,16 +103,10 @@ Status PhjBuilder::InitExprsAndFilters(RuntimeState* state, } for (const TRuntimeFilterDesc& filter_desc : filter_descs) { - // If filter propagation not enabled, only consider building broadcast joins (that - // may be consumed by this fragment). - if (state->query_options().runtime_filter_mode != TRuntimeFilterMode::GLOBAL && - !filter_desc.is_broadcast_join) { - continue; - } - if (state->query_options().disable_row_runtime_filtering && - !filter_desc.applied_on_partition_columns) { - continue; - } + DCHECK(state->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL || + filter_desc.is_broadcast_join); + DCHECK(!state->query_options().disable_row_runtime_filtering || + filter_desc.applied_on_partition_columns); ScalarExpr* filter_expr; RETURN_IF_ERROR( ScalarExpr::Create(filter_desc.src_expr, *row_desc_, state, &filter_expr)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/00fd8388/be/src/exec/scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc index fd2453d..18fc473 100644 --- a/be/src/exec/scan-node.cc +++ b/be/src/exec/scan-node.cc @@ -62,14 +62,10 @@ Status ScanNode::Init(const TPlanNode& tnode, RuntimeState* state) { auto it = filter_desc.planid_to_target_ndx.find(tnode.node_id); DCHECK(it != filter_desc.planid_to_target_ndx.end()); const TRuntimeFilterTargetDesc& target = filter_desc.targets[it->second]; - if (state->query_options().runtime_filter_mode == TRuntimeFilterMode::LOCAL && - !target.is_local_target) { - continue; - } - if (query_options.disable_row_runtime_filtering && - !target.is_bound_by_partition_columns) { - continue; - } + DCHECK(state->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL || + target.is_local_target); + DCHECK(!query_options.disable_row_runtime_filtering || + target.is_bound_by_partition_columns); ScalarExpr* filter_expr; RETURN_IF_ERROR( ScalarExpr::Create(target.target_expr, *row_desc(), state, &filter_expr)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/00fd8388/be/src/runtime/coordinator.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index e26919d..af3ec21 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -281,9 +281,7 @@ void Coordinator::InitFilterRoutingTable() { for (const TPlanNode& plan_node: fragment_params.fragment.plan.nodes) { if (!plan_node.__isset.runtime_filters) continue; for (const TRuntimeFilterDesc& filter: plan_node.runtime_filters) { - if (filter_mode_ == TRuntimeFilterMode::LOCAL && !filter.has_local_targets) { - continue; - } + DCHECK(filter_mode_ == TRuntimeFilterMode::GLOBAL || filter.has_local_targets); FilterRoutingTable::iterator i = filter_routing_table_.emplace( filter.filter_id, FilterState(filter, plan_node.node_id)).first; FilterState* f = &(i->second); @@ -317,9 +315,7 @@ void Coordinator::InitFilterRoutingTable() { auto it = filter.planid_to_target_ndx.find(plan_node.node_id); DCHECK(it != filter.planid_to_target_ndx.end()); const TRuntimeFilterTargetDesc& t_target = filter.targets[it->second]; - if (filter_mode_ == TRuntimeFilterMode::LOCAL && !t_target.is_local_target) { - continue; - } + DCHECK(filter_mode_ == TRuntimeFilterMode::GLOBAL || t_target.is_local_target); f->targets()->emplace_back(t_target, fragment_params.fragment.idx); } else { DCHECK(false) << "Unexpected plan node with runtime filters: " http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/00fd8388/be/src/service/fe-support.cc ---------------------------------------------------------------------- diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc index b7a5e81..35b5a15 100644 --- a/be/src/service/fe-support.cc +++ b/be/src/service/fe-support.cc @@ -41,6 +41,7 @@ #include "runtime/raw-value.h" #include "runtime/runtime-state.h" #include "service/impala-server.h" +#include "service/query-options.h" #include "util/cpu-info.h" #include "util/debug-util.h" #include "util/disk-info.h" @@ -445,6 +446,31 @@ Java_org_apache_impala_service_FeSupport_NativePrioritizeLoad( return result_bytes; } +// Used to call native code from the FE to parse and set comma-delimited key=value query +// options. +extern "C" +JNIEXPORT jbyteArray JNICALL +Java_org_apache_impala_service_FeSupport_NativeParseQueryOptions( + JNIEnv* env, jclass caller_class, jstring csv_query_options, + jbyteArray tquery_options) { + TQueryOptions options; + THROW_IF_ERROR_RET(DeserializeThriftMsg(env, tquery_options, &options), env, + JniUtil::internal_exc_class(), nullptr); + + JniUtfCharGuard csv_query_options_guard; + THROW_IF_ERROR_RET( + JniUtfCharGuard::create(env, csv_query_options, &csv_query_options_guard), env, + JniUtil::internal_exc_class(), nullptr); + THROW_IF_ERROR_RET( + impala::ParseQueryOptions(csv_query_options_guard.get(), &options, NULL), env, + JniUtil::internal_exc_class(), nullptr); + + jbyteArray result_bytes = NULL; + THROW_IF_ERROR_RET(SerializeThriftMsg(env, &options, &result_bytes), env, + JniUtil::internal_exc_class(), result_bytes); + return result_bytes; +} + namespace impala { static JNINativeMethod native_methods[] = { @@ -468,6 +494,10 @@ static JNINativeMethod native_methods[] = { (char*)"NativePrioritizeLoad", (char*)"([B)[B", (void*)::Java_org_apache_impala_service_FeSupport_NativePrioritizeLoad }, + { + (char*)"NativeParseQueryOptions", (char*)"(Ljava/lang/String;[B)[B", + (void*)::Java_org_apache_impala_service_FeSupport_NativeParseQueryOptions + }, }; void InitFeSupport(bool disable_codegen) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/00fd8388/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java index 6571036..e1d83d7 100644 --- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java @@ -488,15 +488,6 @@ public class DistributedPlanner { lhsHasCompatPartition, rhsHasCompatPartition, leftChildFragment, rightChildFragment, lhsJoinExprs, rhsJoinExprs, fragments); } - - for (RuntimeFilter filter: node.getRuntimeFilters()) { - filter.setIsBroadcast(distrMode == DistributionMode.BROADCAST); - filter.computeHasLocalTargets(); - // Work around IMPALA-3450, where cardinalities might be wrong in single-node plans - // with UNION and LIMITs. - // TODO: Remove. - filter.computeNdvEstimate(); - } return hjFragment; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/00fd8388/fe/src/main/java/org/apache/impala/planner/Planner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java index 7992b7b..de4f8df 100644 --- a/fe/src/main/java/org/apache/impala/planner/Planner.java +++ b/fe/src/main/java/org/apache/impala/planner/Planner.java @@ -105,15 +105,6 @@ public class Planner { invertJoins(singleNodePlan, ctx_.isSingleNodeExec()); singleNodePlan = useNljForSingularRowBuilds(singleNodePlan, ctx_.getRootAnalyzer()); - // create runtime filters - if (ctx_.getQueryOptions().getRuntime_filter_mode() != TRuntimeFilterMode.OFF) { - // Always compute filters, even if the BE won't always use all of them. - RuntimeFilterGenerator.generateRuntimeFilters(ctx_.getRootAnalyzer(), - singleNodePlan, ctx_.getQueryOptions().getMax_num_runtime_filters()); - ctx_.getAnalysisResult().getTimeline().markEvent( - "Runtime filters computed"); - } - singleNodePlanner.validatePlan(singleNodePlan); if (ctx_.isSingleNodeExec()) { @@ -125,7 +116,13 @@ public class Planner { fragments = distributedPlanner.createPlanFragments(singleNodePlan); } + // Create runtime filters. PlanFragment rootFragment = fragments.get(fragments.size() - 1); + if (ctx_.getQueryOptions().getRuntime_filter_mode() != TRuntimeFilterMode.OFF) { + RuntimeFilterGenerator.generateRuntimeFilters(ctx_, rootFragment.getPlanRoot()); + ctx_.getAnalysisResult().getTimeline().markEvent("Runtime filters computed"); + } + rootFragment.verifyTree(); ExprSubstitutionMap rootNodeSmap = rootFragment.getPlanRoot().getOutputSmap(); List<Expr> resultExprs = null; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/00fd8388/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java index 646eb48..98365e6 100644 --- a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java +++ b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java @@ -39,9 +39,12 @@ import org.apache.impala.catalog.Table; import org.apache.impala.catalog.Type; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.IdGenerator; +import org.apache.impala.planner.JoinNode.DistributionMode; import org.apache.impala.planner.PlanNode; import org.apache.impala.thrift.TRuntimeFilterDesc; +import org.apache.impala.thrift.TRuntimeFilterMode; import org.apache.impala.thrift.TRuntimeFilterTargetDesc; + import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -138,14 +141,17 @@ public final class RuntimeFilterGenerator { // Expr on which the filter is applied public Expr expr; // Indicates if 'expr' is bound only by partition columns - public boolean isBoundByPartitionColumns = false; - // Indicates if 'node' is in the same fragment as the join that produces the - // filter - public boolean isLocalTarget = false; + public final boolean isBoundByPartitionColumns; + // Indicates if 'node' is in the same fragment as the join that produces the filter + public final boolean isLocalTarget; - public RuntimeFilterTarget(ScanNode targetNode, Expr targetExpr) { + public RuntimeFilterTarget(ScanNode targetNode, Expr targetExpr, + boolean isBoundByPartitionColumns, boolean isLocalTarget) { + Preconditions.checkState(targetExpr.isBoundByTupleIds(targetNode.getTupleIds())); node = targetNode; expr = targetExpr; + this.isBoundByPartitionColumns = isBoundByPartitionColumns; + this.isLocalTarget = isLocalTarget; } public TRuntimeFilterTargetDesc toThrift() { @@ -346,25 +352,7 @@ public final class RuntimeFilterGenerator { return src_.getCardinality() / (double) src_.getChild(0).getCardinality(); } - public void addTarget(ScanNode node, Analyzer analyzer, Expr targetExpr) { - Preconditions.checkState(targetExpr.isBoundByTupleIds(node.getTupleIds())); - RuntimeFilterTarget target = new RuntimeFilterTarget(node, targetExpr); - targets_.add(target); - // Check if all the slots of targetExpr_ are bound by partition columns - TupleDescriptor baseTblDesc = node.getTupleDesc(); - Table tbl = baseTblDesc.getTable(); - if (tbl.getNumClusteringCols() == 0) return; - List<SlotId> sids = Lists.newArrayList(); - targetExpr.getIds(null, sids); - for (SlotId sid: sids) { - SlotDescriptor slotDesc = analyzer.getSlotDesc(sid); - if (slotDesc.getColumn() == null - || slotDesc.getColumn().getPosition() >= tbl.getNumClusteringCols()) { - return; - } - } - target.isBoundByPartitionColumns = true; - } + public void addTarget(RuntimeFilterTarget target) { targets_.add(target); } public void setIsBroadcast(boolean isBroadcast) { isBroadcastJoin_ = isBroadcast; } @@ -375,11 +363,8 @@ public final class RuntimeFilterGenerator { Preconditions.checkState(hasTargets()); for (RuntimeFilterTarget target: targets_) { Preconditions.checkNotNull(target.node.getFragment()); - boolean isLocal = - src_.getFragment().getId().equals(target.node.getFragment().getId()); - target.isLocalTarget = isLocal; - hasLocalTargets_ = hasLocalTargets_ || isLocal; - hasRemoteTargets_ = hasRemoteTargets_ || !isLocal; + hasLocalTargets_ = hasLocalTargets_ || target.isLocalTarget; + hasRemoteTargets_ = hasRemoteTargets_ || !target.isLocalTarget; } } @@ -406,11 +391,13 @@ public final class RuntimeFilterGenerator { /** * Generates and assigns runtime filters to a query plan tree. */ - public static void generateRuntimeFilters(Analyzer analyzer, PlanNode plan, - int maxNumFilters) { - Preconditions.checkArgument(maxNumFilters >= 0); + public static void generateRuntimeFilters(PlannerContext ctx, PlanNode plan) { + Preconditions.checkNotNull(ctx); + Preconditions.checkNotNull(ctx.getQueryOptions()); + int maxNumFilters = ctx.getQueryOptions().getMax_num_runtime_filters(); + Preconditions.checkState(maxNumFilters >= 0); RuntimeFilterGenerator filterGenerator = new RuntimeFilterGenerator(); - filterGenerator.generateFilters(analyzer, plan); + filterGenerator.generateFilters(ctx, plan); List<RuntimeFilter> filters = Lists.newArrayList(filterGenerator.getRuntimeFilters()); if (filters.size() > maxNumFilters) { // If more than 'maxNumFilters' were generated, sort them by increasing selectivity @@ -428,6 +415,9 @@ public final class RuntimeFilterGenerator { } for (RuntimeFilter filter: filters.subList(0, Math.min(filters.size(), maxNumFilters))) { + filter.setIsBroadcast( + filter.src_.getDistributionMode() == DistributionMode.BROADCAST); + filter.computeHasLocalTargets(); if (LOG.isTraceEnabled()) LOG.trace("Runtime filter: " + filter.debugString()); filter.assignToPlanNodes(); } @@ -452,13 +442,13 @@ public final class RuntimeFilterGenerator { } /** - * Generates the runtime filters for a query by recursively traversing the single-node + * Generates the runtime filters for a query by recursively traversing the distributed * plan tree rooted at 'root'. In the top-down traversal of the plan tree, candidate * runtime filters are generated from equi-join predicates assigned to hash-join nodes. * In the bottom-up traversal of the plan tree, the filters are assigned to destination * (scan) nodes. Filters that cannot be assigned to a scan node are discarded. */ - private void generateFilters(Analyzer analyzer, PlanNode root) { + private void generateFilters(PlannerContext ctx, PlanNode root) { if (root instanceof HashJoinNode) { HashJoinNode joinNode = (HashJoinNode) root; List<Expr> joinConjuncts = Lists.newArrayList(); @@ -473,23 +463,23 @@ public final class RuntimeFilterGenerator { joinConjuncts.addAll(joinNode.getConjuncts()); List<RuntimeFilter> filters = Lists.newArrayList(); for (Expr conjunct: joinConjuncts) { - RuntimeFilter filter = RuntimeFilter.create(filterIdGenerator, analyzer, - conjunct, joinNode); + RuntimeFilter filter = RuntimeFilter.create(filterIdGenerator, + ctx.getRootAnalyzer(), conjunct, joinNode); if (filter == null) continue; registerRuntimeFilter(filter); filters.add(filter); } - generateFilters(analyzer, root.getChild(0)); + generateFilters(ctx, root.getChild(0)); // Finalize every runtime filter of that join. This is to ensure that we don't // assign a filter to a scan node from the right subtree of joinNode or ancestor // join nodes in case we don't find a destination node in the left subtree. for (RuntimeFilter runtimeFilter: filters) finalizeRuntimeFilter(runtimeFilter); - generateFilters(analyzer, root.getChild(1)); + generateFilters(ctx, root.getChild(1)); } else if (root instanceof ScanNode) { - assignRuntimeFilters(analyzer, (ScanNode) root); + assignRuntimeFilters(ctx, (ScanNode) root); } else { for (PlanNode childNode: root.getChildren()) { - generateFilters(analyzer, childNode); + generateFilters(ctx, childNode); } } } @@ -540,20 +530,66 @@ public final class RuntimeFilterGenerator { /** * Assigns runtime filters to a specific scan node 'scanNode'. - * The assigned filters are the ones for which 'scanNode' can be used a destination - * node. A scan node may be used as a destination node for multiple runtime filters. + * The assigned filters are the ones for which 'scanNode' can be used as a destination + * node. The following constraints are enforced when assigning filters to 'scanNode': + * 1. If the DISABLE_ROW_RUNTIME_FILTERING query option is set, a filter is only + * assigned to 'scanNode' if the filter target expression is bound by partition + * columns. + * 2. If the RUNTIME_FILTER_MODE query option is set to LOCAL, a filter is only assigned + * to 'scanNode' if the filter is produced within the same fragment that contains the + * scan node. + * A scan node may be used as a destination node for multiple runtime filters. * Currently, runtime filters can only be assigned to HdfsScanNodes. */ - private void assignRuntimeFilters(Analyzer analyzer, ScanNode scanNode) { + private void assignRuntimeFilters(PlannerContext ctx, ScanNode scanNode) { if (!(scanNode instanceof HdfsScanNode)) return; TupleId tid = scanNode.getTupleIds().get(0); if (!runtimeFiltersByTid_.containsKey(tid)) return; + Analyzer analyzer = ctx.getRootAnalyzer(); + boolean disableRowRuntimeFiltering = + ctx.getQueryOptions().isDisable_row_runtime_filtering(); + TRuntimeFilterMode runtimeFilterMode = ctx.getQueryOptions().getRuntime_filter_mode(); for (RuntimeFilter filter: runtimeFiltersByTid_.get(tid)) { if (filter.isFinalized()) continue; Expr targetExpr = computeTargetExpr(filter, tid, analyzer); if (targetExpr == null) continue; - filter.addTarget(scanNode, analyzer, targetExpr); + boolean isBoundByPartitionColumns = isBoundByPartitionColumns(analyzer, targetExpr, + scanNode); + if (disableRowRuntimeFiltering && !isBoundByPartitionColumns) continue; + boolean isLocalTarget = isLocalTarget(filter, scanNode); + if (runtimeFilterMode == TRuntimeFilterMode.LOCAL && !isLocalTarget) continue; + RuntimeFilter.RuntimeFilterTarget target = new RuntimeFilter.RuntimeFilterTarget( + scanNode, targetExpr, isBoundByPartitionColumns, isLocalTarget); + filter.addTarget(target); + } + } + + /** + * Check if 'targetNode' is local to the source node of 'filter'. + */ + static private boolean isLocalTarget(RuntimeFilter filter, ScanNode targetNode) { + return targetNode.getFragment().getId().equals(filter.src_.getFragment().getId()); + } + + /** + * Check if all the slots of 'targetExpr' are bound by partition columns. + */ + static private boolean isBoundByPartitionColumns(Analyzer analyzer, Expr targetExpr, + ScanNode targetNode) { + Preconditions.checkState(targetExpr.isBoundByTupleIds(targetNode.getTupleIds())); + TupleDescriptor baseTblDesc = targetNode.getTupleDesc(); + Table tbl = baseTblDesc.getTable(); + if (tbl.getNumClusteringCols() == 0) return false; + List<SlotId> sids = Lists.newArrayList(); + targetExpr.getIds(null, sids); + for (SlotId sid : sids) { + SlotDescriptor slotDesc = analyzer.getSlotDesc(sid); + if (slotDesc.getColumn() == null + || slotDesc.getColumn().getPosition() >= tbl.getNumClusteringCols()) { + return false; + } } + return true; } /** http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/00fd8388/fe/src/main/java/org/apache/impala/service/FeSupport.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/FeSupport.java b/fe/src/main/java/org/apache/impala/service/FeSupport.java index 8b87962..48349c2 100644 --- a/fe/src/main/java/org/apache/impala/service/FeSupport.java +++ b/fe/src/main/java/org/apache/impala/service/FeSupport.java @@ -44,6 +44,7 @@ import org.apache.impala.thrift.TExprBatch; import org.apache.impala.thrift.TPrioritizeLoadRequest; import org.apache.impala.thrift.TPrioritizeLoadResponse; import org.apache.impala.thrift.TQueryCtx; +import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.thrift.TResultRow; import org.apache.impala.thrift.TStatus; import org.apache.impala.thrift.TSymbolLookupParams; @@ -84,6 +85,18 @@ public class FeSupport { // using Java Thrift bindings. public native static byte[] NativePrioritizeLoad(byte[] thriftReq); + // Parses a string of comma-separated key=value query options ('csvQueryOptions'), + // updates the existing query options ('queryOptions') with them and returns the + // resulting serialized TQueryOptions object. + // A note about the function's interface: ideally we wouldn't have to pass in the + // existing query options. We could just return the newly set query options to the + // caller and let the caller update the existing query options with the new ones. + // Unfortunately due to a bug in the thrift-generated TQueryOptions class, in some cases + // it is impossible to figure out whether a query option has been set explicitly or left + // at its default setting, therefore this approach would not work. + public native static byte[] NativeParseQueryOptions(String csvQueryOptions, + byte[] queryOptions); + /** * Locally caches the jar at the specified HDFS location. * @@ -261,6 +274,38 @@ public class FeSupport { } } + private static byte[] ParseQueryOptions(String csvQueryOptions, byte[] queryOptions) { + try { + return NativeParseQueryOptions(csvQueryOptions, queryOptions); + } catch (UnsatisfiedLinkError e) { + loadLibrary(); + } + return NativeParseQueryOptions(csvQueryOptions, queryOptions); + } + + /** + * Parses a string of comma-separated key=value query options. Returns a TQueryOptions + * object that contains the updated query options. + */ + public static TQueryOptions ParseQueryOptions(String csvQueryOptions, + TQueryOptions queryOptions) throws InternalException { + Preconditions.checkNotNull(csvQueryOptions); + Preconditions.checkNotNull(queryOptions); + + TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory()); + try { + byte[] result = ParseQueryOptions(csvQueryOptions, + serializer.serialize(queryOptions)); + Preconditions.checkNotNull(result); + TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory()); + TQueryOptions updatedQueryOptions = new TQueryOptions(); + deserializer.deserialize(updatedQueryOptions, result); + return updatedQueryOptions; + } catch (TException e) { + throw new InternalException("Could not parse query options: " + e.getMessage(), e); + } + } + /** * This function should only be called explicitly by the FeSupport to ensure that * native functions are loaded. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/00fd8388/fe/src/test/java/org/apache/impala/planner/PlannerTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java index fc8ceab..f49e39c 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java @@ -297,6 +297,11 @@ public class PlannerTest extends PlannerTestBase { } @Test + public void testRuntimeFilterQueryOptions() { + runPlannerTestFile("runtime-filter-query-options"); + } + + @Test public void testConjunctOrdering() { runPlannerTestFile("conjunct-ordering"); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/00fd8388/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java index ae6488d..6932539 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java @@ -402,15 +402,8 @@ public class PlannerTestBase extends FrontendTestBase { * of 'testCase'. */ private void runTestCase(TestCase testCase, StringBuilder errorLog, - StringBuilder actualOutput, String dbName, TQueryOptions options, - boolean ignoreExplainHeader) + StringBuilder actualOutput, String dbName, boolean ignoreExplainHeader) throws CatalogException { - if (options == null) { - options = defaultQueryOptions(); - } else { - options = mergeQueryOptions(defaultQueryOptions(), options); - } - String query = testCase.getQuery(); LOG.info("running query " + query); if (query.isEmpty()) { @@ -419,7 +412,7 @@ public class PlannerTestBase extends FrontendTestBase { } TQueryCtx queryCtx = TestUtils.createQueryContext( dbName, System.getProperty("user.name")); - queryCtx.client_request.query_options = options; + queryCtx.client_request.query_options = testCase.getOptions(); // Test single node plan, scan range locations, and column lineage. TExecRequest singleNodeExecRequest = testPlan(testCase, Section.PLAN, queryCtx, ignoreExplainHeader, errorLog, actualOutput); @@ -736,7 +729,12 @@ public class PlannerTestBase extends FrontendTestBase { private void runPlannerTestFile(String testFile, String dbName, TQueryOptions options, boolean ignoreExplainHeader) { String fileName = testDir_.resolve(testFile + ".test").toString(); - TestFileParser queryFileParser = new TestFileParser(fileName); + if (options == null) { + options = defaultQueryOptions(); + } else { + options = mergeQueryOptions(defaultQueryOptions(), options); + } + TestFileParser queryFileParser = new TestFileParser(fileName, options); StringBuilder actualOutput = new StringBuilder(); queryFileParser.parseFile(); @@ -745,8 +743,7 @@ public class PlannerTestBase extends FrontendTestBase { actualOutput.append(testCase.getSectionAsString(Section.QUERY, true, "\n")); actualOutput.append("\n"); try { - runTestCase(testCase, errorLog, actualOutput, dbName, options, - ignoreExplainHeader); + runTestCase(testCase, errorLog, actualOutput, dbName, ignoreExplainHeader); } catch (CatalogException e) { errorLog.append(String.format("Failed to plan query\n%s\n%s", testCase.getQuery(), e.getMessage())); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/00fd8388/fe/src/test/java/org/apache/impala/testutil/TestFileParser.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/testutil/TestFileParser.java b/fe/src/test/java/org/apache/impala/testutil/TestFileParser.java index 5bfd018..0c62666 100644 --- a/fe/src/test/java/org/apache/impala/testutil/TestFileParser.java +++ b/fe/src/test/java/org/apache/impala/testutil/TestFileParser.java @@ -28,9 +28,14 @@ import java.util.EnumMap; import java.util.List; import java.util.Scanner; +import org.apache.impala.common.InternalException; +import org.apache.impala.service.FeSupport; +import org.apache.impala.thrift.TErrorCode; +import org.apache.impala.thrift.TQueryOptions; import org.apache.log4j.Logger; import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -40,6 +45,10 @@ import com.google.common.collect.Maps; * A test file has the following format: * * <QUERY STRING> + * ---- QUERYOPTIONS + * <QUERYOPTION1>=<VALUE1> + * <QUERYOPTION2>=<VALUE2> + * .. * ---- <SECTION NAME 1> * <EXPECTED CONTENTS 1> * ---- <SECTION NAME 2> @@ -48,6 +57,10 @@ import com.google.common.collect.Maps; * <EXPECTED CONTENTS 3> * ==== * <QUERY STRING> + * ---- QUERYOPTIONS + * <QUERYOPTION1>=<VALUE1> + * <QUERYOPTION2>=<VALUE2> + * .. * ---- <SECTION NAME 1> * <EXPECTED CONTENTS 1> * ---- <SECTION NAME 2> @@ -61,7 +74,7 @@ import com.google.common.collect.Maps; * without these lines included. * * Note that <QUERY STRING> and <EXPECTED CONTENTS> sections can consist of multiple - * lines. + * lines. QUERYOPTIONS sections may contain multiple <QUERYOPTION>=<VALUE> lines. */ public class TestFileParser { private static final Logger LOG = Logger.getLogger(TestCase.class); @@ -82,7 +95,8 @@ public class TestFileParser { SETUP, ERRORS, SCANRANGELOCATIONS, - LINEAGE; + LINEAGE, + QUERYOPTIONS; // Return header line for this section public String getHeader() { @@ -101,14 +115,18 @@ public class TestFileParser { // Line number in the test case file where this case started private final int startLineNum; + private TQueryOptions options; - public TestCase(int lineNum) { - startLineNum = lineNum; + public TestCase(int lineNum, TQueryOptions options) { + this.startLineNum = lineNum; + this.options = options; } - public int getStartingLineNum() { - return startLineNum; - } + public int getStartingLineNum() { return startLineNum; } + + public TQueryOptions getOptions() { return this.options; } + + public void setOptions(TQueryOptions options) { this.options = options; } protected void addSection(Section section, ArrayList<String> contents) { expectedResultSections.put(section, contents); @@ -210,6 +228,7 @@ public class TestFileParser { private BufferedReader reader; private Scanner scanner; private boolean hasSetupSection = false; + private TQueryOptions options; /** * For backwards compatibility, if no title is found this is the order in which @@ -218,8 +237,9 @@ public class TestFileParser { static private final ArrayList<Section> defaultSectionOrder = Lists.newArrayList(Section.QUERY, Section.TYPES, Section.RESULTS); - public TestFileParser(String fileName) { + public TestFileParser(String fileName, TQueryOptions options) { this.fileName = fileName; + this.options = options; } public List<TestCase> getTestCases() { @@ -255,7 +275,8 @@ public class TestFileParser { private TestCase parseOneTestCase() { Section currentSection = Section.QUERY; ArrayList<String> sectionContents = Lists.newArrayList(); - TestCase currentTestCase = new TestCase(lineNum); + // Each test case in the test file has its own copy of query options. + TestCase currentTestCase = new TestCase(lineNum, options.deepCopy()); int sectionCount = 0; while (scanner.hasNextLine()) { @@ -263,6 +284,7 @@ public class TestFileParser { ++lineNum; if (line.startsWith("====") && sectionCount > 0) { currentTestCase.addSection(currentSection, sectionContents); + parseQueryOptions(currentTestCase); if (!currentTestCase.isValid()) { throw new IllegalStateException("Invalid test case" + " at line " + currentTestCase.startLineNum + " detected."); @@ -302,6 +324,10 @@ public class TestFileParser { LOG.warn("No section header found. Guessing: " + currentSection); } + if (!currentTestCase.getSectionContents(currentSection).isEmpty()) { + throw new IllegalStateException("Duplicate sections are not allowed: " + + currentSection); + } sectionContents = Lists.newArrayList(); } else { sectionContents.add(line); @@ -317,6 +343,25 @@ public class TestFileParser { } /** + * Parses QUERYOPTIONS section. Adds the parsed query options to "testCase.options". + * Throws an IllegalStateException if parsing failed. + */ + private void parseQueryOptions(TestCase testCase) { + String optionsStr = testCase.getSectionAsString(Section.QUERYOPTIONS, false, ","); + if (optionsStr == null || optionsStr.isEmpty()) return; + + TQueryOptions result = null; + try { + result = FeSupport.ParseQueryOptions(optionsStr, testCase.getOptions()); + } catch (InternalException e) { + throw new IllegalStateException("Failed to parse query options: " + optionsStr + + " - " + e.getMessage(), e); + } + Preconditions.checkNotNull(result); + testCase.setOptions(result); + } + + /** * Parses a test file in its entirety and constructs a list of TestCases. */ public void parseFile() { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/00fd8388/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-query-options.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-query-options.test b/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-query-options.test new file mode 100644 index 0000000..a8909be --- /dev/null +++ b/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-query-options.test @@ -0,0 +1,535 @@ +# Default query options +select /* +straight_join */ count(*) from functional.alltypes a + join /* +broadcast */ functional.alltypes b on b.id = a.id and + b.date_string_col = a.date_string_col + join /* +broadcast */ functional.alltypes c on c.month = a.month and + c.int_col = a.int_col + join /* +shuffle */ functional.alltypes d on d.bool_col = a.bool_col and d.year = a.year +---- PLAN +PLAN-ROOT SINK +| +07:AGGREGATE [FINALIZE] +| output: count(*) +| +06:HASH JOIN [INNER JOIN] +| hash predicates: a.bool_col = d.bool_col, a.year = d.year +| runtime filters: RF000 <- d.bool_col, RF001 <- d.year +| +|--03:SCAN HDFS [functional.alltypes d] +| partitions=24/24 files=24 size=478.45KB +| +05:HASH JOIN [INNER JOIN] +| hash predicates: a.int_col = c.int_col, a.month = c.month +| runtime filters: RF002 <- c.int_col, RF003 <- c.month +| +|--02:SCAN HDFS [functional.alltypes c] +| partitions=24/24 files=24 size=478.45KB +| +04:HASH JOIN [INNER JOIN] +| hash predicates: a.id = b.id, a.date_string_col = b.date_string_col +| runtime filters: RF004 <- b.id, RF005 <- b.date_string_col +| +|--01:SCAN HDFS [functional.alltypes b] +| partitions=24/24 files=24 size=478.45KB +| +00:SCAN HDFS [functional.alltypes a] + partitions=24/24 files=24 size=478.45KB + runtime filters: RF000 -> a.bool_col, RF001 -> a.year, RF002 -> a.int_col, RF003 -> a.month, RF004 -> a.id, RF005 -> a.date_string_col +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +13:AGGREGATE [FINALIZE] +| output: count:merge(*) +| +12:EXCHANGE [UNPARTITIONED] +| +07:AGGREGATE +| output: count(*) +| +06:HASH JOIN [INNER JOIN, PARTITIONED] +| hash predicates: a.bool_col = d.bool_col, a.year = d.year +| runtime filters: RF000 <- d.bool_col, RF001 <- d.year +| +|--11:EXCHANGE [HASH(d.bool_col,d.year)] +| | +| 03:SCAN HDFS [functional.alltypes d] +| partitions=24/24 files=24 size=478.45KB +| +10:EXCHANGE [HASH(a.bool_col,a.year)] +| +05:HASH JOIN [INNER JOIN, BROADCAST] +| hash predicates: a.int_col = c.int_col, a.month = c.month +| runtime filters: RF002 <- c.int_col, RF003 <- c.month +| +|--09:EXCHANGE [BROADCAST] +| | +| 02:SCAN HDFS [functional.alltypes c] +| partitions=24/24 files=24 size=478.45KB +| +04:HASH JOIN [INNER JOIN, BROADCAST] +| hash predicates: a.id = b.id, a.date_string_col = b.date_string_col +| runtime filters: RF004 <- b.id, RF005 <- b.date_string_col +| +|--08:EXCHANGE [BROADCAST] +| | +| 01:SCAN HDFS [functional.alltypes b] +| partitions=24/24 files=24 size=478.45KB +| +00:SCAN HDFS [functional.alltypes a] + partitions=24/24 files=24 size=478.45KB + runtime filters: RF000 -> a.bool_col, RF001 -> a.year, RF002 -> a.int_col, RF003 -> a.month, RF004 -> a.id, RF005 -> a.date_string_col +==== +# Keep only MAX_NUM_RUNTIME_FILTERS most selective filters, remove the rest. +# In this query RF000 (<- d.bool_col) and RF001 (<- d.year) are the least selective +# filters. +select /* +straight_join */ count(*) from functional.alltypes a + join /* +broadcast */ functional.alltypes b on b.id = a.id and + b.date_string_col = a.date_string_col + join /* +broadcast */ functional.alltypes c on c.month = a.month and + c.int_col = a.int_col + join /* +shuffle */ functional.alltypes d on d.bool_col = a.bool_col and d.year = a.year +---- QUERYOPTIONS +MAX_NUM_RUNTIME_FILTERS=4 +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +13:AGGREGATE [FINALIZE] +| output: count:merge(*) +| +12:EXCHANGE [UNPARTITIONED] +| +07:AGGREGATE +| output: count(*) +| +06:HASH JOIN [INNER JOIN, PARTITIONED] +| hash predicates: a.bool_col = d.bool_col, a.year = d.year +| +|--11:EXCHANGE [HASH(d.bool_col,d.year)] +| | +| 03:SCAN HDFS [functional.alltypes d] +| partitions=24/24 files=24 size=478.45KB +| +10:EXCHANGE [HASH(a.bool_col,a.year)] +| +05:HASH JOIN [INNER JOIN, BROADCAST] +| hash predicates: a.int_col = c.int_col, a.month = c.month +| runtime filters: RF002 <- c.int_col, RF003 <- c.month +| +|--09:EXCHANGE [BROADCAST] +| | +| 02:SCAN HDFS [functional.alltypes c] +| partitions=24/24 files=24 size=478.45KB +| +04:HASH JOIN [INNER JOIN, BROADCAST] +| hash predicates: a.id = b.id, a.date_string_col = b.date_string_col +| runtime filters: RF004 <- b.id, RF005 <- b.date_string_col +| +|--08:EXCHANGE [BROADCAST] +| | +| 01:SCAN HDFS [functional.alltypes b] +| partitions=24/24 files=24 size=478.45KB +| +00:SCAN HDFS [functional.alltypes a] + partitions=24/24 files=24 size=478.45KB + runtime filters: RF004 -> a.id, RF005 -> a.date_string_col, RF002 -> a.int_col, RF003 -> a.month +==== +# DISABLE_ROW_RUNTIME_FILTERING is set: only partition column filters are applied. +select /* +straight_join */ count(*) from functional.alltypes a + join /* +broadcast */ functional.alltypes b on b.id = a.id and + b.date_string_col = a.date_string_col + join /* +broadcast */ functional.alltypes c on c.month = a.month and + c.int_col = a.int_col + join /* +shuffle */ functional.alltypes d on d.bool_col = a.bool_col and d.year = a.year +---- QUERYOPTIONS +DISABLE_ROW_RUNTIME_FILTERING=true +---- PLAN +PLAN-ROOT SINK +| +07:AGGREGATE [FINALIZE] +| output: count(*) +| +06:HASH JOIN [INNER JOIN] +| hash predicates: a.bool_col = d.bool_col, a.year = d.year +| runtime filters: RF001 <- d.year +| +|--03:SCAN HDFS [functional.alltypes d] +| partitions=24/24 files=24 size=478.45KB +| +05:HASH JOIN [INNER JOIN] +| hash predicates: a.int_col = c.int_col, a.month = c.month +| runtime filters: RF003 <- c.month +| +|--02:SCAN HDFS [functional.alltypes c] +| partitions=24/24 files=24 size=478.45KB +| +04:HASH JOIN [INNER JOIN] +| hash predicates: a.id = b.id, a.date_string_col = b.date_string_col +| +|--01:SCAN HDFS [functional.alltypes b] +| partitions=24/24 files=24 size=478.45KB +| +00:SCAN HDFS [functional.alltypes a] + partitions=24/24 files=24 size=478.45KB + runtime filters: RF001 -> a.year, RF003 -> a.month +==== +# DISABLE_ROW_RUNTIME_FILTERING is set and MAX_NUM_RUNTIME_FILTERS is set to 2: only the 2 +# partition column filters are applied +select /* +straight_join */ count(*) from functional.alltypes a + join /* +broadcast */ functional.alltypes b on b.id = a.id and + b.date_string_col = a.date_string_col + join /* +broadcast */ functional.alltypes c on c.month = a.month and + c.int_col = a.int_col + join /* +shuffle */ functional.alltypes d on d.bool_col = a.bool_col and d.year = a.year +---- QUERYOPTIONS +DISABLE_ROW_RUNTIME_FILTERING=true +MAX_NUM_RUNTIME_FILTERS=2 +---- PLAN +PLAN-ROOT SINK +| +07:AGGREGATE [FINALIZE] +| output: count(*) +| +06:HASH JOIN [INNER JOIN] +| hash predicates: a.bool_col = d.bool_col, a.year = d.year +| runtime filters: RF001 <- d.year +| +|--03:SCAN HDFS [functional.alltypes d] +| partitions=24/24 files=24 size=478.45KB +| +05:HASH JOIN [INNER JOIN] +| hash predicates: a.int_col = c.int_col, a.month = c.month +| runtime filters: RF003 <- c.month +| +|--02:SCAN HDFS [functional.alltypes c] +| partitions=24/24 files=24 size=478.45KB +| +04:HASH JOIN [INNER JOIN] +| hash predicates: a.id = b.id, a.date_string_col = b.date_string_col +| +|--01:SCAN HDFS [functional.alltypes b] +| partitions=24/24 files=24 size=478.45KB +| +00:SCAN HDFS [functional.alltypes a] + partitions=24/24 files=24 size=478.45KB + runtime filters: RF001 -> a.year, RF003 -> a.month +==== +# RUNTIME_FILTER_MODE is set to LOCAL: only local filters are applied +select /* +straight_join */ count(*) from functional.alltypes a + join /* +broadcast */ functional.alltypes b on b.id = a.id and + b.date_string_col = a.date_string_col + join /* +broadcast */ functional.alltypes c on c.month = a.month and + c.int_col = a.int_col + join /* +shuffle */ functional.alltypes d on d.bool_col = a.bool_col and d.year = a.year +---- QUERYOPTIONS +RUNTIME_FILTER_MODE=LOCAL +---- PLAN +PLAN-ROOT SINK +| +07:AGGREGATE [FINALIZE] +| output: count(*) +| +06:HASH JOIN [INNER JOIN] +| hash predicates: a.bool_col = d.bool_col, a.year = d.year +| runtime filters: RF000 <- d.bool_col, RF001 <- d.year +| +|--03:SCAN HDFS [functional.alltypes d] +| partitions=24/24 files=24 size=478.45KB +| +05:HASH JOIN [INNER JOIN] +| hash predicates: a.int_col = c.int_col, a.month = c.month +| runtime filters: RF002 <- c.int_col, RF003 <- c.month +| +|--02:SCAN HDFS [functional.alltypes c] +| partitions=24/24 files=24 size=478.45KB +| +04:HASH JOIN [INNER JOIN] +| hash predicates: a.id = b.id, a.date_string_col = b.date_string_col +| runtime filters: RF004 <- b.id, RF005 <- b.date_string_col +| +|--01:SCAN HDFS [functional.alltypes b] +| partitions=24/24 files=24 size=478.45KB +| +00:SCAN HDFS [functional.alltypes a] + partitions=24/24 files=24 size=478.45KB + runtime filters: RF000 -> a.bool_col, RF001 -> a.year, RF002 -> a.int_col, RF003 -> a.month, RF004 -> a.id, RF005 -> a.date_string_col +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +13:AGGREGATE [FINALIZE] +| output: count:merge(*) +| +12:EXCHANGE [UNPARTITIONED] +| +07:AGGREGATE +| output: count(*) +| +06:HASH JOIN [INNER JOIN, PARTITIONED] +| hash predicates: a.bool_col = d.bool_col, a.year = d.year +| +|--11:EXCHANGE [HASH(d.bool_col,d.year)] +| | +| 03:SCAN HDFS [functional.alltypes d] +| partitions=24/24 files=24 size=478.45KB +| +10:EXCHANGE [HASH(a.bool_col,a.year)] +| +05:HASH JOIN [INNER JOIN, BROADCAST] +| hash predicates: a.int_col = c.int_col, a.month = c.month +| runtime filters: RF002 <- c.int_col, RF003 <- c.month +| +|--09:EXCHANGE [BROADCAST] +| | +| 02:SCAN HDFS [functional.alltypes c] +| partitions=24/24 files=24 size=478.45KB +| +04:HASH JOIN [INNER JOIN, BROADCAST] +| hash predicates: a.id = b.id, a.date_string_col = b.date_string_col +| runtime filters: RF004 <- b.id, RF005 <- b.date_string_col +| +|--08:EXCHANGE [BROADCAST] +| | +| 01:SCAN HDFS [functional.alltypes b] +| partitions=24/24 files=24 size=478.45KB +| +00:SCAN HDFS [functional.alltypes a] + partitions=24/24 files=24 size=478.45KB + runtime filters: RF002 -> a.int_col, RF003 -> a.month, RF004 -> a.id, RF005 -> a.date_string_col +==== +# RUNTIME_FILTER_MODE is set to LOCAL and MAX_NUM_RUNTIME_FILTERS is set to 3: only 3 +# local filters are kept, which means that both local and non-local filters are removed +# from the distributed plan. +select /* +straight_join */ count(*) from functional.alltypes a + join /* +broadcast */ functional.alltypes b on b.id = a.id and + b.date_string_col = a.date_string_col + join /* +broadcast */ functional.alltypes c on c.month = a.month and + c.int_col = a.int_col + join /* +shuffle */ functional.alltypes d on d.bool_col = a.bool_col and d.year = a.year +---- QUERYOPTIONS +RUNTIME_FILTER_MODE=LOCAL +MAX_NUM_RUNTIME_FILTERS=3 +---- PLAN +PLAN-ROOT SINK +| +07:AGGREGATE [FINALIZE] +| output: count(*) +| +06:HASH JOIN [INNER JOIN] +| hash predicates: a.bool_col = d.bool_col, a.year = d.year +| +|--03:SCAN HDFS [functional.alltypes d] +| partitions=24/24 files=24 size=478.45KB +| +05:HASH JOIN [INNER JOIN] +| hash predicates: a.int_col = c.int_col, a.month = c.month +| runtime filters: RF002 <- c.int_col +| +|--02:SCAN HDFS [functional.alltypes c] +| partitions=24/24 files=24 size=478.45KB +| +04:HASH JOIN [INNER JOIN] +| hash predicates: a.id = b.id, a.date_string_col = b.date_string_col +| runtime filters: RF004 <- b.id, RF005 <- b.date_string_col +| +|--01:SCAN HDFS [functional.alltypes b] +| partitions=24/24 files=24 size=478.45KB +| +00:SCAN HDFS [functional.alltypes a] + partitions=24/24 files=24 size=478.45KB + runtime filters: RF004 -> a.id, RF005 -> a.date_string_col, RF002 -> a.int_col +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +13:AGGREGATE [FINALIZE] +| output: count:merge(*) +| +12:EXCHANGE [UNPARTITIONED] +| +07:AGGREGATE +| output: count(*) +| +06:HASH JOIN [INNER JOIN, PARTITIONED] +| hash predicates: a.bool_col = d.bool_col, a.year = d.year +| +|--11:EXCHANGE [HASH(d.bool_col,d.year)] +| | +| 03:SCAN HDFS [functional.alltypes d] +| partitions=24/24 files=24 size=478.45KB +| +10:EXCHANGE [HASH(a.bool_col,a.year)] +| +05:HASH JOIN [INNER JOIN, BROADCAST] +| hash predicates: a.int_col = c.int_col, a.month = c.month +| runtime filters: RF002 <- c.int_col +| +|--09:EXCHANGE [BROADCAST] +| | +| 02:SCAN HDFS [functional.alltypes c] +| partitions=24/24 files=24 size=478.45KB +| +04:HASH JOIN [INNER JOIN, BROADCAST] +| hash predicates: a.id = b.id, a.date_string_col = b.date_string_col +| runtime filters: RF004 <- b.id, RF005 <- b.date_string_col +| +|--08:EXCHANGE [BROADCAST] +| | +| 01:SCAN HDFS [functional.alltypes b] +| partitions=24/24 files=24 size=478.45KB +| +00:SCAN HDFS [functional.alltypes a] + partitions=24/24 files=24 size=478.45KB + runtime filters: RF004 -> a.id, RF005 -> a.date_string_col, RF002 -> a.int_col +==== +# DISABLE_ROW_RUNTIME_FILTERING is set and RUNTIME_FILTER_MODE is set to LOCAL: only local +# partition column filters are applied +select /* +straight_join */ count(*) from functional.alltypes a + join /* +broadcast */ functional.alltypes b on b.id = a.id and + b.date_string_col = a.date_string_col + join /* +broadcast */ functional.alltypes c on c.month = a.month and + c.int_col = a.int_col + join /* +shuffle */ functional.alltypes d on d.bool_col = a.bool_col and d.year = a.year +---- QUERYOPTIONS +DISABLE_ROW_RUNTIME_FILTERING=true +RUNTIME_FILTER_MODE=LOCAL +---- PLAN +PLAN-ROOT SINK +| +07:AGGREGATE [FINALIZE] +| output: count(*) +| +06:HASH JOIN [INNER JOIN] +| hash predicates: a.bool_col = d.bool_col, a.year = d.year +| runtime filters: RF001 <- d.year +| +|--03:SCAN HDFS [functional.alltypes d] +| partitions=24/24 files=24 size=478.45KB +| +05:HASH JOIN [INNER JOIN] +| hash predicates: a.int_col = c.int_col, a.month = c.month +| runtime filters: RF003 <- c.month +| +|--02:SCAN HDFS [functional.alltypes c] +| partitions=24/24 files=24 size=478.45KB +| +04:HASH JOIN [INNER JOIN] +| hash predicates: a.id = b.id, a.date_string_col = b.date_string_col +| +|--01:SCAN HDFS [functional.alltypes b] +| partitions=24/24 files=24 size=478.45KB +| +00:SCAN HDFS [functional.alltypes a] + partitions=24/24 files=24 size=478.45KB + runtime filters: RF001 -> a.year, RF003 -> a.month +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +13:AGGREGATE [FINALIZE] +| output: count:merge(*) +| +12:EXCHANGE [UNPARTITIONED] +| +07:AGGREGATE +| output: count(*) +| +06:HASH JOIN [INNER JOIN, PARTITIONED] +| hash predicates: a.bool_col = d.bool_col, a.year = d.year +| +|--11:EXCHANGE [HASH(d.bool_col,d.year)] +| | +| 03:SCAN HDFS [functional.alltypes d] +| partitions=24/24 files=24 size=478.45KB +| +10:EXCHANGE [HASH(a.bool_col,a.year)] +| +05:HASH JOIN [INNER JOIN, BROADCAST] +| hash predicates: a.int_col = c.int_col, a.month = c.month +| runtime filters: RF003 <- c.month +| +|--09:EXCHANGE [BROADCAST] +| | +| 02:SCAN HDFS [functional.alltypes c] +| partitions=24/24 files=24 size=478.45KB +| +04:HASH JOIN [INNER JOIN, BROADCAST] +| hash predicates: a.id = b.id, a.date_string_col = b.date_string_col +| +|--08:EXCHANGE [BROADCAST] +| | +| 01:SCAN HDFS [functional.alltypes b] +| partitions=24/24 files=24 size=478.45KB +| +00:SCAN HDFS [functional.alltypes a] + partitions=24/24 files=24 size=478.45KB + runtime filters: RF003 -> a.month +==== +# RUNTIME_FILTER_MODE is OFF: no filters are applied +select /* +straight_join */ count(*) from functional.alltypes a + join /* +broadcast */ functional.alltypes b on b.id = a.id and + b.date_string_col = a.date_string_col + join /* +broadcast */ functional.alltypes c on c.month = a.month and + c.int_col = a.int_col + join /* +shuffle */ functional.alltypes d on d.bool_col = a.bool_col and d.year = a.year +---- QUERYOPTIONS +RUNTIME_FILTER_MODE=OFF +---- PLAN +PLAN-ROOT SINK +| +07:AGGREGATE [FINALIZE] +| output: count(*) +| +06:HASH JOIN [INNER JOIN] +| hash predicates: a.bool_col = d.bool_col, a.year = d.year +| +|--03:SCAN HDFS [functional.alltypes d] +| partitions=24/24 files=24 size=478.45KB +| +05:HASH JOIN [INNER JOIN] +| hash predicates: a.int_col = c.int_col, a.month = c.month +| +|--02:SCAN HDFS [functional.alltypes c] +| partitions=24/24 files=24 size=478.45KB +| +04:HASH JOIN [INNER JOIN] +| hash predicates: a.id = b.id, a.date_string_col = b.date_string_col +| +|--01:SCAN HDFS [functional.alltypes b] +| partitions=24/24 files=24 size=478.45KB +| +00:SCAN HDFS [functional.alltypes a] + partitions=24/24 files=24 size=478.45KB +==== +# MAX_NUM_RUNTIME_FILTERS is 0: no filters are applied +select /* +straight_join */ count(*) from functional.alltypes a + join /* +broadcast */ functional.alltypes b on b.id = a.id and + b.date_string_col = a.date_string_col + join /* +broadcast */ functional.alltypes c on c.month = a.month and + c.int_col = a.int_col + join /* +shuffle */ functional.alltypes d on d.bool_col = a.bool_col and d.year = a.year +---- QUERYOPTIONS +MAX_NUM_RUNTIME_FILTERS=0 +---- PLAN +PLAN-ROOT SINK +| +07:AGGREGATE [FINALIZE] +| output: count(*) +| +06:HASH JOIN [INNER JOIN] +| hash predicates: a.bool_col = d.bool_col, a.year = d.year +| +|--03:SCAN HDFS [functional.alltypes d] +| partitions=24/24 files=24 size=478.45KB +| +05:HASH JOIN [INNER JOIN] +| hash predicates: a.int_col = c.int_col, a.month = c.month +| +|--02:SCAN HDFS [functional.alltypes c] +| partitions=24/24 files=24 size=478.45KB +| +04:HASH JOIN [INNER JOIN] +| hash predicates: a.id = b.id, a.date_string_col = b.date_string_col +| +|--01:SCAN HDFS [functional.alltypes b] +| partitions=24/24 files=24 size=478.45KB +| +00:SCAN HDFS [functional.alltypes a] + partitions=24/24 files=24 size=478.45KB +====
