Repository: incubator-impala
Updated Branches:
  refs/heads/master e301ca641 -> f2cd5bd51


IMPALA-3548: Prune runtime filters based on query options in the FE

Currently, the FE generates a number of runtime filters and assigns
them to the single node plan without taking the value of
RUNTIME_FILTER_MODE and DISABLE_ROW_RUNTIME_FILTERING query options
into account.

The backend then removes filters from exec nodes, based on the
following rules:
1. If DISABLE_ROW_RUNTIME_FILTERING is set, filters are removed from
   the exec nodes that are marked as targets not bound by partition
   columns.

2. If RUNTIME_FILTER_MODE is set to LOCAL, filters are removed from
   the exec nodes that are marked as remote targets.

This may cause some confusion to users because they may see runtime
filters in the output of explain that are not applied when the query
is executed.

This change moves the logic of runtime filter pruning to the planner
in the FE. The runtime filter assignment is done on the distributed
plan and the above constraints are enforced there directly.

Change-Id: Id0f0b200e02442edcad8df3979f652d66c6e52eb
Reviewed-on: http://gerrit.cloudera.org:8080/7564
Tested-by: Impala Public Jenkins
Reviewed-by: Alex Behm <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/00fd8388
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/00fd8388
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/00fd8388

Branch: refs/heads/master
Commit: 00fd8388c37e1b3207dddf564463b7eeafe3f887
Parents: e301ca6
Author: Attila Jeges <[email protected]>
Authored: Tue Jul 18 17:25:23 2017 +0200
Committer: Alex Behm <[email protected]>
Committed: Thu Oct 26 17:06:32 2017 +0000

----------------------------------------------------------------------
 be/src/exec/partitioned-hash-join-builder.cc    |  14 +-
 be/src/exec/scan-node.cc                        |  12 +-
 be/src/runtime/coordinator.cc                   |   8 +-
 be/src/service/fe-support.cc                    |  30 ++
 .../impala/planner/DistributedPlanner.java      |   9 -
 .../java/org/apache/impala/planner/Planner.java |  15 +-
 .../impala/planner/RuntimeFilterGenerator.java  | 126 +++--
 .../org/apache/impala/service/FeSupport.java    |  45 ++
 .../org/apache/impala/planner/PlannerTest.java  |   5 +
 .../apache/impala/planner/PlannerTestBase.java  |  21 +-
 .../apache/impala/testutil/TestFileParser.java  |  63 ++-
 .../runtime-filter-query-options.test           | 535 +++++++++++++++++++
 12 files changed, 775 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/00fd8388/be/src/exec/partitioned-hash-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.cc 
b/be/src/exec/partitioned-hash-join-builder.cc
index 4b49fb0..bf5f891 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -103,16 +103,10 @@ Status PhjBuilder::InitExprsAndFilters(RuntimeState* 
state,
   }
 
   for (const TRuntimeFilterDesc& filter_desc : filter_descs) {
-    // If filter propagation not enabled, only consider building broadcast 
joins (that
-    // may be consumed by this fragment).
-    if (state->query_options().runtime_filter_mode != 
TRuntimeFilterMode::GLOBAL &&
-        !filter_desc.is_broadcast_join) {
-      continue;
-    }
-    if (state->query_options().disable_row_runtime_filtering &&
-        !filter_desc.applied_on_partition_columns) {
-      continue;
-    }
+    DCHECK(state->query_options().runtime_filter_mode == 
TRuntimeFilterMode::GLOBAL ||
+        filter_desc.is_broadcast_join);
+    DCHECK(!state->query_options().disable_row_runtime_filtering ||
+        filter_desc.applied_on_partition_columns);
     ScalarExpr* filter_expr;
     RETURN_IF_ERROR(
         ScalarExpr::Create(filter_desc.src_expr, *row_desc_, state, 
&filter_expr));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/00fd8388/be/src/exec/scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index fd2453d..18fc473 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -62,14 +62,10 @@ Status ScanNode::Init(const TPlanNode& tnode, RuntimeState* 
state) {
     auto it = filter_desc.planid_to_target_ndx.find(tnode.node_id);
     DCHECK(it != filter_desc.planid_to_target_ndx.end());
     const TRuntimeFilterTargetDesc& target = filter_desc.targets[it->second];
-    if (state->query_options().runtime_filter_mode == 
TRuntimeFilterMode::LOCAL &&
-        !target.is_local_target) {
-      continue;
-    }
-    if (query_options.disable_row_runtime_filtering &&
-        !target.is_bound_by_partition_columns) {
-      continue;
-    }
+    DCHECK(state->query_options().runtime_filter_mode == 
TRuntimeFilterMode::GLOBAL ||
+        target.is_local_target);
+    DCHECK(!query_options.disable_row_runtime_filtering ||
+        target.is_bound_by_partition_columns);
     ScalarExpr* filter_expr;
     RETURN_IF_ERROR(
         ScalarExpr::Create(target.target_expr, *row_desc(), state, 
&filter_expr));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/00fd8388/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index e26919d..af3ec21 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -281,9 +281,7 @@ void Coordinator::InitFilterRoutingTable() {
     for (const TPlanNode& plan_node: fragment_params.fragment.plan.nodes) {
       if (!plan_node.__isset.runtime_filters) continue;
       for (const TRuntimeFilterDesc& filter: plan_node.runtime_filters) {
-        if (filter_mode_ == TRuntimeFilterMode::LOCAL && 
!filter.has_local_targets) {
-          continue;
-        }
+        DCHECK(filter_mode_ == TRuntimeFilterMode::GLOBAL || 
filter.has_local_targets);
         FilterRoutingTable::iterator i = filter_routing_table_.emplace(
             filter.filter_id, FilterState(filter, plan_node.node_id)).first;
         FilterState* f = &(i->second);
@@ -317,9 +315,7 @@ void Coordinator::InitFilterRoutingTable() {
           auto it = filter.planid_to_target_ndx.find(plan_node.node_id);
           DCHECK(it != filter.planid_to_target_ndx.end());
           const TRuntimeFilterTargetDesc& t_target = 
filter.targets[it->second];
-          if (filter_mode_ == TRuntimeFilterMode::LOCAL && 
!t_target.is_local_target) {
-            continue;
-          }
+          DCHECK(filter_mode_ == TRuntimeFilterMode::GLOBAL || 
t_target.is_local_target);
           f->targets()->emplace_back(t_target, fragment_params.fragment.idx);
         } else {
           DCHECK(false) << "Unexpected plan node with runtime filters: "

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/00fd8388/be/src/service/fe-support.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index b7a5e81..35b5a15 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -41,6 +41,7 @@
 #include "runtime/raw-value.h"
 #include "runtime/runtime-state.h"
 #include "service/impala-server.h"
+#include "service/query-options.h"
 #include "util/cpu-info.h"
 #include "util/debug-util.h"
 #include "util/disk-info.h"
@@ -445,6 +446,31 @@ 
Java_org_apache_impala_service_FeSupport_NativePrioritizeLoad(
   return result_bytes;
 }
 
+// Used to call native code from the FE to parse and set comma-delimited 
key=value query
+// options.
+extern "C"
+JNIEXPORT jbyteArray JNICALL
+Java_org_apache_impala_service_FeSupport_NativeParseQueryOptions(
+    JNIEnv* env, jclass caller_class, jstring csv_query_options,
+    jbyteArray tquery_options) {
+  TQueryOptions options;
+  THROW_IF_ERROR_RET(DeserializeThriftMsg(env, tquery_options, &options), env,
+      JniUtil::internal_exc_class(), nullptr);
+
+  JniUtfCharGuard csv_query_options_guard;
+  THROW_IF_ERROR_RET(
+      JniUtfCharGuard::create(env, csv_query_options, 
&csv_query_options_guard), env,
+      JniUtil::internal_exc_class(), nullptr);
+  THROW_IF_ERROR_RET(
+      impala::ParseQueryOptions(csv_query_options_guard.get(), &options, 
NULL), env,
+      JniUtil::internal_exc_class(), nullptr);
+
+  jbyteArray result_bytes = NULL;
+  THROW_IF_ERROR_RET(SerializeThriftMsg(env, &options, &result_bytes), env,
+      JniUtil::internal_exc_class(), result_bytes);
+  return result_bytes;
+}
+
 namespace impala {
 
 static JNINativeMethod native_methods[] = {
@@ -468,6 +494,10 @@ static JNINativeMethod native_methods[] = {
     (char*)"NativePrioritizeLoad", (char*)"([B)[B",
     (void*)::Java_org_apache_impala_service_FeSupport_NativePrioritizeLoad
   },
+  {
+    (char*)"NativeParseQueryOptions", (char*)"(Ljava/lang/String;[B)[B",
+    (void*)::Java_org_apache_impala_service_FeSupport_NativeParseQueryOptions
+  },
 };
 
 void InitFeSupport(bool disable_codegen) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/00fd8388/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java 
b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index 6571036..e1d83d7 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -488,15 +488,6 @@ public class DistributedPlanner {
           lhsHasCompatPartition, rhsHasCompatPartition, leftChildFragment,
           rightChildFragment, lhsJoinExprs, rhsJoinExprs, fragments);
     }
-
-    for (RuntimeFilter filter: node.getRuntimeFilters()) {
-      filter.setIsBroadcast(distrMode == DistributionMode.BROADCAST);
-      filter.computeHasLocalTargets();
-      // Work around IMPALA-3450, where cardinalities might be wrong in 
single-node plans
-      // with UNION and LIMITs.
-      // TODO: Remove.
-      filter.computeNdvEstimate();
-    }
     return hjFragment;
  }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/00fd8388/fe/src/main/java/org/apache/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java 
b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 7992b7b..de4f8df 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -105,15 +105,6 @@ public class Planner {
     invertJoins(singleNodePlan, ctx_.isSingleNodeExec());
     singleNodePlan = useNljForSingularRowBuilds(singleNodePlan, 
ctx_.getRootAnalyzer());
 
-    // create runtime filters
-    if (ctx_.getQueryOptions().getRuntime_filter_mode() != 
TRuntimeFilterMode.OFF) {
-      // Always compute filters, even if the BE won't always use all of them.
-      RuntimeFilterGenerator.generateRuntimeFilters(ctx_.getRootAnalyzer(),
-          singleNodePlan, ctx_.getQueryOptions().getMax_num_runtime_filters());
-      ctx_.getAnalysisResult().getTimeline().markEvent(
-          "Runtime filters computed");
-    }
-
     singleNodePlanner.validatePlan(singleNodePlan);
 
     if (ctx_.isSingleNodeExec()) {
@@ -125,7 +116,13 @@ public class Planner {
       fragments = distributedPlanner.createPlanFragments(singleNodePlan);
     }
 
+    // Create runtime filters.
     PlanFragment rootFragment = fragments.get(fragments.size() - 1);
+    if (ctx_.getQueryOptions().getRuntime_filter_mode() != 
TRuntimeFilterMode.OFF) {
+      RuntimeFilterGenerator.generateRuntimeFilters(ctx_, 
rootFragment.getPlanRoot());
+      ctx_.getAnalysisResult().getTimeline().markEvent("Runtime filters 
computed");
+    }
+
     rootFragment.verifyTree();
     ExprSubstitutionMap rootNodeSmap = 
rootFragment.getPlanRoot().getOutputSmap();
     List<Expr> resultExprs = null;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/00fd8388/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java 
b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
index 646eb48..98365e6 100644
--- a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
+++ b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
@@ -39,9 +39,12 @@ import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.IdGenerator;
+import org.apache.impala.planner.JoinNode.DistributionMode;
 import org.apache.impala.planner.PlanNode;
 import org.apache.impala.thrift.TRuntimeFilterDesc;
+import org.apache.impala.thrift.TRuntimeFilterMode;
 import org.apache.impala.thrift.TRuntimeFilterTargetDesc;
+
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -138,14 +141,17 @@ public final class RuntimeFilterGenerator {
       // Expr on which the filter is applied
       public Expr expr;
       // Indicates if 'expr' is bound only by partition columns
-      public boolean isBoundByPartitionColumns = false;
-      // Indicates if 'node' is in the same fragment as the join that produces 
the
-      // filter
-      public boolean isLocalTarget = false;
+      public final boolean isBoundByPartitionColumns;
+      // Indicates if 'node' is in the same fragment as the join that produces 
the filter
+      public final boolean isLocalTarget;
 
-      public RuntimeFilterTarget(ScanNode targetNode, Expr targetExpr) {
+      public RuntimeFilterTarget(ScanNode targetNode, Expr targetExpr,
+          boolean isBoundByPartitionColumns, boolean isLocalTarget) {
+        
Preconditions.checkState(targetExpr.isBoundByTupleIds(targetNode.getTupleIds()));
         node = targetNode;
         expr = targetExpr;
+        this.isBoundByPartitionColumns = isBoundByPartitionColumns;
+        this.isLocalTarget = isLocalTarget;
       }
 
       public TRuntimeFilterTargetDesc toThrift() {
@@ -346,25 +352,7 @@ public final class RuntimeFilterGenerator {
       return src_.getCardinality() / (double) 
src_.getChild(0).getCardinality();
     }
 
-    public void addTarget(ScanNode node, Analyzer analyzer, Expr targetExpr) {
-      
Preconditions.checkState(targetExpr.isBoundByTupleIds(node.getTupleIds()));
-      RuntimeFilterTarget target = new RuntimeFilterTarget(node, targetExpr);
-      targets_.add(target);
-      // Check if all the slots of targetExpr_ are bound by partition columns
-      TupleDescriptor baseTblDesc = node.getTupleDesc();
-      Table tbl = baseTblDesc.getTable();
-      if (tbl.getNumClusteringCols() == 0) return;
-      List<SlotId> sids = Lists.newArrayList();
-      targetExpr.getIds(null, sids);
-      for (SlotId sid: sids) {
-        SlotDescriptor slotDesc = analyzer.getSlotDesc(sid);
-        if (slotDesc.getColumn() == null
-            || slotDesc.getColumn().getPosition() >= 
tbl.getNumClusteringCols()) {
-          return;
-        }
-      }
-      target.isBoundByPartitionColumns = true;
-    }
+    public void addTarget(RuntimeFilterTarget target) { targets_.add(target); }
 
     public void setIsBroadcast(boolean isBroadcast) { isBroadcastJoin_ = 
isBroadcast; }
 
@@ -375,11 +363,8 @@ public final class RuntimeFilterGenerator {
       Preconditions.checkState(hasTargets());
       for (RuntimeFilterTarget target: targets_) {
         Preconditions.checkNotNull(target.node.getFragment());
-        boolean isLocal =
-            
src_.getFragment().getId().equals(target.node.getFragment().getId());
-        target.isLocalTarget = isLocal;
-        hasLocalTargets_ = hasLocalTargets_ || isLocal;
-        hasRemoteTargets_ = hasRemoteTargets_ || !isLocal;
+        hasLocalTargets_ = hasLocalTargets_ || target.isLocalTarget;
+        hasRemoteTargets_ = hasRemoteTargets_ || !target.isLocalTarget;
       }
     }
 
@@ -406,11 +391,13 @@ public final class RuntimeFilterGenerator {
   /**
    * Generates and assigns runtime filters to a query plan tree.
    */
-  public static void generateRuntimeFilters(Analyzer analyzer, PlanNode plan,
-      int maxNumFilters) {
-    Preconditions.checkArgument(maxNumFilters >= 0);
+  public static void generateRuntimeFilters(PlannerContext ctx, PlanNode plan) 
{
+    Preconditions.checkNotNull(ctx);
+    Preconditions.checkNotNull(ctx.getQueryOptions());
+    int maxNumFilters = ctx.getQueryOptions().getMax_num_runtime_filters();
+    Preconditions.checkState(maxNumFilters >= 0);
     RuntimeFilterGenerator filterGenerator = new RuntimeFilterGenerator();
-    filterGenerator.generateFilters(analyzer, plan);
+    filterGenerator.generateFilters(ctx, plan);
     List<RuntimeFilter> filters = 
Lists.newArrayList(filterGenerator.getRuntimeFilters());
     if (filters.size() > maxNumFilters) {
       // If more than 'maxNumFilters' were generated, sort them by increasing 
selectivity
@@ -428,6 +415,9 @@ public final class RuntimeFilterGenerator {
     }
     for (RuntimeFilter filter:
          filters.subList(0, Math.min(filters.size(), maxNumFilters))) {
+      filter.setIsBroadcast(
+          filter.src_.getDistributionMode() == DistributionMode.BROADCAST);
+      filter.computeHasLocalTargets();
       if (LOG.isTraceEnabled()) LOG.trace("Runtime filter: " + 
filter.debugString());
       filter.assignToPlanNodes();
     }
@@ -452,13 +442,13 @@ public final class RuntimeFilterGenerator {
   }
 
   /**
-   * Generates the runtime filters for a query by recursively traversing the 
single-node
+   * Generates the runtime filters for a query by recursively traversing the 
distributed
    * plan tree rooted at 'root'. In the top-down traversal of the plan tree, 
candidate
    * runtime filters are generated from equi-join predicates assigned to 
hash-join nodes.
    * In the bottom-up traversal of the plan tree, the filters are assigned to 
destination
    * (scan) nodes. Filters that cannot be assigned to a scan node are 
discarded.
    */
-  private void generateFilters(Analyzer analyzer, PlanNode root) {
+  private void generateFilters(PlannerContext ctx, PlanNode root) {
     if (root instanceof HashJoinNode) {
       HashJoinNode joinNode = (HashJoinNode) root;
       List<Expr> joinConjuncts = Lists.newArrayList();
@@ -473,23 +463,23 @@ public final class RuntimeFilterGenerator {
       joinConjuncts.addAll(joinNode.getConjuncts());
       List<RuntimeFilter> filters = Lists.newArrayList();
       for (Expr conjunct: joinConjuncts) {
-        RuntimeFilter filter = RuntimeFilter.create(filterIdGenerator, 
analyzer,
-            conjunct, joinNode);
+        RuntimeFilter filter = RuntimeFilter.create(filterIdGenerator,
+            ctx.getRootAnalyzer(), conjunct, joinNode);
         if (filter == null) continue;
         registerRuntimeFilter(filter);
         filters.add(filter);
       }
-      generateFilters(analyzer, root.getChild(0));
+      generateFilters(ctx, root.getChild(0));
       // Finalize every runtime filter of that join. This is to ensure that we 
don't
       // assign a filter to a scan node from the right subtree of joinNode or 
ancestor
       // join nodes in case we don't find a destination node in the left 
subtree.
       for (RuntimeFilter runtimeFilter: filters) 
finalizeRuntimeFilter(runtimeFilter);
-      generateFilters(analyzer, root.getChild(1));
+      generateFilters(ctx, root.getChild(1));
     } else if (root instanceof ScanNode) {
-      assignRuntimeFilters(analyzer, (ScanNode) root);
+      assignRuntimeFilters(ctx, (ScanNode) root);
     } else {
       for (PlanNode childNode: root.getChildren()) {
-        generateFilters(analyzer, childNode);
+        generateFilters(ctx, childNode);
       }
     }
   }
@@ -540,20 +530,66 @@ public final class RuntimeFilterGenerator {
 
   /**
    * Assigns runtime filters to a specific scan node 'scanNode'.
-   * The assigned filters are the ones for which 'scanNode' can be used a 
destination
-   * node. A scan node may be used as a destination node for multiple runtime 
filters.
+   * The assigned filters are the ones for which 'scanNode' can be used as a 
destination
+   * node. The following constraints are enforced when assigning filters to 
'scanNode':
+   * 1. If the DISABLE_ROW_RUNTIME_FILTERING query option is set, a filter is 
only
+   *    assigned to 'scanNode' if the filter target expression is bound by 
partition
+   *    columns.
+   * 2. If the RUNTIME_FILTER_MODE query option is set to LOCAL, a filter is 
only assigned
+   *    to 'scanNode' if the filter is produced within the same fragment that 
contains the
+   *    scan node.
+   * A scan node may be used as a destination node for multiple runtime 
filters.
    * Currently, runtime filters can only be assigned to HdfsScanNodes.
    */
-  private void assignRuntimeFilters(Analyzer analyzer, ScanNode scanNode) {
+  private void assignRuntimeFilters(PlannerContext ctx, ScanNode scanNode) {
     if (!(scanNode instanceof HdfsScanNode)) return;
     TupleId tid = scanNode.getTupleIds().get(0);
     if (!runtimeFiltersByTid_.containsKey(tid)) return;
+    Analyzer analyzer = ctx.getRootAnalyzer();
+    boolean disableRowRuntimeFiltering =
+        ctx.getQueryOptions().isDisable_row_runtime_filtering();
+    TRuntimeFilterMode runtimeFilterMode = 
ctx.getQueryOptions().getRuntime_filter_mode();
     for (RuntimeFilter filter: runtimeFiltersByTid_.get(tid)) {
       if (filter.isFinalized()) continue;
       Expr targetExpr = computeTargetExpr(filter, tid, analyzer);
       if (targetExpr == null) continue;
-      filter.addTarget(scanNode, analyzer, targetExpr);
+      boolean isBoundByPartitionColumns = isBoundByPartitionColumns(analyzer, 
targetExpr,
+          scanNode);
+      if (disableRowRuntimeFiltering && !isBoundByPartitionColumns) continue;
+      boolean isLocalTarget = isLocalTarget(filter, scanNode);
+      if (runtimeFilterMode == TRuntimeFilterMode.LOCAL && !isLocalTarget) 
continue;
+      RuntimeFilter.RuntimeFilterTarget target = new 
RuntimeFilter.RuntimeFilterTarget(
+          scanNode, targetExpr, isBoundByPartitionColumns, isLocalTarget);
+      filter.addTarget(target);
+    }
+  }
+
+  /**
+   * Check if 'targetNode' is local to the source node of 'filter'.
+   */
+  static private boolean isLocalTarget(RuntimeFilter filter, ScanNode 
targetNode) {
+    return 
targetNode.getFragment().getId().equals(filter.src_.getFragment().getId());
+  }
+
+  /**
+   * Check if all the slots of 'targetExpr' are bound by partition columns.
+   */
+  static private boolean isBoundByPartitionColumns(Analyzer analyzer, Expr 
targetExpr,
+      ScanNode targetNode) {
+    
Preconditions.checkState(targetExpr.isBoundByTupleIds(targetNode.getTupleIds()));
+    TupleDescriptor baseTblDesc = targetNode.getTupleDesc();
+    Table tbl = baseTblDesc.getTable();
+    if (tbl.getNumClusteringCols() == 0) return false;
+    List<SlotId> sids = Lists.newArrayList();
+    targetExpr.getIds(null, sids);
+    for (SlotId sid : sids) {
+      SlotDescriptor slotDesc = analyzer.getSlotDesc(sid);
+      if (slotDesc.getColumn() == null
+          || slotDesc.getColumn().getPosition() >= tbl.getNumClusteringCols()) 
{
+        return false;
+      }
     }
+    return true;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/00fd8388/fe/src/main/java/org/apache/impala/service/FeSupport.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/FeSupport.java 
b/fe/src/main/java/org/apache/impala/service/FeSupport.java
index 8b87962..48349c2 100644
--- a/fe/src/main/java/org/apache/impala/service/FeSupport.java
+++ b/fe/src/main/java/org/apache/impala/service/FeSupport.java
@@ -44,6 +44,7 @@ import org.apache.impala.thrift.TExprBatch;
 import org.apache.impala.thrift.TPrioritizeLoadRequest;
 import org.apache.impala.thrift.TPrioritizeLoadResponse;
 import org.apache.impala.thrift.TQueryCtx;
+import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TResultRow;
 import org.apache.impala.thrift.TStatus;
 import org.apache.impala.thrift.TSymbolLookupParams;
@@ -84,6 +85,18 @@ public class FeSupport {
   // using Java Thrift bindings.
   public native static byte[] NativePrioritizeLoad(byte[] thriftReq);
 
+  // Parses a string of comma-separated key=value query options 
('csvQueryOptions'),
+  // updates the existing query options ('queryOptions') with them and returns 
the
+  // resulting serialized TQueryOptions object.
+  // A note about the function's interface: ideally we wouldn't have to pass 
in the
+  // existing query options. We could just return the newly set query options 
to the
+  // caller and let the caller update the existing query options with the new 
ones.
+  // Unfortunately due to a bug in the thrift-generated TQueryOptions class, 
in some cases
+  // it is impossible to figure out whether a query option has been set 
explicitly or left
+  // at its default setting, therefore this approach would not work.
+  public native static byte[] NativeParseQueryOptions(String csvQueryOptions,
+      byte[] queryOptions);
+
   /**
    * Locally caches the jar at the specified HDFS location.
    *
@@ -261,6 +274,38 @@ public class FeSupport {
     }
   }
 
+  private static byte[] ParseQueryOptions(String csvQueryOptions, byte[] 
queryOptions) {
+    try {
+      return NativeParseQueryOptions(csvQueryOptions, queryOptions);
+    } catch (UnsatisfiedLinkError e) {
+      loadLibrary();
+    }
+    return NativeParseQueryOptions(csvQueryOptions, queryOptions);
+  }
+
+  /**
+   * Parses a string of comma-separated key=value query options. Returns a 
TQueryOptions
+   * object that contains the updated query options.
+   */
+  public static TQueryOptions ParseQueryOptions(String csvQueryOptions,
+      TQueryOptions queryOptions) throws InternalException {
+    Preconditions.checkNotNull(csvQueryOptions);
+    Preconditions.checkNotNull(queryOptions);
+
+    TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
+    try {
+      byte[] result = ParseQueryOptions(csvQueryOptions,
+          serializer.serialize(queryOptions));
+      Preconditions.checkNotNull(result);
+      TDeserializer deserializer = new TDeserializer(new 
TBinaryProtocol.Factory());
+      TQueryOptions updatedQueryOptions = new TQueryOptions();
+      deserializer.deserialize(updatedQueryOptions, result);
+      return updatedQueryOptions;
+    } catch (TException e) {
+      throw new InternalException("Could not parse query options: " + 
e.getMessage(), e);
+    }
+  }
+
   /**
    * This function should only be called explicitly by the FeSupport to ensure 
that
    * native functions are loaded.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/00fd8388/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java 
b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index fc8ceab..f49e39c 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -297,6 +297,11 @@ public class PlannerTest extends PlannerTestBase {
   }
 
   @Test
+  public void testRuntimeFilterQueryOptions() {
+    runPlannerTestFile("runtime-filter-query-options");
+  }
+
+  @Test
   public void testConjunctOrdering() {
     runPlannerTestFile("conjunct-ordering");
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/00fd8388/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java 
b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index ae6488d..6932539 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -402,15 +402,8 @@ public class PlannerTestBase extends FrontendTestBase {
    * of 'testCase'.
    */
   private void runTestCase(TestCase testCase, StringBuilder errorLog,
-      StringBuilder actualOutput, String dbName, TQueryOptions options,
-      boolean ignoreExplainHeader)
+      StringBuilder actualOutput, String dbName, boolean ignoreExplainHeader)
       throws CatalogException {
-    if (options == null) {
-      options = defaultQueryOptions();
-    } else {
-      options = mergeQueryOptions(defaultQueryOptions(), options);
-    }
-
     String query = testCase.getQuery();
     LOG.info("running query " + query);
     if (query.isEmpty()) {
@@ -419,7 +412,7 @@ public class PlannerTestBase extends FrontendTestBase {
     }
     TQueryCtx queryCtx = TestUtils.createQueryContext(
         dbName, System.getProperty("user.name"));
-    queryCtx.client_request.query_options = options;
+    queryCtx.client_request.query_options = testCase.getOptions();
     // Test single node plan, scan range locations, and column lineage.
     TExecRequest singleNodeExecRequest = testPlan(testCase, Section.PLAN, 
queryCtx,
         ignoreExplainHeader, errorLog, actualOutput);
@@ -736,7 +729,12 @@ public class PlannerTestBase extends FrontendTestBase {
   private void runPlannerTestFile(String testFile, String dbName, 
TQueryOptions options,
       boolean ignoreExplainHeader) {
     String fileName = testDir_.resolve(testFile + ".test").toString();
-    TestFileParser queryFileParser = new TestFileParser(fileName);
+    if (options == null) {
+      options = defaultQueryOptions();
+    } else {
+      options = mergeQueryOptions(defaultQueryOptions(), options);
+    }
+    TestFileParser queryFileParser = new TestFileParser(fileName, options);
     StringBuilder actualOutput = new StringBuilder();
 
     queryFileParser.parseFile();
@@ -745,8 +743,7 @@ public class PlannerTestBase extends FrontendTestBase {
       actualOutput.append(testCase.getSectionAsString(Section.QUERY, true, 
"\n"));
       actualOutput.append("\n");
       try {
-        runTestCase(testCase, errorLog, actualOutput, dbName, options,
-                ignoreExplainHeader);
+        runTestCase(testCase, errorLog, actualOutput, dbName, 
ignoreExplainHeader);
       } catch (CatalogException e) {
         errorLog.append(String.format("Failed to plan query\n%s\n%s",
             testCase.getQuery(), e.getMessage()));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/00fd8388/fe/src/test/java/org/apache/impala/testutil/TestFileParser.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/testutil/TestFileParser.java 
b/fe/src/test/java/org/apache/impala/testutil/TestFileParser.java
index 5bfd018..0c62666 100644
--- a/fe/src/test/java/org/apache/impala/testutil/TestFileParser.java
+++ b/fe/src/test/java/org/apache/impala/testutil/TestFileParser.java
@@ -28,9 +28,14 @@ import java.util.EnumMap;
 import java.util.List;
 import java.util.Scanner;
 
+import org.apache.impala.common.InternalException;
+import org.apache.impala.service.FeSupport;
+import org.apache.impala.thrift.TErrorCode;
+import org.apache.impala.thrift.TQueryOptions;
 import org.apache.log4j.Logger;
 
 import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -40,6 +45,10 @@ import com.google.common.collect.Maps;
  * A test file has the following format:
  *
  * <QUERY STRING>
+ * ---- QUERYOPTIONS
+ * <QUERYOPTION1>=<VALUE1>
+ * <QUERYOPTION2>=<VALUE2>
+ * ..
  * ---- <SECTION NAME 1>
  * <EXPECTED CONTENTS 1>
  * ---- <SECTION NAME 2>
@@ -48,6 +57,10 @@ import com.google.common.collect.Maps;
  * <EXPECTED CONTENTS 3>
  * ====
  * <QUERY STRING>
+ * ---- QUERYOPTIONS
+ * <QUERYOPTION1>=<VALUE1>
+ * <QUERYOPTION2>=<VALUE2>
+ * ..
  * ---- <SECTION NAME 1>
  * <EXPECTED CONTENTS 1>
  * ---- <SECTION NAME 2>
@@ -61,7 +74,7 @@ import com.google.common.collect.Maps;
  * without these lines included.
  *
  * Note that <QUERY STRING> and <EXPECTED CONTENTS> sections can consist of 
multiple
- * lines.
+ * lines. QUERYOPTIONS sections may contain multiple <QUERYOPTION>=<VALUE> 
lines.
  */
 public class TestFileParser {
   private static final Logger LOG = Logger.getLogger(TestCase.class);
@@ -82,7 +95,8 @@ public class TestFileParser {
     SETUP,
     ERRORS,
     SCANRANGELOCATIONS,
-    LINEAGE;
+    LINEAGE,
+    QUERYOPTIONS;
 
     // Return header line for this section
     public String getHeader() {
@@ -101,14 +115,18 @@ public class TestFileParser {
 
     // Line number in the test case file where this case started
     private final int startLineNum;
+    private TQueryOptions options;
 
-    public TestCase(int lineNum) {
-      startLineNum = lineNum;
+    public TestCase(int lineNum, TQueryOptions options) {
+      this.startLineNum = lineNum;
+      this.options = options;
     }
 
-    public int getStartingLineNum() {
-      return startLineNum;
-    }
+    public int getStartingLineNum() { return startLineNum; }
+
+    public TQueryOptions getOptions() { return this.options; }
+
+    public void setOptions(TQueryOptions options) { this.options = options; }
 
     protected void addSection(Section section, ArrayList<String> contents) {
       expectedResultSections.put(section, contents);
@@ -210,6 +228,7 @@ public class TestFileParser {
   private BufferedReader reader;
   private Scanner scanner;
   private boolean hasSetupSection = false;
+  private TQueryOptions options;
 
   /**
    * For backwards compatibility, if no title is found this is the order in 
which
@@ -218,8 +237,9 @@ public class TestFileParser {
   static private final ArrayList<Section> defaultSectionOrder =
     Lists.newArrayList(Section.QUERY, Section.TYPES, Section.RESULTS);
 
-  public TestFileParser(String fileName) {
+  public TestFileParser(String fileName, TQueryOptions options) {
     this.fileName = fileName;
+    this.options = options;
   }
 
   public List<TestCase> getTestCases() {
@@ -255,7 +275,8 @@ public class TestFileParser {
   private TestCase parseOneTestCase() {
     Section currentSection = Section.QUERY;
     ArrayList<String> sectionContents = Lists.newArrayList();
-    TestCase currentTestCase = new TestCase(lineNum);
+    // Each test case in the test file has its own copy of query options.
+    TestCase currentTestCase = new TestCase(lineNum, options.deepCopy());
     int sectionCount = 0;
 
     while (scanner.hasNextLine()) {
@@ -263,6 +284,7 @@ public class TestFileParser {
       ++lineNum;
       if (line.startsWith("====") && sectionCount > 0) {
         currentTestCase.addSection(currentSection, sectionContents);
+        parseQueryOptions(currentTestCase);
         if (!currentTestCase.isValid()) {
           throw new IllegalStateException("Invalid test case" +
               " at line " + currentTestCase.startLineNum + " detected.");
@@ -302,6 +324,10 @@ public class TestFileParser {
           LOG.warn("No section header found. Guessing: " + currentSection);
         }
 
+        if (!currentTestCase.getSectionContents(currentSection).isEmpty()) {
+          throw new IllegalStateException("Duplicate sections are not allowed: 
"
+              + currentSection);
+        }
         sectionContents = Lists.newArrayList();
       } else {
         sectionContents.add(line);
@@ -317,6 +343,25 @@ public class TestFileParser {
   }
 
   /**
+   * Parses QUERYOPTIONS section. Adds the parsed query options to 
"testCase.options".
+   * Throws an IllegalStateException if parsing failed.
+   */
+  private void parseQueryOptions(TestCase testCase) {
+    String optionsStr = testCase.getSectionAsString(Section.QUERYOPTIONS, 
false, ",");
+    if (optionsStr == null || optionsStr.isEmpty()) return;
+
+    TQueryOptions result = null;
+    try {
+      result = FeSupport.ParseQueryOptions(optionsStr, testCase.getOptions());
+    } catch (InternalException e) {
+      throw new IllegalStateException("Failed to parse query options: " + 
optionsStr +
+          " - " + e.getMessage(), e);
+    }
+    Preconditions.checkNotNull(result);
+    testCase.setOptions(result);
+  }
+
+  /**
    * Parses a test file in its entirety and constructs a list of TestCases.
    */
   public void parseFile() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/00fd8388/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-query-options.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-query-options.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-query-options.test
new file mode 100644
index 0000000..a8909be
--- /dev/null
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-query-options.test
@@ -0,0 +1,535 @@
+# Default query options
+select /* +straight_join */ count(*) from functional.alltypes a
+  join /* +broadcast */ functional.alltypes b on b.id = a.id and
+    b.date_string_col = a.date_string_col
+  join /* +broadcast */ functional.alltypes c on c.month = a.month and
+    c.int_col = a.int_col
+  join /* +shuffle */ functional.alltypes d on d.bool_col = a.bool_col and 
d.year = a.year
+---- PLAN
+PLAN-ROOT SINK
+|
+07:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+06:HASH JOIN [INNER JOIN]
+|  hash predicates: a.bool_col = d.bool_col, a.year = d.year
+|  runtime filters: RF000 <- d.bool_col, RF001 <- d.year
+|
+|--03:SCAN HDFS [functional.alltypes d]
+|     partitions=24/24 files=24 size=478.45KB
+|
+05:HASH JOIN [INNER JOIN]
+|  hash predicates: a.int_col = c.int_col, a.month = c.month
+|  runtime filters: RF002 <- c.int_col, RF003 <- c.month
+|
+|--02:SCAN HDFS [functional.alltypes c]
+|     partitions=24/24 files=24 size=478.45KB
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: a.id = b.id, a.date_string_col = b.date_string_col
+|  runtime filters: RF004 <- b.id, RF005 <- b.date_string_col
+|
+|--01:SCAN HDFS [functional.alltypes b]
+|     partitions=24/24 files=24 size=478.45KB
+|
+00:SCAN HDFS [functional.alltypes a]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> a.bool_col, RF001 -> a.year, RF002 -> a.int_col, 
RF003 -> a.month, RF004 -> a.id, RF005 -> a.date_string_col
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+13:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|
+12:EXCHANGE [UNPARTITIONED]
+|
+07:AGGREGATE
+|  output: count(*)
+|
+06:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: a.bool_col = d.bool_col, a.year = d.year
+|  runtime filters: RF000 <- d.bool_col, RF001 <- d.year
+|
+|--11:EXCHANGE [HASH(d.bool_col,d.year)]
+|  |
+|  03:SCAN HDFS [functional.alltypes d]
+|     partitions=24/24 files=24 size=478.45KB
+|
+10:EXCHANGE [HASH(a.bool_col,a.year)]
+|
+05:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: a.int_col = c.int_col, a.month = c.month
+|  runtime filters: RF002 <- c.int_col, RF003 <- c.month
+|
+|--09:EXCHANGE [BROADCAST]
+|  |
+|  02:SCAN HDFS [functional.alltypes c]
+|     partitions=24/24 files=24 size=478.45KB
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: a.id = b.id, a.date_string_col = b.date_string_col
+|  runtime filters: RF004 <- b.id, RF005 <- b.date_string_col
+|
+|--08:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [functional.alltypes b]
+|     partitions=24/24 files=24 size=478.45KB
+|
+00:SCAN HDFS [functional.alltypes a]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> a.bool_col, RF001 -> a.year, RF002 -> a.int_col, 
RF003 -> a.month, RF004 -> a.id, RF005 -> a.date_string_col
+====
+# Keep only MAX_NUM_RUNTIME_FILTERS most selective filters, remove the rest.
+# In this query RF000 (<- d.bool_col) and RF001 (<- d.year) are the least 
selective
+# filters.
+select /* +straight_join */ count(*) from functional.alltypes a
+  join /* +broadcast */ functional.alltypes b on b.id = a.id and
+    b.date_string_col = a.date_string_col
+  join /* +broadcast */ functional.alltypes c on c.month = a.month and
+    c.int_col = a.int_col
+  join /* +shuffle */ functional.alltypes d on d.bool_col = a.bool_col and 
d.year = a.year
+---- QUERYOPTIONS
+MAX_NUM_RUNTIME_FILTERS=4
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+13:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|
+12:EXCHANGE [UNPARTITIONED]
+|
+07:AGGREGATE
+|  output: count(*)
+|
+06:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: a.bool_col = d.bool_col, a.year = d.year
+|
+|--11:EXCHANGE [HASH(d.bool_col,d.year)]
+|  |
+|  03:SCAN HDFS [functional.alltypes d]
+|     partitions=24/24 files=24 size=478.45KB
+|
+10:EXCHANGE [HASH(a.bool_col,a.year)]
+|
+05:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: a.int_col = c.int_col, a.month = c.month
+|  runtime filters: RF002 <- c.int_col, RF003 <- c.month
+|
+|--09:EXCHANGE [BROADCAST]
+|  |
+|  02:SCAN HDFS [functional.alltypes c]
+|     partitions=24/24 files=24 size=478.45KB
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: a.id = b.id, a.date_string_col = b.date_string_col
+|  runtime filters: RF004 <- b.id, RF005 <- b.date_string_col
+|
+|--08:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [functional.alltypes b]
+|     partitions=24/24 files=24 size=478.45KB
+|
+00:SCAN HDFS [functional.alltypes a]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF004 -> a.id, RF005 -> a.date_string_col, RF002 -> 
a.int_col, RF003 -> a.month
+====
+# DISABLE_ROW_RUNTIME_FILTERING is set: only partition column filters are 
applied.
+select /* +straight_join */ count(*) from functional.alltypes a
+  join /* +broadcast */ functional.alltypes b on b.id = a.id and
+    b.date_string_col = a.date_string_col
+  join /* +broadcast */ functional.alltypes c on c.month = a.month and
+    c.int_col = a.int_col
+  join /* +shuffle */ functional.alltypes d on d.bool_col = a.bool_col and 
d.year = a.year
+---- QUERYOPTIONS
+DISABLE_ROW_RUNTIME_FILTERING=true
+---- PLAN
+PLAN-ROOT SINK
+|
+07:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+06:HASH JOIN [INNER JOIN]
+|  hash predicates: a.bool_col = d.bool_col, a.year = d.year
+|  runtime filters: RF001 <- d.year
+|
+|--03:SCAN HDFS [functional.alltypes d]
+|     partitions=24/24 files=24 size=478.45KB
+|
+05:HASH JOIN [INNER JOIN]
+|  hash predicates: a.int_col = c.int_col, a.month = c.month
+|  runtime filters: RF003 <- c.month
+|
+|--02:SCAN HDFS [functional.alltypes c]
+|     partitions=24/24 files=24 size=478.45KB
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: a.id = b.id, a.date_string_col = b.date_string_col
+|
+|--01:SCAN HDFS [functional.alltypes b]
+|     partitions=24/24 files=24 size=478.45KB
+|
+00:SCAN HDFS [functional.alltypes a]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF001 -> a.year, RF003 -> a.month
+====
+# DISABLE_ROW_RUNTIME_FILTERING is set and MAX_NUM_RUNTIME_FILTERS is set to 
2: only the 2
+# partition column filters are applied
+select /* +straight_join */ count(*) from functional.alltypes a
+  join /* +broadcast */ functional.alltypes b on b.id = a.id and
+    b.date_string_col = a.date_string_col
+  join /* +broadcast */ functional.alltypes c on c.month = a.month and
+    c.int_col = a.int_col
+  join /* +shuffle */ functional.alltypes d on d.bool_col = a.bool_col and 
d.year = a.year
+---- QUERYOPTIONS
+DISABLE_ROW_RUNTIME_FILTERING=true
+MAX_NUM_RUNTIME_FILTERS=2
+---- PLAN
+PLAN-ROOT SINK
+|
+07:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+06:HASH JOIN [INNER JOIN]
+|  hash predicates: a.bool_col = d.bool_col, a.year = d.year
+|  runtime filters: RF001 <- d.year
+|
+|--03:SCAN HDFS [functional.alltypes d]
+|     partitions=24/24 files=24 size=478.45KB
+|
+05:HASH JOIN [INNER JOIN]
+|  hash predicates: a.int_col = c.int_col, a.month = c.month
+|  runtime filters: RF003 <- c.month
+|
+|--02:SCAN HDFS [functional.alltypes c]
+|     partitions=24/24 files=24 size=478.45KB
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: a.id = b.id, a.date_string_col = b.date_string_col
+|
+|--01:SCAN HDFS [functional.alltypes b]
+|     partitions=24/24 files=24 size=478.45KB
+|
+00:SCAN HDFS [functional.alltypes a]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF001 -> a.year, RF003 -> a.month
+====
+# RUNTIME_FILTER_MODE is set to LOCAL: only local filters are applied
+select /* +straight_join */ count(*) from functional.alltypes a
+  join /* +broadcast */ functional.alltypes b on b.id = a.id and
+    b.date_string_col = a.date_string_col
+  join /* +broadcast */ functional.alltypes c on c.month = a.month and
+    c.int_col = a.int_col
+  join /* +shuffle */ functional.alltypes d on d.bool_col = a.bool_col and 
d.year = a.year
+---- QUERYOPTIONS
+RUNTIME_FILTER_MODE=LOCAL
+---- PLAN
+PLAN-ROOT SINK
+|
+07:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+06:HASH JOIN [INNER JOIN]
+|  hash predicates: a.bool_col = d.bool_col, a.year = d.year
+|  runtime filters: RF000 <- d.bool_col, RF001 <- d.year
+|
+|--03:SCAN HDFS [functional.alltypes d]
+|     partitions=24/24 files=24 size=478.45KB
+|
+05:HASH JOIN [INNER JOIN]
+|  hash predicates: a.int_col = c.int_col, a.month = c.month
+|  runtime filters: RF002 <- c.int_col, RF003 <- c.month
+|
+|--02:SCAN HDFS [functional.alltypes c]
+|     partitions=24/24 files=24 size=478.45KB
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: a.id = b.id, a.date_string_col = b.date_string_col
+|  runtime filters: RF004 <- b.id, RF005 <- b.date_string_col
+|
+|--01:SCAN HDFS [functional.alltypes b]
+|     partitions=24/24 files=24 size=478.45KB
+|
+00:SCAN HDFS [functional.alltypes a]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> a.bool_col, RF001 -> a.year, RF002 -> a.int_col, 
RF003 -> a.month, RF004 -> a.id, RF005 -> a.date_string_col
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+13:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|
+12:EXCHANGE [UNPARTITIONED]
+|
+07:AGGREGATE
+|  output: count(*)
+|
+06:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: a.bool_col = d.bool_col, a.year = d.year
+|
+|--11:EXCHANGE [HASH(d.bool_col,d.year)]
+|  |
+|  03:SCAN HDFS [functional.alltypes d]
+|     partitions=24/24 files=24 size=478.45KB
+|
+10:EXCHANGE [HASH(a.bool_col,a.year)]
+|
+05:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: a.int_col = c.int_col, a.month = c.month
+|  runtime filters: RF002 <- c.int_col, RF003 <- c.month
+|
+|--09:EXCHANGE [BROADCAST]
+|  |
+|  02:SCAN HDFS [functional.alltypes c]
+|     partitions=24/24 files=24 size=478.45KB
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: a.id = b.id, a.date_string_col = b.date_string_col
+|  runtime filters: RF004 <- b.id, RF005 <- b.date_string_col
+|
+|--08:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [functional.alltypes b]
+|     partitions=24/24 files=24 size=478.45KB
+|
+00:SCAN HDFS [functional.alltypes a]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF002 -> a.int_col, RF003 -> a.month, RF004 -> a.id, RF005 
-> a.date_string_col
+====
+# RUNTIME_FILTER_MODE is set to LOCAL and MAX_NUM_RUNTIME_FILTERS is set to 3: 
only 3
+# local filters are kept, which means that both local and non-local filters 
are removed
+# from the distributed plan.
+select /* +straight_join */ count(*) from functional.alltypes a
+  join /* +broadcast */ functional.alltypes b on b.id = a.id and
+    b.date_string_col = a.date_string_col
+  join /* +broadcast */ functional.alltypes c on c.month = a.month and
+    c.int_col = a.int_col
+  join /* +shuffle */ functional.alltypes d on d.bool_col = a.bool_col and 
d.year = a.year
+---- QUERYOPTIONS
+RUNTIME_FILTER_MODE=LOCAL
+MAX_NUM_RUNTIME_FILTERS=3
+---- PLAN
+PLAN-ROOT SINK
+|
+07:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+06:HASH JOIN [INNER JOIN]
+|  hash predicates: a.bool_col = d.bool_col, a.year = d.year
+|
+|--03:SCAN HDFS [functional.alltypes d]
+|     partitions=24/24 files=24 size=478.45KB
+|
+05:HASH JOIN [INNER JOIN]
+|  hash predicates: a.int_col = c.int_col, a.month = c.month
+|  runtime filters: RF002 <- c.int_col
+|
+|--02:SCAN HDFS [functional.alltypes c]
+|     partitions=24/24 files=24 size=478.45KB
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: a.id = b.id, a.date_string_col = b.date_string_col
+|  runtime filters: RF004 <- b.id, RF005 <- b.date_string_col
+|
+|--01:SCAN HDFS [functional.alltypes b]
+|     partitions=24/24 files=24 size=478.45KB
+|
+00:SCAN HDFS [functional.alltypes a]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF004 -> a.id, RF005 -> a.date_string_col, RF002 -> 
a.int_col
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+13:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|
+12:EXCHANGE [UNPARTITIONED]
+|
+07:AGGREGATE
+|  output: count(*)
+|
+06:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: a.bool_col = d.bool_col, a.year = d.year
+|
+|--11:EXCHANGE [HASH(d.bool_col,d.year)]
+|  |
+|  03:SCAN HDFS [functional.alltypes d]
+|     partitions=24/24 files=24 size=478.45KB
+|
+10:EXCHANGE [HASH(a.bool_col,a.year)]
+|
+05:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: a.int_col = c.int_col, a.month = c.month
+|  runtime filters: RF002 <- c.int_col
+|
+|--09:EXCHANGE [BROADCAST]
+|  |
+|  02:SCAN HDFS [functional.alltypes c]
+|     partitions=24/24 files=24 size=478.45KB
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: a.id = b.id, a.date_string_col = b.date_string_col
+|  runtime filters: RF004 <- b.id, RF005 <- b.date_string_col
+|
+|--08:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [functional.alltypes b]
+|     partitions=24/24 files=24 size=478.45KB
+|
+00:SCAN HDFS [functional.alltypes a]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF004 -> a.id, RF005 -> a.date_string_col, RF002 -> 
a.int_col
+====
+# DISABLE_ROW_RUNTIME_FILTERING is set and RUNTIME_FILTER_MODE is set to 
LOCAL: only local
+# partition column filters are applied
+select /* +straight_join */ count(*) from functional.alltypes a
+  join /* +broadcast */ functional.alltypes b on b.id = a.id and
+    b.date_string_col = a.date_string_col
+  join /* +broadcast */ functional.alltypes c on c.month = a.month and
+    c.int_col = a.int_col
+  join /* +shuffle */ functional.alltypes d on d.bool_col = a.bool_col and 
d.year = a.year
+---- QUERYOPTIONS
+DISABLE_ROW_RUNTIME_FILTERING=true
+RUNTIME_FILTER_MODE=LOCAL
+---- PLAN
+PLAN-ROOT SINK
+|
+07:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+06:HASH JOIN [INNER JOIN]
+|  hash predicates: a.bool_col = d.bool_col, a.year = d.year
+|  runtime filters: RF001 <- d.year
+|
+|--03:SCAN HDFS [functional.alltypes d]
+|     partitions=24/24 files=24 size=478.45KB
+|
+05:HASH JOIN [INNER JOIN]
+|  hash predicates: a.int_col = c.int_col, a.month = c.month
+|  runtime filters: RF003 <- c.month
+|
+|--02:SCAN HDFS [functional.alltypes c]
+|     partitions=24/24 files=24 size=478.45KB
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: a.id = b.id, a.date_string_col = b.date_string_col
+|
+|--01:SCAN HDFS [functional.alltypes b]
+|     partitions=24/24 files=24 size=478.45KB
+|
+00:SCAN HDFS [functional.alltypes a]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF001 -> a.year, RF003 -> a.month
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+13:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|
+12:EXCHANGE [UNPARTITIONED]
+|
+07:AGGREGATE
+|  output: count(*)
+|
+06:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: a.bool_col = d.bool_col, a.year = d.year
+|
+|--11:EXCHANGE [HASH(d.bool_col,d.year)]
+|  |
+|  03:SCAN HDFS [functional.alltypes d]
+|     partitions=24/24 files=24 size=478.45KB
+|
+10:EXCHANGE [HASH(a.bool_col,a.year)]
+|
+05:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: a.int_col = c.int_col, a.month = c.month
+|  runtime filters: RF003 <- c.month
+|
+|--09:EXCHANGE [BROADCAST]
+|  |
+|  02:SCAN HDFS [functional.alltypes c]
+|     partitions=24/24 files=24 size=478.45KB
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: a.id = b.id, a.date_string_col = b.date_string_col
+|
+|--08:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [functional.alltypes b]
+|     partitions=24/24 files=24 size=478.45KB
+|
+00:SCAN HDFS [functional.alltypes a]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF003 -> a.month
+====
+# RUNTIME_FILTER_MODE is OFF: no filters are applied
+select /* +straight_join */ count(*) from functional.alltypes a
+  join /* +broadcast */ functional.alltypes b on b.id = a.id and
+    b.date_string_col = a.date_string_col
+  join /* +broadcast */ functional.alltypes c on c.month = a.month and
+    c.int_col = a.int_col
+  join /* +shuffle */ functional.alltypes d on d.bool_col = a.bool_col and 
d.year = a.year
+---- QUERYOPTIONS
+RUNTIME_FILTER_MODE=OFF
+---- PLAN
+PLAN-ROOT SINK
+|
+07:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+06:HASH JOIN [INNER JOIN]
+|  hash predicates: a.bool_col = d.bool_col, a.year = d.year
+|
+|--03:SCAN HDFS [functional.alltypes d]
+|     partitions=24/24 files=24 size=478.45KB
+|
+05:HASH JOIN [INNER JOIN]
+|  hash predicates: a.int_col = c.int_col, a.month = c.month
+|
+|--02:SCAN HDFS [functional.alltypes c]
+|     partitions=24/24 files=24 size=478.45KB
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: a.id = b.id, a.date_string_col = b.date_string_col
+|
+|--01:SCAN HDFS [functional.alltypes b]
+|     partitions=24/24 files=24 size=478.45KB
+|
+00:SCAN HDFS [functional.alltypes a]
+   partitions=24/24 files=24 size=478.45KB
+====
+# MAX_NUM_RUNTIME_FILTERS is 0: no filters are applied
+select /* +straight_join */ count(*) from functional.alltypes a
+  join /* +broadcast */ functional.alltypes b on b.id = a.id and
+    b.date_string_col = a.date_string_col
+  join /* +broadcast */ functional.alltypes c on c.month = a.month and
+    c.int_col = a.int_col
+  join /* +shuffle */ functional.alltypes d on d.bool_col = a.bool_col and 
d.year = a.year
+---- QUERYOPTIONS
+MAX_NUM_RUNTIME_FILTERS=0
+---- PLAN
+PLAN-ROOT SINK
+|
+07:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+06:HASH JOIN [INNER JOIN]
+|  hash predicates: a.bool_col = d.bool_col, a.year = d.year
+|
+|--03:SCAN HDFS [functional.alltypes d]
+|     partitions=24/24 files=24 size=478.45KB
+|
+05:HASH JOIN [INNER JOIN]
+|  hash predicates: a.int_col = c.int_col, a.month = c.month
+|
+|--02:SCAN HDFS [functional.alltypes c]
+|     partitions=24/24 files=24 size=478.45KB
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: a.id = b.id, a.date_string_col = b.date_string_col
+|
+|--01:SCAN HDFS [functional.alltypes b]
+|     partitions=24/24 files=24 size=478.45KB
+|
+00:SCAN HDFS [functional.alltypes a]
+   partitions=24/24 files=24 size=478.45KB
+====

Reply via email to