Repository: incubator-impala
Updated Branches:
  refs/heads/master 1b9d9ea7c -> 0d0c93ec8


IMPALA-4117: Factor simple scheduler test code into own files

This change merely splits the helper classes from
simple-scheduler-test.cc into separate .h and .cc files. It does not
change the semantics of the code.  Whitespace formatting has been done
with git-clang-format for any changed lines.

Change-Id: Id3a6b3336db175eb095cbeb7ec623a5957b77ccc
Reviewed-on: http://gerrit.cloudera.org:8080/4486
Reviewed-by: Matthew Jacobs <m...@cloudera.com>
Reviewed-by: Dan Hecht <dhe...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d76a2b22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d76a2b22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d76a2b22

Branch: refs/heads/master
Commit: d76a2b2272ea4b2d2b9e5636c7f4e2e8da18961c
Parents: 1b9d9ea
Author: Lars Volker <l...@cloudera.com>
Authored: Wed Sep 14 17:17:51 2016 -0700
Committer: Internal Jenkins <cloudera-hud...@gerrit.cloudera.org>
Committed: Thu Sep 22 05:19:26 2016 +0000

----------------------------------------------------------------------
 be/src/scheduling/CMakeLists.txt                |   1 +
 be/src/scheduling/simple-scheduler-test-util.cc | 543 ++++++++++++
 be/src/scheduling/simple-scheduler-test-util.h  | 465 ++++++++++
 be/src/scheduling/simple-scheduler-test.cc      | 852 +------------------
 be/src/scheduling/simple-scheduler.h            |   4 +-
 5 files changed, 1015 insertions(+), 850 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d76a2b22/be/src/scheduling/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/scheduling/CMakeLists.txt b/be/src/scheduling/CMakeLists.txt
index 9cfb672..ef49a14 100644
--- a/be/src/scheduling/CMakeLists.txt
+++ b/be/src/scheduling/CMakeLists.txt
@@ -28,6 +28,7 @@ add_library(Scheduling STATIC
   backend-config.cc
   query-schedule.cc
   request-pool-service.cc
+  simple-scheduler-test-util.cc
   simple-scheduler.cc
 )
 add_dependencies(Scheduling thrift-deps)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d76a2b22/be/src/scheduling/simple-scheduler-test-util.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler-test-util.cc 
b/be/src/scheduling/simple-scheduler-test-util.cc
new file mode 100644
index 0000000..3e14ea7
--- /dev/null
+++ b/be/src/scheduling/simple-scheduler-test-util.cc
@@ -0,0 +1,543 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "simple-scheduler-test-util.h"
+#include "simple-scheduler.h"
+
+#include "common/names.h"
+
+using namespace impala;
+using namespace impala::test;
+
+/// Sample 'n' elements without replacement from the set [0..N-1].
+/// This is an implementation of "Algorithm R" by J. Vitter.
+void SampleN(int n, int N, vector<int>* out) {
+  if (n == 0) return;
+  DCHECK(n <= N);
+  out->reserve(n);
+  out->clear();
+  for (int i = 0; i < n; ++i) out->push_back(i);
+  for (int i = n; i < N; ++i) {
+    // Accept element with probability n/i. Place at random position.
+    int r = rand() % i;
+    if (r < n) (*out)[r] = i;
+  }
+}
+
+/// Sample a set of 'n' elements from 'in' without replacement and copy them to
+/// 'out'.
+template <typename T>
+void SampleNElements(int n, const vector<T>& in, vector<T>* out) {
+  vector<int> idxs;
+  SampleN(n, in.size(), &idxs);
+  DCHECK_EQ(n, idxs.size());
+  out->reserve(n);
+  for (int idx : idxs) out->push_back(in[idx]);
+}
+
+/// Define constants from simple-scheduler-test-util.h here.
+const int Cluster::BACKEND_PORT = 1000;
+const int Cluster::DATANODE_PORT = 2000;
+const string Cluster::HOSTNAME_PREFIX = "host_";
+const string Cluster::IP_PREFIX = "10";
+
+/// Default size for new blocks is 1MB.
+const int64_t Block::DEFAULT_BLOCK_SIZE = 1 << 20;
+
+int Cluster::AddHost(bool has_backend, bool has_datanode) {
+  int host_idx = hosts_.size();
+  int be_port = has_backend ? BACKEND_PORT : -1;
+  int dn_port = has_datanode ? DATANODE_PORT : -1;
+  IpAddr ip = HostIdxToIpAddr(host_idx);
+  DCHECK(ip_to_idx_.find(ip) == ip_to_idx_.end());
+  ip_to_idx_[ip] = host_idx;
+  hosts_.push_back(Host(HostIdxToHostname(host_idx), ip, be_port, dn_port));
+  // Add host to lists of backend indexes per type.
+  if (has_backend) backend_host_idxs_.push_back(host_idx);
+  if (has_datanode) {
+    datanode_host_idxs_.push_back(host_idx);
+    if (has_backend) {
+      datanode_with_backend_host_idxs_.push_back(host_idx);
+    } else {
+      datanode_only_host_idxs_.push_back(host_idx);
+    }
+  }
+  return host_idx;
+}
+
+void Cluster::AddHosts(int num_hosts, bool has_backend, bool has_datanode) {
+  for (int i = 0; i < num_hosts; ++i) AddHost(has_backend, has_datanode);
+}
+
+Hostname Cluster::HostIdxToHostname(int host_idx) {
+  return HOSTNAME_PREFIX + std::to_string(host_idx);
+}
+
+void Cluster::GetBackendAddress(int host_idx, TNetworkAddress* addr) const {
+  DCHECK_LT(host_idx, hosts_.size());
+  addr->hostname = hosts_[host_idx].ip;
+  addr->port = hosts_[host_idx].be_port;
+}
+
+const vector<int>& Cluster::datanode_with_backend_host_idxs() const {
+  return datanode_with_backend_host_idxs_;
+}
+
+const vector<int>& Cluster::datanode_only_host_idxs() const {
+  return datanode_only_host_idxs_;
+}
+
+IpAddr Cluster::HostIdxToIpAddr(int host_idx) {
+  DCHECK_LT(host_idx, (1 << 24));
+  string suffix;
+  for (int i = 0; i < 3; ++i) {
+    suffix = "." + std::to_string(host_idx % 256) + suffix; // prepend
+    host_idx /= 256;
+  }
+  DCHECK_EQ(0, host_idx);
+  return IP_PREFIX + suffix;
+}
+
+void Schema::AddSingleBlockTable(
+    const TableName& table_name, const vector<int>& 
non_cached_replica_host_idxs) {
+  AddSingleBlockTable(table_name, non_cached_replica_host_idxs, {});
+}
+
+void Schema::AddSingleBlockTable(const TableName& table_name,
+    const vector<int>& non_cached_replica_host_idxs,
+    const vector<int>& cached_replica_host_idxs) {
+  DCHECK(tables_.find(table_name) == tables_.end());
+  Block block;
+  int num_replicas =
+      non_cached_replica_host_idxs.size() + cached_replica_host_idxs.size();
+  block.replica_host_idxs = non_cached_replica_host_idxs;
+  block.replica_host_idxs.insert(block.replica_host_idxs.end(),
+      cached_replica_host_idxs.begin(), cached_replica_host_idxs.end());
+  // Initialize for non-cached replicas first.
+  block.replica_host_idx_is_cached.resize(non_cached_replica_host_idxs.size(), 
false);
+  // Fill up to final size for cached replicas.
+  block.replica_host_idx_is_cached.insert(
+      block.replica_host_idx_is_cached.end(), cached_replica_host_idxs.size(), 
true);
+  DCHECK_EQ(block.replica_host_idxs.size(), 
block.replica_host_idx_is_cached.size());
+  DCHECK_EQ(block.replica_host_idxs.size(), num_replicas);
+  // Create table
+  Table table;
+  table.blocks.push_back(block);
+  // Insert table
+  tables_.emplace(table_name, table);
+}
+
+void Schema::AddMultiBlockTable(const TableName& table_name, int num_blocks,
+    ReplicaPlacement replica_placement, int num_replicas) {
+  AddMultiBlockTable(table_name, num_blocks, replica_placement, num_replicas, 
0);
+}
+
+void Schema::AddMultiBlockTable(const TableName& table_name, int num_blocks,
+    ReplicaPlacement replica_placement, int num_replicas, int 
num_cached_replicas) {
+  DCHECK_GT(num_replicas, 0);
+  DCHECK(num_cached_replicas <= num_replicas);
+  Table table;
+  for (int i = 0; i < num_blocks; ++i) {
+    Block block;
+    vector<int>& replica_idxs = block.replica_host_idxs;
+
+    // Determine replica host indexes.
+    switch (replica_placement) {
+      case ReplicaPlacement::RANDOM:
+        SampleNElements(num_replicas, cluster_.datanode_host_idxs(), 
&replica_idxs);
+        break;
+      case ReplicaPlacement::LOCAL_ONLY:
+        DCHECK(num_replicas <= 
cluster_.datanode_with_backend_host_idxs().size());
+        SampleNElements(
+            num_replicas, cluster_.datanode_with_backend_host_idxs(), 
&replica_idxs);
+        break;
+      case ReplicaPlacement::REMOTE_ONLY:
+        DCHECK(num_replicas <= cluster_.datanode_only_host_idxs().size());
+        SampleNElements(num_replicas, cluster_.datanode_only_host_idxs(), 
&replica_idxs);
+        break;
+      default:
+        DCHECK(false) << "Unsupported replica placement: " << 
(int)replica_placement;
+    }
+
+    // Determine cached replicas.
+    vector<int> cached_replicas;
+    vector<bool>& is_cached = block.replica_host_idx_is_cached;
+    is_cached.resize(num_replicas, false);
+    SampleN(num_cached_replicas, num_replicas, &cached_replicas);
+    // Flag cached entries.
+    for (const int idx : cached_replicas) is_cached[idx] = true;
+
+    DCHECK_EQ(replica_idxs.size(), is_cached.size());
+    table.blocks.push_back(block);
+  }
+  // Insert table
+  tables_[table_name] = table;
+}
+
+const Table& Schema::GetTable(const TableName& table_name) const {
+  auto it = tables_.find(table_name);
+  DCHECK(it != tables_.end());
+  return it->second;
+}
+
+void Plan::SetReplicaPreference(TReplicaPreference::type p) {
+  query_options_.replica_preference = p;
+}
+
+const vector<TNetworkAddress>& Plan::referenced_datanodes() const {
+  return referenced_datanodes_;
+}
+
+const vector<TScanRangeLocations>& Plan::scan_range_locations() const {
+  return scan_range_locations_;
+}
+
+void Plan::AddTableScan(const TableName& table_name) {
+  const Table& table = schema_.GetTable(table_name);
+  const vector<Block>& blocks = table.blocks;
+  for (int i = 0; i < blocks.size(); ++i) {
+    const Block& block = blocks[i];
+    TScanRangeLocations scan_range_locations;
+    BuildTScanRangeLocations(table_name, block, i, &scan_range_locations);
+    scan_range_locations_.push_back(scan_range_locations);
+  }
+}
+
+void Plan::BuildTScanRangeLocations(const TableName& table_name, const Block& 
block,
+    int block_idx, TScanRangeLocations* scan_range_locations) {
+  const vector<int>& replica_idxs = block.replica_host_idxs;
+  const vector<bool>& is_cached = block.replica_host_idx_is_cached;
+  DCHECK_EQ(replica_idxs.size(), is_cached.size());
+  int num_replicas = replica_idxs.size();
+  BuildScanRange(table_name, block, block_idx, 
&scan_range_locations->scan_range);
+  scan_range_locations->locations.resize(num_replicas);
+  for (int i = 0; i < num_replicas; ++i) {
+    TScanRangeLocation& location = scan_range_locations->locations[i];
+    location.host_idx = FindOrInsertDatanodeIndex(replica_idxs[i]);
+    location.__set_is_cached(is_cached[i]);
+  }
+}
+
+void Plan::BuildScanRange(const TableName& table_name, const Block& block, int 
block_idx,
+    TScanRange* scan_range) {
+  // Initialize locations.scan_range correctly.
+  THdfsFileSplit file_split;
+  // 'length' is the only member considered by the scheduler.
+  file_split.length = block.length;
+  // Encoding the table name and block index in the file helps debugging.
+  file_split.file_name = table_name + "_block_" + std::to_string(block_idx);
+  file_split.offset = 0;
+  file_split.partition_id = 0;
+  // For now, we model each file by a single block.
+  file_split.file_length = block.length;
+  file_split.file_compression = THdfsCompression::NONE;
+  file_split.mtime = 1;
+  scan_range->__set_hdfs_file_split(file_split);
+}
+
+int Plan::FindOrInsertDatanodeIndex(int cluster_datanode_idx) {
+  const Host& host = schema_.cluster().hosts()[cluster_datanode_idx];
+  auto ret = host_idx_to_datanode_idx_.emplace(
+      cluster_datanode_idx, referenced_datanodes_.size());
+  bool inserted_new_element = ret.second;
+  if (inserted_new_element) {
+    TNetworkAddress datanode;
+    datanode.hostname = host.ip;
+    datanode.port = host.dn_port;
+    referenced_datanodes_.push_back(datanode);
+  }
+  return ret.first->second;
+}
+
+int Result::NumTotalAssignments(int host_idx) const {
+  return CountAssignmentsIf(IsHost(host_idx));
+}
+
+int Result::NumTotalAssignedBytes(int host_idx) const {
+  return CountAssignedBytesIf(IsHost(host_idx));
+}
+
+int Result::NumCachedAssignments(int host_idx) const {
+  return CountAssignmentsIf(IsCached(IsHost(host_idx)));
+}
+
+int Result::NumCachedAssignedBytes(int host_idx) const {
+  return CountAssignedBytesIf(IsCached(IsHost(host_idx)));
+}
+
+int Result::NumDiskAssignments(int host_idx) const {
+  return CountAssignmentsIf(IsDisk(IsHost(host_idx)));
+}
+
+int Result::NumDiskAssignedBytes(int host_idx) const {
+  return CountAssignedBytesIf(IsDisk(IsHost(host_idx)));
+}
+
+int Result::NumRemoteAssignments(int host_idx) const {
+  return CountAssignmentsIf(IsRemote(IsHost(host_idx)));
+}
+
+int Result::NumRemoteAssignedBytes(int host_idx) const {
+  return CountAssignedBytesIf(IsRemote(IsHost(host_idx)));
+}
+
+int Result::MaxNumAssignmentsPerHost() const {
+  NumAssignmentsPerBackend num_assignments_per_backend;
+  CountAssignmentsPerBackend(&num_assignments_per_backend);
+  int max_count = 0;
+  for (const auto& elem : num_assignments_per_backend) {
+    max_count = max(max_count, elem.second);
+  }
+  return max_count;
+}
+
+int64_t Result::MaxNumAssignedBytesPerHost() const {
+  NumAssignedBytesPerBackend num_assigned_bytes_per_backend;
+  CountAssignedBytesPerBackend(&num_assigned_bytes_per_backend);
+  int64_t max_assigned_bytes = 0;
+  for (const auto& elem : num_assigned_bytes_per_backend) {
+    max_assigned_bytes = max(max_assigned_bytes, elem.second);
+  }
+  return max_assigned_bytes;
+}
+
+int Result::MinNumAssignmentsPerHost() const {
+  NumAssignmentsPerBackend num_assignments_per_backend;
+  CountAssignmentsPerBackend(&num_assignments_per_backend);
+  int min_count = numeric_limits<int>::max();
+  for (const auto& elem : num_assignments_per_backend) {
+    min_count = min(min_count, elem.second);
+  }
+  DCHECK_GT(min_count, 0);
+  return min_count;
+}
+
+int64_t Result::MinNumAssignedBytesPerHost() const {
+  NumAssignedBytesPerBackend num_assigned_bytes_per_backend;
+  CountAssignedBytesPerBackend(&num_assigned_bytes_per_backend);
+  int64_t min_assigned_bytes = 0;
+  for (const auto& elem : num_assigned_bytes_per_backend) {
+    min_assigned_bytes = max(min_assigned_bytes, elem.second);
+  }
+  DCHECK_GT(min_assigned_bytes, 0);
+  return min_assigned_bytes;
+}
+
+int Result::NumDistinctBackends() const {
+  unordered_set<IpAddr> backends;
+  AssignmentCallback cb = [&backends](
+      const AssignmentInfo& assignment) { 
backends.insert(assignment.addr.hostname); };
+  ProcessAssignments(cb);
+  return backends.size();
+}
+
+const FragmentScanRangeAssignment& Result::GetAssignment(int index) const {
+  DCHECK_GT(assignments_.size(), index);
+  return assignments_[index];
+}
+
+FragmentScanRangeAssignment* Result::AddAssignment() {
+  assignments_.push_back(FragmentScanRangeAssignment());
+  return &assignments_.back();
+}
+
+Result::AssignmentFilter Result::Any() const {
+  return [](const AssignmentInfo& assignment) { return true; };
+}
+
+Result::AssignmentFilter Result::IsHost(int host_idx) const {
+  TNetworkAddress expected_addr;
+  plan_.cluster().GetBackendAddress(host_idx, &expected_addr);
+  return [expected_addr](
+      const AssignmentInfo& assignment) { return assignment.addr == 
expected_addr; };
+}
+
+Result::AssignmentFilter Result::IsCached(AssignmentFilter filter) const {
+  return [filter](const AssignmentInfo& assignment) {
+    return filter(assignment) && assignment.is_cached;
+  };
+}
+
+Result::AssignmentFilter Result::IsDisk(AssignmentFilter filter) const {
+  return [filter](const AssignmentInfo& assignment) {
+    return filter(assignment) && !assignment.is_cached && 
!assignment.is_remote;
+  };
+}
+
+Result::AssignmentFilter Result::IsRemote(AssignmentFilter filter) const {
+  return [filter](const AssignmentInfo& assignment) {
+    return filter(assignment) && assignment.is_remote;
+  };
+}
+
+void Result::ProcessAssignments(const AssignmentCallback& cb) const {
+  for (const FragmentScanRangeAssignment& assignment : assignments_) {
+    for (const auto& assignment_elem : assignment) {
+      const TNetworkAddress& addr = assignment_elem.first;
+      const PerNodeScanRanges& per_node_ranges = assignment_elem.second;
+      for (const auto& per_node_ranges_elem : per_node_ranges) {
+        const vector<TScanRangeParams> scan_range_params_vector =
+            per_node_ranges_elem.second;
+        for (const TScanRangeParams& scan_range_params : 
scan_range_params_vector) {
+          const TScanRange& scan_range = scan_range_params.scan_range;
+          DCHECK(scan_range.__isset.hdfs_file_split);
+          const THdfsFileSplit& hdfs_file_split = scan_range.hdfs_file_split;
+          bool is_cached =
+              scan_range_params.__isset.is_cached ? 
scan_range_params.is_cached : false;
+          bool is_remote =
+              scan_range_params.__isset.is_remote ? 
scan_range_params.is_remote : false;
+          cb({addr, hdfs_file_split, is_cached, is_remote});
+        }
+      }
+    }
+  }
+}
+
+int Result::CountAssignmentsIf(const AssignmentFilter& filter) const {
+  int count = 0;
+  AssignmentCallback cb = [&count, filter](const AssignmentInfo& assignment) {
+    if (filter(assignment)) ++count;
+  };
+  ProcessAssignments(cb);
+  return count;
+}
+
+int64_t Result::CountAssignedBytesIf(const AssignmentFilter& filter) const {
+  int64_t assigned_bytes = 0;
+  AssignmentCallback cb = [&assigned_bytes, filter](const AssignmentInfo& 
assignment) {
+    if (filter(assignment)) assigned_bytes += 
assignment.hdfs_file_split.length;
+  };
+  ProcessAssignments(cb);
+  return assigned_bytes;
+}
+
+void Result::CountAssignmentsPerBackend(
+    NumAssignmentsPerBackend* num_assignments_per_backend) const {
+  AssignmentCallback cb = [&num_assignments_per_backend](
+      const AssignmentInfo& assignment) {
+    ++(*num_assignments_per_backend)[assignment.addr.hostname];
+  };
+  ProcessAssignments(cb);
+}
+
+void Result::CountAssignedBytesPerBackend(
+    NumAssignedBytesPerBackend* num_assignments_per_backend) const {
+  AssignmentCallback cb = [&num_assignments_per_backend](
+      const AssignmentInfo& assignment) {
+    (*num_assignments_per_backend)[assignment.addr.hostname] +=
+        assignment.hdfs_file_split.length;
+  };
+  ProcessAssignments(cb);
+}
+
+SchedulerWrapper::SchedulerWrapper(const Plan& plan)
+  : plan_(plan), metrics_("TestMetrics") {
+  InitializeScheduler();
+}
+
+void SchedulerWrapper::Compute(bool exec_at_coord, Result* result) {
+  DCHECK(scheduler_ != NULL);
+
+  // Compute Assignment.
+  FragmentScanRangeAssignment* assignment = result->AddAssignment();
+  scheduler_->ComputeScanRangeAssignment(*scheduler_->GetBackendConfig(), 0, 
NULL, false,
+      plan_.scan_range_locations(), plan_.referenced_datanodes(), 
exec_at_coord,
+      plan_.query_options(), NULL, assignment);
+}
+
+void SchedulerWrapper::AddBackend(const Host& host) {
+  // Add to topic delta
+  TTopicDelta delta;
+  delta.topic_name = SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC;
+  delta.is_delta = true;
+  AddHostToTopicDelta(host, &delta);
+  SendTopicDelta(delta);
+}
+
+void SchedulerWrapper::RemoveBackend(const Host& host) {
+  // Add deletion to topic delta
+  TTopicDelta delta;
+  delta.topic_name = SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC;
+  delta.is_delta = true;
+  delta.topic_deletions.push_back(host.ip);
+  SendTopicDelta(delta);
+}
+
+void SchedulerWrapper::SendFullMembershipMap() {
+  TTopicDelta delta;
+  delta.topic_name = SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC;
+  delta.is_delta = false;
+  for (const Host& host : plan_.cluster().hosts()) {
+    if (host.be_port >= 0) AddHostToTopicDelta(host, &delta);
+  }
+  SendTopicDelta(delta);
+}
+
+void SchedulerWrapper::SendEmptyUpdate() {
+  TTopicDelta delta;
+  delta.topic_name = SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC;
+  delta.is_delta = true;
+  SendTopicDelta(delta);
+}
+
+void SchedulerWrapper::InitializeScheduler() {
+  DCHECK(scheduler_ == NULL);
+  DCHECK_GT(plan_.cluster().NumHosts(), 0) << "Cannot initialize scheduler 
with 0 "
+                                           << "hosts.";
+  const Host& scheduler_host = plan_.cluster().hosts()[0];
+  string scheduler_backend_id = scheduler_host.ip;
+  TNetworkAddress scheduler_backend_address;
+  scheduler_backend_address.hostname = scheduler_host.ip;
+  scheduler_backend_address.port = scheduler_host.be_port;
+
+  scheduler_.reset(new SimpleScheduler(
+      NULL, scheduler_backend_id, scheduler_backend_address, &metrics_, NULL, 
NULL));
+  scheduler_->Init();
+  // Initialize the scheduler backend maps.
+  SendFullMembershipMap();
+}
+
+void SchedulerWrapper::AddHostToTopicDelta(const Host& host, TTopicDelta* 
delta) const {
+  DCHECK_GT(host.be_port, 0) << "Host cannot be added to scheduler without a 
running "
+                             << "backend";
+  // Build backend descriptor.
+  TBackendDescriptor be_desc;
+  be_desc.address.hostname = host.ip;
+  be_desc.address.port = host.be_port;
+  be_desc.ip_address = host.ip;
+
+  // Build topic item.
+  TTopicItem item;
+  item.key = host.ip;
+  ThriftSerializer serializer(false);
+  Status status = serializer.Serialize(&be_desc, &item.value);
+  DCHECK(status.ok());
+
+  // Add to topic delta.
+  delta->topic_entries.push_back(item);
+}
+
+void SchedulerWrapper::SendTopicDelta(const TTopicDelta& delta) {
+  DCHECK(scheduler_ != NULL);
+  // Wrap in topic delta map.
+  StatestoreSubscriber::TopicDeltaMap delta_map;
+  delta_map.emplace(SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC, delta);
+
+  // Send to the scheduler.
+  vector<TTopicDelta> dummy_result;
+  scheduler_->UpdateMembership(delta_map, &dummy_result);
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d76a2b22/be/src/scheduling/simple-scheduler-test-util.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler-test-util.h 
b/be/src/scheduling/simple-scheduler-test-util.h
new file mode 100644
index 0000000..85bb1a5
--- /dev/null
+++ b/be/src/scheduling/simple-scheduler-test-util.h
@@ -0,0 +1,465 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include <boost/scoped_ptr.hpp>
+
+#include "gen-cpp/ImpalaInternalService.h" // for TQueryOptions
+#include "scheduling/query-schedule.h"
+#include "util/metrics.h"
+
+namespace impala {
+
+class SimpleScheduler;
+class TTopicDelta;
+
+namespace test {
+
+typedef std::string TableName;
+
+/// Helper classes to be used by the scheduler tests.
+
+/// Overall testing approach: Each test builds a list of hosts and a plan, 
both to which
+/// elements can be added using various helper methods. Then scheduling can be 
tested
+/// by instantiating SchedulerWrapper and calling Compute(...). The result can 
be verified
+/// using a set of helper methods. There are also helper methods to modify the 
internal
+/// state of the scheduler between subsequent calls to 
SchedulerWrapper::Compute().
+///
+/// The model currently comes with some known limitations:
+///
+/// - Files map 1:1 to blocks and to scan ranges.
+/// - All files have the same size (1 block of 1M). Tables that differ in size 
can be
+///   expressed as having a different number of blocks.
+/// - We don't support multiple backends on a single host.
+/// - Ports are assigned to hosts automatically and are not configurable by 
the test.
+
+// TODO: Extend the model to support files with multiple blocks.
+// TODO: Test more methods of the scheduler.
+// TODO: Add support to add skewed table scans with multiple scan ranges: 
often there are
+//       3 replicas where there may be skew for 1 of the replicas (e.g. after 
a single
+//       node insert) but the other 2 are random.
+// TODO: Make use of the metrics provided by the scheduler.
+// TODO: Add checks for MinNumAssignmentsPerHost() to all tests where 
applicable.
+// TODO: Add post-condition checks that have to hold for all successful 
scheduler runs.
+// TODO: Add possibility to explicitly specify the replica location per file.
+// TODO: Add methods to retrieve and verify generated file placements from 
plan.
+// TODO: Extend the model to specify a physical schema independently of a plan 
(ie,
+//       tables/files, blocks, replicas and cached replicas exist 
independently of the
+//       queries that run against them).
+
+/// File blocks store a list of all datanodes that have a replica of the 
block. When
+/// defining tables you can specify the desired replica placement among all 
available
+/// datanodes in the cluster.
+///
+/// - RANDOM means that any datanode can be picked.
+/// - LOCAL_ONLY means that only datanodes with a backend will be picked.
+/// - REMOTE_ONLY means that only datanodes without a backend will be picked.
+///
+/// Whether replicas will be cached or not is not determined by this value, 
but by
+/// additional function arguments when adding tables to the schema.
+enum class ReplicaPlacement {
+  RANDOM,
+  LOCAL_ONLY,
+  REMOTE_ONLY,
+};
+
+/// Host model. Each host can have either a backend, a datanode, or both. To 
specify that
+/// a host should not act as a backend or datanode specify '-1' as the 
respective port.
+struct Host {
+  Host(const Hostname& name, const IpAddr& ip, int be_port, int dn_port)
+    : name(name), ip(ip), be_port(be_port), dn_port(dn_port) {}
+  Hostname name;
+  IpAddr ip;
+  int be_port; // Backend port
+  int dn_port; // Datanode port
+};
+
+/// A cluster stores a list of hosts and provides various methods to add hosts 
to the
+/// cluster. All hosts are guaranteed to have unique IP addresses and 
hostnames.
+class Cluster {
+ public:
+  /// Add a host and return the host's index. 'hostname' and 'ip' of the new 
host will be
+  /// generated and are guaranteed to be unique.
+  int AddHost(bool has_backend, bool has_datanode);
+
+  /// Add a number of hosts with the same properties by repeatedly calling 
AddHost(..).
+  void AddHosts(int num_hosts, bool has_backend, bool has_datanode);
+
+  /// Convert a host index to a hostname.
+  static Hostname HostIdxToHostname(int host_idx);
+
+  /// Return the backend address (ip, port) for the host with index 'host_idx'.
+  void GetBackendAddress(int host_idx, TNetworkAddress* addr) const;
+
+  const std::vector<Host>& hosts() const { return hosts_; }
+  int NumHosts() const { return hosts_.size(); }
+
+  /// These methods return lists of host indexes, grouped by their type, which 
can be used
+  /// to draw samples of random sets of hosts.
+  /// TODO: Think of a nicer abstraction to expose this information.
+  const std::vector<int>& backend_host_idxs() const { return 
backend_host_idxs_; }
+  const std::vector<int>& datanode_host_idxs() const { return 
datanode_host_idxs_; }
+  const std::vector<int>& datanode_with_backend_host_idxs() const;
+  const std::vector<int>& datanode_only_host_idxs() const;
+
+ private:
+  /// Port for all backends.
+  static const int BACKEND_PORT;
+
+  /// Port for all datanodes.
+  static const int DATANODE_PORT;
+
+  /// Prefix for all generated hostnames.
+  static const std::string HOSTNAME_PREFIX;
+
+  /// First octet for all generated IP addresses.
+  static const std::string IP_PREFIX;
+
+  /// List of hosts in this cluster.
+  std::vector<Host> hosts_;
+
+  /// Lists of indexes of hosts, grouped by their type. The lists reference 
hosts in
+  /// 'hosts_' by index and are used for random sampling.
+  ///
+  /// All hosts with a backend.
+  std::vector<int> backend_host_idxs_;
+  /// All hosts with a datanode.
+  std::vector<int> datanode_host_idxs_;
+  /// All hosts with a datanode and a backend.
+  std::vector<int> datanode_with_backend_host_idxs_;
+  /// All hosts with a datanode but no backend.
+  std::vector<int> datanode_only_host_idxs_;
+
+  /// Map from IP addresses to host indexes.
+  std::unordered_map<IpAddr, int> ip_to_idx_;
+
+  /// Convert a host index to an IP address. The host index must be smaller 
than 2^24 and
+  /// will specify the lower 24 bits of the IPv4 address (the lower 3 octets).
+  static IpAddr HostIdxToIpAddr(int host_idx);
+};
+
+struct Block {
+  /// By default all blocks are of the same size.
+  int64_t length = DEFAULT_BLOCK_SIZE;
+
+  /// Index into the cluster that owns the table that owns this block.
+  std::vector<int> replica_host_idxs;
+
+  /// Flag for each entry in replica_host_idxs whether it is a cached replica 
or not.
+  std::vector<bool> replica_host_idx_is_cached;
+
+  /// Default size for new blocks.
+  static const int64_t DEFAULT_BLOCK_SIZE;
+};
+
+struct Table {
+  std::vector<Block> blocks;
+};
+
+class Schema {
+ public:
+  Schema(const Cluster& cluster) : cluster_(cluster) {}
+
+  /// Add a table consisting of a single block to the schema with explicitly 
specified
+  /// replica indexes for non-cached replicas and without any cached replicas. 
Replica
+  /// indexes must refer to hosts in cluster_.hosts() by index.
+  void AddSingleBlockTable(
+      const TableName& table_name, const std::vector<int>& 
non_cached_replica_host_idxs);
+
+  /// Add a table consisting of a single block to the schema with explicitly 
specified
+  /// replica indexes for both non-cached and cached replicas. Values in both 
lists must
+  /// refer to hosts in cluster_.hosts() by index. Both lists must be 
disjoint, i.e., a
+  /// replica can either be cached or not.
+  void AddSingleBlockTable(const TableName& table_name,
+      const std::vector<int>& non_cached_replica_host_idxs,
+      const std::vector<int>& cached_replica_host_idxs);
+
+  /// Add a table to the schema, selecting replica hosts according to the 
given replica
+  /// placement preference. All replicas will be non-cached.
+  void AddMultiBlockTable(const TableName& table_name, int num_blocks,
+      ReplicaPlacement replica_placement, int num_replicas);
+
+  /// Add a table to the schema, selecting replica hosts according to the 
given replica
+  /// placement preference. After replica selection has been done, 
'num_cached_replicas'
+  /// of them are marked as cached.
+  void AddMultiBlockTable(const TableName& table_name, int num_blocks,
+      ReplicaPlacement replica_placement, int num_replicas, int 
num_cached_replicas);
+
+  const Table& GetTable(const TableName& table_name) const;
+
+  const Cluster& cluster() const { return cluster_; }
+
+ private:
+  /// Store a reference to the cluster, from which hosts are sampled. Test 
results will
+  /// use the cluster to resolve host indexes to hostnames and IP addresses.
+  const Cluster& cluster_;
+
+  std::unordered_map<TableName, Table> tables_;
+};
+
+/// Plan model. A plan contains a list of tables to scan and the query options 
to be used
+/// during scheduling.
+class Plan {
+ public:
+  Plan(const Schema& schema) : schema_(schema) {}
+
+  const TQueryOptions& query_options() const { return query_options_; }
+
+  void SetReplicaPreference(TReplicaPreference::type p);
+
+  void SetRandomReplica(bool b) { query_options_.schedule_random_replica = b; }
+  void SetDisableCachedReads(bool b) { query_options_.disable_cached_reads = 
b; }
+  const Cluster& cluster() const { return schema_.cluster(); }
+
+  const std::vector<TNetworkAddress>& referenced_datanodes() const;
+
+  const std::vector<TScanRangeLocations>& scan_range_locations() const;
+
+  /// Add a scan of table 'table_name' to the plan. This method will populate 
the internal
+  /// list of TScanRangeLocations and can be called multiple times for the 
same table to
+  /// schedule additional scans.
+  void AddTableScan(const TableName& table_name);
+
+ private:
+  /// Store a reference to the schema, from which scanned tables will be read.
+  const Schema& schema_;
+
+  TQueryOptions query_options_;
+
+  /// List of all datanodes that are referenced by this plan. Only hosts that 
have an
+  /// assigned scan range are added here.
+  std::vector<TNetworkAddress> referenced_datanodes_;
+
+  /// Map from plan host index to an index in 'referenced_datanodes_'.
+  std::unordered_map<int, int> host_idx_to_datanode_idx_;
+
+  /// List of all scan range locations, which can be passed to the 
SimpleScheduler.
+  std::vector<TScanRangeLocations> scan_range_locations_;
+
+  /// Initialize a TScanRangeLocations object in place.
+  void BuildTScanRangeLocations(const TableName& table_name, const Block& 
block,
+      int block_idx, TScanRangeLocations* scan_range_locations);
+
+  void BuildScanRange(const TableName& table_name, const Block& block, int 
block_idx,
+      TScanRange* scan_range);
+
+  /// Look up the plan-local host index of 'cluster_datanode_idx'. If the host 
has not
+  /// been added to the plan before, it will add it to 'referenced_datanodes_' 
and return
+  /// the new index.
+  int FindOrInsertDatanodeIndex(int cluster_datanode_idx);
+};
+
+class Result {
+ private:
+  /// Map to count the number of assignments per backend.
+  typedef std::unordered_map<IpAddr, int> NumAssignmentsPerBackend;
+
+  /// Map to count the number of assigned bytes per backend.
+  typedef std::unordered_map<IpAddr, int64_t> NumAssignedBytesPerBackend;
+
+  /// Parameter type for callbacks, which are used to filter scheduling 
results.
+  struct AssignmentInfo {
+    const TNetworkAddress& addr;
+    const THdfsFileSplit& hdfs_file_split;
+    bool is_cached;
+    bool is_remote;
+  };
+
+  /// These functions are used as callbacks when processing the scheduling 
result. They
+  /// will be called once per assigned scan range.
+  typedef std::function<bool(const AssignmentInfo& assignment)> 
AssignmentFilter;
+  typedef std::function<void(const AssignmentInfo& assignment)> 
AssignmentCallback;
+
+ public:
+  Result(const Plan& plan) : plan_(plan) {}
+
+  /// Return the total number of scheduled assignments.
+  int NumTotalAssignments() const { return CountAssignmentsIf(Any()); }
+
+  /// Return the total number of assigned bytes.
+  int NumTotalAssignedBytes() const { return CountAssignedBytesIf(Any()); }
+
+  /// Return the number of scheduled assignments for a single host.
+  int NumTotalAssignments(int host_idx) const;
+
+  /// Return the number of assigned bytes for a single host.
+  int NumTotalAssignedBytes(int host_idx) const;
+
+  /// Return the total number of assigned cached reads.
+  int NumCachedAssignments() const { return 
CountAssignmentsIf(IsCached(Any())); }
+
+  /// Return the total number of assigned bytes for cached reads.
+  int NumCachedAssignedBytes() const { return 
CountAssignedBytesIf(IsCached(Any())); }
+
+  /// Return the total number of assigned cached reads for a single host.
+  int NumCachedAssignments(int host_idx) const;
+
+  /// Return the total number of assigned bytes for cached reads for a single 
host.
+  int NumCachedAssignedBytes(int host_idx) const;
+
+  /// Return the total number of assigned non-cached reads.
+  int NumDiskAssignments() const { return CountAssignmentsIf(IsDisk(Any())); }
+
+  /// Return the total number of assigned bytes for non-cached reads.
+  int NumDiskAssignedBytes() const { return 
CountAssignedBytesIf(IsDisk(Any())); }
+
+  /// Return the total number of assigned non-cached reads for a single host.
+  int NumDiskAssignments(int host_idx) const;
+
+  /// Return the total number of assigned bytes for non-cached reads for a 
single host.
+  int NumDiskAssignedBytes(int host_idx) const;
+
+  /// Return the total number of assigned remote reads.
+  int NumRemoteAssignments() const { return 
CountAssignmentsIf(IsRemote(Any())); }
+
+  /// Return the total number of assigned bytes for remote reads.
+  int NumRemoteAssignedBytes() const { return 
CountAssignedBytesIf(IsRemote(Any())); }
+
+  /// Return the total number of assigned remote reads for a single host.
+  int NumRemoteAssignments(int host_idx) const;
+
+  /// Return the total number of assigned bytes for remote reads for a single 
host.
+  int NumRemoteAssignedBytes(int host_idx) const;
+
+  /// Return the maximum number of assigned reads over all hosts.
+  int MaxNumAssignmentsPerHost() const;
+
+  /// Return the maximum number of assigned reads over all hosts.
+  int64_t MaxNumAssignedBytesPerHost() const;
+
+  /// Return the minimum number of assigned reads over all hosts.
+  /// NOTE: This is computed by traversing all recorded assignments and thus 
will not
+  /// consider hosts without any assignments. Hence the minimum value to 
expect is 1 (not
+  /// 0).
+  int MinNumAssignmentsPerHost() const;
+
+  /// Return the minimum number of assigned bytes over all hosts.
+  /// NOTE: This is computed by traversing all recorded assignments and thus 
will not
+  /// consider hosts without any assignments. Hence the minimum value to 
expect is 1 (not
+  /// 0).
+  int64_t MinNumAssignedBytesPerHost() const;
+
+  /// Return the number of scan range assignments stored in this result.
+  int NumAssignments() const { return assignments_.size(); }
+
+  /// Return the number of distinct backends that have been picked by the 
scheduler so
+  /// far.
+  int NumDistinctBackends() const;
+
+  /// Return the full assignment for manual matching.
+  const FragmentScanRangeAssignment& GetAssignment(int index = 0) const;
+
+  /// Add an assignment to the result and return a reference, which can then 
be passed on
+  /// to the scheduler.
+  FragmentScanRangeAssignment* AddAssignment();
+
+  /// Reset the result to an empty state.
+  void Reset() { assignments_.clear(); }
+
+ private:
+  /// Vector to store results of consecutive scheduler runs.
+  std::vector<FragmentScanRangeAssignment> assignments_;
+
+  /// Reference to the plan, needed to look up hosts.
+  const Plan& plan_;
+
+  /// Dummy filter matching any assignment.
+  AssignmentFilter Any() const;
+
+  /// Filter to only match assignments of a particular host.
+  AssignmentFilter IsHost(int host_idx) const;
+
+  /// Filter to only match assignments of cached reads.
+  AssignmentFilter IsCached(AssignmentFilter filter) const;
+
+  /// Filter to only match assignments of non-cached, local disk reads.
+  AssignmentFilter IsDisk(AssignmentFilter filter) const;
+
+  /// Filter to only match assignments of remote reads.
+  AssignmentFilter IsRemote(AssignmentFilter filter) const;
+
+  /// Process all recorded assignments and call the supplied callback on each 
tuple of IP
+  /// address and scan_range it iterates over.
+  void ProcessAssignments(const AssignmentCallback& cb) const;
+
+  /// Count all assignments matching the supplied filter callback.
+  int CountAssignmentsIf(const AssignmentFilter& filter) const;
+
+  /// Count all assignments matching the supplied filter callback.
+  int64_t CountAssignedBytesIf(const AssignmentFilter& filter) const;
+
+  /// Create a map containing the number of assigned scan ranges per node.
+  void CountAssignmentsPerBackend(
+      NumAssignmentsPerBackend* num_assignments_per_backend) const;
+
+  /// Create a map containing the number of assigned bytes per node.
+  void CountAssignedBytesPerBackend(
+      NumAssignedBytesPerBackend* num_assignments_per_backend) const;
+};
+
+/// This class wraps the SimpleScheduler and provides helper for easier 
instrumentation
+/// during tests.
+class SchedulerWrapper {
+ public:
+  SchedulerWrapper(const Plan& plan);
+
+  /// Call ComputeScanRangeAssignment() with exec_at_coord set to false.
+  void Compute(Result* result) { Compute(false, result); }
+
+  /// Call ComputeScanRangeAssignment().
+  void Compute(bool exec_at_coord, Result* result);
+
+  /// Reset the state of the scheduler by re-creating and initializing it.
+  void Reset() { InitializeScheduler(); }
+
+  /// Methods to modify the internal lists of backends maintained by the 
scheduler.
+
+  /// Add a backend to the scheduler.
+  void AddBackend(const Host& host);
+
+  /// Remove a backend from the scheduler.
+  void RemoveBackend(const Host& host);
+
+  /// Send a full map of the backends to the scheduler instead of deltas.
+  void SendFullMembershipMap();
+
+  /// Send an empty update message to the scheduler.
+  void SendEmptyUpdate();
+
+ private:
+  const Plan& plan_;
+  boost::scoped_ptr<SimpleScheduler> scheduler_;
+  MetricGroup metrics_;
+
+  /// Initialize the internal scheduler object. The method uses the 'real' 
constructor
+  /// used in the rest of the codebase, in contrast to the one that takes a 
list of
+  /// backends, which is only used for testing purposes. This allows us to 
properly
+  /// initialize the scheduler and exercise the UpdateMembership() method in 
tests.
+  void InitializeScheduler();
+
+  /// Add a single host to the given TTopicDelta.
+  void AddHostToTopicDelta(const Host& host, TTopicDelta* delta) const;
+
+  /// Send the given topic delta to the scheduler.
+  void SendTopicDelta(const TTopicDelta& delta);
+};
+
+} // end namespace test
+} // end namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d76a2b22/be/src/scheduling/simple-scheduler-test.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler-test.cc 
b/be/src/scheduling/simple-scheduler-test.cc
index 7116e21..d9964af 100644
--- a/be/src/scheduling/simple-scheduler-test.cc
+++ b/be/src/scheduling/simple-scheduler-test.cc
@@ -15,862 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <set>
-#include <vector>
-
-#include <boost/scoped_ptr.hpp>
-
 #include "simple-scheduler.h"
+#include "common/logging.h"
+#include "simple-scheduler-test-util.h"
 #include "testutil/gtest-util.h"
-#include "util/runtime-profile.h"
-
-#include "common/names.h"
 
 using namespace impala;
-
-DECLARE_string(pool_conf_file);
+using namespace impala::test;
 
 namespace impala {
 
-// Typedefs to make the rest of the code more readable.
-typedef string Hostname;
-typedef string IpAddr;
-typedef string TableName;
-
-/// Sample 'n' elements without replacement from the set [0..N-1].
-/// This is an implementation of "Algorithm R" by J. Vitter.
-void SampleN(int n, int N, vector<int>* out) {
-  if (n == 0) return;
-  DCHECK(n <= N);
-  out->reserve(n);
-  out->clear();
-  for (int i = 0; i < n; ++i) out->push_back(i);
-  for (int i = n; i < N; ++i) {
-    // Accept element with probability n/i. Place at random position.
-    int r = rand() % i;
-    if (r < n) (*out)[r] = i;
-  }
-}
-
-/// Sample a set of 'n' elements from 'in' without replacement and copy them to
-/// 'out'.
-template <typename T>
-void SampleNElements(int n, const vector<T>& in, vector<T>* out) {
-  vector<int> idxs;
-  SampleN(n, in.size(), &idxs);
-  DCHECK_EQ(n, idxs.size());
-  out->reserve(n);
-  for (int idx: idxs) out->push_back(in[idx]);
-}
-
-/// Helper classes to be used by the scheduler tests.
-
-/// Overall testing approach: Each test builds a list of hosts and a plan, 
both to which
-/// elements can be added using various helper methods. Then scheduling can be 
tested
-/// by instantiating SchedulerWrapper and calling Compute(...). The result can 
be verified
-/// using a set of helper methods. There are also helper methods to modify the 
internal
-/// state of the scheduler between subsequent calls to 
SchedulerWrapper::Compute().
-///
-/// The model currently comes with some known limitations:
-///
-/// - Files map 1:1 to blocks and to scan ranges.
-/// - All files have the same size (1 block of 1M). Tables that differ in size 
can be
-///   expressed as having a different number of blocks.
-/// - We don't support multiple backends on a single host.
-/// - Ports are assigned to hosts automatically and are not configurable by 
the test.
-
-// TODO: Extend the model to support files with multiple blocks.
-// TODO: Test more methods of the scheduler.
-// TODO: Add support to add skewed table scans with multiple scan ranges: 
often there are
-//       3 replicas where there may be skew for 1 of the replicas (e.g. after 
a single
-//       node insert) but the other 2 are random.
-// TODO: Make use of the metrics provided by the scheduler.
-// TODO: Add checks for MinNumAssignmentsPerHost() to all tests where 
applicable.
-// TODO: Add post-condition checks that have to hold for all successful 
scheduler runs.
-// TODO: Add possibility to explicitly specify the replica location per file.
-// TODO: Add methods to retrieve and verify generated file placements from 
plan.
-// TODO: Extend the model to specify a physical schema independently of a plan 
(ie,
-//       tables/files, blocks, replicas and cached replicas exist 
independently of the
-//       queries that run against them).
-
-/// File blocks store a list of all datanodes that have a replica of the 
block. When
-/// defining tables you can specify the desired replica placement among all 
available
-/// datanodes in the cluster.
-///
-/// - RANDOM means that any datanode can be picked.
-/// - LOCAL_ONLY means that only datanodes with a backend will be picked.
-/// - REMOTE_ONLY means that only datanodes without a backend will be picked.
-///
-/// Whether replicas will be cached or not is not determined by this value, 
but by
-/// additional function arguments when adding tables to the schema.
-enum class ReplicaPlacement {
-  RANDOM,
-  LOCAL_ONLY,
-  REMOTE_ONLY,
-};
-
-/// Host model. Each host can have either a backend, a datanode, or both. To 
specify that
-/// a host should not act as a backend or datanode specify '-1' as the 
respective port.
-struct Host {
-  Host(const Hostname& name, const IpAddr& ip, int be_port, int dn_port)
-      : name(name), ip(ip), be_port(be_port), dn_port(dn_port) {}
-  Hostname name;
-  IpAddr ip;
-  int be_port;  // Backend port
-  int dn_port;  // Datanode port
-};
-
-/// A cluster stores a list of hosts and provides various methods to add hosts 
to the
-/// cluster. All hosts are guaranteed to have unique IP addresses and 
hostnames.
-class Cluster {
- public:
-  /// Add a host and return the host's index. 'hostname' and 'ip' of the new 
host will be
-  /// generated and are guaranteed to be unique.
-  int AddHost(bool has_backend, bool has_datanode) {
-    int host_idx = hosts_.size();
-    int be_port = has_backend ? BACKEND_PORT : -1;
-    int dn_port = has_datanode ? DATANODE_PORT : -1;
-    IpAddr ip = HostIdxToIpAddr(host_idx);
-    DCHECK(ip_to_idx_.find(ip) == ip_to_idx_.end());
-    ip_to_idx_[ip] = host_idx;
-    hosts_.push_back(Host(HostIdxToHostname(host_idx), ip, be_port, dn_port));
-    // Add host to lists of backend indexes per type.
-    if (has_backend) backend_host_idxs_.push_back(host_idx);
-    if (has_datanode) {
-      datanode_host_idxs_.push_back(host_idx);
-      if (has_backend) {
-        datanode_with_backend_host_idxs_.push_back(host_idx);
-      } else {
-        datanode_only_host_idxs_.push_back(host_idx);
-      }
-    }
-    return host_idx;
-  }
-
-  /// Add a number of hosts with the same properties by repeatedly calling 
AddHost(..).
-  void AddHosts(int num_hosts, bool has_backend, bool has_datanode) {
-    for (int i = 0; i < num_hosts; ++i) AddHost(has_backend, has_datanode);
-  }
-
-  /// Convert a host index to a hostname.
-  static Hostname HostIdxToHostname(int host_idx) {
-    return HOSTNAME_PREFIX + std::to_string(host_idx);
-  }
-
-  /// Return the backend address (ip, port) for the host with index 'host_idx'.
-  void GetBackendAddress(int host_idx, TNetworkAddress* addr) const {
-    DCHECK_LT(host_idx, hosts_.size());
-    addr->hostname = hosts_[host_idx].ip;
-    addr->port = hosts_[host_idx].be_port;
-  }
-
-  const vector<Host>& hosts() const { return hosts_; }
-  int NumHosts() const { return hosts_.size(); }
-
-  /// These methods return lists of host indexes, grouped by their type, which 
can be used
-  /// to draw samples of random sets of hosts.
-  /// TODO: Think of a nicer abstraction to expose this information.
-  const vector<int>& backend_host_idxs() const { return backend_host_idxs_; }
-  const vector<int>& datanode_host_idxs() const { return datanode_host_idxs_; }
-
-  const vector<int>& datanode_with_backend_host_idxs() const {
-    return datanode_with_backend_host_idxs_;
-  }
-
-  const vector<int>& datanode_only_host_idxs() const { return 
datanode_only_host_idxs_; }
-
- private:
-  /// Port for all backends.
-  static const int BACKEND_PORT;
-
-  /// Port for all datanodes.
-  static const int DATANODE_PORT;
-
-  /// Prefix for all generated hostnames.
-  static const string HOSTNAME_PREFIX;
-
-  /// First octet for all generated IP addresses.
-  static const string IP_PREFIX;
-
-  /// List of hosts in this cluster.
-  vector<Host> hosts_;
-
-  /// Lists of indexes of hosts, grouped by their type. The lists reference 
hosts in
-  /// 'hosts_' by index and are used for random sampling.
-  ///
-  /// All hosts with a backend.
-  vector<int> backend_host_idxs_;
-  /// All hosts with a datanode.
-  vector<int> datanode_host_idxs_;
-  /// All hosts with a datanode and a backend.
-  vector<int> datanode_with_backend_host_idxs_;
-  /// All hosts with a datanode but no backend.
-  vector<int> datanode_only_host_idxs_;
-
-  /// Map from IP addresses to host indexes.
-  unordered_map<IpAddr, int> ip_to_idx_;
-
-  /// Convert a host index to an IP address. The host index must be smaller 
than 2^24 and
-  /// will specify the lower 24 bits of the IPv4 address (the lower 3 octets).
-  static IpAddr HostIdxToIpAddr(int host_idx) {
-    DCHECK_LT(host_idx, (1 << 24));
-    string suffix;
-    for (int i = 0; i < 3; ++i) {
-      suffix = "." + std::to_string(host_idx % 256) + suffix;  // prepend
-      host_idx /= 256;
-    }
-    DCHECK_EQ(0, host_idx);
-    return IP_PREFIX + suffix;
-  }
-};
-
-const int Cluster::BACKEND_PORT = 1000;
-const int Cluster::DATANODE_PORT = 2000;
-const string Cluster::HOSTNAME_PREFIX = "host_";
-const string Cluster::IP_PREFIX = "10";
-
-struct Block {
-  /// By default all blocks are of the same size.
-  int64_t length = DEFAULT_BLOCK_SIZE;
-
-  /// Index into the cluster that owns the table that owns this block.
-  vector<int> replica_host_idxs;
-
-  /// Flag for each entry in replica_host_idxs whether it is a cached replica 
or not.
-  vector<bool> replica_host_idx_is_cached;
-
-  /// Default size for new blocks.
-  static const int64_t DEFAULT_BLOCK_SIZE;
-};
-/// Default size for new blocks is 1MB.
-const int64_t Block::DEFAULT_BLOCK_SIZE = 1 << 20;
-
-struct Table {
-  vector<Block> blocks;
-};
-
-class Schema {
- public:
-  Schema(const Cluster& cluster) : cluster_(cluster) {}
-
-  /// Add a table consisting of a single block to the schema with explicitly 
specified
-  /// replica indexes for non-cached replicas and without any cached replicas. 
Replica
-  /// indexes must refer to hosts in cluster_.hosts() by index.
-  void AddSingleBlockTable(const TableName& table_name,
-      const vector<int>& non_cached_replica_host_idxs) {
-    AddSingleBlockTable(table_name, non_cached_replica_host_idxs, {});
-  }
-
-  /// Add a table consisting of a single block to the schema with explicitly 
specified
-  /// replica indexes for both non-cached and cached replicas. Values in both 
lists must
-  /// refer to hosts in cluster_.hosts() by index. Both lists must be 
disjoint, i.e., a
-  /// replica can either be cached or not.
-  void AddSingleBlockTable(const TableName& table_name,
-      const vector<int>& non_cached_replica_host_idxs,
-      const vector<int>& cached_replica_host_idxs) {
-    DCHECK(tables_.find(table_name) == tables_.end());
-    Block block;
-    int num_replicas = non_cached_replica_host_idxs.size() +
-      cached_replica_host_idxs.size();
-    block.replica_host_idxs = non_cached_replica_host_idxs;
-    block.replica_host_idxs.insert(block.replica_host_idxs.end(),
-        cached_replica_host_idxs.begin(), cached_replica_host_idxs.end());
-    // Initialize for non-cached replicas first.
-    
block.replica_host_idx_is_cached.resize(non_cached_replica_host_idxs.size(), 
false);
-    // Fill up to final size for cached replicas.
-    
block.replica_host_idx_is_cached.insert(block.replica_host_idx_is_cached.end(),
-        cached_replica_host_idxs.size(), true);
-    DCHECK_EQ(block.replica_host_idxs.size(), 
block.replica_host_idx_is_cached.size());
-    DCHECK_EQ(block.replica_host_idxs.size(), num_replicas);
-    // Create table
-    Table table;
-    table.blocks.push_back(block);
-    // Insert table
-    tables_.emplace(table_name, table);
-  }
-
-  /// Add a table to the schema, selecting replica hosts according to the 
given replica
-  /// placement preference. All replicas will be non-cached.
-  void AddMultiBlockTable(const TableName& table_name, int num_blocks,
-      ReplicaPlacement replica_placement, int num_replicas) {
-    AddMultiBlockTable(table_name, num_blocks, replica_placement, 
num_replicas, 0);
-  }
-
-  /// Add a table to the schema, selecting replica hosts according to the 
given replica
-  /// placement preference. After replica selection has been done, 
'num_cached_replicas'
-  /// of them are marked as cached.
-  void AddMultiBlockTable(const TableName& table_name, int num_blocks,
-      ReplicaPlacement replica_placement, int num_replicas, int 
num_cached_replicas) {
-    DCHECK_GT(num_replicas, 0);
-    DCHECK(num_cached_replicas <= num_replicas);
-    Table table;
-    for (int i = 0; i < num_blocks; ++i) {
-      Block block;
-      vector<int>& replica_idxs = block.replica_host_idxs;
-
-      // Determine replica host indexes.
-      switch (replica_placement) {
-        case ReplicaPlacement::RANDOM:
-          SampleNElements(num_replicas, cluster_.datanode_host_idxs(), 
&replica_idxs);
-          break;
-        case ReplicaPlacement::LOCAL_ONLY:
-          DCHECK(num_replicas <= 
cluster_.datanode_with_backend_host_idxs().size());
-          SampleNElements(num_replicas, 
cluster_.datanode_with_backend_host_idxs(),
-              &replica_idxs);
-          break;
-        case ReplicaPlacement::REMOTE_ONLY:
-          DCHECK(num_replicas <= cluster_.datanode_only_host_idxs().size());
-          SampleNElements(num_replicas, cluster_.datanode_only_host_idxs(),
-              &replica_idxs);
-          break;
-        default:
-          DCHECK(false) << "Unsupported replica placement: "
-              << (int)replica_placement;
-      }
-
-      // Determine cached replicas.
-      vector<int> cached_replicas;
-      vector<bool>& is_cached = block.replica_host_idx_is_cached;
-      is_cached.resize(num_replicas, false);
-      SampleN(num_cached_replicas, num_replicas, &cached_replicas);
-      // Flag cached entries.
-      for (const int idx: cached_replicas) is_cached[idx] = true;
-
-      DCHECK_EQ(replica_idxs.size(), is_cached.size());
-      table.blocks.push_back(block);
-    }
-    // Insert table
-    tables_[table_name] = table;
-  }
-
-  const Table& GetTable(const TableName& table_name) const {
-    auto it = tables_.find(table_name);
-    DCHECK(it != tables_.end());
-    return it->second;
-  }
-
-  const Cluster& cluster() const { return cluster_; }
-
- private:
-  /// Store a reference to the cluster, from which hosts are sampled. Test 
results will
-  /// use the cluster to resolve host indexes to hostnames and IP addresses.
-  const Cluster& cluster_;
-
-  unordered_map<TableName, Table> tables_;
-};
-
-/// Plan model. A plan contains a list of tables to scan and the query options 
to be used
-/// during scheduling.
-class Plan {
- public:
-  Plan(const Schema& schema) : schema_(schema) {}
-
-  const TQueryOptions& query_options() const { return query_options_; }
-
-  void SetReplicaPreference(TReplicaPreference::type p) {
-    query_options_.replica_preference = p;
-  }
-
-  void SetRandomReplica(bool b) { query_options_.schedule_random_replica = b; }
-  void SetDisableCachedReads(bool b) { query_options_.disable_cached_reads = 
b; }
-  const Cluster& cluster() const { return schema_.cluster(); }
-
-  const vector<TNetworkAddress>& referenced_datanodes() const {
-    return referenced_datanodes_;
-  }
-
-  const vector<TScanRangeLocations>& scan_range_locations() const {
-    return scan_range_locations_;
-  }
-
-  /// Add a scan of table 'table_name' to the plan. This method will populate 
the internal
-  /// list of TScanRangeLocations and can be called multiple times for the 
same table to
-  /// schedule additional scans.
-  void AddTableScan(const TableName& table_name) {
-    const Table& table = schema_.GetTable(table_name);
-    const vector<Block>& blocks = table.blocks;
-    for (int i = 0; i < blocks.size(); ++i) {
-      const Block& block = blocks[i];
-      TScanRangeLocations scan_range_locations;
-      BuildTScanRangeLocations(table_name, block, i, &scan_range_locations);
-      scan_range_locations_.push_back(scan_range_locations);
-    }
-  }
-
- private:
-  /// Store a reference to the schema, from which scanned tables will be read.
-  const Schema& schema_;
-
-  TQueryOptions query_options_;
-
-  /// List of all datanodes that are referenced by this plan. Only hosts that 
have an
-  /// assigned scan range are added here.
-  vector<TNetworkAddress> referenced_datanodes_;
-
-  /// Map from plan host index to an index in 'referenced_datanodes_'.
-  boost::unordered_map<int, int> host_idx_to_datanode_idx_;
-
-  /// List of all scan range locations, which can be passed to the 
SimpleScheduler.
-  vector<TScanRangeLocations> scan_range_locations_;
-
-  /// Initialize a TScanRangeLocations object in place.
-  void BuildTScanRangeLocations(const TableName& table_name, const Block& 
block,
-      int block_idx, TScanRangeLocations* scan_range_locations) {
-    const vector<int>& replica_idxs = block.replica_host_idxs;
-    const vector<bool>& is_cached = block.replica_host_idx_is_cached;
-    DCHECK_EQ(replica_idxs.size(), is_cached.size());
-    int num_replicas = replica_idxs.size();
-    BuildScanRange(table_name, block, block_idx, 
&scan_range_locations->scan_range);
-    scan_range_locations->locations.resize(num_replicas);
-    for (int i = 0; i < num_replicas; ++i) {
-      TScanRangeLocation& location = scan_range_locations->locations[i];
-      location.host_idx = FindOrInsertDatanodeIndex(replica_idxs[i]);
-      location.__set_is_cached(is_cached[i]);
-    }
-  }
-
-  void BuildScanRange(const TableName& table_name, const Block& block, int 
block_idx,
-      TScanRange* scan_range) {
-    // Initialize locations.scan_range correctly.
-    THdfsFileSplit file_split;
-    // 'length' is the only member considered by the scheduler.
-    file_split.length = block.length;
-    // Encoding the table name and block index in the file helps debugging.
-    file_split.file_name = table_name + "_block_" + std::to_string(block_idx);
-    file_split.offset = 0;
-    file_split.partition_id = 0;
-    // For now, we model each file by a single block.
-    file_split.file_length = block.length;
-    file_split.file_compression = THdfsCompression::NONE;
-    file_split.mtime = 1;
-    scan_range->__set_hdfs_file_split(file_split);
-  }
-
-  /// Look up the plan-local host index of 'cluster_datanode_idx'. If the host 
has not
-  /// been added to the plan before, it will add it to 'referenced_datanodes_' 
and return
-  /// the new index.
-  int FindOrInsertDatanodeIndex(int cluster_datanode_idx) {
-    const Host& host = schema_.cluster().hosts()[cluster_datanode_idx];
-    auto ret = host_idx_to_datanode_idx_.emplace(
-        cluster_datanode_idx, referenced_datanodes_.size());
-    bool inserted_new_element = ret.second;
-    if (inserted_new_element) {
-      TNetworkAddress datanode;
-      datanode.hostname = host.ip;
-      datanode.port = host.dn_port;
-      referenced_datanodes_.push_back(datanode);
-    }
-    return ret.first->second;
-  }
-};
-
-class Result {
- private:
-  /// Map to count the number of assignments per backend.
-  typedef unordered_map<IpAddr, int> NumAssignmentsPerBackend;
-
-  /// Map to count the number of assigned bytes per backend.
-  typedef unordered_map<IpAddr, int64_t> NumAssignedBytesPerBackend;
-
-  /// Parameter type for callbacks, which are used to filter scheduling 
results.
-  struct AssignmentInfo {
-    const TNetworkAddress& addr;
-    const THdfsFileSplit& hdfs_file_split;
-    bool is_cached;
-    bool is_remote;
-  };
-
-  /// These functions are used as callbacks when processing the scheduling 
result. They
-  /// will be called once per assigned scan range.
-  typedef std::function<bool(const AssignmentInfo& assignment)> 
AssignmentFilter;
-  typedef std::function<void(const AssignmentInfo& assignment)> 
AssignmentCallback;
-
- public:
-  Result(const Plan& plan) : plan_(plan) {}
-
-  /// Return the total number of scheduled assignments.
-  int NumTotalAssignments() const { return CountAssignmentsIf(Any()); }
-
-  /// Return the total number of assigned bytes.
-  int NumTotalAssignedBytes() const { return CountAssignedBytesIf(Any()); }
-
-  /// Return the number of scheduled assignments for a single host.
-  int NumTotalAssignments(int host_idx) const {
-    return CountAssignmentsIf(IsHost(host_idx));
-  }
-
-  /// Return the number of assigned bytes for a single host.
-  int NumTotalAssignedBytes(int host_idx) const {
-    return CountAssignedBytesIf(IsHost(host_idx));
-  }
-
-  /// Return the total number of assigned cached reads.
-  int NumCachedAssignments() const { return 
CountAssignmentsIf(IsCached(Any())); }
-
-  /// Return the total number of assigned bytes for cached reads.
-  int NumCachedAssignedBytes() const { return 
CountAssignedBytesIf(IsCached(Any())); }
-
-  /// Return the total number of assigned cached reads for a single host.
-  int NumCachedAssignments(int host_idx) const {
-    return CountAssignmentsIf(IsCached(IsHost(host_idx)));
-  }
-
-  /// Return the total number of assigned bytes for cached reads for a single 
host.
-  int NumCachedAssignedBytes(int host_idx) const {
-    return CountAssignedBytesIf(IsCached(IsHost(host_idx)));
-  }
-
-  /// Return the total number of assigned non-cached reads.
-  int NumDiskAssignments() const { return CountAssignmentsIf(IsDisk(Any())); }
-
-  /// Return the total number of assigned bytes for non-cached reads.
-  int NumDiskAssignedBytes() const { return 
CountAssignedBytesIf(IsDisk(Any())); }
-
-  /// Return the total number of assigned non-cached reads for a single host.
-  int NumDiskAssignments(int host_idx) const {
-    return CountAssignmentsIf(IsDisk(IsHost(host_idx)));
-  }
-
-  /// Return the total number of assigned bytes for non-cached reads for a 
single host.
-  int NumDiskAssignedBytes(int host_idx) const {
-    return CountAssignedBytesIf(IsDisk(IsHost(host_idx)));
-  }
-
-  /// Return the total number of assigned remote reads.
-  int NumRemoteAssignments() const { return 
CountAssignmentsIf(IsRemote(Any())); }
-
-  /// Return the total number of assigned bytes for remote reads.
-  int NumRemoteAssignedBytes() const { return 
CountAssignedBytesIf(IsRemote(Any())); }
-
-  /// Return the total number of assigned remote reads for a single host.
-  int NumRemoteAssignments(int host_idx) const {
-    return CountAssignmentsIf(IsRemote(IsHost(host_idx)));
-  }
-
-  /// Return the total number of assigned bytes for remote reads for a single 
host.
-  int NumRemoteAssignedBytes(int host_idx) const {
-    return CountAssignedBytesIf(IsRemote(IsHost(host_idx)));
-  }
-
-  /// Return the maximum number of assigned reads over all hosts.
-  int MaxNumAssignmentsPerHost() const {
-    NumAssignmentsPerBackend num_assignments_per_backend;
-    CountAssignmentsPerBackend(&num_assignments_per_backend);
-    int max_count = 0;
-    for (const auto& elem: num_assignments_per_backend) {
-      max_count = max(max_count, elem.second);
-    }
-    return max_count;
-  }
-
-  /// Return the maximum number of assigned reads over all hosts.
-  int64_t MaxNumAssignedBytesPerHost() const {
-    NumAssignedBytesPerBackend num_assigned_bytes_per_backend;
-    CountAssignedBytesPerBackend(&num_assigned_bytes_per_backend);
-    int64_t max_assigned_bytes = 0;
-    for (const auto& elem: num_assigned_bytes_per_backend) {
-      max_assigned_bytes = max(max_assigned_bytes, elem.second);
-    }
-    return max_assigned_bytes;
-  }
-
-  /// Return the minimum number of assigned reads over all hosts.
-  /// NOTE: This is computed by traversing all recorded assignments and thus 
will not
-  /// consider hosts without any assignments. Hence the minimum value to 
expect is 1 (not
-  /// 0).
-  int MinNumAssignmentsPerHost() const {
-    NumAssignmentsPerBackend num_assignments_per_backend;
-    CountAssignmentsPerBackend(&num_assignments_per_backend);
-    int min_count = numeric_limits<int>::max();
-    for (const auto& elem: num_assignments_per_backend) {
-      min_count = min(min_count, elem.second);
-    }
-    DCHECK_GT(min_count, 0);
-    return min_count;
-  }
-
-  /// Return the minimum number of assigned bytes over all hosts.
-  /// NOTE: This is computed by traversing all recorded assignments and thus 
will not
-  /// consider hosts without any assignments. Hence the minimum value to 
expect is 1 (not
-  /// 0).
-  int64_t MinNumAssignedBytesPerHost() const {
-    NumAssignedBytesPerBackend num_assigned_bytes_per_backend;
-    CountAssignedBytesPerBackend(&num_assigned_bytes_per_backend);
-    int64_t min_assigned_bytes = 0;
-    for (const auto& elem: num_assigned_bytes_per_backend) {
-      min_assigned_bytes = max(min_assigned_bytes, elem.second);
-    }
-    DCHECK_GT(min_assigned_bytes, 0);
-    return min_assigned_bytes;
-  }
-
-  /// Return the number of scan range assignments stored in this result.
-  int NumAssignments() const { return assignments_.size(); }
-
-  /// Return the number of distinct backends that have been picked by the 
scheduler so
-  /// far.
-  int NumDistinctBackends() const {
-    unordered_set<IpAddr> backends;
-    AssignmentCallback cb = [&backends](const AssignmentInfo& assignment) {
-      backends.insert(assignment.addr.hostname);
-    };
-    ProcessAssignments(cb);
-    return backends.size();
-  }
-
-  /// Return the full assignment for manual matching.
-  const FragmentScanRangeAssignment& GetAssignment(int index = 0) const {
-    DCHECK_GT(assignments_.size(), index);
-    return assignments_[index];
-  }
-
-  /// Add an assignment to the result and return a reference, which can then 
be passed on
-  /// to the scheduler.
-  FragmentScanRangeAssignment* AddAssignment() {
-    assignments_.push_back(FragmentScanRangeAssignment());
-    return &assignments_.back();
-  }
-
-  /// Reset the result to an empty state.
-  void Reset() { assignments_.clear(); }
-
- private:
-  /// Vector to store results of consecutive scheduler runs.
-  vector<FragmentScanRangeAssignment> assignments_;
-
-  /// Reference to the plan, needed to look up hosts.
-  const Plan& plan_;
-
-  /// Dummy filter matching any assignment.
-  AssignmentFilter Any() const {
-    return [](const AssignmentInfo& assignment) { return true; };
-  }
-
-  /// Filter to only match assignments of a particular host.
-  AssignmentFilter IsHost(int host_idx) const {
-    TNetworkAddress expected_addr;
-    plan_.cluster().GetBackendAddress(host_idx, &expected_addr);
-    return [expected_addr](const AssignmentInfo& assignment) {
-      return assignment.addr == expected_addr;
-    };
-  }
-
-  /// Filter to only match assignments of cached reads.
-  AssignmentFilter IsCached(AssignmentFilter filter) const {
-    return [filter](const AssignmentInfo& assignment) {
-        return filter(assignment) && assignment.is_cached;
-    };
-  }
-
-  /// Filter to only match assignments of non-cached, local disk reads.
-  AssignmentFilter IsDisk(AssignmentFilter filter) const {
-    return [filter](const AssignmentInfo& assignment) {
-        return filter(assignment) && !assignment.is_cached && 
!assignment.is_remote;
-    };
-  }
-
-  /// Filter to only match assignments of remote reads.
-  AssignmentFilter IsRemote(AssignmentFilter filter) const {
-    return [filter](const AssignmentInfo& assignment) {
-        return filter(assignment) && assignment.is_remote;
-    };
-  }
-
-  /// Process all recorded assignments and call the supplied callback on each 
tuple of IP
-  /// address and scan_range it iterates over.
-  void ProcessAssignments(const AssignmentCallback& cb) const {
-    for (const FragmentScanRangeAssignment& assignment: assignments_) {
-      for (const auto& assignment_elem: assignment) {
-        const TNetworkAddress& addr = assignment_elem.first;
-        const PerNodeScanRanges& per_node_ranges = assignment_elem.second;
-        for (const auto& per_node_ranges_elem: per_node_ranges) {
-          const vector<TScanRangeParams> scan_range_params_vector =
-              per_node_ranges_elem.second;
-          for (const TScanRangeParams& scan_range_params: 
scan_range_params_vector) {
-            const TScanRange& scan_range = scan_range_params.scan_range;
-            DCHECK(scan_range.__isset.hdfs_file_split);
-            const THdfsFileSplit& hdfs_file_split = scan_range.hdfs_file_split;
-            bool is_cached = scan_range_params.__isset.is_cached
-                ? scan_range_params.is_cached : false;
-            bool is_remote = scan_range_params.__isset.is_remote
-                ? scan_range_params.is_remote : false;
-            cb({addr, hdfs_file_split, is_cached, is_remote});
-          }
-        }
-      }
-    }
-  }
-
-  /// Count all assignments matching the supplied filter callback.
-  int CountAssignmentsIf(const AssignmentFilter& filter) const {
-    int count = 0;
-    AssignmentCallback cb = [&count, filter](const AssignmentInfo& assignment) 
{
-      if (filter(assignment)) ++count;
-    };
-    ProcessAssignments(cb);
-    return count;
-  }
-
-  /// Count all assignments matching the supplied filter callback.
-  int64_t CountAssignedBytesIf(const AssignmentFilter& filter) const {
-    int64_t assigned_bytes = 0;
-    AssignmentCallback cb = [&assigned_bytes, filter](const AssignmentInfo& 
assignment) {
-      if (filter(assignment)) assigned_bytes += 
assignment.hdfs_file_split.length;
-    };
-    ProcessAssignments(cb);
-    return assigned_bytes;
-  }
-
-  /// Create a map containing the number of assigned scan ranges per node.
-  void CountAssignmentsPerBackend(
-      NumAssignmentsPerBackend* num_assignments_per_backend) const {
-    AssignmentCallback cb = [&num_assignments_per_backend](
-        const AssignmentInfo& assignment) {
-      ++(*num_assignments_per_backend)[assignment.addr.hostname];
-    };
-    ProcessAssignments(cb);
-  }
-
-  /// Create a map containing the number of assigned bytes per node.
-  void CountAssignedBytesPerBackend(
-      NumAssignedBytesPerBackend* num_assignments_per_backend) const {
-    AssignmentCallback cb = [&num_assignments_per_backend](
-        const AssignmentInfo& assignment) {
-      (*num_assignments_per_backend)[assignment.addr.hostname] +=
-        assignment.hdfs_file_split.length;
-    };
-    ProcessAssignments(cb);
-  }
-};
-
-/// This class wraps the SimpleScheduler and provides helper for easier 
instrumentation
-/// during tests.
-class SchedulerWrapper {
- public:
-  SchedulerWrapper(const Plan& plan) : plan_(plan), metrics_("TestMetrics") {
-    InitializeScheduler();
-  }
-
-  /// Call ComputeScanRangeAssignment() with exec_at_coord set to false.
-  void Compute(Result* result) {
-    Compute(false, result);
-  }
-
-  /// Call ComputeScanRangeAssignment().
-  void Compute(bool exec_at_coord, Result* result) {
-    DCHECK(scheduler_ != NULL);
-
-    // Compute Assignment.
-    FragmentScanRangeAssignment* assignment = result->AddAssignment();
-    scheduler_->ComputeScanRangeAssignment(*scheduler_->GetBackendConfig(), 0, 
NULL,
-        false, plan_.scan_range_locations(), plan_.referenced_datanodes(), 
exec_at_coord,
-        plan_.query_options(), NULL, assignment);
-  }
-
-  /// Reset the state of the scheduler by re-creating and initializing it.
-  void Reset() { InitializeScheduler(); }
-
-  /// Methods to modify the internal lists of backends maintained by the 
scheduler.
-
-  /// Add a backend to the scheduler.
-  void AddBackend(const Host& host) {
-    // Add to topic delta
-    TTopicDelta delta;
-    delta.topic_name = SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC;
-    delta.is_delta = true;
-    AddHostToTopicDelta(host, &delta);
-    SendTopicDelta(delta);
-  }
-
-  /// Remove a backend from the scheduler.
-  void RemoveBackend(const Host& host) {
-    // Add deletion to topic delta
-    TTopicDelta delta;
-    delta.topic_name = SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC;
-    delta.is_delta = true;
-    delta.topic_deletions.push_back(host.ip);
-    SendTopicDelta(delta);
-  }
-
-  /// Send a full map of the backends to the scheduler instead of deltas.
-  void SendFullMembershipMap() {
-    TTopicDelta delta;
-    delta.topic_name = SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC;
-    delta.is_delta = false;
-    for (const Host& host: plan_.cluster().hosts()) {
-      if (host.be_port >= 0) AddHostToTopicDelta(host, &delta);
-    }
-    SendTopicDelta(delta);
-  }
-
-  /// Send an empty update message to the scheduler.
-  void SendEmptyUpdate() {
-    TTopicDelta delta;
-    delta.topic_name = SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC;
-    delta.is_delta = true;
-    SendTopicDelta(delta);
-  }
-
- private:
-  const Plan& plan_;
-  boost::scoped_ptr<SimpleScheduler> scheduler_;
-  MetricGroup metrics_;
-
-  /// Initialize the internal scheduler object. The method uses the 'real' 
constructor
-  /// used in the rest of the codebase, in contrast to the one that takes a 
list of
-  /// backends, which is only used for testing purposes. This allows us to 
properly
-  /// initialize the scheduler and exercise the UpdateMembership() method in 
tests.
-  void InitializeScheduler() {
-    DCHECK(scheduler_ == NULL);
-    DCHECK_GT(plan_.cluster().NumHosts(), 0) << "Cannot initialize scheduler 
with 0 "
-        << "hosts.";
-    const Host& scheduler_host = plan_.cluster().hosts()[0];
-    string scheduler_backend_id = scheduler_host.ip;
-    TNetworkAddress scheduler_backend_address;
-    scheduler_backend_address.hostname = scheduler_host.ip;
-    scheduler_backend_address.port = scheduler_host.be_port;
-
-    scheduler_.reset(new SimpleScheduler(
-        NULL, scheduler_backend_id, scheduler_backend_address, &metrics_, 
NULL, NULL));
-    scheduler_->Init();
-    // Initialize the scheduler backend maps.
-    SendFullMembershipMap();
-  }
-
-  /// Add a single host to the given TTopicDelta.
-  void AddHostToTopicDelta(const Host& host, TTopicDelta* delta) const {
-    DCHECK_GT(host.be_port, 0) << "Host cannot be added to scheduler without a 
running "
-      << "backend";
-    // Build backend descriptor.
-    TBackendDescriptor be_desc;
-    be_desc.address.hostname = host.ip;
-    be_desc.address.port = host.be_port;
-    be_desc.ip_address = host.ip;
-
-    // Build topic item.
-    TTopicItem item;
-    item.key = host.ip;
-    ThriftSerializer serializer(false);
-    Status status = serializer.Serialize(&be_desc, &item.value);
-    DCHECK(status.ok());
-
-    // Add to topic delta.
-    delta->topic_entries.push_back(item);
-  }
-
-  /// Send the given topic delta to the scheduler.
-  void SendTopicDelta(const TTopicDelta& delta) {
-    DCHECK(scheduler_ != NULL);
-    // Wrap in topic delta map.
-    StatestoreSubscriber::TopicDeltaMap delta_map;
-    delta_map.emplace(SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC, delta);
-
-    // Send to the scheduler.
-    vector<TTopicDelta> dummy_result;
-    scheduler_->UpdateMembership(delta_map, &dummy_result);
-  }
-};
-
 class SchedulerTest : public testing::Test {
  protected:
   SchedulerTest() { srand(0); };

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d76a2b22/be/src/scheduling/simple-scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.h 
b/be/src/scheduling/simple-scheduler.h
index 8c96dde..9b51269 100644
--- a/be/src/scheduling/simple-scheduler.h
+++ b/be/src/scheduling/simple-scheduler.h
@@ -42,7 +42,9 @@ namespace impala {
 
 class Coordinator;
 
+namespace test {
 class SchedulerWrapper;
+}
 
 /// Performs simple scheduling by matching between a list of backends 
configured
 /// either from the statestore, or from a static list of addresses, and a list
@@ -434,7 +436,7 @@ class SimpleScheduler : public Scheduler {
   int FindSenderFragment(TPlanNodeId exch_id, int fragment_idx,
       const TQueryExecRequest& exec_request);
 
-  friend class impala::SchedulerWrapper;
+  friend class impala::test::SchedulerWrapper;
   FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentDeterministicNonCached);
   FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentRandomNonCached);
   FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentRandomDiskLocal);

Reply via email to