Repository: impala
Updated Branches:
  refs/heads/2.x 8d6142fd2 -> d3362bd43


IMPALA-5931: Generates scan ranges in planner for s3/adls

Currently, for filesystems that do not include physical
block information (e.g., block replica locations, caching),
synthetic blocks are generated and stored in the catalog
when metadata is loaded. Example file systems for which this is done
includes S3, ADLS, and local fs.

This change avoids generating these blocks when metadata is loaded.
Instead, scan ranges are directly generated from such files by the
backend coordinator. Previously, all scan ranges were produced by
the planner in HDFSScanNode in the frontend. Now, those files without
block information are sent to the coordinator represented by a split
specification that determines how the coordinator will create scan ranges
to send to executors.

This change reduces the space needed in the catalog and reduces the
scan range data structures that are passed from the frontend to the
backend when planning and coordinating a query.
In addition a bug is avoided where non-splittable files were being
split anyways to support the query parameter that places a limit on
scan ranges.

Testing:
- added backend scheduler tests
- mixed-filesystems test covers tables/queries with multiple fs's.
- local fs tests cover the code paths in this change
- all core tests pass when configured with s3
- manually tried larger local filesystem tables (tpch) with multiple
  partitions and observed the same scan ranges.
    - TODO: adls testing

Change-Id: I326065adbb2f7e632814113aae85cb51ca4779a5
Reviewed-on: http://gerrit.cloudera.org:8080/8523
Reviewed-by: Vuk Ercegovac <vercego...@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/10692
Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/11554a17
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/11554a17
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/11554a17

Branch: refs/heads/2.x
Commit: 11554a17c75b242767d5a50d66bc2874aa545c77
Parents: 8d6142f
Author: Vuk Ercegovac <vercego...@cloudera.com>
Authored: Wed May 23 23:23:31 2018 -0700
Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Committed: Wed Jun 13 02:30:16 2018 +0000

----------------------------------------------------------------------
 CMakeLists.txt                                  |   2 +-
 be/src/scheduling/query-schedule.h              |   4 +-
 be/src/scheduling/scheduler-test-util.cc        |  71 ++++-
 be/src/scheduling/scheduler-test-util.h         |  50 ++-
 be/src/scheduling/scheduler-test.cc             |  92 +++++-
 be/src/scheduling/scheduler.cc                  |  62 +++-
 be/src/scheduling/scheduler.h                   |  11 +-
 be/src/util/CMakeLists.txt                      |   1 +
 be/src/util/flat_buffer.cc                      |  68 ++++
 be/src/util/flat_buffer.h                       |  33 ++
 common/thrift/Frontend.thrift                   |   4 +-
 common/thrift/PlanNodes.thrift                  |  23 +-
 common/thrift/Planner.thrift                    |   8 +
 .../apache/impala/catalog/HdfsPartition.java    |  58 +---
 .../org/apache/impala/catalog/HdfsTable.java    |  72 ++---
 .../impala/planner/DataSourceScanNode.java      |   7 +-
 .../apache/impala/planner/HBaseScanNode.java    |  10 +-
 .../org/apache/impala/planner/HdfsScanNode.java | 308 ++++++++++++-------
 .../org/apache/impala/planner/KuduScanNode.java |   5 +-
 .../org/apache/impala/planner/ScanNode.java     |  14 +-
 .../org/apache/impala/service/Frontend.java     |   2 +-
 .../apache/impala/planner/PlannerTestBase.java  |  17 +-
 22 files changed, 677 insertions(+), 245 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/11554a17/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 43cd258..4ea3579 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -370,7 +370,7 @@ add_subdirectory(impala-parent)
 add_subdirectory(ext-data-source)
 
 # Build target for all generated files which most backend code depends on
-add_custom_target(gen-deps ALL DEPENDS thrift-deps proto-deps)
+add_custom_target(gen-deps ALL DEPENDS thrift-deps proto-deps fb-deps)
 
 add_custom_target(tarballs ALL DEPENDS shell_tarball)
 

http://git-wip-us.apache.org/repos/asf/impala/blob/11554a17/be/src/scheduling/query-schedule.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.h 
b/be/src/scheduling/query-schedule.h
index e5d8f6a..5425ada 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -74,7 +74,7 @@ struct BackendExecParams {
 /// map from an impalad host address to the list of assigned fragment instance 
params.
 typedef std::map<TNetworkAddress, BackendExecParams> PerBackendExecParams;
 
-/// execution parameters for a single fragment instance; used to assemble the
+/// Execution parameters for a single fragment instance; used to assemble the
 /// TPlanFragmentInstanceCtx
 struct FInstanceExecParams {
   TUniqueId instance_id;
@@ -88,7 +88,7 @@ struct FInstanceExecParams {
   /// uniquely identify it to a receiver. -1 = invalid.
   int sender_id;
 
-  /// the parent FragmentExecParams
+  /// The parent FragmentExecParams
   const FragmentExecParams& fragment_exec_params;
   const TPlanFragment& fragment() const;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/11554a17/be/src/scheduling/scheduler-test-util.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler-test-util.cc 
b/be/src/scheduling/scheduler-test-util.cc
index 7363cd3..56cf813 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -20,10 +20,13 @@
 #include <boost/unordered_set.hpp>
 
 #include "common/names.h"
+#include "flatbuffers/flatbuffers.h"
+#include "gen-cpp/CatalogObjects_generated.h"
 #include "scheduling/scheduler.h"
 
 using namespace impala;
 using namespace impala::test;
+using namespace org::apache::impala::fb;
 
 DECLARE_int32(krpc_port);
 
@@ -61,6 +64,10 @@ const string Cluster::IP_PREFIX = "10";
 
 /// Default size for new blocks is 1MB.
 const int64_t Block::DEFAULT_BLOCK_SIZE = 1 << 20;
+/// Default size for files is 4MB.
+const int64_t FileSplitGeneratorSpec::DEFAULT_FILE_SIZE = 1 << 22;
+/// Default size for file splits is 1 MB.
+const int64_t FileSplitGeneratorSpec::DEFAULT_BLOCK_SIZE = 1 << 20;
 
 int Cluster::AddHost(bool has_backend, bool has_datanode, bool is_executor) {
   int host_idx = hosts_.size();
@@ -193,6 +200,17 @@ void Schema::AddMultiBlockTable(const TableName& 
table_name, int num_blocks,
   tables_[table_name] = table;
 }
 
+void Schema::AddFileSplitGeneratorSpecs(
+    const TableName& table_name, const std::vector<FileSplitGeneratorSpec>& 
specs) {
+  Table* table = &tables_[table_name];
+  table->specs.insert(table->specs.end(), specs.begin(), specs.end());
+}
+
+void Schema::AddFileSplitGeneratorDefaultSpecs(const TableName& table_name, 
int num) {
+  Table* table = &tables_[table_name];
+  for (int i = 0; i < num; ++i) 
table->specs.push_back(FileSplitGeneratorSpec());
+}
+
 const Table& Schema::GetTable(const TableName& table_name) const {
   auto it = tables_.find(table_name);
   DCHECK(it != tables_.end());
@@ -207,8 +225,8 @@ const vector<TNetworkAddress>& Plan::referenced_datanodes() 
const {
   return referenced_datanodes_;
 }
 
-const vector<TScanRangeLocationList>& Plan::scan_range_locations() const {
-  return scan_range_locations_;
+const TScanRangeSpec& Plan::scan_range_specs() const {
+  return scan_range_specs_;
 }
 
 void Plan::AddTableScan(const TableName& table_name) {
@@ -218,7 +236,14 @@ void Plan::AddTableScan(const TableName& table_name) {
     const Block& block = blocks[i];
     TScanRangeLocationList scan_range_locations;
     BuildTScanRangeLocationList(table_name, block, i, &scan_range_locations);
-    scan_range_locations_.push_back(scan_range_locations);
+    scan_range_specs_.concrete_ranges.push_back(scan_range_locations);
+  }
+  const vector<FileSplitGeneratorSpec>& specs = table.specs;
+  for (int i = 0; i < specs.size(); ++i) {
+    const FileSplitGeneratorSpec& file_spec = specs[i];
+    TFileSplitGeneratorSpec spec;
+    BuildScanRangeSpec(table_name, file_spec, i, &spec);
+    scan_range_specs_.split_specs.push_back(spec);
   }
 }
 
@@ -254,6 +279,27 @@ void Plan::BuildScanRange(const TableName& table_name, 
const Block& block, int b
   scan_range->__set_hdfs_file_split(file_split);
 }
 
+void Plan::BuildScanRangeSpec(const TableName& table_name,
+    const FileSplitGeneratorSpec& spec, int spec_idx,
+    TFileSplitGeneratorSpec* thrift_spec) {
+  THdfsFileDesc thrift_file;
+
+  flatbuffers::FlatBufferBuilder fb_builder;
+  auto file_name =
+      fb_builder.CreateString(table_name + "_spec_" + 
std::to_string(spec_idx));
+  auto fb_file_desc = CreateFbFileDesc(fb_builder, file_name, spec.length);
+  fb_builder.Finish(fb_file_desc);
+
+  string buffer(
+      reinterpret_cast<const char*>(fb_builder.GetBufferPointer()), 
fb_builder.GetSize());
+  thrift_file.__set_file_desc_data(buffer);
+
+  thrift_spec->__set_partition_id(0);
+  thrift_spec->__set_file_desc(thrift_file);
+  thrift_spec->__set_max_block_size(spec.block_size);
+  thrift_spec->__set_is_splittable(spec.is_splittable);
+}
+
 int Plan::FindOrInsertDatanodeIndex(int cluster_datanode_idx) {
   const Host& host = schema_.cluster().hosts()[cluster_datanode_idx];
   auto ret = host_idx_to_datanode_idx_.emplace(
@@ -459,9 +505,24 @@ Status SchedulerWrapper::Compute(bool exec_at_coord, 
Result* result) {
 
   // Compute Assignment.
   FragmentScanRangeAssignment* assignment = result->AddAssignment();
+  const vector<TScanRangeLocationList>* locations = nullptr;
+  vector<TScanRangeLocationList> expanded_locations;
+  if (plan_.scan_range_specs().split_specs.empty()) {
+    // directly use the concrete ranges.
+    locations = &plan_.scan_range_specs().concrete_ranges;
+  } else {
+    // union concrete ranges and expanded specs.
+    for (const TScanRangeLocationList& range : 
plan_.scan_range_specs().concrete_ranges) {
+      expanded_locations.push_back(range);
+    }
+    RETURN_IF_ERROR(scheduler_->GenerateScanRanges(
+        plan_.scan_range_specs().split_specs, &expanded_locations));
+    locations = &expanded_locations;
+  }
+  DCHECK(locations != nullptr);
   return 
scheduler_->ComputeScanRangeAssignment(*scheduler_->GetExecutorsConfig(), 0,
-      nullptr, false, plan_.scan_range_locations(), 
plan_.referenced_datanodes(),
-      exec_at_coord, plan_.query_options(), nullptr, assignment);
+      nullptr, false, *locations, plan_.referenced_datanodes(), exec_at_coord,
+      plan_.query_options(), nullptr, assignment);
 }
 
 void SchedulerWrapper::AddBackend(const Host& host) {

http://git-wip-us.apache.org/repos/asf/impala/blob/11554a17/be/src/scheduling/scheduler-test-util.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler-test-util.h 
b/be/src/scheduling/scheduler-test-util.h
index c3d6b1c..8f1e917 100644
--- a/be/src/scheduling/scheduler-test-util.h
+++ b/be/src/scheduling/scheduler-test-util.h
@@ -180,8 +180,29 @@ struct Block {
   static const int64_t DEFAULT_BLOCK_SIZE;
 };
 
+struct FileSplitGeneratorSpec {
+  FileSplitGeneratorSpec() {}
+  FileSplitGeneratorSpec(int64_t length, int64_t block, bool splittable)
+    : length(length), block_size(block), is_splittable(splittable) {}
+
+  /// Length of file for which to generate file splits.
+  int64_t length = DEFAULT_FILE_SIZE;
+
+  /// Size of each split.
+  int64_t block_size = DEFAULT_BLOCK_SIZE;
+
+  bool is_splittable = true;
+
+  static const int64_t DEFAULT_FILE_SIZE;
+  static const int64_t DEFAULT_BLOCK_SIZE;
+};
+
+/// A table models multiple partitions, some of which represent their files 
explicitly
+/// with Blocks or with FileSplitGeneratorSpecs. A table can consist of both
+/// representations.
 struct Table {
   std::vector<Block> blocks;
+  std::vector<FileSplitGeneratorSpec> specs;
 };
 
 class Schema {
@@ -213,6 +234,16 @@ class Schema {
   void AddMultiBlockTable(const TableName& table_name, int num_blocks,
       ReplicaPlacement replica_placement, int num_replicas, int 
num_cached_replicas);
 
+  /// Adds FileSplitGeneratorSpecs to table named 'table_name'. If the table 
does not
+  /// exist, creates a new table. Otherwise, adds the 'specs' to an existing 
table.
+  void AddFileSplitGeneratorSpecs(
+      const TableName& table_name, const std::vector<FileSplitGeneratorSpec>& 
specs);
+
+  /// Adds 'num' default FileSplitGeneratorSpecs to table named 'table_name'. 
If the table
+  /// does not exist, creates a new table. Otherwise, adds the 'specs' to an 
existing
+  /// table.
+  void AddFileSplitGeneratorDefaultSpecs(const TableName& table_name, int num);
+
   const Table& GetTable(const TableName& table_name) const;
 
   const Cluster& cluster() const { return cluster_; }
@@ -241,11 +272,11 @@ class Plan {
 
   const std::vector<TNetworkAddress>& referenced_datanodes() const;
 
-  const std::vector<TScanRangeLocationList>& scan_range_locations() const;
+  const TScanRangeSpec& scan_range_specs() const;
 
   /// Add a scan of table 'table_name' to the plan. This method will populate 
the internal
-  /// list of TScanRangeLocationList and can be called multiple times for the 
same table
-  /// to schedule additional scans.
+  /// TScanRangeSpecs and can be called multiple times for the same table to 
schedule
+  /// additional scans.
   void AddTableScan(const TableName& table_name);
 
  private:
@@ -261,15 +292,20 @@ class Plan {
   /// Map from plan host index to an index in 'referenced_datanodes_'.
   std::unordered_map<int, int> host_idx_to_datanode_idx_;
 
-  /// List of all scan range locations, which can be passed to the Scheduler.
-  std::vector<TScanRangeLocationList> scan_range_locations_;
+  /// Scan range specs that are scheduled by the Scheduler.
+  TScanRangeSpec scan_range_specs_;
 
   /// Initialize a TScanRangeLocationList object in place.
   void BuildTScanRangeLocationList(const TableName& table_name, const Block& 
block,
       int block_idx, TScanRangeLocationList* scan_range_locations);
 
-  void BuildScanRange(const TableName& table_name, const Block& block, int 
block_idx,
-      TScanRange* scan_range);
+  /// Initializes a scan range for a Block.
+  void BuildScanRange(
+      const TableName& table_name, const Block& block, int block_idx, 
TScanRange* range);
+
+  /// Initializes a scan range for a FileSplitGeneratorSpec.
+  void BuildScanRangeSpec(const TableName& table_name, const 
FileSplitGeneratorSpec& spec,
+      int spec_idx, TFileSplitGeneratorSpec* thrift_spec);
 
   /// Look up the plan-local host index of 'cluster_datanode_idx'. If the host 
has not
   /// been added to the plan before, it will add it to 'referenced_datanodes_' 
and return

http://git-wip-us.apache.org/repos/asf/impala/blob/11554a17/be/src/scheduling/scheduler-test.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler-test.cc 
b/be/src/scheduling/scheduler-test.cc
index c70e54f..d87305a 100644
--- a/be/src/scheduling/scheduler-test.cc
+++ b/be/src/scheduling/scheduler-test.cc
@@ -115,7 +115,8 @@ TEST_F(SchedulerTest, ScanTableTwice) {
   EXPECT_EQ(0, result.NumCachedAssignedBytes());
 }
 
-// TODO: This test can be removed once we have the non-random backend 
round-robin by rank.
+/// TODO: This test can be removed once we have the non-random backend 
round-robin by
+/// rank.
 /// Schedule randomly over 3 backends and ensure that each backend is at least 
used once.
 TEST_F(SchedulerTest, RandomReads) {
   Cluster cluster;
@@ -382,6 +383,95 @@ TEST_F(SchedulerTest, TestSendUpdates) {
   EXPECT_EQ(0, result.NumTotalAssignedBytes(1));
 }
 
+TEST_F(SchedulerTest, TestGeneratedSingleSplit) {
+  Cluster cluster;
+
+  cluster.AddHosts(3, true, true);
+
+  Schema schema(cluster);
+  schema.AddFileSplitGeneratorDefaultSpecs("T", 1);
+
+  Plan plan(schema);
+  plan.AddTableScan("T");
+  plan.SetRandomReplica(true);
+
+  Result result(plan);
+  SchedulerWrapper scheduler(plan);
+  ASSERT_OK(scheduler.Compute(&result));
+
+  EXPECT_EQ(FileSplitGeneratorSpec::DEFAULT_FILE_SIZE
+          / FileSplitGeneratorSpec::DEFAULT_BLOCK_SIZE,
+      result.NumTotalAssignments());
+  EXPECT_EQ(
+      1 * FileSplitGeneratorSpec::DEFAULT_FILE_SIZE, 
result.NumTotalAssignedBytes());
+}
+
+TEST_F(SchedulerTest, TestGeneratedMultiSplit) {
+  Cluster cluster;
+
+  cluster.AddHosts(3, true, true);
+
+  Schema schema(cluster);
+  schema.AddFileSplitGeneratorDefaultSpecs("T", 100);
+
+  Plan plan(schema);
+  plan.AddTableScan("T");
+  plan.SetRandomReplica(true);
+
+  Result result(plan);
+  SchedulerWrapper scheduler(plan);
+  ASSERT_OK(scheduler.Compute(&result));
+
+  EXPECT_EQ(100 * FileSplitGeneratorSpec::DEFAULT_FILE_SIZE
+          / FileSplitGeneratorSpec::DEFAULT_BLOCK_SIZE,
+      result.NumTotalAssignments());
+  EXPECT_EQ(
+      100 * FileSplitGeneratorSpec::DEFAULT_FILE_SIZE, 
result.NumTotalAssignedBytes());
+}
+
+TEST_F(SchedulerTest, TestGeneratedVariableSizeSplit) {
+  Cluster cluster;
+
+  cluster.AddHosts(3, true, true);
+
+  Schema schema(cluster);
+  schema.AddFileSplitGeneratorSpecs(
+      "T", {{100, 100, true}, {100, 1, false}, {100, 10, true}});
+
+  Plan plan(schema);
+  plan.AddTableScan("T");
+  plan.SetRandomReplica(true);
+
+  Result result(plan);
+  SchedulerWrapper scheduler(plan);
+  ASSERT_OK(scheduler.Compute(&result));
+
+  EXPECT_EQ(12, result.NumTotalAssignments());
+  EXPECT_EQ(300, result.NumTotalAssignedBytes());
+}
+
+TEST_F(SchedulerTest, TestBlockAndGenerateSplit) {
+  Cluster cluster;
+
+  cluster.AddHosts(3, true, true);
+  Schema schema(cluster);
+  schema.AddMultiBlockTable("T", 2, ReplicaPlacement::LOCAL_ONLY, 3);
+  schema.AddFileSplitGeneratorSpecs(
+      "T", {{100, 100, true}, {100, 1, false}, {100, 10, true}});
+
+  Plan plan(schema);
+  plan.AddTableScan("T");
+
+  Result result(plan);
+  SchedulerWrapper scheduler(plan);
+  ASSERT_OK(scheduler.Compute(&result));
+
+  EXPECT_EQ(14, result.NumTotalAssignments());
+  EXPECT_EQ(300 + 2 * Block::DEFAULT_BLOCK_SIZE, 
result.NumTotalAssignedBytes());
+  EXPECT_EQ(2 * Block::DEFAULT_BLOCK_SIZE, result.NumDiskAssignedBytes());
+  EXPECT_EQ(0, result.NumCachedAssignedBytes());
+}
+
 /// IMPALA-4329: Test scheduling with no backends.
 /// With the fix for IMPALA-5058, the scheduler is no longer responsible for
 /// registering the local backend with itself. This functionality is moved to

http://git-wip-us.apache.org/repos/asf/impala/blob/11554a17/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 20c3210..0c9d800 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -28,11 +28,13 @@
 #include <gutil/strings/substitute.h>
 
 #include "common/logging.h"
+#include "flatbuffers/flatbuffers.h"
 #include "gen-cpp/ImpalaInternalService_constants.h"
 #include "gen-cpp/Types_types.h"
 #include "runtime/exec-env.h"
 #include "statestore/statestore-subscriber.h"
 #include "util/container-util.h"
+#include "util/flat_buffer.h"
 #include "util/metrics.h"
 #include "util/network-util.h"
 #include "util/runtime-profile-counters.h"
@@ -41,6 +43,7 @@
 
 using boost::algorithm::join;
 using namespace apache::thrift;
+using namespace org::apache::impala::fb;
 using namespace strings;
 
 DECLARE_bool(use_krpc);
@@ -222,6 +225,45 @@ const TBackendDescriptor& Scheduler::LookUpBackendDesc(
   return *desc;
 }
 
+Status Scheduler::GenerateScanRanges(const vector<TFileSplitGeneratorSpec>& 
specs,
+    vector<TScanRangeLocationList>* generated_scan_ranges) {
+  for (const auto& spec : specs) {
+    // Converts the spec to one or more scan ranges.
+    const FbFileDesc* fb_desc =
+        
flatbuffers::GetRoot<FbFileDesc>(spec.file_desc.file_desc_data.c_str());
+    DCHECK(fb_desc->file_blocks() == nullptr || fb_desc->file_blocks()->size() 
== 0);
+
+    long scan_range_offset = 0;
+    long remaining = fb_desc->length();
+    long scan_range_length = std::min(spec.max_block_size, fb_desc->length());
+    if (!spec.is_splittable) scan_range_length = fb_desc->length();
+
+    while (remaining > 0) {
+      THdfsFileSplit hdfs_scan_range;
+      THdfsCompression::type compression;
+      RETURN_IF_ERROR(FromFbCompression(fb_desc->compression(), &compression));
+      hdfs_scan_range.__set_file_compression(compression);
+      hdfs_scan_range.__set_file_length(fb_desc->length());
+      hdfs_scan_range.__set_file_name(fb_desc->file_name()->str());
+      hdfs_scan_range.__set_length(scan_range_length);
+      hdfs_scan_range.__set_mtime(fb_desc->last_modification_time());
+      hdfs_scan_range.__set_offset(scan_range_offset);
+      hdfs_scan_range.__set_partition_id(spec.partition_id);
+      TScanRange scan_range;
+      scan_range.__set_hdfs_file_split(hdfs_scan_range);
+      TScanRangeLocationList scan_range_list;
+      scan_range_list.__set_scan_range(scan_range);
+
+      generated_scan_ranges->push_back(scan_range_list);
+      scan_range_offset += scan_range_length;
+      remaining -= scan_range_length;
+      scan_range_length = (scan_range_length > remaining ? remaining : 
scan_range_length);
+    }
+  }
+
+  return Status::OK();
+}
+
 Status Scheduler::ComputeScanRangeAssignment(
     const BackendConfig& executor_config, QuerySchedule* schedule) {
   RuntimeProfile::Counter* total_assignment_timer =
@@ -247,11 +289,26 @@ Status Scheduler::ComputeScanRangeAssignment(
 
       FragmentScanRangeAssignment* assignment =
           
&schedule->GetFragmentExecParams(fragment.idx)->scan_range_assignment;
+
+      const vector<TScanRangeLocationList>* locations = nullptr;
+      vector<TScanRangeLocationList> expanded_locations;
+      if (entry.second.split_specs.empty()) {
+        // directly use the concrete ranges.
+        locations = &entry.second.concrete_ranges;
+      } else {
+        // union concrete ranges and expanded specs.
+        expanded_locations.insert(expanded_locations.end(),
+            entry.second.concrete_ranges.begin(), 
entry.second.concrete_ranges.end());
+        RETURN_IF_ERROR(
+            GenerateScanRanges(entry.second.split_specs, &expanded_locations));
+        locations = &expanded_locations;
+      }
+      DCHECK(locations != nullptr);
       RETURN_IF_ERROR(
           ComputeScanRangeAssignment(executor_config, node_id, 
node_replica_preference,
-              node_random_replica, entry.second, exec_request.host_list, 
exec_at_coord,
+              node_random_replica, *locations, exec_request.host_list, 
exec_at_coord,
               schedule->query_options(), total_assignment_timer, assignment));
-      schedule->IncNumScanRanges(entry.second.size());
+      schedule->IncNumScanRanges(locations->size());
     }
   }
   return Status::OK();
@@ -526,6 +583,7 @@ Status Scheduler::ComputeScanRangeAssignment(const 
BackendConfig& executor_confi
       exec_at_coord ? coord_only_backend_config_ : executor_config, 
total_assignments_,
       total_local_assignments_);
 
+  // Holds scan ranges that must be assigned for remote reads.
   vector<const TScanRangeLocationList*> remote_scan_range_locations;
 
   // Loop over all scan ranges, select an executor for those with local 
impalads and

http://git-wip-us.apache.org/repos/asf/impala/blob/11554a17/be/src/scheduling/scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index d0302e6..a182689 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -28,6 +28,7 @@
 
 #include "common/global-types.h"
 #include "common/status.h"
+#include "gen-cpp/CatalogObjects_generated.h"
 #include "gen-cpp/PlanNodes_types.h"
 #include "gen-cpp/StatestoreService_types.h"
 #include "gen-cpp/Types_types.h" // for TNetworkAddress
@@ -334,6 +335,10 @@ class Scheduler {
   Status GetRequestPool(const std::string& user, const TQueryOptions& 
query_options,
       std::string* pool) const;
 
+  /// Generates scan ranges from 'specs' and places them in 
'generated_scan_ranges'.
+  Status GenerateScanRanges(const std::vector<TFileSplitGeneratorSpec>& specs,
+      std::vector<TScanRangeLocationList>* generated_scan_ranges);
+
   /// Compute the assignment of scan ranges to hosts for each scan node in
   /// the schedule's TQueryExecRequest.plan_exec_info.
   /// Unpartitioned fragments are assigned to the coordinator. Populate the 
schedule's
@@ -345,7 +350,8 @@ class Scheduler {
 
   /// Process the list of scan ranges of a single plan node and compute scan 
range
   /// assignments (returned in 'assignment'). The result is a mapping from 
hosts to their
-  /// assigned scan ranges per plan node.
+  /// assigned scan ranges per plan node. Inputs that are scan range specs are 
used to
+  /// generate scan ranges.
   ///
   /// If exec_at_coord is true, all scan ranges will be assigned to the 
coordinator host.
   /// Otherwise the assignment is computed for each scan range as follows:
@@ -444,8 +450,7 @@ class Scheduler {
   /// The maximum number of instances is the value of query option mt_dop.
   /// For HDFS, this attempts to load balance among instances by computing the 
average
   /// number of bytes per instances and then in a single pass assigning scan 
ranges to
-  /// each
-  /// instances to roughly meet that average.
+  /// each instance to roughly meet that average.
   /// For all other storage mgrs, it load-balances the number of splits per 
instance.
   void CreateScanInstances(
       PlanNodeId scan_id, FragmentExecParams* fragment_params, QuerySchedule* 
schedule);

http://git-wip-us.apache.org/repos/asf/impala/blob/11554a17/be/src/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index d9092ce..8e6c7c1 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -47,6 +47,7 @@ add_library(Util
   disk-info.cc
   error-util.cc
   filesystem-util.cc
+  flat_buffer.cc
   hdfs-util.cc
   hdfs-bulk-ops.cc
   hdr-histogram.cc

http://git-wip-us.apache.org/repos/asf/impala/blob/11554a17/be/src/util/flat_buffer.cc
----------------------------------------------------------------------
diff --git a/be/src/util/flat_buffer.cc b/be/src/util/flat_buffer.cc
new file mode 100644
index 0000000..46448e9
--- /dev/null
+++ b/be/src/util/flat_buffer.cc
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/flat_buffer.h"
+
+#include <gutil/strings/substitute.h>
+
+using namespace apache::thrift;
+using namespace impala;
+using namespace org::apache::impala::fb;
+
+namespace impala {
+
+Status FromFbCompression(
+    FbCompression fb_compression, THdfsCompression::type* thrift_compression) {
+  switch (fb_compression) {
+    case FbCompression_NONE:
+      *thrift_compression = THdfsCompression::NONE;
+      break;
+    case FbCompression_DEFAULT:
+      *thrift_compression = THdfsCompression::DEFAULT;
+      break;
+    case FbCompression_GZIP:
+      *thrift_compression = THdfsCompression::GZIP;
+      break;
+    case FbCompression_DEFLATE:
+      *thrift_compression = THdfsCompression::DEFLATE;
+      break;
+    case FbCompression_BZIP2:
+      *thrift_compression = THdfsCompression::BZIP2;
+      break;
+    case FbCompression_SNAPPY:
+      *thrift_compression = THdfsCompression::SNAPPY;
+      break;
+    case FbCompression_SNAPPY_BLOCKED:
+      *thrift_compression = THdfsCompression::SNAPPY_BLOCKED;
+      break;
+    case FbCompression_LZO:
+      *thrift_compression = THdfsCompression::LZO;
+      break;
+    case FbCompression_LZ4:
+      *thrift_compression = THdfsCompression::LZ4;
+      break;
+    case FbCompression_ZLIB:
+      *thrift_compression = THdfsCompression::ZLIB;
+      break;
+    default:
+      return Status(strings::Substitute(
+          "Invalid file descriptor compression code: $0", fb_compression));
+  }
+  return Status::OK();
+}
+
+} // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/11554a17/be/src/util/flat_buffer.h
----------------------------------------------------------------------
diff --git a/be/src/util/flat_buffer.h b/be/src/util/flat_buffer.h
new file mode 100644
index 0000000..916db4e
--- /dev/null
+++ b/be/src/util/flat_buffer.h
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_UTIL_FLAT_BUFFER_H
+#define IMPALA_UTIL_FLAT_BUFFER_H
+
+#include "common/status.h"
+
+#include "gen-cpp/CatalogObjects_generated.h"
+#include "gen-cpp/ImpalaInternalService_constants.h"
+
+namespace impala {
+
+/// Converts a FlatBuffer representation for compression to the corresponding 
thrift
+/// representation. Returns an error for invalid mappings.
+Status FromFbCompression(org::apache::impala::fb::FbCompression fb_compression,
+    THdfsCompression::type* thrift_compression);
+} // namespace impala
+#endif // IMPALA_UTIL_FLAT_BUFFER_H

http://git-wip-us.apache.org/repos/asf/impala/blob/11554a17/common/thrift/Frontend.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index 64531ce..9c7777f 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -366,9 +366,9 @@ struct TPlanExecInfo {
   // it is unpartitioned.
   1: required list<Planner.TPlanFragment> fragments
 
-  // A map from scan node ids to a list of scan range locations.
+  // A map from scan node ids to a scan range specification.
   // The node ids refer to scan nodes in fragments[].plan
-  2: optional map<Types.TPlanNodeId, list<Planner.TScanRangeLocationList>>
+  2: optional map<Types.TPlanNodeId, Planner.TScanRangeSpec>
       per_node_scan_ranges
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/11554a17/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 01698ce..b45625f 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -195,8 +195,29 @@ struct THBaseKeyRange {
   2: optional string stopKey
 }
 
+// Specifies how THdfsFileSplits can be generated from HDFS files.
+// Currently used for files that do not have block locations,
+// such as S3, ADLS, and Local. The Frontend creates these and the
+// coordinator's scheduler expands them into THdfsFileSplits.
+// The plan is to use TFileSplitGeneratorSpec as well for HDFS
+// files with block information. Doing so will permit the FlatBuffer
+// representation used to represent block information to pass from the
+// FrontEnd to the Coordinator without transforming to a heavier
+// weight Thrift representation. See IMPALA-6458.
+struct TFileSplitGeneratorSpec {
+  1: required CatalogObjects.THdfsFileDesc file_desc
+
+  // Maximum length of a file split to generate.
+  2: required i64 max_block_size
+
+  3: required bool is_splittable
+
+  // ID of partition within the THdfsTable associated with this scan node.
+  4: required i64 partition_id
+}
+
 // Specification of an individual data range which is held in its entirety
-// by a storage server
+// by a storage server.
 struct TScanRange {
   // one of these must be set for every TScanRange
   1: optional THdfsFileSplit hdfs_file_split

http://git-wip-us.apache.org/repos/asf/impala/blob/11554a17/common/thrift/Planner.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Planner.thrift b/common/thrift/Planner.thrift
index 9d2fc9d..2619b44 100644
--- a/common/thrift/Planner.thrift
+++ b/common/thrift/Planner.thrift
@@ -101,6 +101,14 @@ struct TScanRangeLocationList {
   2: list<TScanRangeLocation> locations
 }
 
+// A specification for scan ranges. Scan ranges can be
+// concrete or specs, which are used to generate concrete ranges.
+// Each type is stored in a separate list.
+struct TScanRangeSpec {
+   1: optional list<TScanRangeLocationList> concrete_ranges
+   2: optional list<PlanNodes.TFileSplitGeneratorSpec> split_specs
+}
+
 // A plan: tree of plan fragments that materializes either a query result or 
the build
 // side of a join used by another plan; it consists of a sequence of plan 
fragments.
 // TODO: rename both this and PlanNodes.TPlan (TPlan should be something like 
TExecPlan

http://git-wip-us.apache.org/repos/asf/impala/blob/11554a17/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 1b05804..5765902 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -86,7 +86,7 @@ public class HdfsPartition implements 
Comparable<HdfsPartition> {
 
     // Minimum block size in bytes allowed for synthetic file blocks (other 
than the last
     // block, which may be shorter).
-    private final static long MIN_SYNTHETIC_BLOCK_SIZE = 1024 * 1024;
+    public final static long MIN_SYNTHETIC_BLOCK_SIZE = 1024 * 1024;
 
     // Internal representation of a file descriptor using a FlatBuffer.
     private final FbFileDesc fbFileDescriptor_;
@@ -123,35 +123,35 @@ public class HdfsPartition implements 
Comparable<HdfsPartition> {
     /**
      * Creates the file descriptor of a file represented by 'fileStatus' that
      * resides in a filesystem that doesn't support the BlockLocation API 
(e.g. S3).
-     * fileFormat' is the file format of the partition where this file resides 
and
-     * 'hostIndex' stores the network addresses of the hosts that store blocks 
of
-     * the parent HdfsTable.
+     * 'fileFormat' is the file format of the partition where this file 
resides.
      */
-    public static FileDescriptor createWithSynthesizedBlockMd(FileStatus 
fileStatus,
-        HdfsFileFormat fileFormat, ListMap<TNetworkAddress> hostIndex) {
+    public static FileDescriptor createWithNoBlocks(
+        FileStatus fileStatus, HdfsFileFormat fileFormat) {
       FlatBufferBuilder fbb = new FlatBufferBuilder(1);
-      int[] fbFileBlockOffets =
-          synthesizeFbBlockMd(fbb, fileStatus, fileFormat, hostIndex);
-      return new FileDescriptor(createFbFileDesc(fbb, fileStatus, 
fbFileBlockOffets));
+      return new FileDescriptor(createFbFileDesc(fbb, fileStatus, null));
     }
 
     /**
      * Serializes the metadata of a file descriptor represented by 
'fileStatus' into a
-     * FlatBuffer using 'fbb' and returns the associated FbFileDesc object. 
'blockOffsets'
-     * are the offsets of the serialized block metadata of this file in the 
underlying
-     * buffer.
+     * FlatBuffer using 'fbb' and returns the associated FbFileDesc object.
+     * 'fbFileBlockOffsets' are the offsets of the serialized block metadata 
of this file
+     * in the underlying buffer. Can be null if there are no blocks.
      */
     private static FbFileDesc createFbFileDesc(FlatBufferBuilder fbb,
         FileStatus fileStatus, int[] fbFileBlockOffets) {
       int fileNameOffset = fbb.createString(fileStatus.getPath().getName());
-      int blockVectorOffset = FbFileDesc.createFileBlocksVector(fbb, 
fbFileBlockOffets);
+      // A negative block vector offset is used when no block offsets are 
specified.
+      int blockVectorOffset = -1;
+      if (fbFileBlockOffets != null) {
+        blockVectorOffset = FbFileDesc.createFileBlocksVector(fbb, 
fbFileBlockOffets);
+      }
       FbFileDesc.startFbFileDesc(fbb);
       FbFileDesc.addFileName(fbb, fileNameOffset);
       FbFileDesc.addLength(fbb, fileStatus.getLen());
       FbFileDesc.addLastModificationTime(fbb, 
fileStatus.getModificationTime());
       HdfsCompression comp = 
HdfsCompression.fromFileName(fileStatus.getPath().getName());
       FbFileDesc.addCompression(fbb, comp.toFb());
-      FbFileDesc.addFileBlocks(fbb, blockVectorOffset);
+      if (blockVectorOffset >= 0) FbFileDesc.addFileBlocks(fbb, 
blockVectorOffset);
       fbb.finish(FbFileDesc.endFbFileDesc(fbb));
       // To eliminate memory fragmentation, copy the contents of the 
FlatBuffer to the
       // smallest possible ByteBuffer.
@@ -161,36 +161,6 @@ public class HdfsPartition implements 
Comparable<HdfsPartition> {
       return FbFileDesc.getRootAsFbFileDesc((ByteBuffer)compressedBb.flip());
     }
 
-    /**
-     * Synthesizes the block metadata of a file represented by 'fileStatus' 
that resides
-     * in a filesystem that doesn't support the BlockLocation API. The block 
metadata
-     * consist of the length and offset of each file block. It serializes the
-     * block metadata into a FlatBuffer using 'fbb' and returns their offsets 
in the
-     * underlying buffer. 'fileFormat' is the file format of the underlying 
partition and
-     * 'hostIndex' stores the network addresses of the hosts that store the 
blocks of the
-     * parent HdfsTable.
-     */
-    private static int[] synthesizeFbBlockMd(FlatBufferBuilder fbb, FileStatus 
fileStatus,
-        HdfsFileFormat fileFormat, ListMap<TNetworkAddress> hostIndex) {
-      long start = 0;
-      long remaining = fileStatus.getLen();
-      long blockSize = fileStatus.getBlockSize();
-      if (blockSize < MIN_SYNTHETIC_BLOCK_SIZE) blockSize = 
MIN_SYNTHETIC_BLOCK_SIZE;
-      if (!fileFormat.isSplittable(HdfsCompression.fromFileName(
-          fileStatus.getPath().getName()))) {
-        blockSize = remaining;
-      }
-      List<Integer> fbFileBlockOffets = Lists.newArrayList();
-      while (remaining > 0) {
-        long len = Math.min(remaining, blockSize);
-        fbFileBlockOffets.add(FileBlock.createFbFileBlock(fbb, start, len,
-            (short) hostIndex.getIndex(REMOTE_NETWORK_ADDRESS)));
-        remaining -= len;
-        start += len;
-      }
-      return Ints.toArray(fbFileBlockOffets);
-    }
-
     public String getFileName() { return fbFileDescriptor_.fileName(); }
     public long getFileLength() { return fbFileDescriptor_.length(); }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/11554a17/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 9910eb8..0362625 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -37,7 +37,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
@@ -324,8 +323,8 @@ public class HdfsTable extends Table {
     // have changed since last load (more details in hasFileChanged()).
     private final boolean reuseFileMd_;
 
-    public FileMetadataLoadRequest(Path path, List<HdfsPartition> partitions,
-       boolean reuseFileMd) {
+    public FileMetadataLoadRequest(
+        Path path, List<HdfsPartition> partitions, boolean reuseFileMd) {
       hdfsPath_ = path;
       partitionList_ = partitions;
       reuseFileMd_ = reuseFileMd;
@@ -340,7 +339,7 @@ public class HdfsTable extends Table {
     }
 
     public String debugString() {
-      String loadType = reuseFileMd_? "Refreshed" : "Loaded";
+      String loadType = reuseFileMd_ ? "Refreshed" : "Loaded";
       return String.format("%s file metadata for path: %s", loadType,
           hdfsPath_.toString());
     }
@@ -404,12 +403,8 @@ public class HdfsTable extends Table {
    *   for each file under it.
    * - For every valid data file, enumerate all its blocks and their 
corresponding hosts
    *   and disk IDs if the underlying file system supports the block locations 
API
-   *   (for ex: HDFS). For other file systems (like S3) we synthesize the file 
metadata
-   *   manually by splitting the file ranges into fixed size blocks.
-   * For filesystems that don't support BlockLocation API, synthesize file 
blocks
-   * by manually splitting the file range into fixed-size blocks.  That way, 
scan
-   * ranges can be derived from file blocks as usual.  All synthesized blocks 
are given
-   * an invalid network address so that the scheduler will treat them as 
remote.
+   *   (for ex: HDFS). For other file systems (like S3), per-block information 
is not
+   *   available so it's not stored.
    */
   private FileMetadataLoadStats resetAndLoadFileMetadata(
       Path partDir, List<HdfsPartition> partitions) throws IOException {
@@ -421,7 +416,7 @@ public class HdfsTable extends Table {
     }
 
     FileSystem fs = partDir.getFileSystem(CONF);
-    boolean synthesizeFileMd = !FileSystemUtil.supportsStorageIds(fs);
+    boolean supportsBlocks = FileSystemUtil.supportsStorageIds(fs);
     RemoteIterator<LocatedFileStatus> fileStatusIter =
         FileSystemUtil.listFiles(fs, partDir, false);
     if (fileStatusIter == null) return loadStats;
@@ -434,14 +429,12 @@ public class HdfsTable extends Table {
         continue;
       }
       FileDescriptor fd;
-      // Block locations are manually synthesized if the underlying fs does 
not support
-      // the block location API.
-      if (synthesizeFileMd) {
-        fd = FileDescriptor.createWithSynthesizedBlockMd(fileStatus,
-            partitions.get(0).getFileFormat(), hostIndex_);
+      if (supportsBlocks) {
+        fd = FileDescriptor.create(fileStatus, fileStatus.getBlockLocations(), 
fs,
+            hostIndex_, numUnknownDiskIds);
       } else {
-        fd = FileDescriptor.create(fileStatus,
-            fileStatus.getBlockLocations(), fs, hostIndex_, numUnknownDiskIds);
+        fd = FileDescriptor.createWithNoBlocks(
+            fileStatus, partitions.get(0).getFileFormat());
       }
       newFileDescs.add(fd);
       ++loadStats.loadedFiles;
@@ -485,10 +478,9 @@ public class HdfsTable extends Table {
     FileSystem fs = partDir.getFileSystem(CONF);
     FileStatus[] fileStatuses = FileSystemUtil.listStatus(fs, partDir);
     if (fileStatuses == null) return loadStats;
-    boolean synthesizeFileMd = !FileSystemUtil.supportsStorageIds(fs);
+    boolean supportsBlocks = FileSystemUtil.supportsStorageIds(fs);
     Reference<Long> numUnknownDiskIds = new Reference<Long>(Long.valueOf(0));
     List<FileDescriptor> newFileDescs = Lists.newArrayList();
-    HdfsFileFormat fileFormat = partitions.get(0).getFileFormat();
     // If there is a cached partition mapped to this path, we recompute the 
block
     // locations even if the underlying files have not changed 
(hasFileChanged()).
     // This is done to keep the cached block metadata up to date.
@@ -507,14 +499,14 @@ public class HdfsTable extends Table {
       String fileName = fileStatus.getPath().getName().toString();
       FileDescriptor fd = fileDescsByName.get(fileName);
       if (isPartitionMarkedCached || hasFileChanged(fd, fileStatus)) {
-        if (synthesizeFileMd) {
-          fd = FileDescriptor.createWithSynthesizedBlockMd(fileStatus,
-              fileFormat, hostIndex_);
-        } else {
+        if (supportsBlocks) {
           BlockLocation[] locations =
-            fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+              fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
           fd = FileDescriptor.create(fileStatus, locations, fs, hostIndex_,
               numUnknownDiskIds);
+        } else {
+          fd = FileDescriptor.createWithNoBlocks(
+              fileStatus, partitions.get(0).getFileFormat());
         }
         ++loadStats.loadedFiles;
       } else {
@@ -565,7 +557,6 @@ public class HdfsTable extends Table {
     return TCatalogObjectType.TABLE;
   }
   public boolean isMarkedCached() { return isMarkedCached_; }
-
   public Collection<HdfsPartition> getPartitions() { return 
partitionMap_.values(); }
   public Map<Long, HdfsPartition> getPartitionMap() { return partitionMap_; }
   public Set<Long> getNullPartitionIds(int i) { return 
nullPartitionIds_.get(i); }
@@ -625,8 +616,7 @@ public class HdfsTable extends Table {
    * Gets the HdfsPartition matching the given partition spec. Returns null if 
no match
    * was found.
    */
-  public HdfsPartition getPartition(
-      List<PartitionKeyValue> partitionSpec) {
+  public HdfsPartition getPartition(List<PartitionKeyValue> partitionSpec) {
     List<TPartitionKeyValue> partitionKeyValues = Lists.newArrayList();
     for (PartitionKeyValue kv: partitionSpec) {
       String value = PartitionKeyValue.getPartitionKeyValueString(
@@ -688,9 +678,7 @@ public class HdfsTable extends Table {
           break;
         }
       }
-      if (matchFound) {
-        return partition;
-      }
+      if (matchFound) return partition;
     }
     return null;
   }
@@ -702,8 +690,7 @@ public class HdfsTable extends Table {
       List<List<TPartitionKeyValue>> partitionSet) {
     List<HdfsPartition> partitions = Lists.newArrayList();
     for (List<TPartitionKeyValue> kv : partitionSet) {
-      HdfsPartition partition =
-          getPartitionFromThriftPartitionSpec(kv);
+      HdfsPartition partition = getPartitionFromThriftPartitionSpec(kv);
       if (partition != null) partitions.add(partition);
     }
     return partitions;
@@ -876,7 +863,7 @@ public class HdfsTable extends Table {
     Preconditions.checkState(numPaths > 0);
     FileSystem tableFs;
     try {
-      tableFs  = (new Path(getLocation())).getFileSystem(CONF);
+      tableFs = (new Path(getLocation())).getFileSystem(CONF);
     } catch (IOException e) {
       throw new CatalogException("Invalid table path for table: " + 
getFullName(), e);
     }
@@ -896,7 +883,7 @@ public class HdfsTable extends Table {
       boolean reuseFileMd) throws CatalogException {
     int numPathsToLoad = partsByPath.size();
     // For tables without partitions we have no file metadata to load.
-    if (numPathsToLoad == 0)  return;
+    if (numPathsToLoad == 0) return;
 
     int threadPoolSize = getLoadingThreadPoolSize(numPathsToLoad);
     LOG.info(String.format("Loading file and block metadata for %s paths for 
table %s " +
@@ -1031,7 +1018,6 @@ public class HdfsTable extends Table {
     return addedParts;
   }
 
-
   /**
    * Creates a new HdfsPartition from a specified StorageDescriptor and an HMS 
partition
    * object.
@@ -1203,8 +1189,7 @@ public class HdfsTable extends Table {
     // refer to this to understand how to create new partitions.
     HdfsStorageDescriptor hdfsStorageDescriptor =
         HdfsStorageDescriptor.fromStorageDescriptor(this.name_, 
storageDescriptor);
-    HdfsPartition partition = HdfsPartition.defaultPartition(this,
-        hdfsStorageDescriptor);
+    HdfsPartition partition = HdfsPartition.defaultPartition(this, 
hdfsStorageDescriptor);
     partitionMap_.put(partition.getId(), partition);
   }
 
@@ -1350,8 +1335,7 @@ public class HdfsTable extends Table {
     // in the Hive Metastore. Note: This is a relatively "cheap" operation
     // (~.3 secs for 30K partitions).
     Set<String> msPartitionNames = Sets.newHashSet();
-    msPartitionNames.addAll(
-        client.listPartitionNames(db_.getName(), name_, (short) -1));
+    msPartitionNames.addAll(client.listPartitionNames(db_.getName(), name_, 
(short) -1));
     // Names of loaded partitions in this table
     Set<String> partitionNames = Sets.newHashSet();
     // Partitions for which file metadata must be loaded, grouped by partition 
paths.
@@ -1682,13 +1666,13 @@ public class HdfsTable extends Table {
     }
     avroSchema_ = hdfsTable.isSetAvroSchema() ? hdfsTable.getAvroSchema() : 
null;
     isMarkedCached_ =
-      HdfsCachingUtil.validateCacheParams(getMetaStoreTable().getParameters());
+        
HdfsCachingUtil.validateCacheParams(getMetaStoreTable().getParameters());
   }
 
   @Override
   public TTableDescriptor toThriftDescriptor(int tableId,
       Set<Long> referencedPartitions) {
-    // Create thrift descriptors to send to the BE.  The BE does not
+    // Create thrift descriptors to send to the BE. The BE does not
     // need any information below the THdfsPartition level.
     TTableDescriptor tableDesc = new TTableDescriptor(tableId, 
TTableType.HDFS_TABLE,
         getTColumnDescriptors(), numClusteringCols_, name_, db_.getName());
@@ -1856,7 +1840,7 @@ public class HdfsTable extends Table {
    *
    * path e.g. c1=1/c2=2/c3=3
    * partitionKeys The ordered partition keys. e.g.("c1", "c2", "c3")
-   * depth The start position in partitionKeys to match the path name.
+   * depth. The start position in partitionKeys to match the path name.
    * partitionValues The partition values used to create a partition.
    * partitionExprs The list of LiteralExprs which is used to avoid duplicate 
partitions.
    * E.g. Having /c1=0001 and /c1=01, we should make sure only one partition
@@ -1923,7 +1907,7 @@ public class HdfsTable extends Table {
         expr = LiteralExpr.create(value, type);
         // Skip large value which exceeds the MAX VALUE of specified Type.
         if (expr instanceof NumericLiteral) {
-          if (NumericLiteral.isOverflow(((NumericLiteral)expr).getValue(), 
type)) {
+          if (NumericLiteral.isOverflow(((NumericLiteral) expr).getValue(), 
type)) {
             LOG.warn(String.format("Skip the overflow value (%s) for Type 
(%s).",
                 value, type.toSql()));
             return null;

http://git-wip-us.apache.org/repos/asf/impala/blob/11554a17/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
index 5615b6e..9175fbb 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
@@ -55,6 +55,7 @@ import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TScanRange;
 import org.apache.impala.thrift.TScanRangeLocation;
 import org.apache.impala.thrift.TScanRangeLocationList;
+import org.apache.impala.thrift.TScanRangeSpec;
 import org.apache.impala.thrift.TStatus;
 import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
@@ -324,9 +325,9 @@ public class DataSourceScanNode extends ScanNode {
     // TODO: Does the port matter?
     TNetworkAddress networkAddress = 
addressToTNetworkAddress("localhost:12345");
     Integer hostIndex = analyzer.getHostIndex().getIndex(networkAddress);
-    scanRanges_ = Lists.newArrayList(
-        new TScanRangeLocationList(
-            new TScanRange(), Lists.newArrayList(new 
TScanRangeLocation(hostIndex))));
+    scanRangeSpecs_ = new TScanRangeSpec();
+    scanRangeSpecs_.addToConcrete_ranges(new TScanRangeLocationList(
+        new TScanRange(), Lists.newArrayList(new 
TScanRangeLocation(hostIndex))));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/impala/blob/11554a17/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
index c0b81ca..881c4ae 100644
--- a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
@@ -52,6 +52,7 @@ import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TScanRange;
 import org.apache.impala.thrift.TScanRangeLocation;
 import org.apache.impala.thrift.TScanRangeLocationList;
+import org.apache.impala.thrift.TScanRangeSpec;
 import org.apache.impala.util.MembershipSnapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -122,6 +123,7 @@ public class HBaseScanNode extends ScanNode {
     analyzer.materializeSlots(conjuncts_);
     computeMemLayout(analyzer);
     computeScanRangeLocations(analyzer);
+    Preconditions.checkState(!scanRangeSpecs_.isSetSplit_specs());
 
     // Call computeStats() after materializing slots and computing the mem 
layout.
     computeStats(analyzer);
@@ -219,7 +221,8 @@ public class HBaseScanNode extends ScanNode {
     // Assume that each node in the cluster gets a scan range, unless there 
are fewer
     // scan ranges than nodes.
     numNodes_ = Math.max(1,
-        Math.min(scanRanges_.size(), 
MembershipSnapshot.getCluster().numNodes()));
+        Math.min(scanRangeSpecs_.getConcrete_rangesSize(),
+            MembershipSnapshot.getCluster().numNodes()));
     if (LOG.isTraceEnabled()) {
       LOG.trace("computeStats HbaseScan: #nodes=" + 
Integer.toString(numNodes_));
     }
@@ -291,7 +294,7 @@ public class HBaseScanNode extends ScanNode {
    * of that region server.
    */
   private void computeScanRangeLocations(Analyzer analyzer) {
-    scanRanges_ = Lists.newArrayList();
+    scanRangeSpecs_ = new TScanRangeSpec();
 
     // For empty scan node, return an empty list.
     if (isEmpty_) return;
@@ -348,11 +351,12 @@ public class HBaseScanNode extends ScanNode {
           TNetworkAddress networkAddress = 
addressToTNetworkAddress(locEntry.getKey());
           scanRangeLocation.addToLocations(
               new 
TScanRangeLocation(analyzer.getHostIndex().getIndex(networkAddress)));
-          scanRanges_.add(scanRangeLocation);
 
           TScanRange scanRange = new TScanRange();
           scanRange.setHbase_key_range(keyRange);
           scanRangeLocation.setScan_range(scanRange);
+
+          scanRangeSpecs_.addToConcrete_ranges(scanRangeLocation);
         }
         prevEndKey = curRegEndKey;
       }

http://git-wip-us.apache.org/repos/asf/impala/blob/11554a17/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 0d9d24c..2c127f0 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -49,6 +49,7 @@ import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.analysis.TupleId;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.ColumnStats;
+import org.apache.impala.catalog.HdfsCompression;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.HdfsPartition;
 import org.apache.impala.catalog.HdfsPartition.FileBlock;
@@ -60,6 +61,7 @@ import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.NotImplementedException;
+import org.apache.impala.common.Pair;
 import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.fb.FbFileBlock;
@@ -67,6 +69,7 @@ import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TExpr;
 import org.apache.impala.thrift.THdfsFileSplit;
+import org.apache.impala.thrift.TFileSplitGeneratorSpec;
 import org.apache.impala.thrift.THdfsScanNode;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TPlanNode;
@@ -76,6 +79,7 @@ import org.apache.impala.thrift.TReplicaPreference;
 import org.apache.impala.thrift.TScanRange;
 import org.apache.impala.thrift.TScanRangeLocation;
 import org.apache.impala.thrift.TScanRangeLocationList;
+import org.apache.impala.thrift.TScanRangeSpec;
 import org.apache.impala.thrift.TTableStats;
 import org.apache.impala.util.BitUtil;
 import org.apache.impala.util.MembershipSnapshot;
@@ -191,6 +195,9 @@ public class HdfsScanNode extends ScanNode {
   private long partitionNumRows_ = -1;
   private long extrapolatedNumRows_ = -1;
 
+  // Number of scan ranges that will be generated for all 
TFileSplitGeneratorSpec's.
+  private long generatedScanRangeCount_ = 0;
+
   // Estimated row count of the largest scan range. -1 if no stats are 
available.
   // Set in computeScanRangeLocations()
   private long maxScanRangeNumRows_ = -1;
@@ -746,7 +753,7 @@ public class HdfsScanNode extends ScanNode {
 
     long scanRangeBytesLimit = 
analyzer.getQueryCtx().client_request.getQuery_options()
         .getMax_scan_range_length();
-    scanRanges_ = Lists.newArrayList();
+    scanRangeSpecs_ = new TScanRangeSpec();
     numPartitions_ = (sampledFiles != null) ? sampledFiles.size() : 
partitions_.size();
     totalFiles_ = 0;
     totalBytes_ = 0;
@@ -773,73 +780,35 @@ public class HdfsScanNode extends ScanNode {
       } catch (IOException e) {
         throw new ImpalaRuntimeException("Error determining partition fs 
type", e);
       }
-      boolean checkMissingDiskIds = 
FileSystemUtil.supportsStorageIds(partitionFs);
-      boolean partitionMissingDiskIds = false;
+      boolean fsHasBlocks = FileSystemUtil.supportsStorageIds(partitionFs);
+      if (!fsHasBlocks) {
+        // Limit the scan range length if generating scan ranges.
+        long maxBlockSize =
+            
Math.max(partitionFs.getDefaultBlockSize(partition.getLocationPath()),
+                HdfsPartition.FileDescriptor.MIN_SYNTHETIC_BLOCK_SIZE);
+        if (scanRangeBytesLimit > 0) {
+          scanRangeBytesLimit = Math.min(scanRangeBytesLimit, maxBlockSize);
+        } else {
+          scanRangeBytesLimit = maxBlockSize;
+        }
+      }
       final long partitionBytes = 
FileDescriptor.computeTotalFileLength(fileDescs);
       long partitionMaxScanRangeBytes = 0;
+      boolean partitionMissingDiskIds = false;
       totalBytes_ += partitionBytes;
       totalFiles_ += fileDescs.size();
       for (FileDescriptor fileDesc: fileDescs) {
-        boolean fileDescMissingDiskIds = false;
-        for (int j = 0; j < fileDesc.getNumFileBlocks(); ++j) {
-          FbFileBlock block = fileDesc.getFbFileBlock(j);
-          int replicaHostCount = FileBlock.getNumReplicaHosts(block);
-          if (replicaHostCount == 0) {
-            // we didn't get locations for this block; for now, just ignore 
the block
-            // TODO: do something meaningful with that
-            continue;
-          }
-          // Collect the network address and volume ID of all replicas of this 
block.
-          List<TScanRangeLocation> locations = Lists.newArrayList();
-          for (int i = 0; i < replicaHostCount; ++i) {
-            TScanRangeLocation location = new TScanRangeLocation();
-            // Translate from the host index (local to the HdfsTable) to 
network address.
-            int replicaHostIdx = FileBlock.getReplicaHostIdx(block, i);
-            TNetworkAddress networkAddress =
-                partition.getTable().getHostIndex().getEntry(replicaHostIdx);
-            Preconditions.checkNotNull(networkAddress);
-            // Translate from network address to the global (to this request) 
host index.
-            Integer globalHostIdx = 
analyzer.getHostIndex().getIndex(networkAddress);
-            location.setHost_idx(globalHostIdx);
-            if (checkMissingDiskIds && FileBlock.getDiskId(block, i) == -1) {
-              ++numScanRangesNoDiskIds_;
-              partitionMissingDiskIds = true;
-              fileDescMissingDiskIds = true;
-            }
-            location.setVolume_id(FileBlock.getDiskId(block, i));
-            location.setIs_cached(FileBlock.isReplicaCached(block, i));
-            locations.add(location);
-          }
-          // create scan ranges, taking into account maxScanRangeLength
-          long currentOffset = FileBlock.getOffset(block);
-          long remainingLength = FileBlock.getLength(block);
-          while (remainingLength > 0) {
-            long currentLength = remainingLength;
-            if (scanRangeBytesLimit > 0 && remainingLength > 
scanRangeBytesLimit) {
-              currentLength = scanRangeBytesLimit;
-            }
-            TScanRange scanRange = new TScanRange();
-            scanRange.setHdfs_file_split(new 
THdfsFileSplit(fileDesc.getFileName(),
-                currentOffset, currentLength, partition.getId(), 
fileDesc.getFileLength(),
-                fileDesc.getFileCompression().toThrift(),
-                fileDesc.getModificationTime()));
-            TScanRangeLocationList scanRangeLocations = new 
TScanRangeLocationList();
-            scanRangeLocations.scan_range = scanRange;
-            scanRangeLocations.locations = locations;
-            scanRanges_.add(scanRangeLocations);
-            largestScanRangeBytes_ = Math.max(largestScanRangeBytes_, 
currentLength);
-            partitionMaxScanRangeBytes =
-                Math.max(partitionMaxScanRangeBytes, currentLength);
-            remainingLength -= currentLength;
-            currentOffset += currentLength;
-          }
-        }
-        if (fileDescMissingDiskIds) {
-          ++numFilesNoDiskIds_;
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("File blocks mapping to unknown disk ids. Dir: " +
-                partition.getLocation() + " File:" + fileDesc.toString());
-          }
+        if (!fsHasBlocks) {
+          Preconditions.checkState(fileDesc.getNumFileBlocks() == 0);
+          generateScanRangeSpecs(partition, fileDesc, scanRangeBytesLimit);
+        } else {
+          // Skips files that have no associated blocks.
+          if (fileDesc.getNumFileBlocks() == 0) continue;
+          Pair<Boolean, Long> result = transformBlocksToScanRanges(
+              partition, fileDesc, fsHasBlocks, scanRangeBytesLimit, analyzer);
+          partitionMaxScanRangeBytes =
+              Math.max(partitionMaxScanRangeBytes, result.second);
+          if (result.first) partitionMissingDiskIds = true;
         }
       }
       if (partitionMissingDiskIds) ++numPartitionsNoDiskIds_;
@@ -881,13 +850,115 @@ public class HdfsScanNode extends ScanNode {
   }
 
   /**
+   * Given a fileDesc of partition, generates TScanRanges that are 
specifications rather
+   * than actual ranges. Defers generating the TScanRanges to the backend.
+   * Used for file systems that do not have any physical attributes associated 
with
+   * blocks (e.g., replica locations, caching, etc.). 'maxBlock' size 
determines how large
+   * the scan ranges can be (may be ignored if the file is not splittable).
+   */
+  private void generateScanRangeSpecs(
+      HdfsPartition partition, HdfsPartition.FileDescriptor fileDesc, long 
maxBlockSize) {
+    Preconditions.checkArgument(fileDesc.getNumFileBlocks() == 0);
+    Preconditions.checkArgument(maxBlockSize > 0);
+    if (fileDesc.getFileLength() <= 0) return;
+    boolean splittable = partition.getFileFormat().isSplittable(
+        HdfsCompression.fromFileName(fileDesc.getFileName()));
+    TFileSplitGeneratorSpec splitSpec = new TFileSplitGeneratorSpec(
+        fileDesc.toThrift(), maxBlockSize, splittable, partition.getId());
+    scanRangeSpecs_.addToSplit_specs(splitSpec);
+    long scanRangeBytes = Math.min(maxBlockSize, fileDesc.getFileLength());
+    if (splittable) {
+      generatedScanRangeCount_ +=
+          Math.ceil((double) fileDesc.getFileLength() / (double) maxBlockSize);
+    } else {
+      ++generatedScanRangeCount_;
+      scanRangeBytes = fileDesc.getFileLength();
+    }
+    largestScanRangeBytes_ = Math.max(largestScanRangeBytes_, scanRangeBytes);
+  }
+
+  /**
+   * Given a fileDesc of partition, transforms the blocks into TScanRanges. 
Each range
+   * is paired with information about where the block is located so that the 
backend
+   * coordinator can assign ranges to workers to avoid remote reads. These
+   * TScanRangeLocationLists are added to scanRanges_. A pair is returned that 
indicates
+   * whether the file has a missing disk id and the maximum scan range (in 
bytes) found.
+   */
+  private Pair<Boolean, Long> transformBlocksToScanRanges(HdfsPartition 
partition,
+      HdfsPartition.FileDescriptor fileDesc, boolean fsHasBlocks,
+      long scanRangeBytesLimit, Analyzer analyzer) {
+    Preconditions.checkArgument(fileDesc.getNumFileBlocks() > 0);
+    boolean fileDescMissingDiskIds = false;
+    long fileMaxScanRangeBytes = 0;
+    for (int i = 0; i < fileDesc.getNumFileBlocks(); ++i) {
+      FbFileBlock block = fileDesc.getFbFileBlock(i);
+      int replicaHostCount = FileBlock.getNumReplicaHosts(block);
+      if (replicaHostCount == 0) {
+        // we didn't get locations for this block; for now, just ignore the 
block
+        // TODO: do something meaningful with that
+        continue;
+      }
+      // Collect the network address and volume ID of all replicas of this 
block.
+      List<TScanRangeLocation> locations = Lists.newArrayList();
+      for (int j = 0; j < replicaHostCount; ++j) {
+        TScanRangeLocation location = new TScanRangeLocation();
+        // Translate from the host index (local to the HdfsTable) to network 
address.
+        int replicaHostIdx = FileBlock.getReplicaHostIdx(block, j);
+        TNetworkAddress networkAddress =
+            partition.getTable().getHostIndex().getEntry(replicaHostIdx);
+        Preconditions.checkNotNull(networkAddress);
+        // Translate from network address to the global (to this request) host 
index.
+        Integer globalHostIdx = 
analyzer.getHostIndex().getIndex(networkAddress);
+        location.setHost_idx(globalHostIdx);
+        if (fsHasBlocks && FileBlock.getDiskId(block, j) == -1) {
+          ++numScanRangesNoDiskIds_;
+          fileDescMissingDiskIds = true;
+        }
+        location.setVolume_id(FileBlock.getDiskId(block, j));
+        location.setIs_cached(FileBlock.isReplicaCached(block, j));
+        locations.add(location);
+      }
+      // create scan ranges, taking into account maxScanRangeLength
+      long currentOffset = FileBlock.getOffset(block);
+      long remainingLength = FileBlock.getLength(block);
+      while (remainingLength > 0) {
+        long currentLength = remainingLength;
+        if (scanRangeBytesLimit > 0 && remainingLength > scanRangeBytesLimit) {
+          currentLength = scanRangeBytesLimit;
+        }
+        TScanRange scanRange = new TScanRange();
+        scanRange.setHdfs_file_split(new THdfsFileSplit(fileDesc.getFileName(),
+            currentOffset, currentLength, partition.getId(), 
fileDesc.getFileLength(),
+            fileDesc.getFileCompression().toThrift(), 
fileDesc.getModificationTime()));
+        TScanRangeLocationList scanRangeLocations = new 
TScanRangeLocationList();
+        scanRangeLocations.scan_range = scanRange;
+        scanRangeLocations.locations = locations;
+        scanRangeSpecs_.addToConcrete_ranges(scanRangeLocations);
+        largestScanRangeBytes_ = Math.max(largestScanRangeBytes_, 
currentLength);
+        fileMaxScanRangeBytes = Math.max(fileMaxScanRangeBytes, currentLength);
+        remainingLength -= currentLength;
+        currentOffset += currentLength;
+      }
+    }
+    if (fileDescMissingDiskIds) {
+      ++numFilesNoDiskIds_;
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("File blocks mapping to unknown disk ids. Dir: "
+            + partition.getLocation() + " File:" + fileDesc.toString());
+      }
+    }
+
+    return new Pair<Boolean, Long>(fileDescMissingDiskIds, 
fileMaxScanRangeBytes);
+  }
+
+  /**
    * Computes the average row size, input and output cardinalities, and 
estimates the
    * number of nodes.
    * Requires that computeScanRangeLocations() has been called.
    */
   @Override
   public void computeStats(Analyzer analyzer) {
-    Preconditions.checkNotNull(scanRanges_);
+    Preconditions.checkNotNull(scanRangeSpecs_);
     super.computeStats(analyzer);
     computeCardinalities();
     computeNumNodes(analyzer, cardinality_);
@@ -1013,54 +1084,67 @@ public class HdfsScanNode extends ScanNode {
    * ranges that cannot will be round-robined across the cluster.
    */
   protected void computeNumNodes(Analyzer analyzer, long cardinality) {
-    Preconditions.checkNotNull(scanRanges_);
+    Preconditions.checkNotNull(scanRangeSpecs_);
     MembershipSnapshot cluster = MembershipSnapshot.getCluster();
     HashSet<TNetworkAddress> localHostSet = Sets.newHashSet();
     int totalNodes = 0;
     int numLocalRanges = 0;
     int numRemoteRanges = 0;
-    for (TScanRangeLocationList range: scanRanges_) {
-      boolean anyLocal = false;
-      for (TScanRangeLocation loc: range.locations) {
-        TNetworkAddress dataNode = 
analyzer.getHostIndex().getEntry(loc.getHost_idx());
-        if (cluster.contains(dataNode)) {
-          anyLocal = true;
-          // Use the full datanode address (including port) to account for the 
test
-          // minicluster where there are multiple datanodes and impalads on a 
single
-          // host.  This assumes that when an impalad is colocated with a 
datanode,
-          // there are the same number of impalads as datanodes on this host 
in this
-          // cluster.
-          localHostSet.add(dataNode);
+    if (scanRangeSpecs_.isSetConcrete_ranges()) {
+      for (TScanRangeLocationList range : scanRangeSpecs_.concrete_ranges) {
+        boolean anyLocal = false;
+        if (range.isSetLocations()) {
+          for (TScanRangeLocation loc : range.locations) {
+            TNetworkAddress dataNode =
+                analyzer.getHostIndex().getEntry(loc.getHost_idx());
+            if (cluster.contains(dataNode)) {
+              anyLocal = true;
+              // Use the full datanode address (including port) to account for 
the test
+              // minicluster where there are multiple datanodes and impalads 
on a single
+              // host.  This assumes that when an impalad is colocated with a 
datanode,
+              // there are the same number of impalads as datanodes on this 
host in this
+              // cluster.
+              localHostSet.add(dataNode);
+            }
+          }
         }
+        // This range has at least one replica with a colocated impalad, so 
assume it
+        // will be scheduled on one of those nodes.
+        if (anyLocal) {
+          ++numLocalRanges;
+        } else {
+          ++numRemoteRanges;
+        }
+        // Approximate the number of nodes that will execute locally assigned 
ranges to
+        // be the smaller of the number of locally assigned ranges and the 
number of
+        // hosts that hold block replica for those ranges.
+        int numLocalNodes = Math.min(numLocalRanges, localHostSet.size());
+        // The remote ranges are round-robined across all the impalads.
+        int numRemoteNodes = Math.min(numRemoteRanges, cluster.numNodes());
+        // The local and remote assignments may overlap, but we don't know by 
how much so
+        // conservatively assume no overlap.
+        totalNodes = Math.min(numLocalNodes + numRemoteNodes, 
cluster.numNodes());
+        // Exit early if all hosts have a scan range assignment, to avoid 
extraneous work
+        // in case the number of scan ranges dominates the number of nodes.
+        if (totalNodes == cluster.numNodes()) break;
       }
-      // This range has at least one replica with a colocated impalad, so 
assume it
-      // will be scheduled on one of those nodes.
-      if (anyLocal) {
-        ++numLocalRanges;
-      } else {
-        ++numRemoteRanges;
-      }
-      // Approximate the number of nodes that will execute locally assigned 
ranges to
-      // be the smaller of the number of locally assigned ranges and the 
number of
-      // hosts that hold block replica for those ranges.
-      int numLocalNodes = Math.min(numLocalRanges, localHostSet.size());
-      // The remote ranges are round-robined across all the impalads.
-      int numRemoteNodes = Math.min(numRemoteRanges, cluster.numNodes());
-      // The local and remote assignments may overlap, but we don't know by 
how much so
-      // conservatively assume no overlap.
-      totalNodes = Math.min(numLocalNodes + numRemoteNodes, 
cluster.numNodes());
-      // Exit early if all hosts have a scan range assignment, to avoid 
extraneous work
-      // in case the number of scan ranges dominates the number of nodes.
-      if (totalNodes == cluster.numNodes()) break;
+    }
+    // Handle the generated range specifications.
+    if (totalNodes < cluster.numNodes() && scanRangeSpecs_.isSetSplit_specs()) 
{
+      Preconditions.checkState(
+          generatedScanRangeCount_ >= scanRangeSpecs_.getSplit_specsSize());
+      numRemoteRanges += generatedScanRangeCount_;
+      totalNodes = Math.min(numRemoteRanges, cluster.numNodes());
     }
     // Tables can reside on 0 nodes (empty table), but a plan node must always 
be
     // executed on at least one node.
     numNodes_ = (cardinality == 0 || totalNodes == 0) ? 1 : totalNodes;
     if (LOG.isTraceEnabled()) {
-      LOG.trace("computeNumNodes totalRanges=" + scanRanges_.size() +
-          " localRanges=" + numLocalRanges + " remoteRanges=" + 
numRemoteRanges +
-          " localHostSet.size=" + localHostSet.size() +
-          " clusterNodes=" + cluster.numNodes());
+      LOG.trace("computeNumNodes totalRanges="
+          + (scanRangeSpecs_.getConcrete_rangesSize() + 
generatedScanRangeCount_)
+          + " localRanges=" + numLocalRanges + " remoteRanges=" + 
numRemoteRanges
+          + " localHostSet.size=" + localHostSet.size()
+          + " clusterNodes=" + cluster.numNodes());
     }
   }
 
@@ -1153,10 +1237,11 @@ public class HdfsScanNode extends ScanNode {
           maxScanRangeNumRows_ == -1 ? "unavailable" : maxScanRangeNumRows_));
       output.append("\n");
       if (numScanRangesNoDiskIds_ > 0) {
-        output.append(String.format("%smissing disk ids: " +
-            "partitions=%s/%s files=%s/%s scan ranges %s/%s\n", detailPrefix,
-            numPartitionsNoDiskIds_, numPartitions_, numFilesNoDiskIds_,
-            totalFiles_, numScanRangesNoDiskIds_, scanRanges_.size()));
+        output.append(String.format("%smissing disk ids: "
+                + "partitions=%s/%s files=%s/%s scan ranges %s/%s\n",
+            detailPrefix, numPartitionsNoDiskIds_, numPartitions_, 
numFilesNoDiskIds_,
+            totalFiles_, numScanRangesNoDiskIds_,
+            scanRangeSpecs_.getConcrete_rangesSize() + 
generatedScanRangeCount_));
       }
       // Groups the min max original conjuncts by tuple descriptor.
       output.append(getMinMaxOriginalConjunctsExplainString(detailPrefix));
@@ -1249,12 +1334,15 @@ public class HdfsScanNode extends ScanNode {
 
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
-    Preconditions.checkNotNull(scanRanges_, "Cost estimation requires scan 
ranges.");
-    if (scanRanges_.isEmpty()) {
+    Preconditions.checkNotNull(scanRangeSpecs_, "Cost estimation requires scan 
ranges.");
+    long scanRangeSize =
+        scanRangeSpecs_.getConcrete_rangesSize() + generatedScanRangeCount_;
+    if (scanRangeSize == 0) {
       nodeResourceProfile_ = ResourceProfile.noReservation(0);
       return;
     }
-    Preconditions.checkState(0 < numNodes_ && numNodes_ <= scanRanges_.size());
+
+    Preconditions.checkState(0 < numNodes_ && numNodes_ <= scanRangeSize);
     Preconditions.checkNotNull(desc_);
     Preconditions.checkState(desc_.getTable() instanceof HdfsTable);
     HdfsTable table = (HdfsTable) desc_.getTable();
@@ -1273,8 +1361,8 @@ public class HdfsScanNode extends ScanNode {
       // excluding partition columns and columns that are populated from file 
metadata.
       perHostScanRanges = columnReservations.size();
     } else {
-      perHostScanRanges = (int) Math.ceil((
-          (double) scanRanges_.size() / (double) numNodes_) * 
SCAN_RANGE_SKEW_FACTOR);
+      perHostScanRanges = (int) Math.ceil(
+          ((double) scanRangeSize / (double) numNodes_) * 
SCAN_RANGE_SKEW_FACTOR);
     }
 
     // The non-MT scan node requires at least one scanner thread.
@@ -1292,7 +1380,7 @@ public class HdfsScanNode extends ScanNode {
       }
     }
 
-    long avgScanRangeBytes = (long) Math.ceil(totalBytes_ / (double) 
scanRanges_.size());
+    long avgScanRangeBytes = (long) Math.ceil(totalBytes_ / (double) 
scanRangeSize);
     // The +1 accounts for an extra I/O buffer to read past the scan range due 
to a
     // trailing record spanning Hdfs blocks.
     long maxIoBufferSize =

http://git-wip-us.apache.org/repos/asf/impala/blob/11554a17/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index 22de3e2..f6d89ad 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -48,6 +48,7 @@ import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TScanRange;
 import org.apache.impala.thrift.TScanRangeLocation;
 import org.apache.impala.thrift.TScanRangeLocationList;
+import org.apache.impala.thrift.TScanRangeSpec;
 import org.apache.impala.util.KuduUtil;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
@@ -193,7 +194,7 @@ public class KuduScanNode extends ScanNode {
   private void computeScanRangeLocations(Analyzer analyzer,
       KuduClient client, org.apache.kudu.client.KuduTable rpcTable)
       throws ImpalaRuntimeException {
-    scanRanges_ = Lists.newArrayList();
+    scanRangeSpecs_ = new TScanRangeSpec();
 
     List<KuduScanToken> scanTokens = createScanTokens(client, rpcTable);
     for (KuduScanToken token: scanTokens) {
@@ -225,7 +226,7 @@ public class KuduScanNode extends ScanNode {
       TScanRangeLocationList locs = new TScanRangeLocationList();
       locs.setScan_range(scanRange);
       locs.locations = locations;
-      scanRanges_.add(locs);
+      scanRangeSpecs_.addToConcrete_ranges(locs);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/11554a17/fe/src/main/java/org/apache/impala/planner/ScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
index eea9c50..e23ea93 100644
--- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
@@ -26,7 +26,7 @@ import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.NotImplementedException;
 import org.apache.impala.thrift.TNetworkAddress;
-import org.apache.impala.thrift.TScanRangeLocationList;
+import org.apache.impala.thrift.TScanRangeSpec;
 import org.apache.impala.thrift.TTableStats;
 
 import com.google.common.base.Joiner;
@@ -43,8 +43,8 @@ abstract public class ScanNode extends PlanNode {
   // Total number of rows this node is expected to process
   protected long inputCardinality_ = -1;
 
-  // List of scan-range locations. Populated in init().
-  protected List<TScanRangeLocationList> scanRanges_;
+  // Scan-range specs. Populated in init().
+  protected TScanRangeSpec scanRangeSpecs_;
 
   public ScanNode(PlanNodeId id, TupleDescriptor desc, String displayName) {
     super(id, desc.getId().asList(), displayName);
@@ -79,11 +79,11 @@ abstract public class ScanNode extends PlanNode {
   }
 
   /**
-   * Returns all scan ranges plus their locations.
+   * Returns all scan range specs.
    */
-  public List<TScanRangeLocationList> getScanRangeLocations() {
-    Preconditions.checkNotNull(scanRanges_, "Need to call init() first.");
-    return scanRanges_;
+  public TScanRangeSpec getScanRangeSpecs() {
+    Preconditions.checkNotNull(scanRangeSpecs_, "Need to call init() first.");
+    return scanRangeSpecs_;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/impala/blob/11554a17/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java 
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 5e51c08..8b0d210 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -835,7 +835,7 @@ public class Frontend {
     Set<TTableName> tablesWithMissingDiskIds = Sets.newTreeSet();
     for (ScanNode scanNode: scanNodes) {
       result.putToPer_node_scan_ranges(
-          scanNode.getId().asInt(), scanNode.getScanRangeLocations());
+          scanNode.getId().asInt(), scanNode.getScanRangeSpecs());
 
       TTableName tableName = scanNode.getTupleDesc().getTableName().toThrift();
       if (scanNode.isTableMissingStats()) tablesMissingStats.add(tableName);

http://git-wip-us.apache.org/repos/asf/impala/blob/11554a17/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java 
b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index 5e6d549..54ad57f 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -66,6 +66,7 @@ import org.apache.impala.thrift.TQueryCtx;
 import org.apache.impala.thrift.TQueryExecRequest;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TScanRangeLocationList;
+import org.apache.impala.thrift.TScanRangeSpec;
 import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableSink;
 import org.apache.impala.thrift.TTupleDescriptor;
@@ -218,12 +219,12 @@ public class PlannerTestBase extends FrontendTestBase {
     Set<THdfsPartition> scanRangePartitions = Sets.newHashSet();
     for (TPlanExecInfo execInfo: execRequest.plan_exec_info) {
       if (execInfo.per_node_scan_ranges != null) {
-        for (Map.Entry<Integer, List<TScanRangeLocationList>> entry:
-             execInfo.per_node_scan_ranges.entrySet()) {
-          if (entry.getValue() == null) {
+        for (Map.Entry<Integer, TScanRangeSpec> entry :
+            execInfo.per_node_scan_ranges.entrySet()) {
+          if (entry.getValue() == null || 
!entry.getValue().isSetConcrete_ranges()) {
             continue;
           }
-          for (TScanRangeLocationList locationList: entry.getValue()) {
+          for (TScanRangeLocationList locationList : 
entry.getValue().concrete_ranges) {
             if (locationList.scan_range.isSetHdfs_file_split()) {
               THdfsFileSplit split = 
locationList.scan_range.getHdfs_file_split();
               THdfsPartition partition = findPartition(entry.getKey(), split);
@@ -270,12 +271,14 @@ public class PlannerTestBase extends FrontendTestBase {
     StringBuilder result = new StringBuilder();
     for (TPlanExecInfo execInfo: execRequest.plan_exec_info) {
       if (execInfo.per_node_scan_ranges == null) continue;
-      for (Map.Entry<Integer, List<TScanRangeLocationList>> entry:
+      for (Map.Entry<Integer, TScanRangeSpec> entry :
           execInfo.per_node_scan_ranges.entrySet()) {
         result.append("NODE " + entry.getKey().toString() + ":\n");
-        if (entry.getValue() == null) continue;
+        if (entry.getValue() == null || 
!entry.getValue().isSetConcrete_ranges()) {
+          continue;
+        }
 
-        for (TScanRangeLocationList locations: entry.getValue()) {
+        for (TScanRangeLocationList locations : 
entry.getValue().concrete_ranges) {
           // print scan range
           result.append("  ");
           if (locations.scan_range.isSetHdfs_file_split()) {

Reply via email to