IMPALA-3798: Disable per-split filtering for sequence-based scanners

If a runtime filter rejects a sequence-based format's header split (but
not the entire file, which may happen if the filter has not arrived in
time), the scanner will never mark all splits for that file
complete. This is because BaseSequenceScanner issues scan ranges after
parsing the header splits, and until those ranges are processed,
RangeComplete() and AddDiskIoRanges() will not be called - those methods
update progress_ and num_unqueued_files_
respectively. HdfsScanNode::ScannerThread() reads those variables to
decide whether to exit, and as a result will spin forever.

This bug therefore only shows up when there is >1 scan range per file.

This patch disables per-split filtering for Avro, RC and sequence files
in lieu of a permanent fix which marks all scan ranges for a file as
done as soon as one range is filtered out.

Testing:

A custom cluster test is added which disables file filtering, emulating
the race condition that leads to the hang when a query that filters
scan ranges is run. Without the fix, this test hangs, with the fix the
query completes as expected. MAX_SCAN_RANGE_LENGTH is used to ensure >1
scan range per file.

Change-Id: I4770dd77fd4258c24115d72b572c727b770bd75d
Reviewed-on: http://gerrit.cloudera.org:8080/3524
Tested-by: Henry Robinson <[email protected]>
Reviewed-by: Henry Robinson <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7cb86315
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7cb86315
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7cb86315

Branch: refs/heads/master
Commit: 7cb863154ab3f8d40b1bb73635ef03f5910ee64c
Parents: 3dccb01
Author: Henry Robinson <[email protected]>
Authored: Tue Jun 28 13:55:19 2016 -0700
Committer: Tim Armstrong <[email protected]>
Committed: Tue Jul 5 13:37:25 2016 -0700

----------------------------------------------------------------------
 be/src/common/global-flags.cc                   |  3 ++
 be/src/exec/hdfs-scan-node.cc                   | 46 ++++++++++++++-----
 tests/custom_cluster/test_seq_file_filtering.py | 47 ++++++++++++++++++++
 3 files changed, 86 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7cb86315/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 88f1daf..296b338 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -95,6 +95,9 @@ DEFINE_int32(stress_free_pool_alloc, 0, "A stress option 
which causes memory all
     "debug builds only.");
 DEFINE_int32(stress_datastream_recvr_delay_ms, 0, "A stress option that causes 
data "
     "stream receiver registration to be delayed. Effective in debug builds 
only.");
+DEFINE_bool(skip_file_runtime_filtering, false, "Skips file-based runtime 
filtering in "
+    "order to provide a regression test for IMPALA-3798. Effective in debug 
builds "
+    "only.");
 #endif
 
 DEFINE_bool(disable_kudu, false, "If true, Kudu features will be disabled.");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7cb86315/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index cb4f2f7..8e808cd 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -67,6 +67,10 @@ DEFINE_int32(runtime_filter_wait_time_ms, 1000, "(Advanced) 
the maximum time, in
 DECLARE_string(cgroup_hierarchy_path);
 DECLARE_bool(enable_rm);
 
+#ifndef NDEBUG
+DECLARE_bool(skip_file_runtime_filtering);
+#endif
+
 namespace filesystem = boost::filesystem;
 using namespace impala;
 using namespace llvm;
@@ -184,6 +188,9 @@ Status HdfsScanNode::Init(const TPlanNode& tnode, 
RuntimeState* state) {
 
 bool HdfsScanNode::FilePassesFilterPredicates(const vector<FilterContext>& 
filter_ctxs,
     const THdfsFileFormat::type& format, HdfsFileDesc* file) {
+#ifndef NDEBUG
+  if (FLAGS_skip_file_runtime_filtering) return true;
+#endif
   if (filter_ctxs_.size() == 0) return true;
   ScanRangeMetadata* metadata =
       reinterpret_cast<ScanRangeMetadata*>(file->splits[0]->meta_data());
@@ -1132,7 +1139,7 @@ void HdfsScanNode::ScannerThread() {
 }
 
 bool HdfsScanNode::PartitionPassesFilterPredicates(int32_t partition_id,
-    const string& stats_name,  const vector<FilterContext>& filter_ctxs) {
+    const string& stats_name, const vector<FilterContext>& filter_ctxs) {
   if (filter_ctxs.size() == 0) return true;
   DCHECK_EQ(filter_ctxs.size(), filter_ctxs_.size())
       << "Mismatched number of filter contexts";
@@ -1152,12 +1159,24 @@ bool 
HdfsScanNode::PartitionPassesFilterPredicates(int32_t partition_id,
     bool processed = ctx.filter->HasBloomFilter();
     bool passed_filter = ctx.filter->Eval<void>(e, ctx.expr->root()->type());
     ctx.stats->IncrCounters(stats_name, 1, processed, !passed_filter);
-    if (!passed_filter)  return false;
+    if (!passed_filter) return false;
   }
 
   return true;
 }
 
+namespace {
+
+// Returns true if 'format' uses a scanner derived from BaseSequenceScanner. 
Used to
+// workaround IMPALA-3798.
+bool FileFormatIsSequenceBased(THdfsFileFormat::type format) {
+  return format == THdfsFileFormat::SEQUENCE_FILE ||
+      format == THdfsFileFormat::RC_FILE ||
+      format == THdfsFileFormat::AVRO;
+}
+
+}
+
 Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
     DiskIoMgr::ScanRange* scan_range) {
 
@@ -1171,14 +1190,21 @@ Status HdfsScanNode::ProcessSplit(const 
vector<FilterContext>& filter_ctxs,
                             << " partition_id=" << partition_id
                             << "\n" << 
PrintThrift(runtime_state_->fragment_params());
 
-  if (!PartitionPassesFilterPredicates(partition_id, FilterStats::SPLITS_KEY,
-          filter_ctxs)) {
-    // Avoid leaking unread buffers in scan_range.
-    scan_range->Cancel(Status::CANCELLED);
-    // Mark scan range as done.
-    scan_ranges_complete_counter()->Add(1);
-    progress_.Update(1);
-    return Status::OK();
+  // IMPALA-3798: Filtering before the scanner is created can cause hangs if a 
header
+  // split is filtered out, for sequence-based file formats. If the scanner 
does not
+  // process the header split, the remaining scan ranges in the file will not 
be marked as
+  // done. See FilePassesFilterPredicates() for the correct logic to mark all 
splits in a
+  // file as done; the correct fix here is to do that for every file in a 
thread-safe way.
+  if (!FileFormatIsSequenceBased(partition->file_format())) {
+    if (!PartitionPassesFilterPredicates(partition_id, FilterStats::SPLITS_KEY,
+            filter_ctxs)) {
+      // Avoid leaking unread buffers in scan_range.
+      scan_range->Cancel(Status::CANCELLED);
+      // Mark scan range as done.
+      scan_ranges_complete_counter()->Add(1);
+      progress_.Update(1);
+      return Status::OK();
+    }
   }
 
   ScannerContext context(runtime_state_, this, partition, scan_range, 
filter_ctxs);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7cb86315/tests/custom_cluster/test_seq_file_filtering.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_seq_file_filtering.py 
b/tests/custom_cluster/test_seq_file_filtering.py
new file mode 100644
index 0000000..41d869b
--- /dev/null
+++ b/tests/custom_cluster/test_seq_file_filtering.py
@@ -0,0 +1,47 @@
+# Copyright 2016 Cloudera Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+class TestImpala3798(CustomClusterTestSuite):
+  """Regression test for IMPALA-3798, which is a hang that occurs when an Avro 
file is not
+  filtered by a runtime filter, but its header split is (this only occurs when 
the filter
+  arrives after file filtering is attempted, but before per-split filtering 
is).
+
+  The debug flag --skip_file_runtime_filtering disables per-file filtering, 
mimicing the
+  race that leads to the hang.
+  """
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--skip_file_runtime_filtering=true")
+  def test_sequence_file_filtering_race(self, vector):
+    impalad = self.cluster.get_any_impalad()
+    client = impalad.service.create_beeswax_client()
+    client.execute("SET RUNTIME_FILTER_MODE=GLOBAL")
+    client.execute("SET RUNTIME_FILTER_WAIT_TIME_MS=10000")
+
+    # Set scan range length shorter than file length to ensure more than one 
split per
+    # file (which is necessary to trigger IMPALA-3798).
+    client.execute("SET MAX_SCAN_RANGE_LENGTH=1024")
+    # To trigger the bug, there must be a partition filter that eliminates at 
least one
+    # file. In this case, we choose a filter that eliminates all files, since 
there is no
+    # int_col = 3 in alltypes.
+    client.execute("select STRAIGHT_JOIN * from functional_avro.alltypes a 
join " +
+                   "[SHUFFLE] functional_avro.alltypes b on a.month = b.id " +
+                   "and b.int_col = -3")

Reply via email to