This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch branch-1.9.x in repository https://gitbox.apache.org/repos/asf/kudu.git
commit ddfcb873eb251e87049f0e5b3aabb417bd27c7e7 Author: Alexey Serbin <[email protected]> AuthorDate: Wed Feb 27 21:53:43 2019 -0800 [master] introduce cache for location mapping assignments This changelist adds a very primitive cache for location assignments. The cache does not prevent running multiple commands for the same location key if an entry is not present in the cache. A unit test is also added. Change-Id: Icb5c436c9433acd87c44c4d81982420f33ebb4a4 Reviewed-on: http://gerrit.cloudera.org:8080/12634 Reviewed-by: Will Berkeley <[email protected]> Tested-by: Kudu Jenkins (cherry picked from commit ae6bbcaabd20955119f1d945d276589538dae928) Reviewed-on: http://gerrit.cloudera.org:8080/12783 Reviewed-by: Andrew Wong <[email protected]> --- src/kudu/master/CMakeLists.txt | 2 + src/kudu/master/location_cache-test.cc | 179 +++++++++++++++++++++++++++++++++ src/kudu/master/location_cache.cc | 152 ++++++++++++++++++++++++++++ src/kudu/master/location_cache.h | 88 ++++++++++++++++ 4 files changed, 421 insertions(+) diff --git a/src/kudu/master/CMakeLists.txt b/src/kudu/master/CMakeLists.txt index 89fd9c6..79f38ce 100644 --- a/src/kudu/master/CMakeLists.txt +++ b/src/kudu/master/CMakeLists.txt @@ -35,6 +35,7 @@ ADD_EXPORTABLE_LIBRARY(master_proto set(MASTER_SRCS catalog_manager.cc hms_notification_log_listener.cc + location_cache.cc master.cc master_cert_authority.cc master_options.cc @@ -79,6 +80,7 @@ SET_KUDU_TEST_LINK_LIBS( ADD_KUDU_TEST(catalog_manager-test) ADD_KUDU_TEST(hms_notification_log_listener-test) +ADD_KUDU_TEST(location_cache-test DATA_FILES ../scripts/first_argument.sh) ADD_KUDU_TEST(master-test RESOURCE_LOCK "master-web-port" DATA_FILES ../scripts/first_argument.sh) ADD_KUDU_TEST(mini_master-test RESOURCE_LOCK "master-web-port") diff --git a/src/kudu/master/location_cache-test.cc b/src/kudu/master/location_cache-test.cc new file mode 100644 index 0000000..96c1bea --- /dev/null +++ b/src/kudu/master/location_cache-test.cc @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/master/location_cache.h" + +#include <cstdint> +#include <string> +#include <thread> +#include <vector> + +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "kudu/gutil/ref_counted.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/metrics.h" +#include "kudu/util/path_util.h" +#include "kudu/util/status.h" +#include "kudu/util/test_macros.h" +#include "kudu/util/test_util.h" + +METRIC_DECLARE_counter(location_mapping_cache_hits); +METRIC_DECLARE_counter(location_mapping_cache_queries); + +using std::string; +using std::thread; +using std::vector; +using strings::Substitute; + +namespace kudu { +namespace master { + +// Targeted test for LocationCache. +class LocationCacheTest : public KuduTest { + protected: + void SetUp() override { + KuduTest::SetUp(); + metric_entity_ = METRIC_ENTITY_server.Instantiate(&metric_registry_, + "LocationCacheTest"); + } + + void CheckMetrics(int64_t expected_queries, int64_t expected_hits) { + scoped_refptr<Counter> cache_queries(metric_entity_->FindOrCreateCounter( + &METRIC_location_mapping_cache_queries)); + ASSERT_NE(nullptr, cache_queries.get()); + ASSERT_EQ(expected_queries, cache_queries->value()); + + scoped_refptr<Counter> cache_hits(metric_entity_->FindOrCreateCounter( + &METRIC_location_mapping_cache_hits)); + ASSERT_NE(nullptr, cache_hits.get()); + ASSERT_EQ(expected_hits, cache_hits->value()); + } + + MetricRegistry metric_registry_; + scoped_refptr<MetricEntity> metric_entity_; +}; + +TEST_F(LocationCacheTest, EmptyMappingCommand) { + LocationCache cache(" ", metric_entity_.get()); + string location; + auto s = cache.GetLocation("na", &location); + ASSERT_TRUE(s.IsRuntimeError()) << s.ToString(); + ASSERT_STR_CONTAINS( + s.ToString(), "invalid empty location mapping command"); + NO_FATALS(CheckMetrics(1, 0)); +} + +TEST_F(LocationCacheTest, MappingCommandFailureExitStatus) { + LocationCache cache("/sbin/nologin", metric_entity_.get()); + string location; + auto s = cache.GetLocation("na", &location); + ASSERT_TRUE(s.IsRuntimeError()) << s.ToString(); + ASSERT_STR_CONTAINS( + s.ToString(), "failed to run location mapping command: "); + NO_FATALS(CheckMetrics(1, 0)); +} + +TEST_F(LocationCacheTest, MappingCommandEmptyOutput) { + LocationCache cache("/bin/cat", metric_entity_.get()); + string location; + auto s = cache.GetLocation("/dev/null", &location); + ASSERT_TRUE(s.IsRuntimeError()) << s.ToString(); + ASSERT_STR_CONTAINS( + s.ToString(), "location mapping command returned invalid empty location"); + NO_FATALS(CheckMetrics(1, 0)); +} + +TEST_F(LocationCacheTest, MappingCommandReturnsInvalidLocation) { + const string cmd_path = JoinPathSegments(GetTestExecutableDirectory(), + "testdata/first_argument.sh"); + const string location_mapping_cmd = Substitute("$0 invalid.location", + cmd_path); + LocationCache cache(location_mapping_cmd, metric_entity_.get()); + string location; + auto s = cache.GetLocation("na", &location); + ASSERT_TRUE(s.IsRuntimeError()) << s.ToString(); + ASSERT_STR_CONTAINS( + s.ToString(), "location mapping command returned invalid location"); + NO_FATALS(CheckMetrics(1, 0)); +} + +TEST_F(LocationCacheTest, HappyPath) { + const string kRefLocation = "/ref_location"; + const string cmd_path = JoinPathSegments(GetTestExecutableDirectory(), + "testdata/first_argument.sh"); + const string location_mapping_cmd = Substitute("$0 $1", + cmd_path, kRefLocation); + LocationCache cache(location_mapping_cmd, metric_entity_.get()); + NO_FATALS(CheckMetrics(0, 0)); + + string location; + auto s = cache.GetLocation("key_0", &location); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(kRefLocation, location); + NO_FATALS(CheckMetrics(1, 0)); + + s = cache.GetLocation("key_1", &location); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(kRefLocation, location); + NO_FATALS(CheckMetrics(2, 0)); + + s = cache.GetLocation("key_1", &location); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(kRefLocation, location); + NO_FATALS(CheckMetrics(3, 1)); + + s = cache.GetLocation("key_0", &location); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(kRefLocation, location); + NO_FATALS(CheckMetrics(4, 2)); +} + +TEST_F(LocationCacheTest, ConcurrentRequests) { + static constexpr auto kNumThreads = 32; + const string kRefLocation = "/ref_location"; + const string cmd_path = JoinPathSegments(GetTestExecutableDirectory(), + "testdata/first_argument.sh"); + const string location_mapping_cmd = Substitute("$0 $1", + cmd_path, kRefLocation); + LocationCache cache(location_mapping_cmd, metric_entity_.get()); + NO_FATALS(CheckMetrics(0, 0)); + + for (auto iter = 0; iter < 2; ++iter) { + vector<thread> threads; + threads.reserve(kNumThreads); + for (auto idx = 0; idx < kNumThreads; ++idx) { + threads.emplace_back([&cache, &kRefLocation, idx]() { + string location; + auto s = cache.GetLocation(Substitute("key_$0", idx), &location); + CHECK(s.ok()) << s.ToString(); + CHECK_EQ(kRefLocation, location); + }); + } + for (auto& t : threads) { + t.join(); + } + // Expecting to account for every query, and the follow-up iteration + // should result in every query hitting the cache. + NO_FATALS(CheckMetrics(kNumThreads * (iter + 1), + kNumThreads * iter)); + } +} + +} // namespace master +} // namespace kudu diff --git a/src/kudu/master/location_cache.cc b/src/kudu/master/location_cache.cc new file mode 100644 index 0000000..b79b259 --- /dev/null +++ b/src/kudu/master/location_cache.cc @@ -0,0 +1,152 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/master/location_cache.h" + +#include <cstdio> +#include <mutex> +#include <string> +#include <unordered_map> +#include <utility> +#include <vector> + +#include <glog/logging.h> + +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/port.h" +#include "kudu/gutil/strings/charset.h" +#include "kudu/gutil/strings/split.h" +#include "kudu/gutil/strings/strip.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/subprocess.h" +#include "kudu/util/trace.h" + +METRIC_DEFINE_counter(server, location_mapping_cache_hits, + "Location Mapping Cache Hits", + kudu::MetricUnit::kCacheHits, + "Number of times location mapping assignment used " + "cached data"); +METRIC_DEFINE_counter(server, location_mapping_cache_queries, + "Location Mapping Cache Queries", + kudu::MetricUnit::kCacheQueries, + "Number of queries to the location mapping cache"); + +using std::string; +using std::vector; +using strings::Substitute; + +namespace kudu { +namespace master { + +namespace { +// Returns if 'location' is a valid location string, i.e. it begins with / +// and consists of /-separated tokens each of which contains only characters +// from the set [a-zA-Z0-9_-.]. +bool IsValidLocation(const string& location) { + if (location.empty() || location[0] != '/') { + return false; + } + const strings::CharSet charset("abcdefghijklmnopqrstuvwxyz" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "0123456789" + "_-./"); + for (const auto c : location) { + if (!charset.Test(c)) { + return false; + } + } + return true; +} +} // anonymous namespace + +LocationCache::LocationCache(string location_mapping_cmd, + MetricEntity* metric_entity) + : location_mapping_cmd_(std::move(location_mapping_cmd)) { + if (metric_entity != nullptr) { + location_mapping_cache_hits_ = metric_entity->FindOrCreateCounter( + &METRIC_location_mapping_cache_hits); + location_mapping_cache_queries_ = metric_entity->FindOrCreateCounter( + &METRIC_location_mapping_cache_queries); + } +} + +Status LocationCache::GetLocation(const string& key, string* location) { + if (PREDICT_TRUE(location_mapping_cache_queries_)) { + location_mapping_cache_queries_->Increment(); + } + { + // First check whether the location for the key has already been assigned. + shared_lock<rw_spinlock> l(location_map_lock_); + const auto* value_ptr = FindOrNull(location_map_, key); + if (value_ptr) { + DCHECK(!value_ptr->empty()); + *location = *value_ptr; + if (PREDICT_TRUE(location_mapping_cache_hits_)) { + location_mapping_cache_hits_->Increment(); + } + return Status::OK(); + } + } + string value; + TRACE(Substitute("key $0: assigning location", key)); + Status s = GetLocationFromLocationMappingCmd( + location_mapping_cmd_, key, &value); + TRACE(Substitute("key $0: assigned location '$1'", key, value)); + if (s.ok()) { + CHECK(!value.empty()); + std::lock_guard<rw_spinlock> l(location_map_lock_); + // This simple implementation doesn't protect against multiple runs of the + // location-mapping command for the same key. + InsertIfNotPresent(&location_map_, key, value); + *location = value; + } + return s; +} + +Status LocationCache::GetLocationFromLocationMappingCmd(const string& cmd, + const string& key, + string* location) { + DCHECK(location); + vector<string> argv = strings::Split(cmd, " ", strings::SkipEmpty()); + if (argv.empty()) { + return Status::RuntimeError("invalid empty location mapping command"); + } + argv.push_back(key); + string stderr, location_temp; + Status s = Subprocess::Call(argv, /*stdin_in=*/"", &location_temp, &stderr); + if (!s.ok()) { + return Status::RuntimeError( + Substitute("failed to run location mapping command: $0", s.ToString()), + stderr); + } + StripWhiteSpace(&location_temp); + // Special case an empty location for a better error. + if (location_temp.empty()) { + return Status::RuntimeError( + "location mapping command returned invalid empty location"); + } + if (!IsValidLocation(location_temp)) { + return Status::RuntimeError( + "location mapping command returned invalid location", + location_temp); + } + *location = std::move(location_temp); + return Status::OK(); +} + +} // namespace master +} // namespace kudu diff --git a/src/kudu/master/location_cache.h b/src/kudu/master/location_cache.h new file mode 100644 index 0000000..b89f2d9 --- /dev/null +++ b/src/kudu/master/location_cache.h @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include <string> +#include <unordered_map> + +#include "kudu/gutil/macros.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/util/locks.h" +#include "kudu/util/metrics.h" +#include "kudu/util/status.h" + +namespace kudu { +namespace master { + +// A primitive cache of unlimited capacity to store assigned locations for +// a key. The cache entries are kept in the cache for the lifetime of the cache +// itself. +class LocationCache { + public: + // The location assignment command is specified by the 'location_mapping_cmd' + // parameter (the command might be a script or an executable). The + // 'metric_entity' is used to register standard cache counters: total number + // of queries and number of cache hits during the cache's lifetime. + explicit LocationCache(std::string location_mapping_cmd, + MetricEntity* metric_entity = nullptr); + ~LocationCache() = default; + + // Get the location for the specified key. The key is treated as an opaque + // identifier. + // + // If no cached location is found, the location mapping command is run, + // caching the result for the lifetime of the cache. + // + // This method returns an error if there was an issue running the location + // assignment command. + Status GetLocation(const std::string& key, std::string* location); + + private: + // Resolves an opaque 'key' into a location using the command 'cmd'. + // The result will be stored in 'location', which must not be null. If there + // is an error running the command or the output is invalid, an error Status + // will be returned. + // + // TODO(wdberkeley): Eventually we may want to get multiple locations at once + // by giving the location mapping command multiple arguments (like Hadoop). + static Status GetLocationFromLocationMappingCmd(const std::string& cmd, + const std::string& key, + std::string* location); + + // The executable to run when assigning locations for keys which are not yet + // in the cache. + const std::string location_mapping_cmd_; + + // Counter to track cache hits, i.e. when it was not necessary to run + // the location assignment command. + scoped_refptr<Counter> location_mapping_cache_hits_; + + // Counter to track overall cache queries, i.e. hits plus misses. Every miss + // results in the location assignment command being run. + scoped_refptr<Counter> location_mapping_cache_queries_; + + // Spinlock to protect the location assignment map (location_map_). + rw_spinlock location_map_lock_; + + // The location assignment map: dictionary of key --> location. + std::unordered_map<std::string, std::string> location_map_; + + DISALLOW_COPY_AND_ASSIGN(LocationCache); +}; + +} // namespace master +} // namespace kudu
