This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 5896202  [tests] added more tests for GetTableLocations()
5896202 is described below

commit 589620257b82fbb9c20fe6d6dfa7f5a97e6c06ae
Author: Alexey Serbin <[email protected]>
AuthorDate: Sun May 10 21:47:08 2020 -0700

    [tests] added more tests for GetTableLocations()
    
    Added a couple more tests for GetTableLocations():
    
      * Direct calls to the CatalogManager::GetTableLocations() method.
    
      * A test to verify whether clients tend to form the thundering herd
        pattern when calling GetTableLocations() upon location refreshes.
        It turned out that even if clients start almost simultaneously,
        their refresh calls are naturally distributed with good enough
        jitter and introducing jitter into the TTL of the location data
        (e.g., https://gerrit.cloudera.org/#/c/15892/) isn't needed.
    
    Change-Id: I61e413533bee2fa22f9e81531aadbea9e59ce6e2
    Reviewed-on: http://gerrit.cloudera.org:8080/15896
    Tested-by: Alexey Serbin <[email protected]>
    Reviewed-by: Andrew Wong <[email protected]>
---
 .../integration-tests/table_locations-itest.cc     | 226 +++++++++++++++++++--
 1 file changed, 208 insertions(+), 18 deletions(-)

diff --git a/src/kudu/integration-tests/table_locations-itest.cc 
b/src/kudu/integration-tests/table_locations-itest.cc
index e8da64b..b78bbba 100644
--- a/src/kudu/integration-tests/table_locations-itest.cc
+++ b/src/kudu/integration-tests/table_locations-itest.cc
@@ -15,13 +15,19 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <algorithm>
 #include <atomic>
+#include <cstddef>
+#include <cstdint>
 #include <memory>
+#include <numeric>
+#include <ostream>
 #include <string>
 #include <thread>
 #include <utility>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
@@ -34,6 +40,8 @@
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/master/catalog_manager.h"
 #include "kudu/master/master.h"
 #include "kudu/master/master.pb.h"
 #include "kudu/master/master.proxy.h"
@@ -57,15 +65,22 @@ using kudu::pb_util::SecureDebugString;
 using kudu::rpc::Messenger;
 using kudu::rpc::MessengerBuilder;
 using kudu::rpc::RpcController;
+using std::ostringstream;
 using std::pair;
 using std::shared_ptr;
 using std::string;
+using std::thread;
 using std::unique_ptr;
 using std::vector;
+using strings::Substitute;
 
-DECLARE_string(location_mapping_cmd);
 DECLARE_int32(max_create_tablets_per_ts);
+DECLARE_int32(rpc_num_service_threads);
+DECLARE_int32(rpc_service_queue_length);
+DECLARE_int32(table_locations_ttl_ms);
+DECLARE_string(location_mapping_cmd);
 
METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetTableLocations);
+METRIC_DECLARE_counter(rpcs_queue_overflow);
 
 DEFINE_int32(benchmark_runtime_secs, 5, "Number of seconds to run the 
benchmark");
 DEFINE_int32(benchmark_num_threads, 16, "Number of threads to run the 
benchmark");
@@ -74,8 +89,6 @@ DEFINE_int32(benchmark_num_tablets, 60, "Number of tablets to 
create");
 namespace kudu {
 namespace master {
 
-const int kNumTabletServers = 3;
-
 // Test the master GetTableLocations RPC. This can't be done in master-test,
 // since it requires a running tablet server in order for the catalog manager 
to
 // report tablet locations.
@@ -89,7 +102,7 @@ class TableLocationsTest : public KuduTest {
     SetUpConfig();
 
     InternalMiniClusterOptions opts;
-    opts.num_tablet_servers = kNumTabletServers;
+    opts.num_tablet_servers = 3;
 
     cluster_.reset(new InternalMiniCluster(env_, opts));
     ASSERT_OK(cluster_->Start());
@@ -116,6 +129,8 @@ class TableLocationsTest : public KuduTest {
                      const vector<KuduPartialRow>& split_rows,
                      const vector<pair<KuduPartialRow, KuduPartialRow>>& 
bounds);
 
+  void CreateTable(const string& table_name, int num_splits);
+
   // Check that the master doesn't give back partial results while the table 
is being created.
   void CheckMasterTableCreation(const string &table_name, int 
tablet_locations_size);
 
@@ -175,6 +190,18 @@ void TableLocationsTest::CheckMasterTableCreation(const 
string &table_name,
   }
 }
 
+void TableLocationsTest::CreateTable(const string& table_name, int num_splits) 
{
+  Schema schema({ ColumnSchema("key", INT32) }, 1);
+  KuduPartialRow row(&schema);
+  vector<KuduPartialRow> splits(num_splits, row);
+  for (int i = 0; i < num_splits; i++) {
+    ASSERT_OK(splits[i].SetInt32(0, i*1000));
+  }
+
+  ASSERT_OK(CreateTable(table_name, schema, splits));
+  NO_FATALS(CheckMasterTableCreation(table_name, num_splits + 1));
+}
+
 // Test the tablet server location is properly set in the master 
GetTableLocations RPC.
 class TableLocationsWithTSLocationTest : public TableLocationsTest {
  public:
@@ -182,7 +209,7 @@ class TableLocationsWithTSLocationTest : public 
TableLocationsTest {
     const string location_cmd_path = 
JoinPathSegments(GetTestExecutableDirectory(),
                                                       
"testdata/first_argument.sh");
     const string location = "/foo";
-    FLAGS_location_mapping_cmd = strings::Substitute("$0 $1", 
location_cmd_path, location);
+    FLAGS_location_mapping_cmd = Substitute("$0 $1", location_cmd_path, 
location);
   }
 };
 
@@ -394,17 +421,7 @@ TEST_F(TableLocationsTest, GetTableLocationsBenchmark) {
   const auto kRuntime = MonoDelta::FromSeconds(FLAGS_benchmark_runtime_secs);
 
   const string table_name = "test";
-  Schema schema({ ColumnSchema("key", INT32) }, 1);
-  KuduPartialRow row(&schema);
-
-  vector<KuduPartialRow> splits(kNumSplits, row);
-  for (int i = 0; i < kNumSplits; i++) {
-    ASSERT_OK(splits[i].SetInt32(0, i*1000));
-  }
-
-  ASSERT_OK(CreateTable(table_name, schema, splits));
-
-  NO_FATALS(CheckMasterTableCreation(table_name, kNumSplits + 1));
+  NO_FATALS(CreateTable(table_name, kNumSplits));
 
   // Make one proxy per thread, so each thread gets its own messenger and
   // reactor. If there were only one messenger, then only one reactor thread
@@ -423,7 +440,7 @@ TEST_F(TableLocationsTest, GetTableLocationsBenchmark) {
   }
 
   std::atomic<bool> stop { false };
-  vector<std::thread> threads;
+  vector<thread> threads;
   threads.reserve(kNumThreads);
   for (int i = 0; i < kNumThreads; i++) {
     threads.emplace_back([&, i]() {
@@ -452,7 +469,180 @@ TEST_F(TableLocationsTest, GetTableLocationsBenchmark) {
 
   cluster_->Shutdown();
 
-  hist->histogram()->DumpHumanReadable(&LOG(INFO));
+  LOG(INFO) << Substitute(
+      "GetTableLocations RPC: $0 req/sec",
+      hist->histogram()->TotalCount() / kRuntime.ToSeconds());
+
+  ostringstream ostr;
+  ostr << "Stats on GetTableLocations RPC (times in microseconds): ";
+  hist->histogram()->DumpHumanReadable(&ostr);
+  LOG(INFO) << ostr.str();
+}
+
+// Similar to GetTableLocationsBenchmark, but calls the function directly 
within
+// the mini-cluster process instead of issuing RPC.
+TEST_F(TableLocationsTest, GetTableLocationsBenchmarkFunctionCall) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+  const string kUserName = "testuser";
+  const size_t kNumSplits = FLAGS_benchmark_num_tablets - 1;
+  const size_t kNumThreads = FLAGS_benchmark_num_threads;
+  const auto kRuntime = MonoDelta::FromSeconds(FLAGS_benchmark_runtime_secs);
+
+  const string table_name = "test";
+  NO_FATALS(CreateTable(table_name, kNumSplits));
+
+  CatalogManager* cm = cluster_->mini_master()->master()->catalog_manager();
+  const auto username = boost::make_optional<const string&>(kUserName);
+
+  std::atomic<bool> stop(false);
+  vector<thread> threads;
+  threads.reserve(kNumThreads);
+  vector<uint64_t> req_counters(kNumThreads, 0);
+  vector<uint64_t> err_counters(kNumThreads, 0);
+  for (size_t idx = 0; idx < kNumThreads; ++idx) {
+    threads.emplace_back([&, idx]() {
+      while (!stop) {
+        GetTableLocationsRequestPB req;
+        GetTableLocationsResponsePB resp;
+        req.mutable_table()->set_table_name(table_name);
+        req.set_max_returned_locations(1000);
+        req.set_intern_ts_infos_in_response(true);
+        ++req_counters[idx];
+        {
+          CatalogManager::ScopedLeaderSharedLock l(cm);
+          auto s = cm->GetTableLocations(&req, &resp, username);
+          if (!s.ok()) {
+            ++err_counters[idx];
+          }
+        }
+      }
+    });
+  }
+
+  SleepFor(kRuntime);
+  stop = true;
+  for (auto& t : threads) {
+    t.join();
+  }
+  cluster_->Shutdown();
+
+  const auto errors = accumulate(err_counters.begin(), err_counters.end(), 
0UL);
+  if (errors != 0) {
+    FAIL() << Substitute("detected $0 errors", errors);
+  }
+
+  const double total = accumulate(req_counters.begin(), req_counters.end(), 
0UL);
+  LOG(INFO) << Substitute(
+      "GetTableLocations function call: $0 req/sec",
+      total / kRuntime.ToSeconds());
+}
+
+// A small benchmark to see whether multiple clients refreshing table location
+// might create the thundering herd pattern with their GetTableLocations()
+// requests, overflowing the RPC service queue.
+class RefreshTableLocationsTest : public KuduTest {
+ public:
+  void SetUp() override {
+    KuduTest::SetUp();
+    // Set the RPC queue size limit and the number of RPC service threads small
+    // to register RPC queue overlows, if any. The idea is to make this test
+    // use smaller numbers, while in real life there would be more clients
+    // and regular length of the RPC service queue.
+    FLAGS_rpc_num_service_threads = 2;
+    FLAGS_rpc_service_queue_length = std::max(1, FLAGS_benchmark_num_threads / 
3);
+    // Make table location information expire faster so the clients would try
+    // to refresh the information during the scenario, while not spending too
+    // much time to run the scenario.
+    FLAGS_table_locations_ttl_ms = 1500;
+    {
+      InternalMiniClusterOptions opts;
+      opts.num_masters = 1;
+      opts.num_tablet_servers = 3;
+      cluster_.reset(new InternalMiniCluster(env_, std::move(opts)));
+    }
+    ASSERT_OK(cluster_->Start());
+  }
+
+  void TearDown() override {
+    if (cluster_) {
+      cluster_->Shutdown();
+      cluster_.reset();
+    }
+    KuduTest::TearDown();
+  }
+
+ protected:
+  unique_ptr<InternalMiniCluster> cluster_;
+};
+
+TEST_F(RefreshTableLocationsTest, ThunderingHerd) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+  const auto kNumThreads = FLAGS_benchmark_num_threads;
+  const auto kRuntime =
+      MonoDelta::FromMilliseconds(5 * FLAGS_table_locations_ttl_ms);
+
+  // Create test tables and populate them with data. These tables are used by
+  // the threads below, each running its own read test workload.
+  for (auto idx = 0; idx < kNumThreads; ++idx) {
+    TestWorkload w(cluster_.get());
+    w.set_num_replicas(3);
+    w.set_num_tablets(10);
+    w.set_table_name(Substitute("test_table_$0", idx));
+    w.set_num_write_threads(1);
+    w.set_num_read_threads(0);
+    w.Setup();
+    w.Start();
+    while (w.rows_inserted() < 100) {
+      SleepFor(MonoDelta::FromMilliseconds(10));
+    }
+    w.StopAndJoin();
+  }
+
+  std::atomic<bool> stop(false);
+  vector<thread> threads;
+  threads.reserve(kNumThreads);
+  for (auto idx = 0; idx < kNumThreads; ++idx) {
+    // It's important to have multiple workloads to simulate multiple clients,
+    // each with its own table locations cache.
+    threads.emplace_back([&, idx]() {
+      TestWorkload w(cluster_.get());
+      w.set_table_name(Substitute("test_table_$0", idx));
+      w.set_num_read_threads(1);
+      w.set_num_write_threads(0);
+      w.Setup();
+      w.Start();
+      while (!stop) {
+        SleepFor(MonoDelta::FromMilliseconds(10));
+      }
+      w.StopAndJoin();
+    });
+  }
+
+  // Make sure the reader threads refresh the expired information on table
+  // locations few times during the scenario.
+  SleepFor(kRuntime);
+  stop = true;
+  for (auto& t : threads) {
+    t.join();
+  }
+
+  const auto& ent = cluster_->mini_master()->master()->metric_entity();
+  auto counter = METRIC_rpcs_queue_overflow.Instantiate(ent);
+  auto hist = 
METRIC_handler_latency_kudu_master_MasterService_GetTableLocations.
+      Instantiate(ent);
+
+  cluster_->Shutdown();
+
+  LOG(INFO) << Substitute(
+      "GetTableLocations RPC: $0 req/sec",
+      hist->histogram()->TotalCount() / kRuntime.ToSeconds());
+
+  ostringstream ostr;
+  ostr << "Stats on GetTableLocations RPC (times in microseconds): ";
+  hist->histogram()->DumpHumanReadable(&ostr);
+  LOG(INFO) << ostr.str();
+
+  LOG(INFO) << "RPC queue overflows: " << counter->value();
 }
 
 } // namespace master

Reply via email to