IMPALA-6679,IMPALA-6678: reduce scan reservation

This has two related changes.

IMPALA-6679: defer scanner reservation increases
------------------------------------------------
When starting each scan range, check to see how big the initial scan
range is (the full thing for row-based formats, the footer for
Parquet) and determine whether more reservation would be useful.

For Parquet, base the ideal reservation on the actual column layout
of each file. This avoids reserving memory that we won't use for
the actual files that we're scanning. This also avoid the need to
estimate ideal reservation in the planner.

We also release scanner thread reservations above the minimum as
soon as threads complete, so that resources can be released slightly
earlier.

IMPALA-6678: estimate Parquet column size for reservation
---------------------------------------------------------
This change also reduces reservation computed by the planner in certain
cases by estimating the on-disk size of column data based on stats. It
also reduces the default per-column reservation to 4MB since it appears
that < 8MB columns are generally common in practice and the method for
estimating column size is biased towards over-estimating. There are two
main cases to consider for the performance implications:
* Memory is available to improve query perf - if we underestimate, we
  can increase the reservation so we can do "efficient" 8MB I/Os for
  large columns.
* The ideal reservation is not available - query performance is affected
  because we can't overlap I/O and compute as much and may do smaller
  (probably 4MB I/Os). However, we should avoid pathological behaviour
  like tiny I/Os.

When stats are not available, we just default to reserving 4MB per
column, which typically is more memory than required. When stats are
available, the memory required can be reduced below when some heuristic
tell us with high confidence that the column data for most or all files
is smaller than 4MB.

The stats-based heuristic could reduce scan performance if both the
conservative heuristics significantly underestimate the column size
and memory is constrained such that we can't increase the scan
reservation at runtime (in which case the memory might be used by
a different operator or scanner thread).

Observability:
Added counters to track when threads were not spawned due to reservation
and to track when reservation increases are requested and denied. These
allow determining if performance may have been affected by memory
availability.

Testing:
Updated test_mem_usage_scaling.py memory requirements and added steps
to regenerate the requirements. Loops test for a while to flush out
flakiness.

Added targeted planner and query tests for reservation calculations and
increases.

Change-Id: Ifc80e05118a9eef72cac8e2308418122e3ee0842
Reviewed-on: http://gerrit.cloudera.org:8080/9757
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
Reviewed-on: http://gerrit.cloudera.org:8080/10273


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/83a70a7a
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/83a70a7a
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/83a70a7a

Branch: refs/heads/2.x
Commit: 83a70a7ae0a1bfc1fc7c2448e73c95ee2d7f7c09
Parents: 385afe5
Author: Tim Armstrong <[email protected]>
Authored: Fri Mar 16 13:27:46 2018 -0700
Committer: Tim Armstrong <[email protected]>
Committed: Thu May 3 15:28:03 2018 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-orc-scanner.cc                 |   1 +
 be/src/exec/hdfs-parquet-scanner-test.cc        |  82 +-
 be/src/exec/hdfs-parquet-scanner.cc             |  51 +-
 be/src/exec/hdfs-parquet-scanner.h              |  19 +-
 be/src/exec/hdfs-scan-node-base.cc              |  62 +-
 be/src/exec/hdfs-scan-node-base.h               |  30 +-
 be/src/exec/hdfs-scan-node-mt.cc                |  13 +-
 be/src/exec/hdfs-scan-node.cc                   | 104 +--
 be/src/exec/hdfs-scan-node.h                    |  56 +-
 be/src/exec/scanner-context.cc                  |   9 +-
 be/src/exec/scanner-context.h                   |  23 +-
 be/src/runtime/io/disk-io-mgr.cc                |  14 +
 be/src/runtime/io/disk-io-mgr.h                 |   4 +
 be/src/util/runtime-profile.cc                  |   9 +
 be/src/util/runtime-profile.h                   |   4 +
 common/thrift/PlanNodes.thrift                  |   3 -
 .../org/apache/impala/catalog/ColumnStats.java  |   1 +
 .../apache/impala/catalog/HdfsPartition.java    |   9 +
 .../org/apache/impala/planner/HdfsScanNode.java | 337 +++++--
 .../org/apache/impala/testutil/TestUtils.java   |  19 +
 testdata/bin/compute-table-stats.sh             |   2 +-
 .../queries/PlannerTest/constant-folding.test   |  32 +-
 .../PlannerTest/fk-pk-join-detection.test       |  88 +-
 .../queries/PlannerTest/max-row-size.test       |  90 +-
 .../PlannerTest/min-max-runtime-filters.test    |   2 +-
 .../queries/PlannerTest/mt-dop-validation.test  |  48 +-
 .../queries/PlannerTest/parquet-filtering.test  |  52 +-
 .../queries/PlannerTest/partition-pruning.test  |   2 +-
 .../PlannerTest/resource-requirements.test      | 927 +++++++++++--------
 .../PlannerTest/sort-expr-materialization.test  |  18 +-
 .../PlannerTest/spillable-buffer-sizing.test    | 204 ++--
 .../queries/PlannerTest/tablesample.test        |  24 +-
 .../queries/PlannerTest/union.test              |  10 +-
 .../admission-reject-mem-estimate.test          |  69 +-
 .../admission-reject-min-reservation.test       |   6 +-
 .../queries/QueryTest/explain-level2.test       |   4 +-
 .../queries/QueryTest/explain-level3.test       |   4 +-
 .../queries/QueryTest/nested-types-tpch.test    |   4 +-
 .../queries/QueryTest/scanner-reservation.test  |  55 ++
 .../QueryTest/single-node-nlj-exhaustive.test   |   5 +-
 .../queries/QueryTest/spilling-aggs.test        |   8 +-
 .../queries/QueryTest/spilling.test             |   4 +-
 .../custom_cluster/test_admission_controller.py |   2 +-
 tests/query_test/test_mem_usage_scaling.py      |  15 +-
 tests/query_test/test_queries.py                |   3 -
 tests/query_test/test_scanners.py               |  44 +-
 tests/query_test/test_sort.py                   |   7 +-
 tests/stress/extract_min_mem.py                 |  49 +
 48 files changed, 1638 insertions(+), 990 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/83a70a7a/be/src/exec/hdfs-orc-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index f12230e..b755e7a 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -118,6 +118,7 @@ void HdfsOrcScanner::ScanRangeInputStream::read(void* buf, 
uint64_t length,
     status = ExecEnv::GetInstance()->disk_io_mgr()->StartScanRange(
           scanner_->scan_node_->reader_context(), range, &needs_buffers);
     DCHECK(!status.ok() || !needs_buffers) << "Already provided a buffer";
+    if (status.ok()) status = range->GetNext(&io_buffer);
   }
   if (io_buffer != nullptr) range->ReturnBuffer(move(io_buffer));
   if (!status.ok()) throw ResourceError(status);

http://git-wip-us.apache.org/repos/asf/impala/blob/83a70a7a/be/src/exec/hdfs-parquet-scanner-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner-test.cc 
b/be/src/exec/hdfs-parquet-scanner-test.cc
index cbc6e76..85fa3ef 100644
--- a/be/src/exec/hdfs-parquet-scanner-test.cc
+++ b/be/src/exec/hdfs-parquet-scanner-test.cc
@@ -16,6 +16,8 @@
 // under the License.
 
 #include "exec/hdfs-parquet-scanner.h"
+#include "runtime/test-env.h"
+#include "service/fe-support.h"
 #include "testutil/gtest-util.h"
 
 #include "common/names.h"
@@ -23,14 +25,87 @@
 static const int64_t MIN_BUFFER_SIZE = 64 * 1024;
 static const int64_t MAX_BUFFER_SIZE = 8 * 1024 * 1024;
 
+DECLARE_int64(min_buffer_size);
+DECLARE_int32(read_size);
+
 namespace impala {
 
 class HdfsParquetScannerTest : public testing::Test {
+ public:
+  virtual void SetUp() {
+    // Override min/max buffer sizes picked up by DiskIoMgr.
+    FLAGS_min_buffer_size = MIN_BUFFER_SIZE;
+    FLAGS_read_size = MAX_BUFFER_SIZE;
+
+    test_env_.reset(new TestEnv);
+    ASSERT_OK(test_env_->Init());
+  }
+
+  virtual void TearDown() {
+    test_env_.reset();
+  }
+
  protected:
+  void TestComputeIdealReservation(const vector<int64_t>& col_range_lengths,
+      int64_t expected_ideal_reservation);
   void TestDivideReservation(const vector<int64_t>& col_range_lengths,
       int64_t total_col_reservation, const vector<int64_t>& 
expected_reservations);
+
+  boost::scoped_ptr<TestEnv> test_env_;
 };
 
+/// Test the ComputeIdealReservation returns 'expected_ideal_reservation' for 
a list
+/// of columns with 'col_range_lengths'.
+void HdfsParquetScannerTest::TestComputeIdealReservation(
+    const vector<int64_t>& col_range_lengths, int64_t 
expected_ideal_reservation) {
+  EXPECT_EQ(expected_ideal_reservation,
+      HdfsParquetScanner::ComputeIdealReservation(col_range_lengths));
+}
+
+TEST_F(HdfsParquetScannerTest, ComputeIdealReservation) {
+  // Should round up to nearest power-of-two buffer size if < max scan range 
buffer.
+  TestComputeIdealReservation({0}, MIN_BUFFER_SIZE);
+  TestComputeIdealReservation({1}, MIN_BUFFER_SIZE);
+  TestComputeIdealReservation({MIN_BUFFER_SIZE - 1}, MIN_BUFFER_SIZE);
+  TestComputeIdealReservation({MIN_BUFFER_SIZE}, MIN_BUFFER_SIZE);
+  TestComputeIdealReservation({MIN_BUFFER_SIZE + 2}, 2 * MIN_BUFFER_SIZE);
+  TestComputeIdealReservation({4 * MIN_BUFFER_SIZE + 1234}, 8 * 
MIN_BUFFER_SIZE);
+  TestComputeIdealReservation({MAX_BUFFER_SIZE - 10}, MAX_BUFFER_SIZE);
+  TestComputeIdealReservation({MAX_BUFFER_SIZE}, MAX_BUFFER_SIZE);
+
+  // Should round to nearest max I/O buffer size if >= max scan range buffer, 
up to 3
+  // buffers.
+  TestComputeIdealReservation({MAX_BUFFER_SIZE + 1}, 2 * MAX_BUFFER_SIZE);
+  TestComputeIdealReservation({MAX_BUFFER_SIZE * 2 - 1}, 2 * MAX_BUFFER_SIZE);
+  TestComputeIdealReservation({MAX_BUFFER_SIZE * 2}, 2 * MAX_BUFFER_SIZE);
+  TestComputeIdealReservation({MAX_BUFFER_SIZE * 2 + 1}, 3 * MAX_BUFFER_SIZE);
+  TestComputeIdealReservation({MAX_BUFFER_SIZE * 3 + 1}, 3 * MAX_BUFFER_SIZE);
+  TestComputeIdealReservation({MAX_BUFFER_SIZE * 100 + 27}, 3 * 
MAX_BUFFER_SIZE);
+
+  // Ideal reservations from multiple ranges are simply added together.
+  TestComputeIdealReservation({1, 2}, 2 * MIN_BUFFER_SIZE);
+  TestComputeIdealReservation(
+      {MAX_BUFFER_SIZE, MAX_BUFFER_SIZE - 1}, 2 * MAX_BUFFER_SIZE);
+  TestComputeIdealReservation(
+      {MAX_BUFFER_SIZE, MIN_BUFFER_SIZE + 1}, MAX_BUFFER_SIZE + 2 * 
MIN_BUFFER_SIZE);
+  TestComputeIdealReservation(
+      {MAX_BUFFER_SIZE, MAX_BUFFER_SIZE * 128}, 4 * MAX_BUFFER_SIZE);
+  TestComputeIdealReservation(
+      {MAX_BUFFER_SIZE * 7, MAX_BUFFER_SIZE * 128, MAX_BUFFER_SIZE * 1000},
+      3L * 3L * MAX_BUFFER_SIZE);
+
+  // Test col size that doesn't fit in int32.
+  TestComputeIdealReservation({MAX_BUFFER_SIZE * 1024L}, 3L * MAX_BUFFER_SIZE);
+
+  // Test sum of reservations that doesn't fit in int32.
+  vector<int64_t> col_range_lengths;
+  const int64_t LARGE_NUM_RANGES = 10000;
+  for (int i = 0; i < LARGE_NUM_RANGES; ++i) {
+    col_range_lengths.push_back(4 * MAX_BUFFER_SIZE);
+  }
+  TestComputeIdealReservation(col_range_lengths, LARGE_NUM_RANGES * 3L * 
MAX_BUFFER_SIZE);
+}
+
 /// Test that DivideReservationBetweenColumns() returns 
'expected_reservations' for
 /// inputs 'col_range_lengths' and 'total_col_reservation'.
 void HdfsParquetScannerTest::TestDivideReservation(const vector<int64_t>& 
col_range_lengths,
@@ -93,4 +168,9 @@ TEST_F(HdfsParquetScannerTest, DivideReservation) {
 
 }
 
-IMPALA_TEST_MAIN();
+int main(int argc, char** argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  impala::InitFeSupport();
+  return RUN_ALL_TESTS();
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/83a70a7a/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc 
b/be/src/exec/hdfs-parquet-scanner.cc
index 8f9d8ca..b40816d 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -53,14 +53,23 @@ const int16_t HdfsParquetScanner::INVALID_POS;
 
 const char* HdfsParquetScanner::LLVM_CLASS_NAME = 
"class.impala::HdfsParquetScanner";
 
-const string PARQUET_MEM_LIMIT_EXCEEDED =
+static const string PARQUET_MEM_LIMIT_EXCEEDED =
     "HdfsParquetScanner::$0() failed to allocate $1 bytes for $2.";
 
 namespace impala {
 
+static const string IDEAL_RESERVATION_COUNTER_NAME = 
"ParquetRowGroupIdealReservation";
+static const string ACTUAL_RESERVATION_COUNTER_NAME = 
"ParquetRowGroupActualReservation";
+
 Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     const vector<HdfsFileDesc*>& files) {
   DCHECK(!files.empty());
+  // Add Parquet-specific counters.
+  ADD_SUMMARY_STATS_COUNTER(
+      scan_node->runtime_profile(), IDEAL_RESERVATION_COUNTER_NAME, 
TUnit::BYTES);
+  ADD_SUMMARY_STATS_COUNTER(
+      scan_node->runtime_profile(), ACTUAL_RESERVATION_COUNTER_NAME, 
TUnit::BYTES);
+
   for (HdfsFileDesc* file : files) {
     // If the file size is less than 12 bytes, it is an invalid Parquet file.
     if (file->file_length < 12) {
@@ -161,14 +170,10 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
 
   // Release I/O buffers immediately to make sure they are cleaned up
   // in case we return a non-OK status anywhere below.
-  int64_t stream_reservation = stream_->reservation();
   stream_ = nullptr;
   context_->ReleaseCompletedResources(true);
   context_->ClearStreams();
   RETURN_IF_ERROR(footer_status);
-  // The scanner-wide stream was used only to read the file footer.  Each 
column has added
-  // its own stream. We can use the reservation from 'stream_' for the columns 
now.
-  total_col_reservation_ = stream_reservation;
 
   // Parse the file schema into an internal representation for schema 
resolution.
   schema_resolver_.reset(new ParquetSchemaResolver(*scan_node_->hdfs_table(),
@@ -1469,39 +1474,61 @@ Status HdfsParquetScanner::InitScalarColumns() {
     }
     RETURN_IF_ERROR(scalar_reader->Reset(*file_desc, col_chunk, 
row_group_idx_));
   }
-  RETURN_IF_ERROR(
-      DivideReservationBetweenColumns(scalar_readers_, 
total_col_reservation_));
+  RETURN_IF_ERROR(DivideReservationBetweenColumns(scalar_readers_));
   return Status::OK();
 }
 
 Status HdfsParquetScanner::DivideReservationBetweenColumns(
-    const vector<BaseScalarColumnReader*>& column_readers,
-    int64_t reservation_to_distribute) {
+    const vector<BaseScalarColumnReader*>& column_readers) {
   DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
   const int64_t min_buffer_size = io_mgr->min_buffer_size();
   const int64_t max_buffer_size = io_mgr->max_buffer_size();
   // The HdfsScanNode reservation calculation in the planner ensures that we 
have
   // reservation for at least one buffer per column.
-  if (reservation_to_distribute < min_buffer_size * column_readers.size()) {
+  if (context_->total_reservation() < min_buffer_size * column_readers.size()) 
{
     return Status(TErrorCode::INTERNAL_ERROR,
         Substitute("Not enough reservation in Parquet scanner for file '$0'. 
Need at "
                    "least $1 bytes per column for $2 columns but had $3 bytes",
             filename(), min_buffer_size, column_readers.size(),
-            reservation_to_distribute));
+            context_->total_reservation()));
   }
 
   vector<int64_t> col_range_lengths(column_readers.size());
   for (int i = 0; i < column_readers.size(); ++i) {
     col_range_lengths[i] = column_readers[i]->scan_range()->len();
   }
+
+  // The scanner-wide stream was used only to read the file footer.  Each 
column has added
+  // its own stream. We can use the total reservation now that 'stream_''s 
resources have
+  // been released. We may benefit from increasing reservation further, so 
let's compute
+  // the ideal reservation to scan all the columns.
+  int64_t ideal_reservation = ComputeIdealReservation(col_range_lengths);
+  if (ideal_reservation > context_->total_reservation()) {
+    context_->TryIncreaseReservation(ideal_reservation);
+  }
+  
scan_node_->runtime_profile()->GetSummaryStatsCounter(ACTUAL_RESERVATION_COUNTER_NAME)->
+      UpdateCounter(context_->total_reservation());
+  
scan_node_->runtime_profile()->GetSummaryStatsCounter(IDEAL_RESERVATION_COUNTER_NAME)->
+      UpdateCounter(ideal_reservation);
+
   vector<pair<int, int64_t>> tmp_reservations = 
DivideReservationBetweenColumnsHelper(
-      min_buffer_size, max_buffer_size, col_range_lengths, 
reservation_to_distribute);
+      min_buffer_size, max_buffer_size, col_range_lengths, 
context_->total_reservation());
   for (auto& tmp_reservation : tmp_reservations) {
     
column_readers[tmp_reservation.first]->set_io_reservation(tmp_reservation.second);
   }
   return Status::OK();
 }
 
+int64_t HdfsParquetScanner::ComputeIdealReservation(
+    const vector<int64_t>& col_range_lengths) {
+  DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
+  int64_t ideal_reservation = 0;
+  for (int64_t len : col_range_lengths) {
+    ideal_reservation += io_mgr->ComputeIdealBufferReservation(len);
+  }
+  return ideal_reservation;
+}
+
 vector<pair<int, int64_t>> 
HdfsParquetScanner::DivideReservationBetweenColumnsHelper(
     int64_t min_buffer_size, int64_t max_buffer_size,
     const vector<int64_t>& col_range_lengths, int64_t 
reservation_to_distribute) {

http://git-wip-us.apache.org/repos/asf/impala/blob/83a70a7a/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h 
b/be/src/exec/hdfs-parquet-scanner.h
index 69749f8..82e761a 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -407,9 +407,6 @@ class HdfsParquetScanner : public HdfsScanner {
   /// Scan range for the metadata.
   const io::ScanRange* metadata_range_;
 
-  /// Reservation available for scanning columns, in bytes.
-  int64_t total_col_reservation_ = 0;
-
   /// Pool to copy dictionary page buffer into. This pool is shared across all 
the
   /// pages in a column chunk.
   boost::scoped_ptr<MemPool> dictionary_pool_;
@@ -573,13 +570,19 @@ class HdfsParquetScanner : public HdfsScanner {
   /// does not start any scan ranges.
   Status InitScalarColumns() WARN_UNUSED_RESULT;
 
-  /// Decides how to divide 'reservation_to_distribute' bytes of reservation 
between the
-  /// columns. Sets the reservation on each corresponding reader in 
'column_readers'.
+  /// Decides how to divide stream_->reservation() between the columns. May 
increase
+  /// the reservation if more reservation would enable more efficient I/O for 
the
+  /// current columns being scanned. Sets the reservation on each 
corresponding reader
+  /// in 'column_readers'.
   Status DivideReservationBetweenColumns(
-      const std::vector<BaseScalarColumnReader*>& column_readers,
-      int64_t reservation_to_distribute);
+      const std::vector<BaseScalarColumnReader*>& column_readers);
+
+  /// Compute the ideal reservation to scan a file with scan range lengths
+  /// 'col_range_lengths' given the min and max buffer size of the singleton 
DiskIoMgr
+  /// in ExecEnv.
+  static int64_t ComputeIdealReservation(const std::vector<int64_t>& 
col_range_lengths);
 
-  /// Helper for DivideReservationBetweenColumns. Implements the core 
algorithm for
+  /// Helper for DivideReservationBetweenColumns(). Implements the core 
algorithm for
   /// dividing a reservation of 'reservation_to_distribute' bytes between 
columns with
   /// scan range lengths 'col_range_lengths' given a min and max buffer size. 
Returns
   /// a vector with an entry per column with the index into 
'col_range_lengths' and the

http://git-wip-us.apache.org/repos/asf/impala/blob/83a70a7a/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc 
b/be/src/exec/hdfs-scan-node-base.cc
index af6bac3..51ea9f3 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -69,7 +69,6 @@ const int UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD = 64 * 1024 
* 1024;
 HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode,
     const DescriptorTbl& descs)
     : ScanNode(pool, tnode, descs),
-      
ideal_scan_range_reservation_(tnode.hdfs_scan_node.ideal_scan_range_reservation),
       min_max_tuple_id_(tnode.hdfs_scan_node.__isset.min_max_tuple_id ?
           tnode.hdfs_scan_node.min_max_tuple_id : -1),
       
skip_header_line_count_(tnode.hdfs_scan_node.__isset.skip_header_line_count ?
@@ -86,7 +85,6 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const 
TPlanNode& tnode,
           &tnode.hdfs_scan_node.dictionary_filter_conjuncts : nullptr),
       disks_accessed_bitmap_(TUnit::UNIT, 0),
       active_hdfs_read_thread_counter_(TUnit::UNIT, 0) {
-  DCHECK_GE(ideal_scan_range_reservation_, resource_profile_.min_reservation);
 }
 
 HdfsScanNodeBase::~HdfsScanNodeBase() {
@@ -346,16 +344,6 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
   }
 
   RETURN_IF_ERROR(ClaimBufferReservation(state));
-  // We got the minimum reservation. Now try to get ideal reservation.
-  if (resource_profile_.min_reservation != ideal_scan_range_reservation_) {
-    bool increased = buffer_pool_client_.IncreaseReservation(
-        ideal_scan_range_reservation_ - resource_profile_.min_reservation);
-    VLOG_FILE << "Increasing reservation from minimum "
-              << resource_profile_.min_reservation << "B to ideal "
-              << ideal_scan_range_reservation_ << "B "
-              << (increased ? "succeeded" : "failed");
-  }
-
   reader_context_ = runtime_state_->io_mgr()->RegisterContext();
 
   // Initialize HdfsScanNode specific counters
@@ -388,6 +376,11 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
   average_hdfs_read_thread_concurrency_ = 
runtime_profile()->AddSamplingCounter(
       AVERAGE_HDFS_READ_THREAD_CONCURRENCY, &active_hdfs_read_thread_counter_);
 
+  initial_range_ideal_reservation_stats_ = 
ADD_SUMMARY_STATS_COUNTER(runtime_profile(),
+      "InitialRangeIdealReservation", TUnit::BYTES);
+  initial_range_actual_reservation_stats_ = 
ADD_SUMMARY_STATS_COUNTER(runtime_profile(),
+      "InitialRangeActualReservation", TUnit::BYTES);
+
   bytes_read_local_ = ADD_COUNTER(runtime_profile(), "BytesReadLocal",
       TUnit::BYTES);
   bytes_read_short_circuit_ = ADD_COUNTER(runtime_profile(), 
"BytesReadShortCircuit",
@@ -519,6 +512,51 @@ bool HdfsScanNodeBase::FilePassesFilterPredicates(
   return true;
 }
 
+Status HdfsScanNodeBase::StartNextScanRange(int64_t* reservation,
+    ScanRange** scan_range) {
+  DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
+  bool needs_buffers;
+  RETURN_IF_ERROR(io_mgr->GetNextUnstartedRange(
+        reader_context_.get(), scan_range, &needs_buffers));
+  if (*scan_range == nullptr) return Status::OK();
+  if (needs_buffers) {
+    // Check if we should increase our reservation to read this range more 
efficiently.
+    // E.g. if we are scanning a large text file, we might want extra I/O 
buffers to
+    // improve throughput. Note that if this is a columnar format like Parquet,
+    // '*scan_range' is the small footer range only so we won't request an 
increase.
+    int64_t ideal_scan_range_reservation =
+        io_mgr->ComputeIdealBufferReservation((*scan_range)->len());
+    *reservation = IncreaseReservationIncrementally(*reservation, 
ideal_scan_range_reservation);
+    
initial_range_ideal_reservation_stats_->UpdateCounter(ideal_scan_range_reservation);
+    initial_range_actual_reservation_stats_->UpdateCounter(*reservation);
+    RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(
+        reader_context_.get(), &buffer_pool_client_, *scan_range, 
*reservation));
+  }
+  return Status::OK();
+}
+
+int64_t HdfsScanNodeBase::IncreaseReservationIncrementally(int64_t 
curr_reservation,
+      int64_t ideal_reservation) {
+  DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
+  // Check if we could use at least one more max-sized I/O buffer for this 
range. Don't
+  // increase in smaller increments since we may not be able to use additional 
smaller
+  // buffers.
+  while (curr_reservation < ideal_reservation) {
+    // Increase to the next I/O buffer multiple or to the ideal reservation.
+    int64_t target = min(ideal_reservation,
+        BitUtil::RoundUpToPowerOf2(curr_reservation + 1, 
io_mgr->max_buffer_size()));
+    DCHECK_LT(curr_reservation, target);
+    bool increased = buffer_pool_client_.IncreaseReservation(target - 
curr_reservation);
+    VLOG_FILE << "Increasing reservation from "
+              << PrettyPrinter::PrintBytes(curr_reservation) << " to "
+              << PrettyPrinter::PrintBytes(target) << " "
+              << (increased ? "succeeded" : "failed");
+    if (!increased) break;
+    curr_reservation = target;
+  }
+  return curr_reservation;
+}
+
 ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
     int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool 
expected_local,
     const BufferOpts& buffer_opts,

http://git-wip-us.apache.org/repos/asf/impala/blob/83a70a7a/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h 
b/be/src/exec/hdfs-scan-node-base.h
index 3a9c37f..4c0a233 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -318,13 +318,19 @@ class HdfsScanNodeBase : public ScanNode {
   bool PartitionPassesFilters(int32_t partition_id, const std::string& 
stats_name,
       const std::vector<FilterContext>& filter_ctxs);
 
+  /// Helper to increase reservation from 'curr_reservation' up to 
'ideal_reservation'
+  /// that may succeed in getting a partial increase if the full increase is 
not
+  /// possible. First increases to an I/O buffer multiple then increases in 
I/O buffer
+  /// sized increments. 'curr_reservation' can refer to a "share' of the total
+  /// reservation of the buffer pool client, e.g. the 'share" belonging to a 
single
+  /// scanner thread. Returns the new reservation after increases.
+  int64_t IncreaseReservationIncrementally(int64_t curr_reservation,
+      int64_t ideal_reservation);
+
  protected:
   friend class ScannerContext;
   friend class HdfsScanner;
 
-  /// Ideal reservation to process each input split, computed by the planner.
-  const int64_t ideal_scan_range_reservation_;
-
   /// Tuple id of the tuple used to evaluate conjuncts on parquet::Statistics.
   const int min_max_tuple_id_;
 
@@ -481,6 +487,13 @@ class HdfsScanNodeBase : public ScanNode {
   /// taken where there are i concurrent hdfs read thread running. Created in 
Open().
   std::vector<RuntimeProfile::Counter*>* hdfs_read_thread_concurrency_bucket_ 
= nullptr;
 
+  /// Track stats about ideal/actual reservation for initial scan ranges so we 
can
+  /// determine if the scan got all of the reservation it wanted. Does not 
include
+  /// subsequent reservation increases done by scanner implementation (e.g. 
for Parquet
+  /// columns).
+  RuntimeProfile::SummaryStatsCounter* initial_range_ideal_reservation_stats_ 
= nullptr;
+  RuntimeProfile::SummaryStatsCounter* initial_range_actual_reservation_stats_ 
= nullptr;
+
   /// Pool for allocating some amounts of memory that is shared between 
scanners.
   /// e.g. partition key tuple and their string buffers
   boost::scoped_ptr<MemPool> scan_node_pool_;
@@ -495,6 +508,17 @@ class HdfsScanNodeBase : public ScanNode {
   /// Only valid to call if !initial_ranges_issued_. Sets 
initial_ranges_issued_ to true.
   Status IssueInitialScanRanges(RuntimeState* state) WARN_UNUSED_RESULT;
 
+  /// Gets the next scan range to process and allocates buffer for it. 
'reservation' is
+  /// an in/out argument with the current reservation available for this 
range. It may
+  /// be increased by this function up to a computed "ideal" reservation, in 
which case
+  /// *reservation is increased to reflect the new reservation.
+  ///
+  /// Returns Status::OK() and sets 'scan_range' if it gets a range to 
process. Returns
+  /// Status::OK() and sets 'scan_range' to NULL when no more ranges are left 
to process.
+  /// Returns an error status if there was an error getting the range or 
allocating
+  /// buffers.
+  Status StartNextScanRange(int64_t* reservation, io::ScanRange** scan_range);
+
   /// Create and open new scanner for this partition type.
   /// If the scanner is successfully created and opened, it is returned in 
'scanner'.
   Status CreateAndOpenScanner(HdfsPartitionDescriptor* partition,

http://git-wip-us.apache.org/repos/asf/impala/blob/83a70a7a/be/src/exec/hdfs-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index 2786742..4573978 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -78,26 +78,19 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, 
RowBatch* row_batch, bool* e
       scanner_->Close(row_batch);
       scanner_.reset();
     }
-    DiskIoMgr* io_mgr = runtime_state_->io_mgr();
-    bool needs_buffers;
-    RETURN_IF_ERROR(io_mgr->GetNextUnstartedRange(
-        reader_context_.get(), &scan_range_, &needs_buffers));
+    int64_t scanner_reservation = buffer_pool_client_.GetReservation();
+    RETURN_IF_ERROR(StartNextScanRange(&scanner_reservation, &scan_range_));
     if (scan_range_ == nullptr) {
       *eos = true;
       StopAndFinalizeCounters();
       return Status::OK();
     }
-    int64_t scanner_reservation = buffer_pool_client_.GetReservation();
-    if (needs_buffers) {
-      RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(reader_context_.get(),
-          &buffer_pool_client_, scan_range_, scanner_reservation));
-    }
     ScanRangeMetadata* metadata =
         static_cast<ScanRangeMetadata*>(scan_range_->meta_data());
     int64_t partition_id = metadata->partition_id;
     HdfsPartitionDescriptor* partition = 
hdfs_table_->GetPartition(partition_id);
     scanner_ctx_.reset(new ScannerContext(runtime_state_, this, 
&buffer_pool_client_,
-        partition, filter_ctxs(), expr_results_pool()));
+        scanner_reservation, partition, filter_ctxs(), expr_results_pool()));
     scanner_ctx_->AddStream(scan_range_, scanner_reservation);
     Status status = CreateAndOpenScanner(partition, scanner_ctx_.get(), 
&scanner_);
     if (!status.ok()) {

http://git-wip-us.apache.org/repos/asf/impala/blob/83a70a7a/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index e8dac08..81b834f 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -156,6 +156,8 @@ Status HdfsScanNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(HdfsScanNodeBase::Prepare(state));
   row_batches_get_timer_ = ADD_TIMER(runtime_profile(), 
"RowBatchQueueGetWaitTime");
   row_batches_put_timer_ = ADD_TIMER(runtime_profile(), 
"RowBatchQueuePutWaitTime");
+  scanner_thread_reservations_denied_counter_ =
+      ADD_COUNTER(runtime_profile(), "NumScannerThreadReservationsDenied", 
TUnit::UNIT);
   return Status::OK();
 }
 
@@ -172,7 +174,6 @@ Status HdfsScanNode::Open(RuntimeState* state) {
     max_num_scanner_threads_ = 
runtime_state_->query_options().num_scanner_threads;
   }
   DCHECK_GT(max_num_scanner_threads_, 0);
-  spare_reservation_.Store(buffer_pool_client_.GetReservation());
   thread_avail_cb_id_ = runtime_state_->resource_pool()->AddThreadAvailableCb(
       bind<void>(mem_fn(&HdfsScanNode::ThreadTokenAvailableCb), this, _1));
   return Status::OK();
@@ -215,28 +216,19 @@ Status HdfsScanNode::AddDiskIoRanges(const 
vector<ScanRange*>& ranges,
   return Status::OK();
 }
 
-bool HdfsScanNode::EnoughReservationForExtraThread(const unique_lock<mutex>& 
lock) {
+void HdfsScanNode::ReturnReservationFromScannerThread(const 
unique_lock<mutex>& lock,
+    int64_t bytes) {
   DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
-  if (spare_reservation_.Load() >= ideal_scan_range_reservation_) return true;
-  int64_t increase = ideal_scan_range_reservation_ - spare_reservation_.Load();
-  if (!buffer_pool_client_.IncreaseReservation(increase)) return false;
-  spare_reservation_.Add(increase);
-  return true;
-}
-
-int64_t HdfsScanNode::DeductReservationForScannerThread(const 
unique_lock<mutex>& lock,
-    bool first_thread) {
-  DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
-  int64_t amount;
-  if (first_thread) {
-    amount = spare_reservation_.Load() >= ideal_scan_range_reservation_ ?
-        ideal_scan_range_reservation_ : resource_profile_.min_reservation;
-  } else {
-    amount = ideal_scan_range_reservation_;
+  int64_t curr_reservation = buffer_pool_client_.GetReservation();
+  DCHECK_GE(curr_reservation, resource_profile_.min_reservation);
+  // Release as much memory as possible. Must hold onto the minimum 
reservation, though.
+  int64_t reservation_decrease =
+      min(bytes, curr_reservation - resource_profile_.min_reservation);
+  if (reservation_decrease > 0) {
+    Status status =
+        buffer_pool_client_.DecreaseReservationTo(curr_reservation - 
reservation_decrease);
+    DCHECK(status.ok()) << "Not possible, scans don't unpin pages" << 
status.GetDetail();
   }
-  int64_t remainder = spare_reservation_.Add(-amount);
-  DCHECK_GE(remainder, 0);
-  return amount;
 }
 
 void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
@@ -250,7 +242,8 @@ void 
HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
   //  4. Don't start up if no initial ranges have been issued (see 
IMPALA-1722).
   //  5. Don't start up a ScannerThread if materialized_row_batches_ is full 
since
   //     we are not scanner bound.
-  //  6. Don't start up a thread if there isn't enough memory left to run it.
+  //  6. Don't start up a thread if it is an extra thread and we can't reserve 
another
+  //     minimum reservation's worth of memory for the thread.
   //  7. Don't start up more than maximum number of scanner threads configured.
   //  8. Don't start up if there are no thread tokens.
 
@@ -269,17 +262,21 @@ void 
HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
 
     const int64_t num_active_scanner_threads = 
active_scanner_thread_counter_.value();
     const bool first_thread = num_active_scanner_threads == 0;
+    const int64_t scanner_thread_reservation = 
resource_profile_.min_reservation;
     // Cases 1, 2, 3.
     if (done_ || all_ranges_started_ ||
         num_active_scanner_threads >= progress_.remaining()) {
       break;
     }
 
-    // Cases 5 and 6.
-    if (!first_thread &&
-        (materialized_row_batches_->Size() >= max_materialized_row_batches_ ||
-         !EnoughReservationForExtraThread(lock))) {
-      break;
+    if (!first_thread) {
+      // Cases 5 and 6.
+      if (materialized_row_batches_->Size() >= max_materialized_row_batches_) 
break;
+      // The node's min reservation is for the first thread so we don't need 
to check
+      if 
(!buffer_pool_client_.IncreaseReservation(scanner_thread_reservation)) {
+        COUNTER_ADD(scanner_thread_reservations_denied_counter_, 1);
+        break;
+      }
     }
 
     // Case 7 and 8.
@@ -288,14 +285,10 @@ void 
HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
       pool->AcquireThreadToken();
     } else if (active_scanner_thread_counter_.value() >= 
max_num_scanner_threads_
         || !pool->TryAcquireThreadToken()) {
+      ReturnReservationFromScannerThread(lock, scanner_thread_reservation);
       break;
     }
 
-    // Deduct the reservation. We haven't dropped the lock since the
-    // first_thread/EnoughReservationForExtraThread() checks so spare 
reservation
-    // must be available.
-    int64_t scanner_thread_reservation =
-        DeductReservationForScannerThread(lock, first_thread);
     COUNTER_ADD(&active_scanner_thread_counter_, 1);
     string name = Substitute("scanner-thread (finst:$0, plan-node-id:$1, 
thread-idx:$2)",
         PrintId(runtime_state_->fragment_instance_id()), id(),
@@ -307,7 +300,7 @@ void 
HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
     status =
       Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn, 
&t, true);
     if (!status.ok()) {
-      ReturnReservationFromScannerThread(scanner_thread_reservation);
+      ReturnReservationFromScannerThread(lock, scanner_thread_reservation);
       COUNTER_ADD(&active_scanner_thread_counter_, -1);
       // Release the token and skip running callbacks to find a replacement. 
Skipping
       // serves two purposes. First, it prevents a mutual recursion between 
this function
@@ -331,8 +324,6 @@ void 
HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
 void HdfsScanNode::ScannerThread(bool first_thread, int64_t 
scanner_thread_reservation) {
   SCOPED_THREAD_COUNTER_MEASUREMENT(scanner_thread_counters());
   SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
-  DiskIoMgr* io_mgr = runtime_state_->io_mgr();
-
   // Make thread-local copy of filter contexts to prune scan ranges, and to 
pass to the
   // scanner for finer-grained filtering. Use a thread-local MemPool for the 
filter
   // contexts as the embedded expression evaluators may allocate from it and 
MemPool
@@ -377,29 +368,19 @@ void HdfsScanNode::ScannerThread(bool first_thread, 
int64_t scanner_thread_reser
     // to return if there's an error.
     ranges_issued_barrier_.Wait(SCANNER_THREAD_WAIT_TIME_MS, &unused);
 
-    // Take a snapshot of num_unqueued_files_ before calling 
GetNextUnstartedRange().
+    // Take a snapshot of num_unqueued_files_ before calling 
StartNextScanRange().
     // We don't want num_unqueued_files_ to go to zero between the return from
-    // GetNextUnstartedRange() and the check for when all ranges are complete.
+    // StartNextScanRange() and the check for when all ranges are complete.
     int num_unqueued_files = num_unqueued_files_.Load();
     // TODO: the Load() acts as an acquire barrier.  Is this needed? (i.e. any 
earlier
     // stores that need to complete?)
     AtomicUtil::MemoryBarrier();
     ScanRange* scan_range;
-    bool needs_buffers;
-    Status status =
-        io_mgr->GetNextUnstartedRange(reader_context_.get(), &scan_range, 
&needs_buffers);
-
+    Status status = StartNextScanRange(&scanner_thread_reservation, 
&scan_range);
     if (status.ok() && scan_range != nullptr) {
-      if (needs_buffers) {
-        status = io_mgr->AllocateBuffersForRange(
-            reader_context_.get(), &buffer_pool_client_, scan_range,
-            scanner_thread_reservation);
-      }
-      if (status.ok()) {
-        // Got a scan range. Process the range end to end (in this thread).
-        status = ProcessSplit(filter_status.ok() ? filter_ctxs : 
vector<FilterContext>(),
-            &expr_results_pool, scan_range, scanner_thread_reservation);
-      }
+      // Got a scan range. Process the range end to end (in this thread).
+      status = ProcessSplit(filter_status.ok() ? filter_ctxs : 
vector<FilterContext>(),
+          &expr_results_pool, scan_range, &scanner_thread_reservation);
     }
 
     if (!status.ok()) {
@@ -429,8 +410,9 @@ void HdfsScanNode::ScannerThread(bool first_thread, int64_t 
scanner_thread_reser
       // TODO: Based on the usage pattern of all_ranges_started_, it looks 
like it is not
       // needed to acquire the lock in x86.
       unique_lock<mutex> l(lock_);
-      // All ranges have been queued and GetNextUnstartedRange() returned 
NULL. This means
-      // that every range is either done or being processed by another thread.
+      // All ranges have been queued and DiskIoMgr has no more new ranges for 
this scan
+      // node to process. This means that every range is either done or being 
processed by
+      // another thread.
       all_ranges_started_ = true;
       break;
     }
@@ -438,7 +420,10 @@ void HdfsScanNode::ScannerThread(bool first_thread, 
int64_t scanner_thread_reser
   COUNTER_ADD(&active_scanner_thread_counter_, -1);
 
 exit:
-  ReturnReservationFromScannerThread(scanner_thread_reservation);
+  {
+    unique_lock<mutex> l(lock_);
+    ReturnReservationFromScannerThread(l, scanner_thread_reservation);
+  }
   runtime_state_->resource_pool()->ReleaseThreadToken(first_thread);
   for (auto& ctx: filter_ctxs) ctx.expr_eval->Close(runtime_state_);
   filter_mem_pool.FreeAll();
@@ -447,7 +432,7 @@ exit:
 
 Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
     MemPool* expr_results_pool, ScanRange* scan_range,
-    int64_t scanner_thread_reservation) {
+    int64_t* scanner_thread_reservation) {
   DCHECK(scan_range != NULL);
 
   ScanRangeMetadata* metadata = 
static_cast<ScanRangeMetadata*>(scan_range->meta_data());
@@ -472,9 +457,9 @@ Status HdfsScanNode::ProcessSplit(const 
vector<FilterContext>& filter_ctxs,
     return Status::OK();
   }
 
-  ScannerContext context(runtime_state_, this, &buffer_pool_client_, partition,
-      filter_ctxs, expr_results_pool);
-  context.AddStream(scan_range, scanner_thread_reservation);
+  ScannerContext context(runtime_state_, this, &buffer_pool_client_,
+      *scanner_thread_reservation, partition, filter_ctxs, expr_results_pool);
+  context.AddStream(scan_range, *scanner_thread_reservation);
   scoped_ptr<HdfsScanner> scanner;
   Status status = CreateAndOpenScanner(partition, &context, &scanner);
   if (!status.ok()) {
@@ -510,6 +495,9 @@ Status HdfsScanNode::ProcessSplit(const 
vector<FilterContext>& filter_ctxs,
 
   // Transfer remaining resources to a final batch and add it to the row batch 
queue.
   scanner->Close();
+  // Reservation may have been increased by the scanner, e.g. Parquet may 
allocate
+  // additional reservation to scan columns.
+  *scanner_thread_reservation = context.total_reservation();
   return status;
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/83a70a7a/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index 2dfbb10..48684bd 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -64,9 +64,11 @@ class TPlanNode;
 /// ------------------
 /// The different scanner threads all allocate I/O buffers from the node's 
Buffer Pool
 /// client. The scan node ensures that enough reservation is available to 
start a
-/// scanner thread before launching each one with (see
-/// EnoughReservationForExtraThread()), after which the scanner thread is 
responsible
-/// for staying within the reservation handed off to it.
+/// scanner thread before launching each one with, after which the scanner 
thread must
+/// stay within the reservation handed off to it. Scanner threads can try to 
increase
+/// their reservation if desired (e.g. for scanning columnar formats like 
Parquet), but
+/// must be able to make progress within the initial reservation handed off 
from the scan
+/// node.
 ///
 /// TODO: Remove this class once the fragment-based multi-threaded execution is
 /// fully functional.
@@ -150,19 +152,15 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// the number of cores.
   int max_num_scanner_threads_;
 
-  /// Amount of the 'buffer_pool_client_' reservation that is not allocated to 
scanner
-  /// threads. Doled out to scanner threads when they are started and returned 
when
-  /// those threads no longer need it. Can be atomically incremented without 
holding
-  /// 'lock_' but 'lock_' is held when decrementing to ensure that the check 
for
-  /// reservation and the actual deduction are atomic with respect to other 
threads
-  /// trying to claim reservation.
-  AtomicInt64 spare_reservation_{0};
+  // Number of times scanner threads were not created because of reservation 
increase
+  // being denied.
+  RuntimeProfile::Counter* scanner_thread_reservations_denied_counter_ = 
nullptr;
 
   /// The wait time for fetching a row batch from the row batch queue.
-  RuntimeProfile::Counter* row_batches_get_timer_;
+  RuntimeProfile::Counter* row_batches_get_timer_ = nullptr;
 
   /// The wait time for enqueuing a row batch into the row batch queue.
-  RuntimeProfile::Counter* row_batches_put_timer_;
+  RuntimeProfile::Counter* row_batches_put_timer_ = nullptr;
 
   /// Tries to spin up as many scanner threads as the quota allows. Called 
explicitly
   /// (e.g., when adding new ranges) or when threads are available for this 
scan node.
@@ -174,34 +172,24 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// 'first_thread' is true if this was the first scanner thread to start and
   /// it acquired a "required" thread token from ThreadResourceMgr.
   /// The caller must have reserved 'scanner_thread_reservation' bytes of 
memory for
-  /// this thread with DeductReservationForScannerThread().
+  /// this thread. Before returning, this function releases the reservation 
with
+  /// ReturnReservationFromScannerThread().
   void ScannerThread(bool first_thread, int64_t scanner_thread_reservation);
 
   /// Process the entire scan range with a new scanner object. Executed in 
scanner
   /// thread. 'filter_ctxs' is a clone of the class-wide filter_ctxs_, used to 
filter rows
-  /// in this split.
+  /// in this split. 'scanner_thread_reservation' is an in/out argument that 
tracks the
+  /// total reservation from 'buffer_pool_client_' that is allotted for this 
thread's
+  /// use.
   Status ProcessSplit(const std::vector<FilterContext>& filter_ctxs,
       MemPool* expr_results_pool, io::ScanRange* scan_range,
-      int64_t scanner_thread_reservation) WARN_UNUSED_RESULT;
-
-  /// Return true if there is enough reservation to start an extra scanner 
thread.
-  /// Tries to increase reservation if enough is not already available in
-  /// 'spare_reservation_'. 'lock_' must be held via 'lock'
-  bool EnoughReservationForExtraThread(const boost::unique_lock<boost::mutex>& 
lock);
-
-  /// Deduct reservation to start a new scanner thread from 
'spare_reservation_'. If
-  /// 'first_thread' is true, this is the first thread to be started and only 
the
-  /// minimum reservation is required to be available. Otherwise
-  /// EnoughReservationForExtra() thread must have returned true in the current
-  /// critical section so that 'ideal_scan_range_bytes_' is available for the 
extra
-  /// thread. Returns the amount deducted. 'lock_' must be held via 'lock'.
-  int64_t DeductReservationForScannerThread(const 
boost::unique_lock<boost::mutex>& lock,
-      bool first_thread);
-
-  /// Called by scanner thread to return or all of its reservation that is not 
needed.
-  void ReturnReservationFromScannerThread(int64_t bytes) {
-    spare_reservation_.Add(bytes);
-  }
+      int64_t* scanner_thread_reservation) WARN_UNUSED_RESULT;
+
+  /// Called by scanner thread to return some or all of its reservation that 
is not
+  /// needed. Always holds onto at least the minimum reservation to avoid 
violating the
+  /// invariants of ExecNode::buffer_pool_client_. 'lock_' must be held via 
'lock'.
+  void ReturnReservationFromScannerThread(const 
boost::unique_lock<boost::mutex>& lock,
+      int64_t bytes);
 
   /// Checks for eos conditions and returns batches from 
materialized_row_batches_.
   Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos)

http://git-wip-us.apache.org/repos/asf/impala/blob/83a70a7a/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index c669e65..280eac9 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -41,12 +41,14 @@ static const int64_t INIT_READ_PAST_SIZE_BYTES = 64 * 1024;
 const int64_t ScannerContext::Stream::OUTPUT_BUFFER_BYTES_LEFT_INIT;
 
 ScannerContext::ScannerContext(RuntimeState* state, HdfsScanNodeBase* 
scan_node,
-    BufferPool::ClientHandle* bp_client, HdfsPartitionDescriptor* 
partition_desc,
+    BufferPool::ClientHandle* bp_client, int64_t total_reservation,
+    HdfsPartitionDescriptor* partition_desc,
     const vector<FilterContext>& filter_ctxs,
     MemPool* expr_results_pool)
   : state_(state),
     scan_node_(scan_node),
     bp_client_(bp_client),
+    total_reservation_(total_reservation),
     partition_desc_(partition_desc),
     filter_ctxs_(filter_ctxs),
     expr_results_pool_(expr_results_pool) {
@@ -56,6 +58,11 @@ ScannerContext::~ScannerContext() {
   DCHECK(streams_.empty());
 }
 
+void ScannerContext::TryIncreaseReservation(int64_t ideal_reservation) {
+  total_reservation_ = scan_node_->IncreaseReservationIncrementally(
+      total_reservation_, ideal_reservation);
+}
+
 void ScannerContext::ReleaseCompletedResources(bool done) {
   for (int i = 0; i < streams_.size(); ++i) {
     streams_[i]->ReleaseCompletedResources(done);

http://git-wip-us.apache.org/repos/asf/impala/blob/83a70a7a/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index 6292486..0bb8d74 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -86,9 +86,10 @@ class ScannerContext {
  public:
   /// Create a scanner context with the parent scan_node (where materialized 
row batches
   /// get pushed to) and the scan range to process. Buffers are allocated using
-  /// 'bp_client'.
+  /// 'bp_client'. 'total_reservation' bytes of 'bp_client''s reservation has 
been
+  /// initally allotted for use by this scanner.
   ScannerContext(RuntimeState* state, HdfsScanNodeBase* scan_node,
-      BufferPool::ClientHandle* bp_client,
+      BufferPool::ClientHandle* bp_client, int64_t total_reservation,
       HdfsPartitionDescriptor* partition_desc,
       const std::vector<FilterContext>& filter_ctxs,
       MemPool* expr_results_pool);
@@ -350,6 +351,13 @@ class ScannerContext {
 
   int NumStreams() const { return streams_.size(); }
 
+  /// Tries to increase 'total_reservation()' to 'ideal_reservation'. May get
+  /// none, part or all of the requested increase. total_reservation() can be
+  /// checked by the caller to find out the new total reservation. When this
+  /// ScannerContext is destroyed, the scan node takes back ownership of
+  /// total_reservation().
+  void TryIncreaseReservation(int64_t ideal_reservation);
+
   /// Release completed resources for all streams, e.g. the last buffer in 
each stream if
   /// the current read position is at the end of the buffer. If 'done' is true 
all
   /// resources are freed, even if the caller has not read that data yet. 
After calling
@@ -370,8 +378,9 @@ class ScannerContext {
   /// buffers that it needs allocated. 'reservation' is the amount of 
reservation that
   /// is given to this stream for allocating I/O buffers. The reservation is 
shared with
   /// 'range', so the context must be careful not to use this until all of 
'range's
-  /// buffers have been freed. Must be >= the minimum IoMgr buffer size o 
allow reading
-  /// past the end of 'range'.
+  /// buffers have been freed. Must be >= the minimum IoMgr buffer size to 
allow reading
+  /// past the end of 'range'. 'reservation' must be <=
+  /// ScannerContext::total_reservation(), i.e. this reservation is included 
in the total.
   ///
   /// Returns the added stream. The returned stream is owned by this context.
   Stream* AddStream(io::ScanRange* range, int64_t reservation);
@@ -382,6 +391,7 @@ class ScannerContext {
   bool cancelled() const;
 
   BufferPool::ClientHandle* bp_client() const { return bp_client_; }
+  int64_t total_reservation() const { return total_reservation_; }
   HdfsPartitionDescriptor* partition_descriptor() const { return 
partition_desc_; }
   const std::vector<FilterContext>& filter_ctxs() const { return filter_ctxs_; 
}
   MemPool* expr_results_pool() const { return expr_results_pool_; }
@@ -396,6 +406,11 @@ class ScannerContext {
   /// call thread-safe BufferPool methods with this client.
   BufferPool::ClientHandle* const bp_client_;
 
+  /// Total reservation from 'bp_client_' that this scanner is allowed to use.
+  /// TODO: when we remove the multi-threaded scan node, we may be able to 
just use
+  /// bp_client_->Reservation()
+  int64_t total_reservation_;
+
   HdfsPartitionDescriptor* const partition_desc_;
 
   /// Vector of streams. Non-columnar formats will always have one stream per 
context.

http://git-wip-us.apache.org/repos/asf/impala/blob/83a70a7a/be/src/runtime/io/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 50f6fc4..170b47e 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -505,6 +505,20 @@ vector<int64_t> DiskIoMgr::ChooseBufferSizes(int64_t 
scan_range_len, int64_t max
   return buffer_sizes;
 }
 
+int64_t DiskIoMgr::ComputeIdealBufferReservation(int64_t scan_range_len) {
+  if (scan_range_len < max_buffer_size_) {
+    // Round up to nearest power-of-two buffer size - ideally we should do a 
single read
+    // I/O for this range.
+    return max(min_buffer_size_, BitUtil::RoundUpToPowerOfTwo(scan_range_len));
+  } else {
+    // Round up to the nearest max-sized I/O buffer, capped by
+    // IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE - we should do one or more 
max-sized read
+    // I/Os for this range.
+    return min(IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE * max_buffer_size_,
+            BitUtil::RoundUpToPowerOf2(scan_range_len, max_buffer_size_));
+  }
+}
+
 // This function gets the next RequestRange to work on for this disk. It 
checks for
 // cancellation and
 // a) Updates ready_to_start_ranges if there are no scan ranges queued for 
this disk.

http://git-wip-us.apache.org/repos/asf/impala/blob/83a70a7a/be/src/runtime/io/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h
index b6b4b75..cc1bb37 100644
--- a/be/src/runtime/io/disk-io-mgr.h
+++ b/be/src/runtime/io/disk-io-mgr.h
@@ -386,6 +386,10 @@ class DiskIoMgr : public CacheLineAligned {
     REMOTE_NUM_DISKS
   };
 
+  /// Compute the ideal reservation for processing a scan range of 
'scan_range_len' bytes.
+  /// See "Buffer Management" in the class comment for explanation.
+  int64_t ComputeIdealBufferReservation(int64_t scan_range_len);
+
   /// The ideal number of max-sized buffers per scan range to maximise 
throughput.
   /// See "Buffer Management" in the class comment for explanation.
   static const int64_t IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE = 3;

http://git-wip-us.apache.org/repos/asf/impala/blob/83a70a7a/be/src/util/runtime-profile.cc
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index 5f205a7..c7d50de 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -621,6 +621,15 @@ RuntimeProfile::Counter* RuntimeProfile::GetCounter(const 
string& name) {
   return NULL;
 }
 
+RuntimeProfile::SummaryStatsCounter* RuntimeProfile::GetSummaryStatsCounter(
+    const string& name) {
+  lock_guard<SpinLock> l(summary_stats_map_lock_);
+  if (summary_stats_map_.find(name) != summary_stats_map_.end()) {
+    return summary_stats_map_[name];
+  }
+  return nullptr;
+}
+
 void RuntimeProfile::GetCounters(const string& name, vector<Counter*>* 
counters) {
   Counter* c = GetCounter(name);
   if (c != NULL) counters->push_back(c);

http://git-wip-us.apache.org/repos/asf/impala/blob/83a70a7a/be/src/util/runtime-profile.h
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h
index 320edf3..a7f9a95 100644
--- a/be/src/util/runtime-profile.h
+++ b/be/src/util/runtime-profile.h
@@ -201,6 +201,10 @@ class RuntimeProfile { // NOLINT: This struct is not 
packed, but there are not s
   /// that name.
   Counter* GetCounter(const std::string& name);
 
+  /// Gets the summary stats counter with 'name'. Returns NULL if there is no 
summary
+  /// stats counter with that name.
+  SummaryStatsCounter* GetSummaryStatsCounter(const std::string& name);
+
   /// Adds all counters with 'name' that are registered either in this or
   /// in any of the child profiles to 'counters'.
   void GetCounters(const std::string& name, std::vector<Counter*>* counters);

http://git-wip-us.apache.org/repos/asf/impala/blob/83a70a7a/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 22198aa..01698ce 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -240,9 +240,6 @@ struct THdfsScanNode {
   // The byte offset of the slot for Parquet metadata if Parquet count star 
optimization
   // is enabled.
   10: optional i32 parquet_count_star_slot_offset
-
-  // The ideal memory reservation in bytes to process an input split.
-  11: optional i64 ideal_scan_range_reservation
 }
 
 struct TDataSourceScanNode {

http://git-wip-us.apache.org/repos/asf/impala/blob/83a70a7a/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java 
b/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java
index dfaaf66..c798d96 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java
@@ -163,6 +163,7 @@ public class ColumnStats {
   public long getMaxSize() { return maxSize_; }
   public boolean hasNulls() { return numNulls_ > 0; }
   public long getNumNulls() { return numNulls_; }
+  public boolean hasAvgSize() { return avgSize_ >= 0; }
   public boolean hasAvgSerializedSize() { return avgSerializedSize_ >= 0; }
   public boolean hasNumDistinctValues() { return numDistinctValues_ >= 0; }
   public boolean hasStats() { return numNulls_ != -1 || numDistinctValues_ != 
-1; }

http://git-wip-us.apache.org/repos/asf/impala/blob/83a70a7a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 2179346..e0850c6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -194,6 +194,15 @@ public class HdfsPartition implements 
Comparable<HdfsPartition> {
     public String getFileName() { return fbFileDescriptor_.fileName(); }
     public long getFileLength() { return fbFileDescriptor_.length(); }
 
+    /** Compute the total length of files in fileDescs */
+    public static long computeTotalFileLength(Collection<FileDescriptor> 
fileDescs) {
+      long totalLength = 0;
+      for (FileDescriptor fileDesc: fileDescs) {
+        totalLength += fileDesc.getFileLength();
+      }
+      return totalLength;
+    }
+
     public HdfsCompression getFileCompression() {
       return 
HdfsCompression.valueOf(FbCompression.name(fbFileDescriptor_.compression()));
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/83a70a7a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index a1f47aa..398393b 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -48,6 +48,7 @@ import org.apache.impala.analysis.TableSampleClause;
 import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.analysis.TupleId;
 import org.apache.impala.catalog.Column;
+import org.apache.impala.catalog.ColumnStats;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.HdfsPartition;
 import org.apache.impala.catalog.HdfsPartition.FileBlock;
@@ -59,7 +60,6 @@ import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.NotImplementedException;
-import org.apache.impala.common.Pair;
 import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.fb.FbFileBlock;
@@ -147,6 +147,16 @@ public class HdfsScanNode extends ScanNode {
   // between 128kb and 1.1mb.
   private static final long MIN_MEMORY_ESTIMATE = 1L * 1024L * 1024L;
 
+  // Default reservation in bytes for a IoMgr scan range for a column in 
columnar
+  // formats like Parquet. Chosen to allow reasonably efficient I/O for all 
columns
+  // even with only the minimum reservation, but not to use excessive memory 
for columns
+  // where we overestimate the size.
+  // TODO: is it worth making this a tunable query option?
+  private static final long DEFAULT_COLUMN_SCAN_RANGE_RESERVATION = 4L * 1024L 
* 1024L;
+
+  // Read size for Parquet and ORC footers. Matches HdfsScanner::FOOTER_SIZE 
in backend.
+  private static final long FOOTER_SIZE = 100L * 1024L;
+
   private final HdfsTable tbl_;
 
   // List of partitions to be scanned. Partitions have been pruned.
@@ -173,12 +183,7 @@ public class HdfsScanNode extends ScanNode {
 
   // Number of bytes in the largest scan range (i.e. hdfs split). Set in
   // computeScanRangeLocations().
-  private long maxScanRangeBytes_ = 0;
-
-  // The ideal reservation to process a single scan range (i.e. hdfs split), 
>= the
-  // minimum reservation. Generally provides enough memory to overlap CPU and 
I/O and
-  // maximize throughput. Set in computeResourceProfile().
-  private long idealScanRangeReservation_ = -1;
+  private long largestScanRangeBytes_ = 0;
 
   // Input cardinality based on the partition row counts or extrapolation. -1 
if invalid.
   // Both values can be valid to report them in the explain plan, but only one 
of them is
@@ -186,6 +191,10 @@ public class HdfsScanNode extends ScanNode {
   private long partitionNumRows_ = -1;
   private long extrapolatedNumRows_ = -1;
 
+  // Estimated row count of the largest scan range. -1 if no stats are 
available.
+  // Set in computeScanRangeLocations()
+  private long maxScanRangeNumRows_ = -1;
+
   // True if this scan node should use the MT implementation in the backend.
   private boolean useMtScanNode_;
 
@@ -735,13 +744,14 @@ public class HdfsScanNode extends ScanNode {
       sampledFiles = tbl_.getFilesSample(partitions_, percentBytes, 0, 
randomSeed);
     }
 
-    long maxScanRangeLength = 
analyzer.getQueryCtx().client_request.getQuery_options()
+    long scanRangeBytesLimit = 
analyzer.getQueryCtx().client_request.getQuery_options()
         .getMax_scan_range_length();
     scanRanges_ = Lists.newArrayList();
     numPartitions_ = (sampledFiles != null) ? sampledFiles.size() : 
partitions_.size();
     totalFiles_ = 0;
     totalBytes_ = 0;
-    maxScanRangeBytes_ = 0;
+    largestScanRangeBytes_ = 0;
+    maxScanRangeNumRows_ = -1;
     fileFormats_ = Sets.newHashSet();
     for (HdfsPartition partition: partitions_) {
       List<FileDescriptor> fileDescs = partition.getFileDescriptors();
@@ -750,6 +760,7 @@ public class HdfsScanNode extends ScanNode {
         fileDescs = sampledFiles.get(Long.valueOf(partition.getId()));
         if (fileDescs == null) continue;
       }
+      long partitionNumRows = partition.getNumRows();
 
       analyzer.getDescTbl().addReferencedPartition(tbl_, partition.getId());
       fileFormats_.add(partition.getFileFormat());
@@ -764,10 +775,11 @@ public class HdfsScanNode extends ScanNode {
       }
       boolean checkMissingDiskIds = 
FileSystemUtil.supportsStorageIds(partitionFs);
       boolean partitionMissingDiskIds = false;
-
+      final long partitionBytes = 
FileDescriptor.computeTotalFileLength(fileDescs);
+      long partitionMaxScanRangeBytes = 0;
+      totalBytes_ += partitionBytes;
       totalFiles_ += fileDescs.size();
       for (FileDescriptor fileDesc: fileDescs) {
-        totalBytes_ += fileDesc.getFileLength();
         boolean fileDescMissingDiskIds = false;
         for (int j = 0; j < fileDesc.getNumFileBlocks(); ++j) {
           FbFileBlock block = fileDesc.getFbFileBlock(j);
@@ -803,8 +815,8 @@ public class HdfsScanNode extends ScanNode {
           long remainingLength = FileBlock.getLength(block);
           while (remainingLength > 0) {
             long currentLength = remainingLength;
-            if (maxScanRangeLength > 0 && remainingLength > 
maxScanRangeLength) {
-              currentLength = maxScanRangeLength;
+            if (scanRangeBytesLimit > 0 && remainingLength > 
scanRangeBytesLimit) {
+              currentLength = scanRangeBytesLimit;
             }
             TScanRange scanRange = new TScanRange();
             scanRange.setHdfs_file_split(new 
THdfsFileSplit(fileDesc.getFileName(),
@@ -815,7 +827,9 @@ public class HdfsScanNode extends ScanNode {
             scanRangeLocations.scan_range = scanRange;
             scanRangeLocations.locations = locations;
             scanRanges_.add(scanRangeLocations);
-            maxScanRangeBytes_ = Math.max(maxScanRangeBytes_, currentLength);
+            largestScanRangeBytes_ = Math.max(largestScanRangeBytes_, 
currentLength);
+            partitionMaxScanRangeBytes =
+                Math.max(partitionMaxScanRangeBytes, currentLength);
             remainingLength -= currentLength;
             currentOffset += currentLength;
           }
@@ -829,59 +843,41 @@ public class HdfsScanNode extends ScanNode {
         }
       }
       if (partitionMissingDiskIds) ++numPartitionsNoDiskIds_;
+      if (partitionMaxScanRangeBytes > 0 && partitionNumRows >= 0) {
+        updateMaxScanRangeNumRows(
+            partitionNumRows, partitionBytes, partitionMaxScanRangeBytes);
+      }
     }
-  }
-
-  /**
-   * Compute the number of columns that are read from the file, as opposed to
-   * materialised based on metadata. If there are nested collections, counts 
the
-   * number of leaf scalar slots per collection. This matches Parquet's 
"shredded"
-   * approach to nested collections, where each nested field is stored as a 
separate
-   * column. We may need to adjust this logic for non-shredded columnar 
formats if added.
-   */
-  private int computeNumColumnsReadFromFile() {
-    HdfsTable table = (HdfsTable) desc_.getTable();
-    int numColumns = 0;
-    boolean havePosSlot = false;
-    for (SlotDescriptor slot: desc_.getSlots()) {
-      if (!slot.isMaterialized() || slot == countStarSlot_) continue;
-      if (slot.getColumn() == null ||
-          slot.getColumn().getPosition() >= table.getNumClusteringCols()) {
-        if (slot.isArrayPosRef()) {
-          // Position virtual slots can be materialized by piggybacking on 
another slot.
-          havePosSlot = true;
-        } else if (slot.getType().isScalarType()) {
-          ++numColumns;
-        } else {
-          numColumns += computeNumColumnsReadForCollection(slot);
-        }
+    if (totalFiles_ == 0) {
+      maxScanRangeNumRows_ = 0;
+    } else {
+      // Also estimate max rows per scan range based on table-level stats, in 
case some
+      // or all partition-level stats were missing.
+      long tableNumRows = tbl_.getNumRows();
+      if (tableNumRows >= 0) {
+        updateMaxScanRangeNumRows(tableNumRows, totalBytes_, 
largestScanRangeBytes_);
       }
     }
-    // Must scan something to materialize a position slot.
-    if (havePosSlot) numColumns = Math.max(numColumns, 1);
-    return numColumns;
   }
 
   /**
-   * Compute the number of columns read from disk for materialized scalar 
slots in
-   * the provided tuple.
+   * Update the estimate of maximum number of rows per scan range based on the 
fraction
+   * of bytes of the scan range relative to the total bytes per partition or 
table.
    */
-  private int computeNumColumnsReadForCollection(SlotDescriptor 
collectionSlot) {
-    Preconditions.checkState(collectionSlot.getType().isCollectionType());
-    int numColumns = 0;
-    for (SlotDescriptor nestedSlot: 
collectionSlot.getItemTupleDesc().getSlots()) {
-      // Position virtual slots can be materialized by piggybacking on another 
slot.
-      if (!nestedSlot.isMaterialized() || nestedSlot.isArrayPosRef()) continue;
-      if (nestedSlot.getType().isScalarType()) {
-        ++numColumns;
-      } else {
-        numColumns += computeNumColumnsReadForCollection(nestedSlot);
-      }
+  private void updateMaxScanRangeNumRows(long totalRows, long totalBytes,
+      long maxScanRangeBytes) {
+    Preconditions.checkState(totalRows >= 0);
+    Preconditions.checkState(totalBytes >= 0);
+    Preconditions.checkState(maxScanRangeBytes >= 0);
+    // Check for zeros first to avoid possibility of divide-by-zero below.
+    long estimate;
+    if (maxScanRangeBytes == 0 || totalBytes == 0 || totalRows == 0) {
+      estimate = 0;
+    } else {
+      double divisor = totalBytes / (double) maxScanRangeBytes;
+      estimate = (long)(totalRows / divisor);
     }
-    // Need to scan at least one column to materialize the pos virtual slot 
and/or
-    // determine the size of the nested array.
-    numColumns = Math.max(numColumns, 1);
-    return numColumns;
+    maxScanRangeNumRows_ =  Math.max(maxScanRangeNumRows_, estimate);
   }
 
   /**
@@ -1076,8 +1072,6 @@ public class HdfsScanNode extends ScanNode {
     }
     msg.hdfs_scan_node.setRandom_replica(randomReplica_);
     msg.node_type = TPlanNodeType.HDFS_SCAN_NODE;
-    Preconditions.checkState(idealScanRangeReservation_ >= 0, 
idealScanRangeReservation_);
-    
msg.hdfs_scan_node.setIdeal_scan_range_reservation(idealScanRangeReservation_);
     if (!collectionConjuncts_.isEmpty()) {
       Map<Integer, List<TExpr>> tcollectionConjuncts = Maps.newLinkedHashMap();
       for (Map.Entry<TupleDescriptor, List<Expr>> entry:
@@ -1155,6 +1149,8 @@ public class HdfsScanNode extends ScanNode {
         extrapRows = "unavailable";
       }
       output.append(String.format("%sextrapolated-rows=%s", detailPrefix, 
extrapRows));
+      output.append(String.format(" max-scan-range-rows=%s",
+          maxScanRangeNumRows_ == -1 ? "unavailable" : maxScanRangeNumRows_));
       output.append("\n");
       if (numScanRangesNoDiskIds_ > 0) {
         output.append(String.format("%smissing disk ids: " +
@@ -1256,21 +1252,26 @@ public class HdfsScanNode extends ScanNode {
     Preconditions.checkNotNull(scanRanges_, "Cost estimation requires scan 
ranges.");
     if (scanRanges_.isEmpty()) {
       nodeResourceProfile_ = ResourceProfile.noReservation(0);
-      idealScanRangeReservation_ = 0;
       return;
     }
     Preconditions.checkState(0 < numNodes_ && numNodes_ <= scanRanges_.size());
     Preconditions.checkNotNull(desc_);
     Preconditions.checkNotNull(desc_.getTable() instanceof HdfsTable);
     HdfsTable table = (HdfsTable) desc_.getTable();
-    int numColumnsReadFromFile = computeNumColumnsReadFromFile();
+    List<Long> columnReservations = null;
+    if (fileFormats_.contains(HdfsFileFormat.PARQUET)
+        || fileFormats_.contains(HdfsFileFormat.ORC)) {
+      columnReservations = computeMinColumnReservations();
+    }
+
     int perHostScanRanges;
     if (table.getMajorityFormat() == HdfsFileFormat.PARQUET
         || table.getMajorityFormat() == HdfsFileFormat.ORC) {
+      Preconditions.checkNotNull(columnReservations);
       // For the purpose of this estimation, the number of per-host scan 
ranges for
       // Parquet/ORC files are equal to the number of columns read from the 
file. I.e.
       // excluding partition columns and columns that are populated from file 
metadata.
-      perHostScanRanges = numColumnsReadFromFile;
+      perHostScanRanges = columnReservations.size();
     } else {
       perHostScanRanges = (int) Math.ceil((
           (double) scanRanges_.size() / (double) numNodes_) * 
SCAN_RANGE_SKEW_FACTOR);
@@ -1310,50 +1311,182 @@ public class HdfsScanNode extends ScanNode {
     }
     perInstanceMemEstimate = Math.max(perInstanceMemEstimate, 
MIN_MEMORY_ESTIMATE);
 
-    Pair<Long, Long> reservation = computeReservation(numColumnsReadFromFile);
     nodeResourceProfile_ = new ResourceProfileBuilder()
         .setMemEstimateBytes(perInstanceMemEstimate)
-        .setMinReservationBytes(reservation.first).build();
-    idealScanRangeReservation_ = reservation.second;
+        
.setMinReservationBytes(computeMinReservation(columnReservations)).build();
   }
 
-  /*
-   *  Compute the minimum and ideal memory reservation to process a single 
scan range
-   *  (i.e. hdfs split). Bound the reservation based on:
-   * - One minimum-sized buffer per IoMgr scan range, which is the absolute 
minimum
-   *   required to scan the data.
-   * - A maximum of either 1 or 3 max-sized I/O buffers per IoMgr scan range 
for
-   *   the minimum and ideal reservation respectively. 1 max-sized I/O buffer 
avoids
-   *   issuing small I/O unnecessarily while 3 max-sized I/O buffers 
guarantees higher
-   *   throughput by overlapping compute and I/O efficiently.
-   * - A maximum reservation of the hdfs split size, to avoid reserving 
excessive
-   *   memory for small files or ranges, e.g. small dimension tables with very 
few
-   *   rows.
+  /**
+   *  Compute the minimum reservation to process a single scan range (i.e. 
hdfs split).
+   *  We aim to choose a reservation that is as low as possible while still 
giving OK
+   *  performance when running with only the minimum reservation. The lower 
bound is one
+   *  minimum-sized buffer per IoMgr scan range - the absolute minimum 
required to scan
+   *  the data. The upper bounds are:
+   * - One max-sized I/O buffer per IoMgr scan range. One max-sized I/O buffer 
avoids
+   *   issuing small I/O unnecessarily. The backend can try to increase the 
reservation
+   *   further if more memory would speed up processing.
+   * - File format-specific calculations, e.g. based on estimated column sizes 
for
+   *   Parquet.
+   * - The hdfs split size, to avoid reserving excessive memory for small 
files or ranges,
+   *   e.g. small dimension tables with very few rows.
    */
-  private Pair<Long, Long> computeReservation(int numColumnsReadFromFile) {
-    Preconditions.checkState(maxScanRangeBytes_ >= 0);
+  private long computeMinReservation(List<Long> columnReservations) {
+    Preconditions.checkState(largestScanRangeBytes_ >= 0);
     long maxIoBufferSize =
         BitUtil.roundUpToPowerOf2(BackendConfig.INSTANCE.getReadSize());
-    // Scanners for columnar formats issue one IoMgr scan range for metadata, 
followed by
-    // one IoMgr scan range per column in parallel. Scanners for row-oriented 
formats
-    // issue only one IoMgr scan range at a time.
-    int iomgrScanRangesPerSplit = 
fileFormats_.contains(HdfsFileFormat.PARQUET) ?
-        Math.max(1, numColumnsReadFromFile) : 1;
-    // Need one buffer per IoMgr scan range to execute the scan.
-    long minReservationToExecute =
-        iomgrScanRangesPerSplit * BackendConfig.INSTANCE.getMinBufferSize();
-
-    // Quantize the max scan range (i.e. hdfs split) size to an I/O buffer 
size.
-    long quantizedMaxScanRangeBytes = maxScanRangeBytes_ < maxIoBufferSize ?
-        BitUtil.roundUpToPowerOf2(maxScanRangeBytes_) :
-        BitUtil.roundUpToPowerOf2Factor(maxScanRangeBytes_, maxIoBufferSize);
-    long minReservationBytes = Math.max(minReservationToExecute,
-        Math.min(iomgrScanRangesPerSplit * maxIoBufferSize,
-            quantizedMaxScanRangeBytes));
-    long idealReservationBytes = Math.max(minReservationToExecute,
-        Math.min(iomgrScanRangesPerSplit * maxIoBufferSize * 3,
-            quantizedMaxScanRangeBytes));
-    return Pair.create(minReservationBytes, idealReservationBytes);
+    long reservationBytes = 0;
+    for (HdfsFileFormat format: fileFormats_) {
+      long formatReservationBytes = 0;
+      // TODO: IMPALA-6875 - ORC should compute total reservation across 
columns once the
+      // ORC scanner supports reservations. For now it is treated the same as a
+      // row-oriented format because there is no per-column reservation.
+      if (format == HdfsFileFormat.PARQUET) {
+        // With Parquet, we first read the footer then all of the materialized 
columns in
+        // parallel.
+        for (long columnReservation : columnReservations) {
+          formatReservationBytes += columnReservation;
+        }
+        formatReservationBytes = Math.max(FOOTER_SIZE, formatReservationBytes);
+      } else {
+        // Scanners for row-oriented formats issue only one IoMgr scan range 
at a time.
+        // Minimum reservation is based on using I/O buffer per IoMgr scan 
range to get
+        // efficient large I/Os.
+        formatReservationBytes = maxIoBufferSize;
+      }
+      reservationBytes = Math.max(reservationBytes, formatReservationBytes);
+    }
+    reservationBytes = roundUpToIoBuffer(reservationBytes, maxIoBufferSize);
+
+    // Clamp the reservation we computed above to range:
+    // * minimum: <# concurrent io mgr ranges> * <min buffer size>, the 
absolute minimum
+    //   needed to execute the scan.
+    // * maximum: the maximum scan range (i.e. HDFS split size), rounded up to
+    //   the amount of buffers required to read it all at once.
+    int iomgrScanRangesPerSplit = columnReservations != null ?
+        Math.max(1, columnReservations.size()) : 1;
+    long maxReservationBytes = roundUpToIoBuffer(largestScanRangeBytes_, 
maxIoBufferSize);
+    return Math.max(iomgrScanRangesPerSplit * 
BackendConfig.INSTANCE.getMinBufferSize(),
+        Math.min(reservationBytes, maxReservationBytes));
+  }
+
+  /**
+   * Compute minimum memory reservations in bytes per column per scan range 
for each of
+   * the columns read from disk for a columnar format. Returns the raw 
estimate for
+   * each column, not quantized to a buffer size.
+
+   * If there are nested collections, returns a size for each of the leaf 
scalar slots
+   * per collection. This matches Parquet's "shredded" approach to nested 
collections,
+   * where each nested field is stored as a separate column. We may need to 
adjust this
+   * logic for nested types in non-shredded columnar formats (e.g. IMPALA-6503 
- ORC)
+   * if/when that is added.
+   */
+  private List<Long> computeMinColumnReservations() {
+    List<Long> columnByteSizes = Lists.newArrayList();
+    HdfsTable table = (HdfsTable) desc_.getTable();
+    boolean havePosSlot = false;
+    for (SlotDescriptor slot: desc_.getSlots()) {
+      if (!slot.isMaterialized() || slot == countStarSlot_) continue;
+      if (slot.getColumn() == null ||
+          slot.getColumn().getPosition() >= table.getNumClusteringCols()) {
+        if (slot.isArrayPosRef()) {
+          // Position virtual slots can be materialized by piggybacking on 
another slot.
+          havePosSlot = true;
+        } else if (slot.getType().isScalarType()) {
+          Column column = slot.getColumn();
+          if (column == null) {
+            // Not a top-level column, e.g. a value from a nested collection 
that is
+            // being unnested by the scanner. No stats are available for nested
+            // collections.
+            columnByteSizes.add(DEFAULT_COLUMN_SCAN_RANGE_RESERVATION);
+          } else {
+            columnByteSizes.add(computeMinScalarColumnReservation(column));
+          }
+        } else {
+          appendMinColumnReservationsForCollection(slot, columnByteSizes);
+        }
+      }
+    }
+    if (havePosSlot && columnByteSizes.isEmpty()) {
+      // Must scan something to materialize a position slot. We don't know 
anything about
+      // the column that we're scanning so use the default reservation.
+      columnByteSizes.add(DEFAULT_COLUMN_SCAN_RANGE_RESERVATION);
+    }
+    return columnByteSizes;
+  }
+
+  /**
+   * Helper for computeMinColumnReservations() - compute minimum memory 
reservations for
+   * all of the scalar columns read from disk when materializing 
collectionSlot. Appends
+   * one number per scalar column to columnByteSizes.
+   */
+  private void appendMinColumnReservationsForCollection(SlotDescriptor 
collectionSlot,
+      List<Long> columnByteSizes) {
+    Preconditions.checkState(collectionSlot.getType().isCollectionType());
+    boolean addedColumn = false;
+    for (SlotDescriptor nestedSlot: 
collectionSlot.getItemTupleDesc().getSlots()) {
+      // Position virtual slots can be materialized by piggybacking on another 
slot.
+      if (!nestedSlot.isMaterialized() || nestedSlot.isArrayPosRef()) continue;
+      if (nestedSlot.getType().isScalarType()) {
+        // No column stats are available for nested collections so use the 
default
+        // reservation.
+        columnByteSizes.add(DEFAULT_COLUMN_SCAN_RANGE_RESERVATION);
+        addedColumn = true;
+      } else {
+        appendMinColumnReservationsForCollection(nestedSlot, columnByteSizes);
+      }
+    }
+    // Need to scan at least one column to materialize the pos virtual slot 
and/or
+    // determine the size of the nested array. Assume it is the size of a 
single I/O
+    // buffer.
+    if (!addedColumn) 
columnByteSizes.add(DEFAULT_COLUMN_SCAN_RANGE_RESERVATION);
+  }
+
+  /**
+   * Choose the min bytes to reserve for this scalar column for a scan range. 
Returns the
+   * raw estimate without quantizing to buffer sizes - the caller should do so 
if needed.
+   *
+   * Starts with DEFAULT_COLUMN_SCAN_RANGE_RESERVATION and tries different 
strategies to
+   * infer that the column data is smaller than this starting value (and 
therefore the
+   * extra memory would not be useful). These estimates are quite conservative 
so this
+   * will still often overestimate the column size. An overestimate does not 
necessarily
+   * result in memory being wasted becase the Parquet scanner distributes the 
total
+   * reservation between columns based on actual column size, so if multiple 
columns are
+   * scanned, memory over-reserved for one column can be used to help scan a 
different
+   * larger column.
+   */
+  private long computeMinScalarColumnReservation(Column column) {
+    Preconditions.checkNotNull(column);
+    long reservationBytes = DEFAULT_COLUMN_SCAN_RANGE_RESERVATION;
+    ColumnStats stats = column.getStats();
+    if (stats.hasAvgSize() && maxScanRangeNumRows_ != -1) {
+      // Estimate the column's uncompressed data size based on row count and 
average
+      // size.
+      reservationBytes =
+          (long) Math.min(reservationBytes, stats.getAvgSize() * 
maxScanRangeNumRows_);
+      if (stats.hasNumDistinctValues()) {
+        // Estimate the data size with dictionary compression, assuming all 
distinct
+        // values occur in the scan range with the largest number of rows and 
that each
+        // value can be represented with approximately log2(ndv) bits. Even if 
Parquet
+        // dictionary compression does not kick in, general-purpose compression
+        // algorithms like Snappy can often find redundancy when there are 
repeated
+        // values.
+        long dictBytes = (long)(stats.getAvgSize() * 
stats.getNumDistinctValues());
+        long bitsPerVal = BitUtil.log2Ceiling(stats.getNumDistinctValues());
+        long encodedDataBytes = bitsPerVal * maxScanRangeNumRows_ / 8;
+        reservationBytes = Math.min(reservationBytes, dictBytes + 
encodedDataBytes);
+      }
+    }
+    return reservationBytes;
+  }
+
+  /**
+   * Calculate the total bytes of I/O buffers that would be allocated to hold 
bytes,
+   * given that buffers must be a power-of-two size <= maxIoBufferSize bytes.
+   */
+  private static long roundUpToIoBuffer(long bytes, long maxIoBufferSize) {
+    return bytes < maxIoBufferSize ?
+        BitUtil.roundUpToPowerOf2(bytes) :
+        BitUtil.roundUpToPowerOf2Factor(bytes, maxIoBufferSize);
   }
 
   /**
@@ -1362,6 +1495,8 @@ public class HdfsScanNode extends ScanNode {
    * Therefore, this upper bound is independent of the number of concurrent 
scans and
    * queries and helps to derive a tighter per-host memory estimate for 
queries with
    * multiple concurrent scans.
+   * TODO: this doesn't accurately describe how the backend works, but it is 
useful to
+   * have an upper bound. We should rethink and replace this with a different 
upper bound.
    */
   public static long getPerHostMemUpperBound() {
     // THREADS_PER_CORE each using a default of

http://git-wip-us.apache.org/repos/asf/impala/blob/83a70a7a/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/testutil/TestUtils.java 
b/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
index 818c573..629c44b 100644
--- a/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
+++ b/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
@@ -104,6 +104,20 @@ public class TestUtils {
 
   static FileSizeFilter fileSizeFilter_ = new FileSizeFilter();
 
+  // Ignore the exact estimated row count, which depends on the file sizes.
+  static class ScanRangeRowCountFilter implements ResultFilter {
+    private final static String NUMBER_FILTER = "\\d+(\\.\\d+)?";
+    private final static String FILTER_KEY = " max-scan-range-rows=";
+
+    public boolean matches(String input) { return input.contains(FILTER_KEY); }
+
+    public String transform(String input) {
+      return input.replaceAll(FILTER_KEY + NUMBER_FILTER, FILTER_KEY);
+    }
+  }
+
+  static ScanRangeRowCountFilter scanRangeRowCountFilter_ = new 
ScanRangeRowCountFilter();
+
   /**
    * Do a line-by-line comparison of actual and expected output.
    * Comparison of the individual lines ignores whitespace.
@@ -142,6 +156,11 @@ public class TestUtils {
         expectedStr = fileSizeFilter_.transform(expectedStr);
         actualStr = fileSizeFilter_.transform(actualStr);
       }
+      if (scanRangeRowCountFilter_.matches(expectedStr)) {
+        containsPrefix = true;
+        expectedStr = scanRangeRowCountFilter_.transform(expectedStr);
+        actualStr = scanRangeRowCountFilter_.transform(actualStr);
+      }
 
       boolean ignoreAfter = false;
       for (int j = 0; j < ignoreContentAfter_.length; ++j) {

http://git-wip-us.apache.org/repos/asf/impala/blob/83a70a7a/testdata/bin/compute-table-stats.sh
----------------------------------------------------------------------
diff --git a/testdata/bin/compute-table-stats.sh 
b/testdata/bin/compute-table-stats.sh
index 98434ee..63eb0da 100755
--- a/testdata/bin/compute-table-stats.sh
+++ b/testdata/bin/compute-table-stats.sh
@@ -39,7 +39,7 @@ if [ "${TARGET_FILESYSTEM}" = "hdfs" ]; then
   ${COMPUTE_STATS_SCRIPT} --db_name=functional_hbase\
     --table_names="alltypessmall,stringids"
 fi
-${COMPUTE_STATS_SCRIPT} --db_names=tpch,tpch_parquet \
+${COMPUTE_STATS_SCRIPT} --db_names=tpch,tpch_parquet,tpch_orc_def \
     --table_names=customer,lineitem,nation,orders,part,partsupp,region,supplier
 ${COMPUTE_STATS_SCRIPT} --db_names=tpch_nested_parquet,tpcds,tpcds_parquet
 

Reply via email to