IMPALA-5789: Add always_false flag in bloom filter

This patch adds an always_false flag in bloom filters. The flag is set
if nothing has been inserted into the bloom filter. HdfsScanner uses
this flag to early terminate the scan at file and split granularities.

Testing: It passes existing tests. Two test cases are added checking
that an always-false runtime filter can filter out files and splits.
In single node perf tests, time spent on primitive_empty_build_join_1
is reduced by 75%.

Change-Id: If680240a3cd4583fc97c3192177d86d9567c4f8d
Reviewed-on: http://gerrit.cloudera.org:8080/8170
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/359b409b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/359b409b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/359b409b

Branch: refs/heads/master
Commit: 359b409b9636e20414270bc7a642d9116304eac1
Parents: 57bde36
Author: Tianyi Wang <[email protected]>
Authored: Fri Oct 6 13:31:35 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Fri Oct 20 03:59:49 2017 +0000

----------------------------------------------------------------------
 be/src/common/global-flags.cc                   |  5 +-
 be/src/exec/base-sequence-scanner.cc            |  6 ++
 be/src/exec/base-sequence-scanner.h             |  3 +
 be/src/exec/filter-context.cc                   | 22 ++++--
 be/src/exec/filter-context.h                    |  5 ++
 be/src/exec/hdfs-parquet-scanner.cc             | 24 ++----
 be/src/exec/hdfs-scan-node-base.cc              | 24 +-----
 be/src/exec/hdfs-scan-node.cc                   | 15 +---
 be/src/exec/hdfs-scanner.cc                     | 29 +++-----
 be/src/runtime/coordinator-backend-state.cc     | 27 +------
 be/src/runtime/coordinator-backend-state.h      |  5 +-
 be/src/runtime/coordinator-filter-state.h       | 17 ++---
 be/src/runtime/coordinator.cc                   | 78 ++++++--------------
 be/src/runtime/runtime-filter-ir.cc             |  2 -
 be/src/runtime/runtime-filter.h                 |  5 +-
 be/src/runtime/runtime-filter.inline.h          |  4 +
 be/src/util/bloom-filter-ir.cc                  |  2 +
 be/src/util/bloom-filter.cc                     | 46 +++++++-----
 be/src/util/bloom-filter.h                      |  7 ++
 common/thrift/ImpalaInternalService.thrift      |  7 +-
 .../custom_cluster/test_always_false_filter.py  | 54 ++++++++++++++
 tests/query_test/test_runtime_filters.py        | 10 +++
 22 files changed, 200 insertions(+), 197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/359b409b/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 5d3879f..cec27eb 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -125,9 +125,8 @@ DEFINE_int32(stress_fn_ctx_alloc, 0, "A stress option which 
causes memory alloca
     "flag. Effective in debug builds only.");
 DEFINE_int32(stress_datastream_recvr_delay_ms, 0, "A stress option that causes 
data "
     "stream receiver registration to be delayed. Effective in debug builds 
only.");
-DEFINE_bool(skip_file_runtime_filtering, false, "Skips file-based runtime 
filtering in "
-    "order to provide a regression test for IMPALA-3798. Effective in debug 
builds "
-    "only.");
+DEFINE_bool(skip_file_runtime_filtering, false, "Skips file-based runtime 
filtering for"
+    "testing purposes. Effective in debug builds only.");
 DEFINE_int32(fault_injection_rpc_delay_ms, 0, "A fault injection option that 
causes "
     "rpc server handling to be delayed to trigger an RPC timeout on the caller 
side. "
     "Effective in debug builds only.");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/359b409b/be/src/exec/base-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.cc 
b/be/src/exec/base-sequence-scanner.cc
index da67bb5..c22f18d 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -68,6 +68,12 @@ Status 
BaseSequenceScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
   return Status::OK();
 }
 
+bool BaseSequenceScanner::FileFormatIsSequenceBased(THdfsFileFormat::type 
format) {
+  return format == THdfsFileFormat::SEQUENCE_FILE ||
+         format == THdfsFileFormat::RC_FILE ||
+         format == THdfsFileFormat::AVRO;
+}
+
 BaseSequenceScanner::BaseSequenceScanner(HdfsScanNodeBase* node, RuntimeState* 
state)
   : HdfsScanner(node, state) {
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/359b409b/be/src/exec/base-sequence-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.h 
b/be/src/exec/base-sequence-scanner.h
index 785182e..887ff6f 100644
--- a/be/src/exec/base-sequence-scanner.h
+++ b/be/src/exec/base-sequence-scanner.h
@@ -52,6 +52,9 @@ class BaseSequenceScanner : public HdfsScanner {
                                    const std::vector<HdfsFileDesc*>& files)
                                    WARN_UNUSED_RESULT;
 
+  /// Returns true if 'format' uses a scanner derived from BaseSequenceScanner.
+  static bool FileFormatIsSequenceBased(THdfsFileFormat::type format);
+
   virtual Status Open(ScannerContext* context) WARN_UNUSED_RESULT;
   virtual void Close(RowBatch* row_batch);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/359b409b/be/src/exec/filter-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/filter-context.cc b/be/src/exec/filter-context.cc
index a882365..61e104f 100644
--- a/be/src/exec/filter-context.cc
+++ b/be/src/exec/filter-context.cc
@@ -18,7 +18,7 @@
 #include "exec/filter-context.h"
 
 #include "codegen/codegen-anyval.h"
-#include "runtime/runtime-filter.h"
+#include "runtime/runtime-filter.inline.h"
 #include "runtime/tuple-row.h"
 #include "util/runtime-profile-counters.h"
 
@@ -34,13 +34,10 @@ const std::string FilterStats::ROWS_KEY = "Rows";
 const char* FilterContext::LLVM_CLASS_NAME = "struct.impala::FilterContext";
 
 FilterStats::FilterStats(RuntimeProfile* runtime_profile, bool 
is_partition_filter) {
-  DCHECK(runtime_profile != NULL);
+  DCHECK(runtime_profile != nullptr);
   profile = runtime_profile;
-  if (is_partition_filter) {
-    RegisterCounterGroup(FilterStats::SPLITS_KEY);
-    RegisterCounterGroup(FilterStats::FILES_KEY);
-  }
-
+  RegisterCounterGroup(FilterStats::SPLITS_KEY);
+  RegisterCounterGroup(FilterStats::FILES_KEY);
   // TODO: These only apply to Parquet, so only register them in that case.
   RegisterCounterGroup(FilterStats::ROWS_KEY);
   if (is_partition_filter) RegisterCounterGroup(FilterStats::ROW_GROUPS_KEY);
@@ -377,3 +374,14 @@ Status FilterContext::CodegenInsert(
   }
   return Status::OK();
 }
+
+bool FilterContext::CheckForAlwaysFalse(const std::string& stats_name,
+    const std::vector<FilterContext>& ctxs) {
+  for (const FilterContext& ctx : ctxs) {
+    if (ctx.filter->AlwaysFalse()) {
+      ctx.stats->IncrCounters(stats_name, 1, 1, 1);
+      return true;
+    }
+  }
+  return false;
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/359b409b/be/src/exec/filter-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/filter-context.h b/be/src/exec/filter-context.h
index 5c232f8..dcd2a94 100644
--- a/be/src/exec/filter-context.h
+++ b/be/src/exec/filter-context.h
@@ -123,6 +123,11 @@ struct FilterContext {
   /// 'fn' is set to the generated function. On failure, an error status is 
returned.
   static Status CodegenInsert(LlvmCodeGen* codegen, ScalarExpr* filter_expr,
       llvm::Function** fn) WARN_UNUSED_RESULT;
+
+  // Returns if there is any always_false filter in ctxs. If there is, the 
counter stats
+  // is updated.
+  static bool CheckForAlwaysFalse(const std::string& stats_name,
+      const std::vector<FilterContext>& ctxs);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/359b409b/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc 
b/be/src/exec/hdfs-parquet-scanner.cc
index 6cfaee0..455c315 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -17,32 +17,18 @@
 
 #include "exec/hdfs-parquet-scanner.h"
 
-#include <limits> // for std::numeric_limits
 #include <queue>
 
-#include <gflags/gflags.h>
 #include <gutil/strings/substitute.h>
 
 #include "codegen/codegen-anyval.h"
-#include "codegen/llvm-codegen.h"
-#include "common/logging.h"
-#include "exec/hdfs-scanner.h"
 #include "exec/hdfs-scan-node.h"
 #include "exec/parquet-column-readers.h"
 #include "exec/parquet-column-stats.h"
 #include "exec/scanner-context.inline.h"
-#include "exprs/scalar-expr.h"
 #include "runtime/collection-value-builder.h"
-#include "runtime/descriptors.h"
 #include "runtime/runtime-state.h"
-#include "runtime/mem-pool.h"
-#include "runtime/row-batch.h"
 #include "runtime/runtime-filter.inline.h"
-#include "runtime/tuple-row.h"
-#include "runtime/tuple.h"
-#include "runtime/string-value.h"
-#include "util/debug-util.h"
-#include "util/error-util.h"
 #include "rpc/thrift-util.h"
 
 #include "common/names.h"
@@ -409,9 +395,13 @@ Status HdfsParquetScanner::ProcessSplit() {
   DCHECK(scan_node_->HasRowBatchQueue());
   HdfsScanNode* scan_node = static_cast<HdfsScanNode*>(scan_node_);
   do {
-    unique_ptr<RowBatch> batch = unique_ptr<RowBatch>(
-        new RowBatch(scan_node_->row_desc(), state_->batch_size(),
-        scan_node_->mem_tracker()));
+    if (FilterContext::CheckForAlwaysFalse(FilterStats::SPLITS_KEY,
+        context_->filter_ctxs())) {
+      eos_ = true;
+      break;
+    }
+    unique_ptr<RowBatch> batch = 
std::make_unique<RowBatch>(scan_node_->row_desc(),
+        state_->batch_size(), scan_node_->mem_tracker());
     Status status = GetNextInternal(batch.get());
     // Always add batch to the queue because it may contain data referenced by 
previously
     // appended batches.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/359b409b/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc 
b/be/src/exec/hdfs-scan-node-base.cc
index 1c6a4de..49c66b8 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -24,37 +24,18 @@
 #include "exec/hdfs-avro-scanner.h"
 #include "exec/hdfs-parquet-scanner.h"
 
-#include <sstream>
 #include <avro/errors.h>
 #include <avro/schema.h>
 #include <boost/filesystem.hpp>
 #include <gutil/strings/substitute.h>
 
 #include "codegen/llvm-codegen.h"
-#include "common/logging.h"
-#include "common/object-pool.h"
-#include "exprs/scalar-expr.h"
-#include "exprs/scalar-expr-evaluator.h"
-#include "runtime/descriptors.h"
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/runtime-filter.inline.h"
 #include "runtime/runtime-state.h"
-#include "runtime/mem-pool.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/raw-value.h"
-#include "runtime/row-batch.h"
-#include "runtime/string-buffer.h"
-#include "util/bit-util.h"
-#include "util/container-util.h"
-#include "util/debug-util.h"
 #include "util/disk-info.h"
-#include "util/error-util.h"
 #include "util/hdfs-util.h"
-#include "util/impalad-metrics.h"
 #include "util/periodic-counter-updater.h"
-#include "util/runtime-profile-counters.h"
-
-#include "gen-cpp/PlanNodes_types.h"
 
 #include "common/names.h"
 
@@ -664,12 +645,13 @@ void HdfsScanNodeBase::InitNullCollectionValues(RowBatch* 
row_batch) const {
 
 bool HdfsScanNodeBase::PartitionPassesFilters(int32_t partition_id,
     const string& stats_name, const vector<FilterContext>& filter_ctxs) {
-  if (filter_ctxs.size() == 0) return true;
+  if (filter_ctxs.empty()) return true;
+  if (FilterContext::CheckForAlwaysFalse(stats_name, filter_ctxs)) return 
false;
   DCHECK_EQ(filter_ctxs.size(), filter_ctxs_.size())
       << "Mismatched number of filter contexts";
   Tuple* template_tuple = partition_template_tuple_map_[partition_id];
   // Defensive - if template_tuple is NULL, there can be no filters on 
partition columns.
-  if (template_tuple == NULL) return true;
+  if (template_tuple == nullptr) return true;
   TupleRow* tuple_row_mem = reinterpret_cast<TupleRow*>(&template_tuple);
   for (const FilterContext& ctx: filter_ctxs) {
     int target_ndx = ctx.filter->filter_desc().planid_to_target_ndx.at(id_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/359b409b/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 6aeb6a9..8cc85ff 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -21,6 +21,7 @@
 #include <sstream>
 
 #include "common/logging.h"
+#include "exec/base-sequence-scanner.h"
 #include "exec/hdfs-scanner.h"
 #include "exec/scanner-context.h"
 #include "runtime/descriptors.h"
@@ -484,18 +485,6 @@ exit:
   expr_results_pool.FreeAll();
 }
 
-namespace {
-
-// Returns true if 'format' uses a scanner derived from BaseSequenceScanner. 
Used to
-// workaround IMPALA-3798.
-bool FileFormatIsSequenceBased(THdfsFileFormat::type format) {
-  return format == THdfsFileFormat::SEQUENCE_FILE ||
-      format == THdfsFileFormat::RC_FILE ||
-      format == THdfsFileFormat::AVRO;
-}
-
-}
-
 Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
     MemPool* expr_results_pool, DiskIoMgr::ScanRange* scan_range) {
   DCHECK(scan_range != NULL);
@@ -512,7 +501,7 @@ Status HdfsScanNode::ProcessSplit(const 
vector<FilterContext>& filter_ctxs,
   // process the header split, the remaining scan ranges in the file will not 
be marked as
   // done. See FilePassesFilterPredicates() for the correct logic to mark all 
splits in a
   // file as done; the correct fix here is to do that for every file in a 
thread-safe way.
-  if (!FileFormatIsSequenceBased(partition->file_format())) {
+  if 
(!BaseSequenceScanner::FileFormatIsSequenceBased(partition->file_format())) {
     if (!PartitionPassesFilters(partition_id, FilterStats::SPLITS_KEY, 
filter_ctxs)) {
       // Avoid leaking unread buffers in scan_range.
       scan_range->Cancel(Status::CANCELLED);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/359b409b/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index e0f3016..cae6004 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -17,36 +17,20 @@
 
 #include "exec/hdfs-scanner.h"
 
-#include <sstream>
-#include <boost/algorithm/string.hpp>
-
 #include "codegen/codegen-anyval.h"
-#include "codegen/llvm-codegen.h"
-#include "common/logging.h"
-#include "common/object-pool.h"
+#include "exec/base-sequence-scanner.h"
 #include "exec/text-converter.h"
 #include "exec/hdfs-scan-node.h"
 #include "exec/hdfs-scan-node-mt.h"
 #include "exec/read-write-util.h"
 #include "exec/text-converter.inline.h"
-#include "exprs/scalar-expr.h"
 #include "runtime/collection-value-builder.h"
-#include "runtime/descriptors.h"
 #include "runtime/hdfs-fs-cache.h"
-#include "runtime/runtime-state.h"
-#include "runtime/mem-pool.h"
-#include "runtime/row-batch.h"
-#include "runtime/string-value.h"
+#include "runtime/runtime-filter.inline.h"
 #include "runtime/tuple-row.h"
-#include "runtime/tuple.h"
 #include "util/bitmap.h"
 #include "util/codec.h"
-#include "util/debug-util.h"
-#include "util/runtime-profile-counters.h"
-#include "util/sse-util.h"
-#include "util/string-parser.h"
 #include "util/test-info.h"
-#include "gen-cpp/PlanNodes_types.h"
 
 #include "common/names.h"
 
@@ -118,6 +102,15 @@ Status HdfsScanner::ProcessSplit() {
   DCHECK(scan_node_->HasRowBatchQueue());
   HdfsScanNode* scan_node = static_cast<HdfsScanNode*>(scan_node_);
   do {
+    // IMPALA-3798: Split-level runtime filtering is disabled with 
sequence-based file
+    // formats.
+    bool is_sequence_based = BaseSequenceScanner::FileFormatIsSequenceBased(
+        context_->partition_descriptor()->file_format());
+    if (!is_sequence_based && 
FilterContext::CheckForAlwaysFalse(FilterStats::SPLITS_KEY,
+        context_->filter_ctxs())) {
+      eos_ = true;
+      break;
+    }
     unique_ptr<RowBatch> batch = 
std::make_unique<RowBatch>(scan_node_->row_desc(),
         state_->batch_size(), scan_node_->mem_tracker());
     Status status = GetNextInternal(batch.get());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/359b409b/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc 
b/be/src/runtime/coordinator-backend-state.cc
index 0ee4bd7..a62d8cc 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -17,31 +17,20 @@
 
 #include "runtime/coordinator-backend-state.h"
 
-#include <sstream>
-#include <string>
 #include <boost/lexical_cast.hpp>
-#include <boost/thread/locks.hpp>
-#include <boost/thread/lock_guard.hpp>
-#include <boost/accumulators/accumulators.hpp>
 
 #include "common/object-pool.h"
 #include "exec/exec-node.h"
 #include "exec/scan-node.h"
-#include "scheduling/query-schedule.h"
 #include "runtime/exec-env.h"
 #include "runtime/fragment-instance-state.h"
 #include "runtime/debug-options.h"
 #include "runtime/client-cache.h"
-#include "runtime/client-cache-types.h"
 #include "runtime/backend-client.h"
 #include "runtime/coordinator-filter-state.h"
-#include "util/error-util.h"
 #include "util/uid-util.h"
 #include "util/network-util.h"
 #include "util/counting-barrier.h"
-#include "util/progress-updater.h"
-#include "gen-cpp/Types_types.h"
-#include "gen-cpp/ImpalaInternalService_types.h"
 #include "gen-cpp/ImpalaInternalService_constants.h"
 
 #include "common/names.h"
@@ -400,26 +389,18 @@ bool Coordinator::BackendState::Cancel() {
   return true;
 }
 
-void Coordinator::BackendState::PublishFilter(
-    shared_ptr<TPublishFilterParams> rpc_params) {
-  DCHECK_EQ(rpc_params->dst_query_id, query_id_);
-  if (fragments_.count(rpc_params->dst_fragment_idx) == 0) return;
+void Coordinator::BackendState::PublishFilter(const TPublishFilterParams& 
rpc_params) {
+  DCHECK_EQ(rpc_params.dst_query_id, query_id_);
+  if (fragments_.count(rpc_params.dst_fragment_idx) == 0) return;
   Status status;
   ImpalaBackendConnection backend_client(
       ExecEnv::GetInstance()->impalad_client_cache(), host_, &status);
   if (!status.ok()) return;
-  // Make a local copy of the shared 'master' set of parameters
-  TPublishFilterParams local_params(*rpc_params);
-  local_params.__set_bloom_filter(rpc_params->bloom_filter);
   TPublishFilterResult res;
-  status = backend_client.DoRpc(&ImpalaBackendClient::PublishFilter, 
local_params, &res);
+  status = backend_client.DoRpc(&ImpalaBackendClient::PublishFilter, 
rpc_params, &res);
   if (!status.ok()) {
     LOG(WARNING) << "Error publishing filter, continuing..." << 
status.GetDetail();
   }
-  // TODO: switch back to the following once we fix the lifecycle
-  // problems of Coordinator
-  //std::cref(fragment_inst->impalad_address()),
-  //std::cref(fragment_inst->fragment_instance_id())));
 }
 
 Coordinator::BackendState::InstanceStats::InstanceStats(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/359b409b/be/src/runtime/coordinator-backend-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.h 
b/be/src/runtime/coordinator-backend-state.h
index 4ea2f33..73acef9 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -81,10 +81,7 @@ class Coordinator::BackendState {
 
   /// Make a PublishFilter rpc with given params if this backend has instances 
of the
   /// fragment with idx == rpc_params->dst_fragment_idx, otherwise do nothing.
-  /// This takes by-value parameters because we cannot guarantee that the 
originating
-  /// coordinator won't be destroyed while this executes.
-  /// TODO: switch to references when we fix the lifecycle problems of 
coordinators.
-  void PublishFilter(std::shared_ptr<TPublishFilterParams> rpc_params);
+  void PublishFilter(const TPublishFilterParams& rpc_params);
 
   /// Cancel execution at this backend if anything is running. Returns true
   /// if cancellation was attempted, false otherwise.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/359b409b/be/src/runtime/coordinator-filter-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-filter-state.h 
b/be/src/runtime/coordinator-filter-state.h
index 61dece9..08944b8 100644
--- a/be/src/runtime/coordinator-filter-state.h
+++ b/be/src/runtime/coordinator-filter-state.h
@@ -58,9 +58,12 @@ class Coordinator::FilterState {
  public:
   FilterState(const TRuntimeFilterDesc& desc, const TPlanNodeId& src)
     : desc_(desc), src_(src), pending_count_(0), first_arrival_time_(0L),
-      completion_time_(0L), disabled_(false) { }
+      completion_time_(0L) {
+    // bloom_filter_ is a disjunction so the unit value is always_false.
+    bloom_filter_.always_false = true;
+  }
 
-  TBloomFilter* bloom_filter() { return bloom_filter_.get(); }
+  TBloomFilter& bloom_filter() { return bloom_filter_; }
   boost::unordered_set<int>* src_fragment_instance_idxs() {
     return &src_fragment_instance_idxs_;
   }
@@ -75,7 +78,7 @@ class Coordinator::FilterState {
   const TRuntimeFilterDesc& desc() const { return desc_; }
   int pending_count() const { return pending_count_; }
   void set_pending_count(int pending_count) { pending_count_ = pending_count; }
-  bool disabled() const { return disabled_; }
+  bool disabled() const { return bloom_filter_.always_true; }
 
   /// Aggregates partitioned join filters and updates memory consumption.
   /// Disables filter if always_true filter is received or OOM is hit.
@@ -98,13 +101,12 @@ class Coordinator::FilterState {
   int pending_count_;
 
   /// BloomFilter aggregated from all source plan nodes, to be broadcast to all
-  /// destination plan fragment instances. Owned by this object so that it can 
be
-  /// deallocated once finished with. Only set for partitioned joins 
(broadcast joins
+  /// destination plan fragment instances. Only set for partitioned joins 
(broadcast joins
   /// need no aggregation).
   /// In order to avoid memory spikes, an incoming filter is moved (vs. 
copied) to the
   /// output structure in the case of a broadcast join. Similarly, for 
partitioned joins,
   /// the filter is moved from the following member to the output structure.
-  std::unique_ptr<TBloomFilter> bloom_filter_;
+  TBloomFilter bloom_filter_;
 
   /// Time at which first local filter arrived.
   int64_t first_arrival_time_;
@@ -112,9 +114,6 @@ class Coordinator::FilterState {
   /// Time at which all local filters arrived.
   int64_t completion_time_;
 
-  /// True if the filter is permanently disabled for this query.
-  bool disabled_;
-
   /// TODO: Add a per-object lock so that we can avoid holding the global 
filter_lock_
   /// for every filter update.
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/359b409b/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index e022a21..e26919d 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -17,39 +17,21 @@
 
 #include "runtime/coordinator.h"
 
-#include <map>
-#include <memory>
 #include <thrift/protocol/TDebugProtocol.h>
 #include <boost/algorithm/string/join.hpp>
-#include <boost/accumulators/accumulators.hpp>
-#include <boost/accumulators/statistics/stats.hpp>
-#include <boost/accumulators/statistics/min.hpp>
-#include <boost/accumulators/statistics/mean.hpp>
-#include <boost/accumulators/statistics/median.hpp>
-#include <boost/accumulators/statistics/max.hpp>
-#include <boost/accumulators/statistics/variance.hpp>
-#include <boost/bind.hpp>
 #include <boost/filesystem.hpp>
 #include <boost/lexical_cast.hpp>
-#include <boost/unordered_set.hpp>
 #include <boost/algorithm/string/split.hpp>
 #include <boost/algorithm/string.hpp>
 #include <gutil/strings/substitute.h>
-#include <errno.h>
 
-#include "common/logging.h"
 #include "exec/data-sink.h"
 #include "exec/plan-root-sink.h"
-#include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/ImpalaInternalService.h"
 #include "gen-cpp/ImpalaInternalService_constants.h"
-#include "gen-cpp/ImpalaInternalService_types.h"
-#include "gen-cpp/Partitions_types.h"
-#include "gen-cpp/PlanNodes_types.h"
 #include "runtime/exec-env.h"
 #include "runtime/fragment-instance-state.h"
 #include "runtime/hdfs-fs-cache.h"
-#include "runtime/mem-tracker.h"
 #include "runtime/query-exec-mgr.h"
 #include "runtime/coordinator-filter-state.h"
 #include "runtime/coordinator-backend-state.h"
@@ -58,14 +40,9 @@
 #include "scheduling/scheduler.h"
 #include "util/bloom-filter.h"
 #include "util/counting-barrier.h"
-#include "util/debug-util.h"
-#include "util/error-util.h"
 #include "util/hdfs-bulk-ops.h"
 #include "util/hdfs-util.h"
 #include "util/histogram-metric.h"
-#include "util/network-util.h"
-#include "util/pretty-printer.h"
-#include "util/runtime-profile.h"
 #include "util/table-printer.h"
 
 #include "common/names.h"
@@ -1097,8 +1074,7 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& 
params) {
   DCHECK(filter_routing_table_complete_)
       << "Filter received before routing table complete";
 
-  // Make a 'master' copy that will be shared by all concurrent delivery RPC 
attempts.
-  shared_ptr<TPublishFilterParams> rpc_params(new TPublishFilterParams());
+  TPublishFilterParams rpc_params;
   unordered_set<int> target_fragment_idxs;
   {
     lock_guard<SpinLock> l(filter_lock_);
@@ -1129,7 +1105,6 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& 
params) {
 
     if (state->pending_count() > 0 && !state->disabled()) return;
     // At this point, we either disabled this filter or aggregation is 
complete.
-    DCHECK(state->disabled() || state->pending_count() == 0);
 
     // No more updates are pending on this filter ID. Create a distribution 
payload and
     // offer it to the queue.
@@ -1141,29 +1116,23 @@ void Coordinator::UpdateFilter(const 
TUpdateFilterParams& params) {
     }
 
     // Assign outgoing bloom filter.
-    if (state->bloom_filter() != nullptr) {
-      // Complete filter case.
-      // TODO: Replace with move() in Thrift 0.9.3.
-      TBloomFilter* aggregated_filter = state->bloom_filter();
-      filter_mem_tracker_->Release(aggregated_filter->directory.size());
-      swap(rpc_params->bloom_filter, *aggregated_filter);
-      DCHECK_EQ(aggregated_filter->directory.size(), 0);
-    } else {
-      // Disabled filter case (due to OOM or due to receiving an always_true 
filter).
-      rpc_params->bloom_filter.always_true = true;
-    }
-
+    TBloomFilter& aggregated_filter = state->bloom_filter();
+    filter_mem_tracker_->Release(aggregated_filter.directory.capacity());
+    swap(rpc_params.bloom_filter, aggregated_filter);
+    DCHECK(rpc_params.bloom_filter.always_false || 
rpc_params.bloom_filter.always_true ||
+        rpc_params.bloom_filter.directory.size() != 0);
+    rpc_params.__isset.bloom_filter = true;
+    DCHECK_EQ(aggregated_filter.directory.capacity(), 0);
     // Filter is complete, and can be released.
     state->Disable(filter_mem_tracker_);
-    DCHECK(state->bloom_filter() == nullptr);
   }
 
-  rpc_params->__set_dst_query_id(query_id());
-  rpc_params->__set_filter_id(params.filter_id);
+  rpc_params.__set_dst_query_id(query_id());
+  rpc_params.__set_filter_id(params.filter_id);
 
   for (BackendState* bs: backend_states_) {
     for (int fragment_idx: target_fragment_idxs) {
-      rpc_params->__set_dst_fragment_idx(fragment_idx);
+      rpc_params.__set_dst_fragment_idx(fragment_idx);
       bs->PublishFilter(rpc_params);
     }
   }
@@ -1171,6 +1140,7 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& 
params) {
 
 void Coordinator::FilterState::ApplyUpdate(const TUpdateFilterParams& params,
     Coordinator* coord) {
+  DCHECK(!disabled());
   DCHECK_GT(pending_count_, 0);
   DCHECK_EQ(completion_time_, 0L);
   if (first_arrival_time_ == 0L) {
@@ -1180,8 +1150,8 @@ void Coordinator::FilterState::ApplyUpdate(const 
TUpdateFilterParams& params,
   --pending_count_;
   if (params.bloom_filter.always_true) {
     Disable(coord->filter_mem_tracker_);
-  } else if (bloom_filter_.get() == nullptr) {
-    int64_t heap_space = params.bloom_filter.directory.size();
+  } else if (bloom_filter_.always_false) {
+    int64_t heap_space = params.bloom_filter.directory.capacity();
     if (!coord->filter_mem_tracker_->TryConsume(heap_space)) {
       VLOG_QUERY << "Not enough memory to allocate filter: "
                  << PrettyPrinter::Print(heap_space, TUnit::BYTES)
@@ -1189,31 +1159,29 @@ void Coordinator::FilterState::ApplyUpdate(const 
TUpdateFilterParams& params,
       // Disable, as one missing update means a correct filter cannot be 
produced.
       Disable(coord->filter_mem_tracker_);
     } else {
-      bloom_filter_.reset(new TBloomFilter());
       // Workaround for fact that parameters are const& for Thrift RPCs - yet 
we want to
       // move the payload from the request rather than copy it and take double 
the memory
       // cost. After this point, params.bloom_filter is an empty filter and 
should not be
       // read.
-      TBloomFilter* non_const_filter =
-          &const_cast<TBloomFilter&>(params.bloom_filter);
-      swap(*bloom_filter_.get(), *non_const_filter);
+      TBloomFilter* non_const_filter = 
&const_cast<TBloomFilter&>(params.bloom_filter);
+      swap(bloom_filter_, *non_const_filter);
       DCHECK_EQ(non_const_filter->directory.size(), 0);
     }
   } else {
-    BloomFilter::Or(params.bloom_filter, bloom_filter_.get());
+    BloomFilter::Or(params.bloom_filter, &bloom_filter_);
   }
 
-  if (pending_count_ == 0 || disabled_) {
+  if (pending_count_ == 0 || disabled()) {
     completion_time_ = coord->query_events_->ElapsedTime();
   }
 }
 
 void Coordinator::FilterState::Disable(MemTracker* tracker) {
-  disabled_ = true;
-  if (bloom_filter_.get() == nullptr) return;
-  int64_t heap_space = bloom_filter_.get()->directory.size();
-  tracker->Release(heap_space);
-  bloom_filter_.reset();
+  bloom_filter_.always_true = true;
+  bloom_filter_.always_false = false;
+  int64_t capacity = bloom_filter_.directory.capacity();
+  bloom_filter_.directory.clear();
+  tracker->Release(capacity);
 }
 
 const TUniqueId& Coordinator::query_id() const {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/359b409b/be/src/runtime/runtime-filter-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-ir.cc 
b/be/src/runtime/runtime-filter-ir.cc
index 1c1ecc0..4e386cb 100644
--- a/be/src/runtime/runtime-filter-ir.cc
+++ b/be/src/runtime/runtime-filter-ir.cc
@@ -17,8 +17,6 @@
 
 #include "runtime/runtime-filter.h"
 
-#include "runtime/raw-value.h"
-
 using namespace impala;
 
 bool IR_ALWAYS_INLINE RuntimeFilter::Eval(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/359b409b/be/src/runtime/runtime-filter.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h
index 7b6066a..5d9531b 100644
--- a/be/src/runtime/runtime-filter.h
+++ b/be/src/runtime/runtime-filter.h
@@ -76,9 +76,10 @@ class RuntimeFilter {
   /// false otherwise.
   bool WaitForArrival(int32_t timeout_ms) const;
 
-  /// Returns true if the filter returns true for all elements, i.e. Eval(v) 
returns true
-  /// for all v.
+  /// Returns true if the filter returns true/false for all elements, i.e. 
Eval(v) returns
+  /// true/false for all v.
   inline bool AlwaysTrue() const;
+  inline bool AlwaysFalse() const;
 
   /// Frequency with which to check for filter arrival in WaitForArrival()
   static const int SLEEP_PERIOD_MS;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/359b409b/be/src/runtime/runtime-filter.inline.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter.inline.h 
b/be/src/runtime/runtime-filter.inline.h
index 2ab77de..128cafd 100644
--- a/be/src/runtime/runtime-filter.inline.h
+++ b/be/src/runtime/runtime-filter.inline.h
@@ -49,6 +49,10 @@ inline bool RuntimeFilter::AlwaysTrue() const  {
   return HasBloomFilter() && bloom_filter_ == BloomFilter::ALWAYS_TRUE_FILTER;
 }
 
+inline bool RuntimeFilter::AlwaysFalse() const {
+  return bloom_filter_ != BloomFilter::ALWAYS_TRUE_FILTER && 
bloom_filter_->AlwaysFalse();
+}
+
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/359b409b/be/src/util/bloom-filter-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/util/bloom-filter-ir.cc b/be/src/util/bloom-filter-ir.cc
index 4c56149..9a2e299 100644
--- a/be/src/util/bloom-filter-ir.cc
+++ b/be/src/util/bloom-filter-ir.cc
@@ -22,11 +22,13 @@
 using namespace impala;
 
 void BloomFilter::InsertNoAvx2(const uint32_t hash) noexcept {
+  always_false_ = false;
   const uint32_t bucket_idx = HashUtil::Rehash32to32(hash) & directory_mask_;
   BucketInsert(bucket_idx, hash);
 }
 
 void BloomFilter::InsertAvx2(const uint32_t hash) noexcept {
+  always_false_ = false;
   const uint32_t bucket_idx = HashUtil::Rehash32to32(hash) & directory_mask_;
   BucketInsertAVX2(bucket_idx, hash);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/359b409b/be/src/util/bloom-filter.cc
----------------------------------------------------------------------
diff --git a/be/src/util/bloom-filter.cc b/be/src/util/bloom-filter.cc
index d8d149e..f8ce625 100644
--- a/be/src/util/bloom-filter.cc
+++ b/be/src/util/bloom-filter.cc
@@ -17,13 +17,7 @@
 
 #include "util/bloom-filter.h"
 
-#include <stdlib.h>
-
-#include <algorithm>
-
-#include "common/logging.h"
 #include "runtime/runtime-state.h"
-#include "util/hash-util.h"
 
 using namespace std;
 
@@ -32,7 +26,8 @@ namespace impala {
 constexpr uint32_t BloomFilter::REHASH[8] __attribute__((aligned(32)));
 
 BloomFilter::BloomFilter(const int log_heap_space)
-  : // Since log_heap_space is in bytes, we need to convert it to the number 
of tiny Bloom
+  : always_false_(true),
+    // Since log_heap_space is in bytes, we need to convert it to the number 
of tiny Bloom
     // filters we will use.
     log_num_buckets_(std::max(1, log_heap_space - LOG_BUCKET_BYTE_SIZE)),
     // Don't use log_num_buckets_ if it will lead to undefined behavior by a 
shift
@@ -55,8 +50,11 @@ BloomFilter::BloomFilter(const int log_heap_space)
 
 BloomFilter::BloomFilter(const TBloomFilter& thrift)
     : BloomFilter(thrift.log_heap_space) {
-  DCHECK_EQ(thrift.directory.size(), directory_size());
-  memcpy(directory_, &thrift.directory[0], thrift.directory.size());
+  if (!thrift.always_false) {
+    always_false_ = false;
+    DCHECK_EQ(thrift.directory.size(), directory_size());
+    memcpy(directory_, &thrift.directory[0], thrift.directory.size());
+  }
 }
 
 BloomFilter::~BloomFilter() {
@@ -68,15 +66,22 @@ BloomFilter::~BloomFilter() {
 
 void BloomFilter::ToThrift(TBloomFilter* thrift) const {
   thrift->log_heap_space = log_num_buckets_ + LOG_BUCKET_BYTE_SIZE;
-  string tmp(reinterpret_cast<const char*>(directory_), directory_size());
-  thrift->directory.swap(tmp);
+  if (always_false_) {
+    thrift->always_false = true;
+    thrift->always_true = false;
+    return;
+  }
+  thrift->directory.assign(reinterpret_cast<const char*>(directory_),
+      static_cast<unsigned long>(directory_size()));
+  thrift->always_false = false;
   thrift->always_true = false;
 }
 
 void BloomFilter::ToThrift(const BloomFilter* filter, TBloomFilter* thrift) {
-  DCHECK(thrift != NULL);
-  if (filter == NULL) {
+  DCHECK(thrift != nullptr);
+  if (filter == nullptr) {
     thrift->always_true = true;
+    DCHECK_EQ(thrift->always_false, false);
     return;
   }
   filter->ToThrift(thrift);
@@ -171,14 +176,15 @@ OrEqualArrayAvx(size_t n, const char* __restrict__ in, 
char* __restrict__ out) {
 } //namespace
 
 void BloomFilter::Or(const TBloomFilter& in, TBloomFilter* out) {
-  DCHECK(out != NULL);
+  DCHECK(out != nullptr);
+  DCHECK(&in != out);
+  // These cases are impossible in current code. If they become possible in 
the future,
+  // memory usage should be tracked accordingly.
+  DCHECK(!out->always_false);
+  DCHECK(!out->always_true);
+  DCHECK(!in.always_true);
+  if (in.always_false) return;
   DCHECK_EQ(in.log_heap_space, out->log_heap_space);
-  if (&in == out) return;
-  out->always_true |= in.always_true;
-  if (out->always_true) {
-    out->directory.resize(0);
-    return;
-  }
   DCHECK_EQ(in.directory.size(), out->directory.size())
       << "Equal log heap space " << in.log_heap_space
       << ", but different directory sizes: " << in.directory.size() << ", "

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/359b409b/be/src/util/bloom-filter.h
----------------------------------------------------------------------
diff --git a/be/src/util/bloom-filter.h b/be/src/util/bloom-filter.h
index 4a50cfc..2e225a2 100644
--- a/be/src/util/bloom-filter.h
+++ b/be/src/util/bloom-filter.h
@@ -69,6 +69,8 @@ class BloomFilter {
   /// is NULL, it is interpreted as a complete filter which contains all 
elements.
   static void ToThrift(const BloomFilter* filter, TBloomFilter* thrift);
 
+  bool AlwaysFalse() const { return always_false_; }
+
   /// Adds an element to the BloomFilter. The function used to generate 'hash' 
need not
   /// have good uniformity, but it should have low collision probability. For 
instance, if
   /// the set of values is 32-bit ints, the identity function is a valid hash 
function for
@@ -106,6 +108,9 @@ class BloomFilter {
   }
 
  private:
+  // always_false_ is true when the bloom filter hasn't had any elements 
inserted.
+  bool always_false_;
+
   /// The BloomFilter is divided up into Buckets
   static const uint64_t BUCKET_WORDS = 8;
   typedef uint32_t BucketWord;
@@ -182,6 +187,7 @@ class BloomFilter {
 // a split Bloom filter, but log2(256) * 8 = 64 random bits for a standard 
Bloom filter.
 
 inline void ALWAYS_INLINE BloomFilter::Insert(const uint32_t hash) noexcept {
+  always_false_ = false;
   const uint32_t bucket_idx = HashUtil::Rehash32to32(hash) & directory_mask_;
   if (CpuInfo::IsSupported(CpuInfo::AVX2)) {
     BucketInsertAVX2(bucket_idx, hash);
@@ -191,6 +197,7 @@ inline void ALWAYS_INLINE BloomFilter::Insert(const 
uint32_t hash) noexcept {
 }
 
 inline bool ALWAYS_INLINE BloomFilter::Find(const uint32_t hash) const 
noexcept {
+  if (always_false_) return false;
   const uint32_t bucket_idx = HashUtil::Rehash32to32(hash) & directory_mask_;
   if (CpuInfo::IsSupported(CpuInfo::AVX2)) {
     return BucketFindAVX2(bucket_idx, hash);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/359b409b/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift 
b/common/thrift/ImpalaInternalService.thrift
index cda5083..1f261f4 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -765,9 +765,10 @@ struct TBloomFilter {
   // BloomFilter::directory_.
   2: binary directory
 
-  // If true, this filter allows all elements to pass (i.e. its selectivity is 
1). If
-  // true, 'directory' and 'log_heap_space' are not meaningful.
-  4: required bool always_true
+  // If always_true or always_false is true, 'directory' and 'log_heap_space' 
are not
+  // meaningful.
+  3: required bool always_true
+  4: required bool always_false
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/359b409b/tests/custom_cluster/test_always_false_filter.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_always_false_filter.py 
b/tests/custom_cluster/test_always_false_filter.py
new file mode 100644
index 0000000..461c128
--- /dev/null
+++ b/tests/custom_cluster/test_always_false_filter.py
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+import re
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.skip import SkipIfBuildType
+
+class TestAlwaysFalseFilter(CustomClusterTestSuite):
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @SkipIfBuildType.not_dev_build
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--skip_file_runtime_filtering=true")
+  def test_skip_split(self, cursor):
+    """IMPALA-5789: Test that always false filter filters out splits when 
file-level
+    filtering is disabled. The filtering is not enabled in seq-based file 
formats."""
+    cursor.execute("SET RUNTIME_FILTER_MODE=GLOBAL")
+    cursor.execute("SET RUNTIME_FILTER_WAIT_TIME_MS=10000")
+    query = """select STRAIGHT_JOIN * from alltypes inner join
+            (select * from alltypessmall where smallint_col=-1) v
+            on v.year = alltypes.year"""
+    # Manually iterate through file formats instead of creating a test matrix 
to prevent
+    # the cluster from restarting multiple times.
+    for table_suffix in ['_avro', '_rc', '_seq']:
+      cursor.execute("use functional" + table_suffix)
+      cursor.execute(query)
+      profile = cursor.get_profile()
+      assert re.search("Files rejected: [^0] \([^0]\)", profile) is None
+      assert re.search("Splits rejected: [^0] \([^0]\)", profile) is None
+    for table_suffix in ['', '_parquet']:
+      cursor.execute("use functional" + table_suffix)
+      cursor.execute(query)
+      profile = cursor.get_profile()
+      assert re.search("Files rejected: [^0] \([^0]\)", profile) is None
+      assert re.search("Splits rejected: 8 \(8\)", profile) is not None
+

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/359b409b/tests/query_test/test_runtime_filters.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_runtime_filters.py 
b/tests/query_test/test_runtime_filters.py
index 82b5792..f631ad1 100644
--- a/tests/query_test/test_runtime_filters.py
+++ b/tests/query_test/test_runtime_filters.py
@@ -17,6 +17,7 @@
 #
 
 import pytest
+import re
 import time
 
 from tests.common.impala_test_suite import ImpalaTestSuite
@@ -47,6 +48,15 @@ class TestRuntimeFilters(ImpalaTestSuite):
     assert duration < 60, \
       "Query took too long (%ss, possibly waiting for missing filters?)" % 
str(duration)
 
+  def test_file_filtering(self, vector):
+    self.change_database(self.client, vector.get_value('table_format'))
+    self.execute_query("SET RUNTIME_FILTER_MODE=GLOBAL")
+    self.execute_query("SET RUNTIME_FILTER_WAIT_TIME_MS=10000")
+    result = self.execute_query("""select STRAIGHT_JOIN * from alltypes inner 
join
+                                (select * from alltypessmall where 
smallint_col=-1) v
+                                on v.year = alltypes.year""")
+    assert re.search("Files rejected: 8 \(8\)", result.runtime_profile) is not 
None
+    assert re.search("Splits rejected: [^0] \([^0]\)", result.runtime_profile) 
is None
 
 @SkipIfLocal.multiple_impalad
 class TestRuntimeRowFilters(ImpalaTestSuite):


Reply via email to