Repository: incubator-impala Updated Branches: refs/heads/master 1b9d9ea7c -> 0d0c93ec8
IMPALA-4117: Factor simple scheduler test code into own files This change merely splits the helper classes from simple-scheduler-test.cc into separate .h and .cc files. It does not change the semantics of the code. Whitespace formatting has been done with git-clang-format for any changed lines. Change-Id: Id3a6b3336db175eb095cbeb7ec623a5957b77ccc Reviewed-on: http://gerrit.cloudera.org:8080/4486 Reviewed-by: Matthew Jacobs <m...@cloudera.com> Reviewed-by: Dan Hecht <dhe...@cloudera.com> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d76a2b22 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d76a2b22 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d76a2b22 Branch: refs/heads/master Commit: d76a2b2272ea4b2d2b9e5636c7f4e2e8da18961c Parents: 1b9d9ea Author: Lars Volker <l...@cloudera.com> Authored: Wed Sep 14 17:17:51 2016 -0700 Committer: Internal Jenkins <cloudera-hud...@gerrit.cloudera.org> Committed: Thu Sep 22 05:19:26 2016 +0000 ---------------------------------------------------------------------- be/src/scheduling/CMakeLists.txt | 1 + be/src/scheduling/simple-scheduler-test-util.cc | 543 ++++++++++++ be/src/scheduling/simple-scheduler-test-util.h | 465 ++++++++++ be/src/scheduling/simple-scheduler-test.cc | 852 +------------------ be/src/scheduling/simple-scheduler.h | 4 +- 5 files changed, 1015 insertions(+), 850 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d76a2b22/be/src/scheduling/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/scheduling/CMakeLists.txt b/be/src/scheduling/CMakeLists.txt index 9cfb672..ef49a14 100644 --- a/be/src/scheduling/CMakeLists.txt +++ b/be/src/scheduling/CMakeLists.txt @@ -28,6 +28,7 @@ add_library(Scheduling STATIC backend-config.cc query-schedule.cc request-pool-service.cc + simple-scheduler-test-util.cc simple-scheduler.cc ) add_dependencies(Scheduling thrift-deps) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d76a2b22/be/src/scheduling/simple-scheduler-test-util.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/simple-scheduler-test-util.cc b/be/src/scheduling/simple-scheduler-test-util.cc new file mode 100644 index 0000000..3e14ea7 --- /dev/null +++ b/be/src/scheduling/simple-scheduler-test-util.cc @@ -0,0 +1,543 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "simple-scheduler-test-util.h" +#include "simple-scheduler.h" + +#include "common/names.h" + +using namespace impala; +using namespace impala::test; + +/// Sample 'n' elements without replacement from the set [0..N-1]. +/// This is an implementation of "Algorithm R" by J. Vitter. +void SampleN(int n, int N, vector<int>* out) { + if (n == 0) return; + DCHECK(n <= N); + out->reserve(n); + out->clear(); + for (int i = 0; i < n; ++i) out->push_back(i); + for (int i = n; i < N; ++i) { + // Accept element with probability n/i. Place at random position. + int r = rand() % i; + if (r < n) (*out)[r] = i; + } +} + +/// Sample a set of 'n' elements from 'in' without replacement and copy them to +/// 'out'. +template <typename T> +void SampleNElements(int n, const vector<T>& in, vector<T>* out) { + vector<int> idxs; + SampleN(n, in.size(), &idxs); + DCHECK_EQ(n, idxs.size()); + out->reserve(n); + for (int idx : idxs) out->push_back(in[idx]); +} + +/// Define constants from simple-scheduler-test-util.h here. +const int Cluster::BACKEND_PORT = 1000; +const int Cluster::DATANODE_PORT = 2000; +const string Cluster::HOSTNAME_PREFIX = "host_"; +const string Cluster::IP_PREFIX = "10"; + +/// Default size for new blocks is 1MB. +const int64_t Block::DEFAULT_BLOCK_SIZE = 1 << 20; + +int Cluster::AddHost(bool has_backend, bool has_datanode) { + int host_idx = hosts_.size(); + int be_port = has_backend ? BACKEND_PORT : -1; + int dn_port = has_datanode ? DATANODE_PORT : -1; + IpAddr ip = HostIdxToIpAddr(host_idx); + DCHECK(ip_to_idx_.find(ip) == ip_to_idx_.end()); + ip_to_idx_[ip] = host_idx; + hosts_.push_back(Host(HostIdxToHostname(host_idx), ip, be_port, dn_port)); + // Add host to lists of backend indexes per type. + if (has_backend) backend_host_idxs_.push_back(host_idx); + if (has_datanode) { + datanode_host_idxs_.push_back(host_idx); + if (has_backend) { + datanode_with_backend_host_idxs_.push_back(host_idx); + } else { + datanode_only_host_idxs_.push_back(host_idx); + } + } + return host_idx; +} + +void Cluster::AddHosts(int num_hosts, bool has_backend, bool has_datanode) { + for (int i = 0; i < num_hosts; ++i) AddHost(has_backend, has_datanode); +} + +Hostname Cluster::HostIdxToHostname(int host_idx) { + return HOSTNAME_PREFIX + std::to_string(host_idx); +} + +void Cluster::GetBackendAddress(int host_idx, TNetworkAddress* addr) const { + DCHECK_LT(host_idx, hosts_.size()); + addr->hostname = hosts_[host_idx].ip; + addr->port = hosts_[host_idx].be_port; +} + +const vector<int>& Cluster::datanode_with_backend_host_idxs() const { + return datanode_with_backend_host_idxs_; +} + +const vector<int>& Cluster::datanode_only_host_idxs() const { + return datanode_only_host_idxs_; +} + +IpAddr Cluster::HostIdxToIpAddr(int host_idx) { + DCHECK_LT(host_idx, (1 << 24)); + string suffix; + for (int i = 0; i < 3; ++i) { + suffix = "." + std::to_string(host_idx % 256) + suffix; // prepend + host_idx /= 256; + } + DCHECK_EQ(0, host_idx); + return IP_PREFIX + suffix; +} + +void Schema::AddSingleBlockTable( + const TableName& table_name, const vector<int>& non_cached_replica_host_idxs) { + AddSingleBlockTable(table_name, non_cached_replica_host_idxs, {}); +} + +void Schema::AddSingleBlockTable(const TableName& table_name, + const vector<int>& non_cached_replica_host_idxs, + const vector<int>& cached_replica_host_idxs) { + DCHECK(tables_.find(table_name) == tables_.end()); + Block block; + int num_replicas = + non_cached_replica_host_idxs.size() + cached_replica_host_idxs.size(); + block.replica_host_idxs = non_cached_replica_host_idxs; + block.replica_host_idxs.insert(block.replica_host_idxs.end(), + cached_replica_host_idxs.begin(), cached_replica_host_idxs.end()); + // Initialize for non-cached replicas first. + block.replica_host_idx_is_cached.resize(non_cached_replica_host_idxs.size(), false); + // Fill up to final size for cached replicas. + block.replica_host_idx_is_cached.insert( + block.replica_host_idx_is_cached.end(), cached_replica_host_idxs.size(), true); + DCHECK_EQ(block.replica_host_idxs.size(), block.replica_host_idx_is_cached.size()); + DCHECK_EQ(block.replica_host_idxs.size(), num_replicas); + // Create table + Table table; + table.blocks.push_back(block); + // Insert table + tables_.emplace(table_name, table); +} + +void Schema::AddMultiBlockTable(const TableName& table_name, int num_blocks, + ReplicaPlacement replica_placement, int num_replicas) { + AddMultiBlockTable(table_name, num_blocks, replica_placement, num_replicas, 0); +} + +void Schema::AddMultiBlockTable(const TableName& table_name, int num_blocks, + ReplicaPlacement replica_placement, int num_replicas, int num_cached_replicas) { + DCHECK_GT(num_replicas, 0); + DCHECK(num_cached_replicas <= num_replicas); + Table table; + for (int i = 0; i < num_blocks; ++i) { + Block block; + vector<int>& replica_idxs = block.replica_host_idxs; + + // Determine replica host indexes. + switch (replica_placement) { + case ReplicaPlacement::RANDOM: + SampleNElements(num_replicas, cluster_.datanode_host_idxs(), &replica_idxs); + break; + case ReplicaPlacement::LOCAL_ONLY: + DCHECK(num_replicas <= cluster_.datanode_with_backend_host_idxs().size()); + SampleNElements( + num_replicas, cluster_.datanode_with_backend_host_idxs(), &replica_idxs); + break; + case ReplicaPlacement::REMOTE_ONLY: + DCHECK(num_replicas <= cluster_.datanode_only_host_idxs().size()); + SampleNElements(num_replicas, cluster_.datanode_only_host_idxs(), &replica_idxs); + break; + default: + DCHECK(false) << "Unsupported replica placement: " << (int)replica_placement; + } + + // Determine cached replicas. + vector<int> cached_replicas; + vector<bool>& is_cached = block.replica_host_idx_is_cached; + is_cached.resize(num_replicas, false); + SampleN(num_cached_replicas, num_replicas, &cached_replicas); + // Flag cached entries. + for (const int idx : cached_replicas) is_cached[idx] = true; + + DCHECK_EQ(replica_idxs.size(), is_cached.size()); + table.blocks.push_back(block); + } + // Insert table + tables_[table_name] = table; +} + +const Table& Schema::GetTable(const TableName& table_name) const { + auto it = tables_.find(table_name); + DCHECK(it != tables_.end()); + return it->second; +} + +void Plan::SetReplicaPreference(TReplicaPreference::type p) { + query_options_.replica_preference = p; +} + +const vector<TNetworkAddress>& Plan::referenced_datanodes() const { + return referenced_datanodes_; +} + +const vector<TScanRangeLocations>& Plan::scan_range_locations() const { + return scan_range_locations_; +} + +void Plan::AddTableScan(const TableName& table_name) { + const Table& table = schema_.GetTable(table_name); + const vector<Block>& blocks = table.blocks; + for (int i = 0; i < blocks.size(); ++i) { + const Block& block = blocks[i]; + TScanRangeLocations scan_range_locations; + BuildTScanRangeLocations(table_name, block, i, &scan_range_locations); + scan_range_locations_.push_back(scan_range_locations); + } +} + +void Plan::BuildTScanRangeLocations(const TableName& table_name, const Block& block, + int block_idx, TScanRangeLocations* scan_range_locations) { + const vector<int>& replica_idxs = block.replica_host_idxs; + const vector<bool>& is_cached = block.replica_host_idx_is_cached; + DCHECK_EQ(replica_idxs.size(), is_cached.size()); + int num_replicas = replica_idxs.size(); + BuildScanRange(table_name, block, block_idx, &scan_range_locations->scan_range); + scan_range_locations->locations.resize(num_replicas); + for (int i = 0; i < num_replicas; ++i) { + TScanRangeLocation& location = scan_range_locations->locations[i]; + location.host_idx = FindOrInsertDatanodeIndex(replica_idxs[i]); + location.__set_is_cached(is_cached[i]); + } +} + +void Plan::BuildScanRange(const TableName& table_name, const Block& block, int block_idx, + TScanRange* scan_range) { + // Initialize locations.scan_range correctly. + THdfsFileSplit file_split; + // 'length' is the only member considered by the scheduler. + file_split.length = block.length; + // Encoding the table name and block index in the file helps debugging. + file_split.file_name = table_name + "_block_" + std::to_string(block_idx); + file_split.offset = 0; + file_split.partition_id = 0; + // For now, we model each file by a single block. + file_split.file_length = block.length; + file_split.file_compression = THdfsCompression::NONE; + file_split.mtime = 1; + scan_range->__set_hdfs_file_split(file_split); +} + +int Plan::FindOrInsertDatanodeIndex(int cluster_datanode_idx) { + const Host& host = schema_.cluster().hosts()[cluster_datanode_idx]; + auto ret = host_idx_to_datanode_idx_.emplace( + cluster_datanode_idx, referenced_datanodes_.size()); + bool inserted_new_element = ret.second; + if (inserted_new_element) { + TNetworkAddress datanode; + datanode.hostname = host.ip; + datanode.port = host.dn_port; + referenced_datanodes_.push_back(datanode); + } + return ret.first->second; +} + +int Result::NumTotalAssignments(int host_idx) const { + return CountAssignmentsIf(IsHost(host_idx)); +} + +int Result::NumTotalAssignedBytes(int host_idx) const { + return CountAssignedBytesIf(IsHost(host_idx)); +} + +int Result::NumCachedAssignments(int host_idx) const { + return CountAssignmentsIf(IsCached(IsHost(host_idx))); +} + +int Result::NumCachedAssignedBytes(int host_idx) const { + return CountAssignedBytesIf(IsCached(IsHost(host_idx))); +} + +int Result::NumDiskAssignments(int host_idx) const { + return CountAssignmentsIf(IsDisk(IsHost(host_idx))); +} + +int Result::NumDiskAssignedBytes(int host_idx) const { + return CountAssignedBytesIf(IsDisk(IsHost(host_idx))); +} + +int Result::NumRemoteAssignments(int host_idx) const { + return CountAssignmentsIf(IsRemote(IsHost(host_idx))); +} + +int Result::NumRemoteAssignedBytes(int host_idx) const { + return CountAssignedBytesIf(IsRemote(IsHost(host_idx))); +} + +int Result::MaxNumAssignmentsPerHost() const { + NumAssignmentsPerBackend num_assignments_per_backend; + CountAssignmentsPerBackend(&num_assignments_per_backend); + int max_count = 0; + for (const auto& elem : num_assignments_per_backend) { + max_count = max(max_count, elem.second); + } + return max_count; +} + +int64_t Result::MaxNumAssignedBytesPerHost() const { + NumAssignedBytesPerBackend num_assigned_bytes_per_backend; + CountAssignedBytesPerBackend(&num_assigned_bytes_per_backend); + int64_t max_assigned_bytes = 0; + for (const auto& elem : num_assigned_bytes_per_backend) { + max_assigned_bytes = max(max_assigned_bytes, elem.second); + } + return max_assigned_bytes; +} + +int Result::MinNumAssignmentsPerHost() const { + NumAssignmentsPerBackend num_assignments_per_backend; + CountAssignmentsPerBackend(&num_assignments_per_backend); + int min_count = numeric_limits<int>::max(); + for (const auto& elem : num_assignments_per_backend) { + min_count = min(min_count, elem.second); + } + DCHECK_GT(min_count, 0); + return min_count; +} + +int64_t Result::MinNumAssignedBytesPerHost() const { + NumAssignedBytesPerBackend num_assigned_bytes_per_backend; + CountAssignedBytesPerBackend(&num_assigned_bytes_per_backend); + int64_t min_assigned_bytes = 0; + for (const auto& elem : num_assigned_bytes_per_backend) { + min_assigned_bytes = max(min_assigned_bytes, elem.second); + } + DCHECK_GT(min_assigned_bytes, 0); + return min_assigned_bytes; +} + +int Result::NumDistinctBackends() const { + unordered_set<IpAddr> backends; + AssignmentCallback cb = [&backends]( + const AssignmentInfo& assignment) { backends.insert(assignment.addr.hostname); }; + ProcessAssignments(cb); + return backends.size(); +} + +const FragmentScanRangeAssignment& Result::GetAssignment(int index) const { + DCHECK_GT(assignments_.size(), index); + return assignments_[index]; +} + +FragmentScanRangeAssignment* Result::AddAssignment() { + assignments_.push_back(FragmentScanRangeAssignment()); + return &assignments_.back(); +} + +Result::AssignmentFilter Result::Any() const { + return [](const AssignmentInfo& assignment) { return true; }; +} + +Result::AssignmentFilter Result::IsHost(int host_idx) const { + TNetworkAddress expected_addr; + plan_.cluster().GetBackendAddress(host_idx, &expected_addr); + return [expected_addr]( + const AssignmentInfo& assignment) { return assignment.addr == expected_addr; }; +} + +Result::AssignmentFilter Result::IsCached(AssignmentFilter filter) const { + return [filter](const AssignmentInfo& assignment) { + return filter(assignment) && assignment.is_cached; + }; +} + +Result::AssignmentFilter Result::IsDisk(AssignmentFilter filter) const { + return [filter](const AssignmentInfo& assignment) { + return filter(assignment) && !assignment.is_cached && !assignment.is_remote; + }; +} + +Result::AssignmentFilter Result::IsRemote(AssignmentFilter filter) const { + return [filter](const AssignmentInfo& assignment) { + return filter(assignment) && assignment.is_remote; + }; +} + +void Result::ProcessAssignments(const AssignmentCallback& cb) const { + for (const FragmentScanRangeAssignment& assignment : assignments_) { + for (const auto& assignment_elem : assignment) { + const TNetworkAddress& addr = assignment_elem.first; + const PerNodeScanRanges& per_node_ranges = assignment_elem.second; + for (const auto& per_node_ranges_elem : per_node_ranges) { + const vector<TScanRangeParams> scan_range_params_vector = + per_node_ranges_elem.second; + for (const TScanRangeParams& scan_range_params : scan_range_params_vector) { + const TScanRange& scan_range = scan_range_params.scan_range; + DCHECK(scan_range.__isset.hdfs_file_split); + const THdfsFileSplit& hdfs_file_split = scan_range.hdfs_file_split; + bool is_cached = + scan_range_params.__isset.is_cached ? scan_range_params.is_cached : false; + bool is_remote = + scan_range_params.__isset.is_remote ? scan_range_params.is_remote : false; + cb({addr, hdfs_file_split, is_cached, is_remote}); + } + } + } + } +} + +int Result::CountAssignmentsIf(const AssignmentFilter& filter) const { + int count = 0; + AssignmentCallback cb = [&count, filter](const AssignmentInfo& assignment) { + if (filter(assignment)) ++count; + }; + ProcessAssignments(cb); + return count; +} + +int64_t Result::CountAssignedBytesIf(const AssignmentFilter& filter) const { + int64_t assigned_bytes = 0; + AssignmentCallback cb = [&assigned_bytes, filter](const AssignmentInfo& assignment) { + if (filter(assignment)) assigned_bytes += assignment.hdfs_file_split.length; + }; + ProcessAssignments(cb); + return assigned_bytes; +} + +void Result::CountAssignmentsPerBackend( + NumAssignmentsPerBackend* num_assignments_per_backend) const { + AssignmentCallback cb = [&num_assignments_per_backend]( + const AssignmentInfo& assignment) { + ++(*num_assignments_per_backend)[assignment.addr.hostname]; + }; + ProcessAssignments(cb); +} + +void Result::CountAssignedBytesPerBackend( + NumAssignedBytesPerBackend* num_assignments_per_backend) const { + AssignmentCallback cb = [&num_assignments_per_backend]( + const AssignmentInfo& assignment) { + (*num_assignments_per_backend)[assignment.addr.hostname] += + assignment.hdfs_file_split.length; + }; + ProcessAssignments(cb); +} + +SchedulerWrapper::SchedulerWrapper(const Plan& plan) + : plan_(plan), metrics_("TestMetrics") { + InitializeScheduler(); +} + +void SchedulerWrapper::Compute(bool exec_at_coord, Result* result) { + DCHECK(scheduler_ != NULL); + + // Compute Assignment. + FragmentScanRangeAssignment* assignment = result->AddAssignment(); + scheduler_->ComputeScanRangeAssignment(*scheduler_->GetBackendConfig(), 0, NULL, false, + plan_.scan_range_locations(), plan_.referenced_datanodes(), exec_at_coord, + plan_.query_options(), NULL, assignment); +} + +void SchedulerWrapper::AddBackend(const Host& host) { + // Add to topic delta + TTopicDelta delta; + delta.topic_name = SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC; + delta.is_delta = true; + AddHostToTopicDelta(host, &delta); + SendTopicDelta(delta); +} + +void SchedulerWrapper::RemoveBackend(const Host& host) { + // Add deletion to topic delta + TTopicDelta delta; + delta.topic_name = SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC; + delta.is_delta = true; + delta.topic_deletions.push_back(host.ip); + SendTopicDelta(delta); +} + +void SchedulerWrapper::SendFullMembershipMap() { + TTopicDelta delta; + delta.topic_name = SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC; + delta.is_delta = false; + for (const Host& host : plan_.cluster().hosts()) { + if (host.be_port >= 0) AddHostToTopicDelta(host, &delta); + } + SendTopicDelta(delta); +} + +void SchedulerWrapper::SendEmptyUpdate() { + TTopicDelta delta; + delta.topic_name = SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC; + delta.is_delta = true; + SendTopicDelta(delta); +} + +void SchedulerWrapper::InitializeScheduler() { + DCHECK(scheduler_ == NULL); + DCHECK_GT(plan_.cluster().NumHosts(), 0) << "Cannot initialize scheduler with 0 " + << "hosts."; + const Host& scheduler_host = plan_.cluster().hosts()[0]; + string scheduler_backend_id = scheduler_host.ip; + TNetworkAddress scheduler_backend_address; + scheduler_backend_address.hostname = scheduler_host.ip; + scheduler_backend_address.port = scheduler_host.be_port; + + scheduler_.reset(new SimpleScheduler( + NULL, scheduler_backend_id, scheduler_backend_address, &metrics_, NULL, NULL)); + scheduler_->Init(); + // Initialize the scheduler backend maps. + SendFullMembershipMap(); +} + +void SchedulerWrapper::AddHostToTopicDelta(const Host& host, TTopicDelta* delta) const { + DCHECK_GT(host.be_port, 0) << "Host cannot be added to scheduler without a running " + << "backend"; + // Build backend descriptor. + TBackendDescriptor be_desc; + be_desc.address.hostname = host.ip; + be_desc.address.port = host.be_port; + be_desc.ip_address = host.ip; + + // Build topic item. + TTopicItem item; + item.key = host.ip; + ThriftSerializer serializer(false); + Status status = serializer.Serialize(&be_desc, &item.value); + DCHECK(status.ok()); + + // Add to topic delta. + delta->topic_entries.push_back(item); +} + +void SchedulerWrapper::SendTopicDelta(const TTopicDelta& delta) { + DCHECK(scheduler_ != NULL); + // Wrap in topic delta map. + StatestoreSubscriber::TopicDeltaMap delta_map; + delta_map.emplace(SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC, delta); + + // Send to the scheduler. + vector<TTopicDelta> dummy_result; + scheduler_->UpdateMembership(delta_map, &dummy_result); +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d76a2b22/be/src/scheduling/simple-scheduler-test-util.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/simple-scheduler-test-util.h b/be/src/scheduling/simple-scheduler-test-util.h new file mode 100644 index 0000000..85bb1a5 --- /dev/null +++ b/be/src/scheduling/simple-scheduler-test-util.h @@ -0,0 +1,465 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <string> +#include <unordered_map> +#include <vector> + +#include <boost/scoped_ptr.hpp> + +#include "gen-cpp/ImpalaInternalService.h" // for TQueryOptions +#include "scheduling/query-schedule.h" +#include "util/metrics.h" + +namespace impala { + +class SimpleScheduler; +class TTopicDelta; + +namespace test { + +typedef std::string TableName; + +/// Helper classes to be used by the scheduler tests. + +/// Overall testing approach: Each test builds a list of hosts and a plan, both to which +/// elements can be added using various helper methods. Then scheduling can be tested +/// by instantiating SchedulerWrapper and calling Compute(...). The result can be verified +/// using a set of helper methods. There are also helper methods to modify the internal +/// state of the scheduler between subsequent calls to SchedulerWrapper::Compute(). +/// +/// The model currently comes with some known limitations: +/// +/// - Files map 1:1 to blocks and to scan ranges. +/// - All files have the same size (1 block of 1M). Tables that differ in size can be +/// expressed as having a different number of blocks. +/// - We don't support multiple backends on a single host. +/// - Ports are assigned to hosts automatically and are not configurable by the test. + +// TODO: Extend the model to support files with multiple blocks. +// TODO: Test more methods of the scheduler. +// TODO: Add support to add skewed table scans with multiple scan ranges: often there are +// 3 replicas where there may be skew for 1 of the replicas (e.g. after a single +// node insert) but the other 2 are random. +// TODO: Make use of the metrics provided by the scheduler. +// TODO: Add checks for MinNumAssignmentsPerHost() to all tests where applicable. +// TODO: Add post-condition checks that have to hold for all successful scheduler runs. +// TODO: Add possibility to explicitly specify the replica location per file. +// TODO: Add methods to retrieve and verify generated file placements from plan. +// TODO: Extend the model to specify a physical schema independently of a plan (ie, +// tables/files, blocks, replicas and cached replicas exist independently of the +// queries that run against them). + +/// File blocks store a list of all datanodes that have a replica of the block. When +/// defining tables you can specify the desired replica placement among all available +/// datanodes in the cluster. +/// +/// - RANDOM means that any datanode can be picked. +/// - LOCAL_ONLY means that only datanodes with a backend will be picked. +/// - REMOTE_ONLY means that only datanodes without a backend will be picked. +/// +/// Whether replicas will be cached or not is not determined by this value, but by +/// additional function arguments when adding tables to the schema. +enum class ReplicaPlacement { + RANDOM, + LOCAL_ONLY, + REMOTE_ONLY, +}; + +/// Host model. Each host can have either a backend, a datanode, or both. To specify that +/// a host should not act as a backend or datanode specify '-1' as the respective port. +struct Host { + Host(const Hostname& name, const IpAddr& ip, int be_port, int dn_port) + : name(name), ip(ip), be_port(be_port), dn_port(dn_port) {} + Hostname name; + IpAddr ip; + int be_port; // Backend port + int dn_port; // Datanode port +}; + +/// A cluster stores a list of hosts and provides various methods to add hosts to the +/// cluster. All hosts are guaranteed to have unique IP addresses and hostnames. +class Cluster { + public: + /// Add a host and return the host's index. 'hostname' and 'ip' of the new host will be + /// generated and are guaranteed to be unique. + int AddHost(bool has_backend, bool has_datanode); + + /// Add a number of hosts with the same properties by repeatedly calling AddHost(..). + void AddHosts(int num_hosts, bool has_backend, bool has_datanode); + + /// Convert a host index to a hostname. + static Hostname HostIdxToHostname(int host_idx); + + /// Return the backend address (ip, port) for the host with index 'host_idx'. + void GetBackendAddress(int host_idx, TNetworkAddress* addr) const; + + const std::vector<Host>& hosts() const { return hosts_; } + int NumHosts() const { return hosts_.size(); } + + /// These methods return lists of host indexes, grouped by their type, which can be used + /// to draw samples of random sets of hosts. + /// TODO: Think of a nicer abstraction to expose this information. + const std::vector<int>& backend_host_idxs() const { return backend_host_idxs_; } + const std::vector<int>& datanode_host_idxs() const { return datanode_host_idxs_; } + const std::vector<int>& datanode_with_backend_host_idxs() const; + const std::vector<int>& datanode_only_host_idxs() const; + + private: + /// Port for all backends. + static const int BACKEND_PORT; + + /// Port for all datanodes. + static const int DATANODE_PORT; + + /// Prefix for all generated hostnames. + static const std::string HOSTNAME_PREFIX; + + /// First octet for all generated IP addresses. + static const std::string IP_PREFIX; + + /// List of hosts in this cluster. + std::vector<Host> hosts_; + + /// Lists of indexes of hosts, grouped by their type. The lists reference hosts in + /// 'hosts_' by index and are used for random sampling. + /// + /// All hosts with a backend. + std::vector<int> backend_host_idxs_; + /// All hosts with a datanode. + std::vector<int> datanode_host_idxs_; + /// All hosts with a datanode and a backend. + std::vector<int> datanode_with_backend_host_idxs_; + /// All hosts with a datanode but no backend. + std::vector<int> datanode_only_host_idxs_; + + /// Map from IP addresses to host indexes. + std::unordered_map<IpAddr, int> ip_to_idx_; + + /// Convert a host index to an IP address. The host index must be smaller than 2^24 and + /// will specify the lower 24 bits of the IPv4 address (the lower 3 octets). + static IpAddr HostIdxToIpAddr(int host_idx); +}; + +struct Block { + /// By default all blocks are of the same size. + int64_t length = DEFAULT_BLOCK_SIZE; + + /// Index into the cluster that owns the table that owns this block. + std::vector<int> replica_host_idxs; + + /// Flag for each entry in replica_host_idxs whether it is a cached replica or not. + std::vector<bool> replica_host_idx_is_cached; + + /// Default size for new blocks. + static const int64_t DEFAULT_BLOCK_SIZE; +}; + +struct Table { + std::vector<Block> blocks; +}; + +class Schema { + public: + Schema(const Cluster& cluster) : cluster_(cluster) {} + + /// Add a table consisting of a single block to the schema with explicitly specified + /// replica indexes for non-cached replicas and without any cached replicas. Replica + /// indexes must refer to hosts in cluster_.hosts() by index. + void AddSingleBlockTable( + const TableName& table_name, const std::vector<int>& non_cached_replica_host_idxs); + + /// Add a table consisting of a single block to the schema with explicitly specified + /// replica indexes for both non-cached and cached replicas. Values in both lists must + /// refer to hosts in cluster_.hosts() by index. Both lists must be disjoint, i.e., a + /// replica can either be cached or not. + void AddSingleBlockTable(const TableName& table_name, + const std::vector<int>& non_cached_replica_host_idxs, + const std::vector<int>& cached_replica_host_idxs); + + /// Add a table to the schema, selecting replica hosts according to the given replica + /// placement preference. All replicas will be non-cached. + void AddMultiBlockTable(const TableName& table_name, int num_blocks, + ReplicaPlacement replica_placement, int num_replicas); + + /// Add a table to the schema, selecting replica hosts according to the given replica + /// placement preference. After replica selection has been done, 'num_cached_replicas' + /// of them are marked as cached. + void AddMultiBlockTable(const TableName& table_name, int num_blocks, + ReplicaPlacement replica_placement, int num_replicas, int num_cached_replicas); + + const Table& GetTable(const TableName& table_name) const; + + const Cluster& cluster() const { return cluster_; } + + private: + /// Store a reference to the cluster, from which hosts are sampled. Test results will + /// use the cluster to resolve host indexes to hostnames and IP addresses. + const Cluster& cluster_; + + std::unordered_map<TableName, Table> tables_; +}; + +/// Plan model. A plan contains a list of tables to scan and the query options to be used +/// during scheduling. +class Plan { + public: + Plan(const Schema& schema) : schema_(schema) {} + + const TQueryOptions& query_options() const { return query_options_; } + + void SetReplicaPreference(TReplicaPreference::type p); + + void SetRandomReplica(bool b) { query_options_.schedule_random_replica = b; } + void SetDisableCachedReads(bool b) { query_options_.disable_cached_reads = b; } + const Cluster& cluster() const { return schema_.cluster(); } + + const std::vector<TNetworkAddress>& referenced_datanodes() const; + + const std::vector<TScanRangeLocations>& scan_range_locations() const; + + /// Add a scan of table 'table_name' to the plan. This method will populate the internal + /// list of TScanRangeLocations and can be called multiple times for the same table to + /// schedule additional scans. + void AddTableScan(const TableName& table_name); + + private: + /// Store a reference to the schema, from which scanned tables will be read. + const Schema& schema_; + + TQueryOptions query_options_; + + /// List of all datanodes that are referenced by this plan. Only hosts that have an + /// assigned scan range are added here. + std::vector<TNetworkAddress> referenced_datanodes_; + + /// Map from plan host index to an index in 'referenced_datanodes_'. + std::unordered_map<int, int> host_idx_to_datanode_idx_; + + /// List of all scan range locations, which can be passed to the SimpleScheduler. + std::vector<TScanRangeLocations> scan_range_locations_; + + /// Initialize a TScanRangeLocations object in place. + void BuildTScanRangeLocations(const TableName& table_name, const Block& block, + int block_idx, TScanRangeLocations* scan_range_locations); + + void BuildScanRange(const TableName& table_name, const Block& block, int block_idx, + TScanRange* scan_range); + + /// Look up the plan-local host index of 'cluster_datanode_idx'. If the host has not + /// been added to the plan before, it will add it to 'referenced_datanodes_' and return + /// the new index. + int FindOrInsertDatanodeIndex(int cluster_datanode_idx); +}; + +class Result { + private: + /// Map to count the number of assignments per backend. + typedef std::unordered_map<IpAddr, int> NumAssignmentsPerBackend; + + /// Map to count the number of assigned bytes per backend. + typedef std::unordered_map<IpAddr, int64_t> NumAssignedBytesPerBackend; + + /// Parameter type for callbacks, which are used to filter scheduling results. + struct AssignmentInfo { + const TNetworkAddress& addr; + const THdfsFileSplit& hdfs_file_split; + bool is_cached; + bool is_remote; + }; + + /// These functions are used as callbacks when processing the scheduling result. They + /// will be called once per assigned scan range. + typedef std::function<bool(const AssignmentInfo& assignment)> AssignmentFilter; + typedef std::function<void(const AssignmentInfo& assignment)> AssignmentCallback; + + public: + Result(const Plan& plan) : plan_(plan) {} + + /// Return the total number of scheduled assignments. + int NumTotalAssignments() const { return CountAssignmentsIf(Any()); } + + /// Return the total number of assigned bytes. + int NumTotalAssignedBytes() const { return CountAssignedBytesIf(Any()); } + + /// Return the number of scheduled assignments for a single host. + int NumTotalAssignments(int host_idx) const; + + /// Return the number of assigned bytes for a single host. + int NumTotalAssignedBytes(int host_idx) const; + + /// Return the total number of assigned cached reads. + int NumCachedAssignments() const { return CountAssignmentsIf(IsCached(Any())); } + + /// Return the total number of assigned bytes for cached reads. + int NumCachedAssignedBytes() const { return CountAssignedBytesIf(IsCached(Any())); } + + /// Return the total number of assigned cached reads for a single host. + int NumCachedAssignments(int host_idx) const; + + /// Return the total number of assigned bytes for cached reads for a single host. + int NumCachedAssignedBytes(int host_idx) const; + + /// Return the total number of assigned non-cached reads. + int NumDiskAssignments() const { return CountAssignmentsIf(IsDisk(Any())); } + + /// Return the total number of assigned bytes for non-cached reads. + int NumDiskAssignedBytes() const { return CountAssignedBytesIf(IsDisk(Any())); } + + /// Return the total number of assigned non-cached reads for a single host. + int NumDiskAssignments(int host_idx) const; + + /// Return the total number of assigned bytes for non-cached reads for a single host. + int NumDiskAssignedBytes(int host_idx) const; + + /// Return the total number of assigned remote reads. + int NumRemoteAssignments() const { return CountAssignmentsIf(IsRemote(Any())); } + + /// Return the total number of assigned bytes for remote reads. + int NumRemoteAssignedBytes() const { return CountAssignedBytesIf(IsRemote(Any())); } + + /// Return the total number of assigned remote reads for a single host. + int NumRemoteAssignments(int host_idx) const; + + /// Return the total number of assigned bytes for remote reads for a single host. + int NumRemoteAssignedBytes(int host_idx) const; + + /// Return the maximum number of assigned reads over all hosts. + int MaxNumAssignmentsPerHost() const; + + /// Return the maximum number of assigned reads over all hosts. + int64_t MaxNumAssignedBytesPerHost() const; + + /// Return the minimum number of assigned reads over all hosts. + /// NOTE: This is computed by traversing all recorded assignments and thus will not + /// consider hosts without any assignments. Hence the minimum value to expect is 1 (not + /// 0). + int MinNumAssignmentsPerHost() const; + + /// Return the minimum number of assigned bytes over all hosts. + /// NOTE: This is computed by traversing all recorded assignments and thus will not + /// consider hosts without any assignments. Hence the minimum value to expect is 1 (not + /// 0). + int64_t MinNumAssignedBytesPerHost() const; + + /// Return the number of scan range assignments stored in this result. + int NumAssignments() const { return assignments_.size(); } + + /// Return the number of distinct backends that have been picked by the scheduler so + /// far. + int NumDistinctBackends() const; + + /// Return the full assignment for manual matching. + const FragmentScanRangeAssignment& GetAssignment(int index = 0) const; + + /// Add an assignment to the result and return a reference, which can then be passed on + /// to the scheduler. + FragmentScanRangeAssignment* AddAssignment(); + + /// Reset the result to an empty state. + void Reset() { assignments_.clear(); } + + private: + /// Vector to store results of consecutive scheduler runs. + std::vector<FragmentScanRangeAssignment> assignments_; + + /// Reference to the plan, needed to look up hosts. + const Plan& plan_; + + /// Dummy filter matching any assignment. + AssignmentFilter Any() const; + + /// Filter to only match assignments of a particular host. + AssignmentFilter IsHost(int host_idx) const; + + /// Filter to only match assignments of cached reads. + AssignmentFilter IsCached(AssignmentFilter filter) const; + + /// Filter to only match assignments of non-cached, local disk reads. + AssignmentFilter IsDisk(AssignmentFilter filter) const; + + /// Filter to only match assignments of remote reads. + AssignmentFilter IsRemote(AssignmentFilter filter) const; + + /// Process all recorded assignments and call the supplied callback on each tuple of IP + /// address and scan_range it iterates over. + void ProcessAssignments(const AssignmentCallback& cb) const; + + /// Count all assignments matching the supplied filter callback. + int CountAssignmentsIf(const AssignmentFilter& filter) const; + + /// Count all assignments matching the supplied filter callback. + int64_t CountAssignedBytesIf(const AssignmentFilter& filter) const; + + /// Create a map containing the number of assigned scan ranges per node. + void CountAssignmentsPerBackend( + NumAssignmentsPerBackend* num_assignments_per_backend) const; + + /// Create a map containing the number of assigned bytes per node. + void CountAssignedBytesPerBackend( + NumAssignedBytesPerBackend* num_assignments_per_backend) const; +}; + +/// This class wraps the SimpleScheduler and provides helper for easier instrumentation +/// during tests. +class SchedulerWrapper { + public: + SchedulerWrapper(const Plan& plan); + + /// Call ComputeScanRangeAssignment() with exec_at_coord set to false. + void Compute(Result* result) { Compute(false, result); } + + /// Call ComputeScanRangeAssignment(). + void Compute(bool exec_at_coord, Result* result); + + /// Reset the state of the scheduler by re-creating and initializing it. + void Reset() { InitializeScheduler(); } + + /// Methods to modify the internal lists of backends maintained by the scheduler. + + /// Add a backend to the scheduler. + void AddBackend(const Host& host); + + /// Remove a backend from the scheduler. + void RemoveBackend(const Host& host); + + /// Send a full map of the backends to the scheduler instead of deltas. + void SendFullMembershipMap(); + + /// Send an empty update message to the scheduler. + void SendEmptyUpdate(); + + private: + const Plan& plan_; + boost::scoped_ptr<SimpleScheduler> scheduler_; + MetricGroup metrics_; + + /// Initialize the internal scheduler object. The method uses the 'real' constructor + /// used in the rest of the codebase, in contrast to the one that takes a list of + /// backends, which is only used for testing purposes. This allows us to properly + /// initialize the scheduler and exercise the UpdateMembership() method in tests. + void InitializeScheduler(); + + /// Add a single host to the given TTopicDelta. + void AddHostToTopicDelta(const Host& host, TTopicDelta* delta) const; + + /// Send the given topic delta to the scheduler. + void SendTopicDelta(const TTopicDelta& delta); +}; + +} // end namespace test +} // end namespace impala http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d76a2b22/be/src/scheduling/simple-scheduler-test.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/simple-scheduler-test.cc b/be/src/scheduling/simple-scheduler-test.cc index 7116e21..d9964af 100644 --- a/be/src/scheduling/simple-scheduler-test.cc +++ b/be/src/scheduling/simple-scheduler-test.cc @@ -15,862 +15,16 @@ // specific language governing permissions and limitations // under the License. -#include <set> -#include <vector> - -#include <boost/scoped_ptr.hpp> - #include "simple-scheduler.h" +#include "common/logging.h" +#include "simple-scheduler-test-util.h" #include "testutil/gtest-util.h" -#include "util/runtime-profile.h" - -#include "common/names.h" using namespace impala; - -DECLARE_string(pool_conf_file); +using namespace impala::test; namespace impala { -// Typedefs to make the rest of the code more readable. -typedef string Hostname; -typedef string IpAddr; -typedef string TableName; - -/// Sample 'n' elements without replacement from the set [0..N-1]. -/// This is an implementation of "Algorithm R" by J. Vitter. -void SampleN(int n, int N, vector<int>* out) { - if (n == 0) return; - DCHECK(n <= N); - out->reserve(n); - out->clear(); - for (int i = 0; i < n; ++i) out->push_back(i); - for (int i = n; i < N; ++i) { - // Accept element with probability n/i. Place at random position. - int r = rand() % i; - if (r < n) (*out)[r] = i; - } -} - -/// Sample a set of 'n' elements from 'in' without replacement and copy them to -/// 'out'. -template <typename T> -void SampleNElements(int n, const vector<T>& in, vector<T>* out) { - vector<int> idxs; - SampleN(n, in.size(), &idxs); - DCHECK_EQ(n, idxs.size()); - out->reserve(n); - for (int idx: idxs) out->push_back(in[idx]); -} - -/// Helper classes to be used by the scheduler tests. - -/// Overall testing approach: Each test builds a list of hosts and a plan, both to which -/// elements can be added using various helper methods. Then scheduling can be tested -/// by instantiating SchedulerWrapper and calling Compute(...). The result can be verified -/// using a set of helper methods. There are also helper methods to modify the internal -/// state of the scheduler between subsequent calls to SchedulerWrapper::Compute(). -/// -/// The model currently comes with some known limitations: -/// -/// - Files map 1:1 to blocks and to scan ranges. -/// - All files have the same size (1 block of 1M). Tables that differ in size can be -/// expressed as having a different number of blocks. -/// - We don't support multiple backends on a single host. -/// - Ports are assigned to hosts automatically and are not configurable by the test. - -// TODO: Extend the model to support files with multiple blocks. -// TODO: Test more methods of the scheduler. -// TODO: Add support to add skewed table scans with multiple scan ranges: often there are -// 3 replicas where there may be skew for 1 of the replicas (e.g. after a single -// node insert) but the other 2 are random. -// TODO: Make use of the metrics provided by the scheduler. -// TODO: Add checks for MinNumAssignmentsPerHost() to all tests where applicable. -// TODO: Add post-condition checks that have to hold for all successful scheduler runs. -// TODO: Add possibility to explicitly specify the replica location per file. -// TODO: Add methods to retrieve and verify generated file placements from plan. -// TODO: Extend the model to specify a physical schema independently of a plan (ie, -// tables/files, blocks, replicas and cached replicas exist independently of the -// queries that run against them). - -/// File blocks store a list of all datanodes that have a replica of the block. When -/// defining tables you can specify the desired replica placement among all available -/// datanodes in the cluster. -/// -/// - RANDOM means that any datanode can be picked. -/// - LOCAL_ONLY means that only datanodes with a backend will be picked. -/// - REMOTE_ONLY means that only datanodes without a backend will be picked. -/// -/// Whether replicas will be cached or not is not determined by this value, but by -/// additional function arguments when adding tables to the schema. -enum class ReplicaPlacement { - RANDOM, - LOCAL_ONLY, - REMOTE_ONLY, -}; - -/// Host model. Each host can have either a backend, a datanode, or both. To specify that -/// a host should not act as a backend or datanode specify '-1' as the respective port. -struct Host { - Host(const Hostname& name, const IpAddr& ip, int be_port, int dn_port) - : name(name), ip(ip), be_port(be_port), dn_port(dn_port) {} - Hostname name; - IpAddr ip; - int be_port; // Backend port - int dn_port; // Datanode port -}; - -/// A cluster stores a list of hosts and provides various methods to add hosts to the -/// cluster. All hosts are guaranteed to have unique IP addresses and hostnames. -class Cluster { - public: - /// Add a host and return the host's index. 'hostname' and 'ip' of the new host will be - /// generated and are guaranteed to be unique. - int AddHost(bool has_backend, bool has_datanode) { - int host_idx = hosts_.size(); - int be_port = has_backend ? BACKEND_PORT : -1; - int dn_port = has_datanode ? DATANODE_PORT : -1; - IpAddr ip = HostIdxToIpAddr(host_idx); - DCHECK(ip_to_idx_.find(ip) == ip_to_idx_.end()); - ip_to_idx_[ip] = host_idx; - hosts_.push_back(Host(HostIdxToHostname(host_idx), ip, be_port, dn_port)); - // Add host to lists of backend indexes per type. - if (has_backend) backend_host_idxs_.push_back(host_idx); - if (has_datanode) { - datanode_host_idxs_.push_back(host_idx); - if (has_backend) { - datanode_with_backend_host_idxs_.push_back(host_idx); - } else { - datanode_only_host_idxs_.push_back(host_idx); - } - } - return host_idx; - } - - /// Add a number of hosts with the same properties by repeatedly calling AddHost(..). - void AddHosts(int num_hosts, bool has_backend, bool has_datanode) { - for (int i = 0; i < num_hosts; ++i) AddHost(has_backend, has_datanode); - } - - /// Convert a host index to a hostname. - static Hostname HostIdxToHostname(int host_idx) { - return HOSTNAME_PREFIX + std::to_string(host_idx); - } - - /// Return the backend address (ip, port) for the host with index 'host_idx'. - void GetBackendAddress(int host_idx, TNetworkAddress* addr) const { - DCHECK_LT(host_idx, hosts_.size()); - addr->hostname = hosts_[host_idx].ip; - addr->port = hosts_[host_idx].be_port; - } - - const vector<Host>& hosts() const { return hosts_; } - int NumHosts() const { return hosts_.size(); } - - /// These methods return lists of host indexes, grouped by their type, which can be used - /// to draw samples of random sets of hosts. - /// TODO: Think of a nicer abstraction to expose this information. - const vector<int>& backend_host_idxs() const { return backend_host_idxs_; } - const vector<int>& datanode_host_idxs() const { return datanode_host_idxs_; } - - const vector<int>& datanode_with_backend_host_idxs() const { - return datanode_with_backend_host_idxs_; - } - - const vector<int>& datanode_only_host_idxs() const { return datanode_only_host_idxs_; } - - private: - /// Port for all backends. - static const int BACKEND_PORT; - - /// Port for all datanodes. - static const int DATANODE_PORT; - - /// Prefix for all generated hostnames. - static const string HOSTNAME_PREFIX; - - /// First octet for all generated IP addresses. - static const string IP_PREFIX; - - /// List of hosts in this cluster. - vector<Host> hosts_; - - /// Lists of indexes of hosts, grouped by their type. The lists reference hosts in - /// 'hosts_' by index and are used for random sampling. - /// - /// All hosts with a backend. - vector<int> backend_host_idxs_; - /// All hosts with a datanode. - vector<int> datanode_host_idxs_; - /// All hosts with a datanode and a backend. - vector<int> datanode_with_backend_host_idxs_; - /// All hosts with a datanode but no backend. - vector<int> datanode_only_host_idxs_; - - /// Map from IP addresses to host indexes. - unordered_map<IpAddr, int> ip_to_idx_; - - /// Convert a host index to an IP address. The host index must be smaller than 2^24 and - /// will specify the lower 24 bits of the IPv4 address (the lower 3 octets). - static IpAddr HostIdxToIpAddr(int host_idx) { - DCHECK_LT(host_idx, (1 << 24)); - string suffix; - for (int i = 0; i < 3; ++i) { - suffix = "." + std::to_string(host_idx % 256) + suffix; // prepend - host_idx /= 256; - } - DCHECK_EQ(0, host_idx); - return IP_PREFIX + suffix; - } -}; - -const int Cluster::BACKEND_PORT = 1000; -const int Cluster::DATANODE_PORT = 2000; -const string Cluster::HOSTNAME_PREFIX = "host_"; -const string Cluster::IP_PREFIX = "10"; - -struct Block { - /// By default all blocks are of the same size. - int64_t length = DEFAULT_BLOCK_SIZE; - - /// Index into the cluster that owns the table that owns this block. - vector<int> replica_host_idxs; - - /// Flag for each entry in replica_host_idxs whether it is a cached replica or not. - vector<bool> replica_host_idx_is_cached; - - /// Default size for new blocks. - static const int64_t DEFAULT_BLOCK_SIZE; -}; -/// Default size for new blocks is 1MB. -const int64_t Block::DEFAULT_BLOCK_SIZE = 1 << 20; - -struct Table { - vector<Block> blocks; -}; - -class Schema { - public: - Schema(const Cluster& cluster) : cluster_(cluster) {} - - /// Add a table consisting of a single block to the schema with explicitly specified - /// replica indexes for non-cached replicas and without any cached replicas. Replica - /// indexes must refer to hosts in cluster_.hosts() by index. - void AddSingleBlockTable(const TableName& table_name, - const vector<int>& non_cached_replica_host_idxs) { - AddSingleBlockTable(table_name, non_cached_replica_host_idxs, {}); - } - - /// Add a table consisting of a single block to the schema with explicitly specified - /// replica indexes for both non-cached and cached replicas. Values in both lists must - /// refer to hosts in cluster_.hosts() by index. Both lists must be disjoint, i.e., a - /// replica can either be cached or not. - void AddSingleBlockTable(const TableName& table_name, - const vector<int>& non_cached_replica_host_idxs, - const vector<int>& cached_replica_host_idxs) { - DCHECK(tables_.find(table_name) == tables_.end()); - Block block; - int num_replicas = non_cached_replica_host_idxs.size() + - cached_replica_host_idxs.size(); - block.replica_host_idxs = non_cached_replica_host_idxs; - block.replica_host_idxs.insert(block.replica_host_idxs.end(), - cached_replica_host_idxs.begin(), cached_replica_host_idxs.end()); - // Initialize for non-cached replicas first. - block.replica_host_idx_is_cached.resize(non_cached_replica_host_idxs.size(), false); - // Fill up to final size for cached replicas. - block.replica_host_idx_is_cached.insert(block.replica_host_idx_is_cached.end(), - cached_replica_host_idxs.size(), true); - DCHECK_EQ(block.replica_host_idxs.size(), block.replica_host_idx_is_cached.size()); - DCHECK_EQ(block.replica_host_idxs.size(), num_replicas); - // Create table - Table table; - table.blocks.push_back(block); - // Insert table - tables_.emplace(table_name, table); - } - - /// Add a table to the schema, selecting replica hosts according to the given replica - /// placement preference. All replicas will be non-cached. - void AddMultiBlockTable(const TableName& table_name, int num_blocks, - ReplicaPlacement replica_placement, int num_replicas) { - AddMultiBlockTable(table_name, num_blocks, replica_placement, num_replicas, 0); - } - - /// Add a table to the schema, selecting replica hosts according to the given replica - /// placement preference. After replica selection has been done, 'num_cached_replicas' - /// of them are marked as cached. - void AddMultiBlockTable(const TableName& table_name, int num_blocks, - ReplicaPlacement replica_placement, int num_replicas, int num_cached_replicas) { - DCHECK_GT(num_replicas, 0); - DCHECK(num_cached_replicas <= num_replicas); - Table table; - for (int i = 0; i < num_blocks; ++i) { - Block block; - vector<int>& replica_idxs = block.replica_host_idxs; - - // Determine replica host indexes. - switch (replica_placement) { - case ReplicaPlacement::RANDOM: - SampleNElements(num_replicas, cluster_.datanode_host_idxs(), &replica_idxs); - break; - case ReplicaPlacement::LOCAL_ONLY: - DCHECK(num_replicas <= cluster_.datanode_with_backend_host_idxs().size()); - SampleNElements(num_replicas, cluster_.datanode_with_backend_host_idxs(), - &replica_idxs); - break; - case ReplicaPlacement::REMOTE_ONLY: - DCHECK(num_replicas <= cluster_.datanode_only_host_idxs().size()); - SampleNElements(num_replicas, cluster_.datanode_only_host_idxs(), - &replica_idxs); - break; - default: - DCHECK(false) << "Unsupported replica placement: " - << (int)replica_placement; - } - - // Determine cached replicas. - vector<int> cached_replicas; - vector<bool>& is_cached = block.replica_host_idx_is_cached; - is_cached.resize(num_replicas, false); - SampleN(num_cached_replicas, num_replicas, &cached_replicas); - // Flag cached entries. - for (const int idx: cached_replicas) is_cached[idx] = true; - - DCHECK_EQ(replica_idxs.size(), is_cached.size()); - table.blocks.push_back(block); - } - // Insert table - tables_[table_name] = table; - } - - const Table& GetTable(const TableName& table_name) const { - auto it = tables_.find(table_name); - DCHECK(it != tables_.end()); - return it->second; - } - - const Cluster& cluster() const { return cluster_; } - - private: - /// Store a reference to the cluster, from which hosts are sampled. Test results will - /// use the cluster to resolve host indexes to hostnames and IP addresses. - const Cluster& cluster_; - - unordered_map<TableName, Table> tables_; -}; - -/// Plan model. A plan contains a list of tables to scan and the query options to be used -/// during scheduling. -class Plan { - public: - Plan(const Schema& schema) : schema_(schema) {} - - const TQueryOptions& query_options() const { return query_options_; } - - void SetReplicaPreference(TReplicaPreference::type p) { - query_options_.replica_preference = p; - } - - void SetRandomReplica(bool b) { query_options_.schedule_random_replica = b; } - void SetDisableCachedReads(bool b) { query_options_.disable_cached_reads = b; } - const Cluster& cluster() const { return schema_.cluster(); } - - const vector<TNetworkAddress>& referenced_datanodes() const { - return referenced_datanodes_; - } - - const vector<TScanRangeLocations>& scan_range_locations() const { - return scan_range_locations_; - } - - /// Add a scan of table 'table_name' to the plan. This method will populate the internal - /// list of TScanRangeLocations and can be called multiple times for the same table to - /// schedule additional scans. - void AddTableScan(const TableName& table_name) { - const Table& table = schema_.GetTable(table_name); - const vector<Block>& blocks = table.blocks; - for (int i = 0; i < blocks.size(); ++i) { - const Block& block = blocks[i]; - TScanRangeLocations scan_range_locations; - BuildTScanRangeLocations(table_name, block, i, &scan_range_locations); - scan_range_locations_.push_back(scan_range_locations); - } - } - - private: - /// Store a reference to the schema, from which scanned tables will be read. - const Schema& schema_; - - TQueryOptions query_options_; - - /// List of all datanodes that are referenced by this plan. Only hosts that have an - /// assigned scan range are added here. - vector<TNetworkAddress> referenced_datanodes_; - - /// Map from plan host index to an index in 'referenced_datanodes_'. - boost::unordered_map<int, int> host_idx_to_datanode_idx_; - - /// List of all scan range locations, which can be passed to the SimpleScheduler. - vector<TScanRangeLocations> scan_range_locations_; - - /// Initialize a TScanRangeLocations object in place. - void BuildTScanRangeLocations(const TableName& table_name, const Block& block, - int block_idx, TScanRangeLocations* scan_range_locations) { - const vector<int>& replica_idxs = block.replica_host_idxs; - const vector<bool>& is_cached = block.replica_host_idx_is_cached; - DCHECK_EQ(replica_idxs.size(), is_cached.size()); - int num_replicas = replica_idxs.size(); - BuildScanRange(table_name, block, block_idx, &scan_range_locations->scan_range); - scan_range_locations->locations.resize(num_replicas); - for (int i = 0; i < num_replicas; ++i) { - TScanRangeLocation& location = scan_range_locations->locations[i]; - location.host_idx = FindOrInsertDatanodeIndex(replica_idxs[i]); - location.__set_is_cached(is_cached[i]); - } - } - - void BuildScanRange(const TableName& table_name, const Block& block, int block_idx, - TScanRange* scan_range) { - // Initialize locations.scan_range correctly. - THdfsFileSplit file_split; - // 'length' is the only member considered by the scheduler. - file_split.length = block.length; - // Encoding the table name and block index in the file helps debugging. - file_split.file_name = table_name + "_block_" + std::to_string(block_idx); - file_split.offset = 0; - file_split.partition_id = 0; - // For now, we model each file by a single block. - file_split.file_length = block.length; - file_split.file_compression = THdfsCompression::NONE; - file_split.mtime = 1; - scan_range->__set_hdfs_file_split(file_split); - } - - /// Look up the plan-local host index of 'cluster_datanode_idx'. If the host has not - /// been added to the plan before, it will add it to 'referenced_datanodes_' and return - /// the new index. - int FindOrInsertDatanodeIndex(int cluster_datanode_idx) { - const Host& host = schema_.cluster().hosts()[cluster_datanode_idx]; - auto ret = host_idx_to_datanode_idx_.emplace( - cluster_datanode_idx, referenced_datanodes_.size()); - bool inserted_new_element = ret.second; - if (inserted_new_element) { - TNetworkAddress datanode; - datanode.hostname = host.ip; - datanode.port = host.dn_port; - referenced_datanodes_.push_back(datanode); - } - return ret.first->second; - } -}; - -class Result { - private: - /// Map to count the number of assignments per backend. - typedef unordered_map<IpAddr, int> NumAssignmentsPerBackend; - - /// Map to count the number of assigned bytes per backend. - typedef unordered_map<IpAddr, int64_t> NumAssignedBytesPerBackend; - - /// Parameter type for callbacks, which are used to filter scheduling results. - struct AssignmentInfo { - const TNetworkAddress& addr; - const THdfsFileSplit& hdfs_file_split; - bool is_cached; - bool is_remote; - }; - - /// These functions are used as callbacks when processing the scheduling result. They - /// will be called once per assigned scan range. - typedef std::function<bool(const AssignmentInfo& assignment)> AssignmentFilter; - typedef std::function<void(const AssignmentInfo& assignment)> AssignmentCallback; - - public: - Result(const Plan& plan) : plan_(plan) {} - - /// Return the total number of scheduled assignments. - int NumTotalAssignments() const { return CountAssignmentsIf(Any()); } - - /// Return the total number of assigned bytes. - int NumTotalAssignedBytes() const { return CountAssignedBytesIf(Any()); } - - /// Return the number of scheduled assignments for a single host. - int NumTotalAssignments(int host_idx) const { - return CountAssignmentsIf(IsHost(host_idx)); - } - - /// Return the number of assigned bytes for a single host. - int NumTotalAssignedBytes(int host_idx) const { - return CountAssignedBytesIf(IsHost(host_idx)); - } - - /// Return the total number of assigned cached reads. - int NumCachedAssignments() const { return CountAssignmentsIf(IsCached(Any())); } - - /// Return the total number of assigned bytes for cached reads. - int NumCachedAssignedBytes() const { return CountAssignedBytesIf(IsCached(Any())); } - - /// Return the total number of assigned cached reads for a single host. - int NumCachedAssignments(int host_idx) const { - return CountAssignmentsIf(IsCached(IsHost(host_idx))); - } - - /// Return the total number of assigned bytes for cached reads for a single host. - int NumCachedAssignedBytes(int host_idx) const { - return CountAssignedBytesIf(IsCached(IsHost(host_idx))); - } - - /// Return the total number of assigned non-cached reads. - int NumDiskAssignments() const { return CountAssignmentsIf(IsDisk(Any())); } - - /// Return the total number of assigned bytes for non-cached reads. - int NumDiskAssignedBytes() const { return CountAssignedBytesIf(IsDisk(Any())); } - - /// Return the total number of assigned non-cached reads for a single host. - int NumDiskAssignments(int host_idx) const { - return CountAssignmentsIf(IsDisk(IsHost(host_idx))); - } - - /// Return the total number of assigned bytes for non-cached reads for a single host. - int NumDiskAssignedBytes(int host_idx) const { - return CountAssignedBytesIf(IsDisk(IsHost(host_idx))); - } - - /// Return the total number of assigned remote reads. - int NumRemoteAssignments() const { return CountAssignmentsIf(IsRemote(Any())); } - - /// Return the total number of assigned bytes for remote reads. - int NumRemoteAssignedBytes() const { return CountAssignedBytesIf(IsRemote(Any())); } - - /// Return the total number of assigned remote reads for a single host. - int NumRemoteAssignments(int host_idx) const { - return CountAssignmentsIf(IsRemote(IsHost(host_idx))); - } - - /// Return the total number of assigned bytes for remote reads for a single host. - int NumRemoteAssignedBytes(int host_idx) const { - return CountAssignedBytesIf(IsRemote(IsHost(host_idx))); - } - - /// Return the maximum number of assigned reads over all hosts. - int MaxNumAssignmentsPerHost() const { - NumAssignmentsPerBackend num_assignments_per_backend; - CountAssignmentsPerBackend(&num_assignments_per_backend); - int max_count = 0; - for (const auto& elem: num_assignments_per_backend) { - max_count = max(max_count, elem.second); - } - return max_count; - } - - /// Return the maximum number of assigned reads over all hosts. - int64_t MaxNumAssignedBytesPerHost() const { - NumAssignedBytesPerBackend num_assigned_bytes_per_backend; - CountAssignedBytesPerBackend(&num_assigned_bytes_per_backend); - int64_t max_assigned_bytes = 0; - for (const auto& elem: num_assigned_bytes_per_backend) { - max_assigned_bytes = max(max_assigned_bytes, elem.second); - } - return max_assigned_bytes; - } - - /// Return the minimum number of assigned reads over all hosts. - /// NOTE: This is computed by traversing all recorded assignments and thus will not - /// consider hosts without any assignments. Hence the minimum value to expect is 1 (not - /// 0). - int MinNumAssignmentsPerHost() const { - NumAssignmentsPerBackend num_assignments_per_backend; - CountAssignmentsPerBackend(&num_assignments_per_backend); - int min_count = numeric_limits<int>::max(); - for (const auto& elem: num_assignments_per_backend) { - min_count = min(min_count, elem.second); - } - DCHECK_GT(min_count, 0); - return min_count; - } - - /// Return the minimum number of assigned bytes over all hosts. - /// NOTE: This is computed by traversing all recorded assignments and thus will not - /// consider hosts without any assignments. Hence the minimum value to expect is 1 (not - /// 0). - int64_t MinNumAssignedBytesPerHost() const { - NumAssignedBytesPerBackend num_assigned_bytes_per_backend; - CountAssignedBytesPerBackend(&num_assigned_bytes_per_backend); - int64_t min_assigned_bytes = 0; - for (const auto& elem: num_assigned_bytes_per_backend) { - min_assigned_bytes = max(min_assigned_bytes, elem.second); - } - DCHECK_GT(min_assigned_bytes, 0); - return min_assigned_bytes; - } - - /// Return the number of scan range assignments stored in this result. - int NumAssignments() const { return assignments_.size(); } - - /// Return the number of distinct backends that have been picked by the scheduler so - /// far. - int NumDistinctBackends() const { - unordered_set<IpAddr> backends; - AssignmentCallback cb = [&backends](const AssignmentInfo& assignment) { - backends.insert(assignment.addr.hostname); - }; - ProcessAssignments(cb); - return backends.size(); - } - - /// Return the full assignment for manual matching. - const FragmentScanRangeAssignment& GetAssignment(int index = 0) const { - DCHECK_GT(assignments_.size(), index); - return assignments_[index]; - } - - /// Add an assignment to the result and return a reference, which can then be passed on - /// to the scheduler. - FragmentScanRangeAssignment* AddAssignment() { - assignments_.push_back(FragmentScanRangeAssignment()); - return &assignments_.back(); - } - - /// Reset the result to an empty state. - void Reset() { assignments_.clear(); } - - private: - /// Vector to store results of consecutive scheduler runs. - vector<FragmentScanRangeAssignment> assignments_; - - /// Reference to the plan, needed to look up hosts. - const Plan& plan_; - - /// Dummy filter matching any assignment. - AssignmentFilter Any() const { - return [](const AssignmentInfo& assignment) { return true; }; - } - - /// Filter to only match assignments of a particular host. - AssignmentFilter IsHost(int host_idx) const { - TNetworkAddress expected_addr; - plan_.cluster().GetBackendAddress(host_idx, &expected_addr); - return [expected_addr](const AssignmentInfo& assignment) { - return assignment.addr == expected_addr; - }; - } - - /// Filter to only match assignments of cached reads. - AssignmentFilter IsCached(AssignmentFilter filter) const { - return [filter](const AssignmentInfo& assignment) { - return filter(assignment) && assignment.is_cached; - }; - } - - /// Filter to only match assignments of non-cached, local disk reads. - AssignmentFilter IsDisk(AssignmentFilter filter) const { - return [filter](const AssignmentInfo& assignment) { - return filter(assignment) && !assignment.is_cached && !assignment.is_remote; - }; - } - - /// Filter to only match assignments of remote reads. - AssignmentFilter IsRemote(AssignmentFilter filter) const { - return [filter](const AssignmentInfo& assignment) { - return filter(assignment) && assignment.is_remote; - }; - } - - /// Process all recorded assignments and call the supplied callback on each tuple of IP - /// address and scan_range it iterates over. - void ProcessAssignments(const AssignmentCallback& cb) const { - for (const FragmentScanRangeAssignment& assignment: assignments_) { - for (const auto& assignment_elem: assignment) { - const TNetworkAddress& addr = assignment_elem.first; - const PerNodeScanRanges& per_node_ranges = assignment_elem.second; - for (const auto& per_node_ranges_elem: per_node_ranges) { - const vector<TScanRangeParams> scan_range_params_vector = - per_node_ranges_elem.second; - for (const TScanRangeParams& scan_range_params: scan_range_params_vector) { - const TScanRange& scan_range = scan_range_params.scan_range; - DCHECK(scan_range.__isset.hdfs_file_split); - const THdfsFileSplit& hdfs_file_split = scan_range.hdfs_file_split; - bool is_cached = scan_range_params.__isset.is_cached - ? scan_range_params.is_cached : false; - bool is_remote = scan_range_params.__isset.is_remote - ? scan_range_params.is_remote : false; - cb({addr, hdfs_file_split, is_cached, is_remote}); - } - } - } - } - } - - /// Count all assignments matching the supplied filter callback. - int CountAssignmentsIf(const AssignmentFilter& filter) const { - int count = 0; - AssignmentCallback cb = [&count, filter](const AssignmentInfo& assignment) { - if (filter(assignment)) ++count; - }; - ProcessAssignments(cb); - return count; - } - - /// Count all assignments matching the supplied filter callback. - int64_t CountAssignedBytesIf(const AssignmentFilter& filter) const { - int64_t assigned_bytes = 0; - AssignmentCallback cb = [&assigned_bytes, filter](const AssignmentInfo& assignment) { - if (filter(assignment)) assigned_bytes += assignment.hdfs_file_split.length; - }; - ProcessAssignments(cb); - return assigned_bytes; - } - - /// Create a map containing the number of assigned scan ranges per node. - void CountAssignmentsPerBackend( - NumAssignmentsPerBackend* num_assignments_per_backend) const { - AssignmentCallback cb = [&num_assignments_per_backend]( - const AssignmentInfo& assignment) { - ++(*num_assignments_per_backend)[assignment.addr.hostname]; - }; - ProcessAssignments(cb); - } - - /// Create a map containing the number of assigned bytes per node. - void CountAssignedBytesPerBackend( - NumAssignedBytesPerBackend* num_assignments_per_backend) const { - AssignmentCallback cb = [&num_assignments_per_backend]( - const AssignmentInfo& assignment) { - (*num_assignments_per_backend)[assignment.addr.hostname] += - assignment.hdfs_file_split.length; - }; - ProcessAssignments(cb); - } -}; - -/// This class wraps the SimpleScheduler and provides helper for easier instrumentation -/// during tests. -class SchedulerWrapper { - public: - SchedulerWrapper(const Plan& plan) : plan_(plan), metrics_("TestMetrics") { - InitializeScheduler(); - } - - /// Call ComputeScanRangeAssignment() with exec_at_coord set to false. - void Compute(Result* result) { - Compute(false, result); - } - - /// Call ComputeScanRangeAssignment(). - void Compute(bool exec_at_coord, Result* result) { - DCHECK(scheduler_ != NULL); - - // Compute Assignment. - FragmentScanRangeAssignment* assignment = result->AddAssignment(); - scheduler_->ComputeScanRangeAssignment(*scheduler_->GetBackendConfig(), 0, NULL, - false, plan_.scan_range_locations(), plan_.referenced_datanodes(), exec_at_coord, - plan_.query_options(), NULL, assignment); - } - - /// Reset the state of the scheduler by re-creating and initializing it. - void Reset() { InitializeScheduler(); } - - /// Methods to modify the internal lists of backends maintained by the scheduler. - - /// Add a backend to the scheduler. - void AddBackend(const Host& host) { - // Add to topic delta - TTopicDelta delta; - delta.topic_name = SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC; - delta.is_delta = true; - AddHostToTopicDelta(host, &delta); - SendTopicDelta(delta); - } - - /// Remove a backend from the scheduler. - void RemoveBackend(const Host& host) { - // Add deletion to topic delta - TTopicDelta delta; - delta.topic_name = SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC; - delta.is_delta = true; - delta.topic_deletions.push_back(host.ip); - SendTopicDelta(delta); - } - - /// Send a full map of the backends to the scheduler instead of deltas. - void SendFullMembershipMap() { - TTopicDelta delta; - delta.topic_name = SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC; - delta.is_delta = false; - for (const Host& host: plan_.cluster().hosts()) { - if (host.be_port >= 0) AddHostToTopicDelta(host, &delta); - } - SendTopicDelta(delta); - } - - /// Send an empty update message to the scheduler. - void SendEmptyUpdate() { - TTopicDelta delta; - delta.topic_name = SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC; - delta.is_delta = true; - SendTopicDelta(delta); - } - - private: - const Plan& plan_; - boost::scoped_ptr<SimpleScheduler> scheduler_; - MetricGroup metrics_; - - /// Initialize the internal scheduler object. The method uses the 'real' constructor - /// used in the rest of the codebase, in contrast to the one that takes a list of - /// backends, which is only used for testing purposes. This allows us to properly - /// initialize the scheduler and exercise the UpdateMembership() method in tests. - void InitializeScheduler() { - DCHECK(scheduler_ == NULL); - DCHECK_GT(plan_.cluster().NumHosts(), 0) << "Cannot initialize scheduler with 0 " - << "hosts."; - const Host& scheduler_host = plan_.cluster().hosts()[0]; - string scheduler_backend_id = scheduler_host.ip; - TNetworkAddress scheduler_backend_address; - scheduler_backend_address.hostname = scheduler_host.ip; - scheduler_backend_address.port = scheduler_host.be_port; - - scheduler_.reset(new SimpleScheduler( - NULL, scheduler_backend_id, scheduler_backend_address, &metrics_, NULL, NULL)); - scheduler_->Init(); - // Initialize the scheduler backend maps. - SendFullMembershipMap(); - } - - /// Add a single host to the given TTopicDelta. - void AddHostToTopicDelta(const Host& host, TTopicDelta* delta) const { - DCHECK_GT(host.be_port, 0) << "Host cannot be added to scheduler without a running " - << "backend"; - // Build backend descriptor. - TBackendDescriptor be_desc; - be_desc.address.hostname = host.ip; - be_desc.address.port = host.be_port; - be_desc.ip_address = host.ip; - - // Build topic item. - TTopicItem item; - item.key = host.ip; - ThriftSerializer serializer(false); - Status status = serializer.Serialize(&be_desc, &item.value); - DCHECK(status.ok()); - - // Add to topic delta. - delta->topic_entries.push_back(item); - } - - /// Send the given topic delta to the scheduler. - void SendTopicDelta(const TTopicDelta& delta) { - DCHECK(scheduler_ != NULL); - // Wrap in topic delta map. - StatestoreSubscriber::TopicDeltaMap delta_map; - delta_map.emplace(SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC, delta); - - // Send to the scheduler. - vector<TTopicDelta> dummy_result; - scheduler_->UpdateMembership(delta_map, &dummy_result); - } -}; - class SchedulerTest : public testing::Test { protected: SchedulerTest() { srand(0); }; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d76a2b22/be/src/scheduling/simple-scheduler.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/simple-scheduler.h b/be/src/scheduling/simple-scheduler.h index 8c96dde..9b51269 100644 --- a/be/src/scheduling/simple-scheduler.h +++ b/be/src/scheduling/simple-scheduler.h @@ -42,7 +42,9 @@ namespace impala { class Coordinator; +namespace test { class SchedulerWrapper; +} /// Performs simple scheduling by matching between a list of backends configured /// either from the statestore, or from a static list of addresses, and a list @@ -434,7 +436,7 @@ class SimpleScheduler : public Scheduler { int FindSenderFragment(TPlanNodeId exch_id, int fragment_idx, const TQueryExecRequest& exec_request); - friend class impala::SchedulerWrapper; + friend class impala::test::SchedulerWrapper; FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentDeterministicNonCached); FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentRandomNonCached); FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentRandomDiskLocal);