This is an automated email from the ASF dual-hosted git repository. todd pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit fefb433917252ead0c78f7f6c674050514505cba Author: Todd Lipcon <[email protected]> AuthorDate: Thu Mar 5 16:09:33 2020 -0800 rpc: use a lighter weight completion for sync RPCs This adds a new Notification class which is a special purpose CountDownLatch with count 1, implemented on top of futex. This ends up being a bit more efficient than the pthread-based mutex and condition variable. Benchmarked with rpc-bench 40 times each before and after and ran t-tests on the reported metrics: User CPU (statistically significant 4-8% reduction): data: subset(d, V1 == "with")$V2 and subset(d, V1 == "without")$V2 t = -6.1821, df = 74.559, p-value = 3.081e-08 alternative hypothesis: true difference in means is not equal to 0 95 percent confidence interval: -1.5546221 -0.7968279 sample estimates: mean of x mean of y 16.96979 18.14551 System CPU (no significant difference): data: subset(d, V1 == "with")$V2 and subset(d, V1 == "without")$V2 t = -0.23148, df = 66.883, p-value = 0.8176 alternative hypothesis: true difference in means is not equal to 0 95 percent confidence interval: -0.6114441 0.4843641 sample estimates: mean of x mean of y 41.03802 41.10156 Context switches (statistically significant 1.6-2.3% reduction): data: subset(d, V1 == "with")$V2 and subset(d, V1 == "without")$V2 t = -11.198, df = 77.282, p-value < 2.2e-16 alternative hypothesis: true difference in means is not equal to 0 95 percent confidence interval: -0.0820182 -0.0572533 sample estimates: mean of x mean of y 3.551491 3.621127 Change-Id: I1b65cce8bd48ee7edf6b2d08e96d00681c32aa97 Reviewed-on: http://gerrit.cloudera.org:8080/15441 Tested-by: Kudu Jenkins Reviewed-by: Bankim Bhavsar <[email protected]> Reviewed-by: Adar Dembo <[email protected]> --- src/kudu/rpc/proxy.cc | 11 ++-- src/kudu/util/notification.h | 137 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 142 insertions(+), 6 deletions(-) diff --git a/src/kudu/rpc/proxy.cc b/src/kudu/rpc/proxy.cc index 24668ab..1dfac01 100644 --- a/src/kudu/rpc/proxy.cc +++ b/src/kudu/rpc/proxy.cc @@ -26,14 +26,14 @@ #include <glog/logging.h> #include "kudu/gutil/strings/substitute.h" -#include "kudu/rpc/outbound_call.h" #include "kudu/rpc/messenger.h" +#include "kudu/rpc/outbound_call.h" #include "kudu/rpc/remote_method.h" #include "kudu/rpc/response_callback.h" #include "kudu/rpc/rpc_controller.h" #include "kudu/rpc/user_credentials.h" #include "kudu/util/net/sockaddr.h" -#include "kudu/util/countdown_latch.h" +#include "kudu/util/notification.h" #include "kudu/util/status.h" #include "kudu/util/user.h" @@ -94,11 +94,10 @@ Status Proxy::SyncRequest(const string& method, const google::protobuf::Message& req, google::protobuf::Message* resp, RpcController* controller) const { - CountDownLatch latch(1); + Notification note; AsyncRequest(method, req, DCHECK_NOTNULL(resp), controller, - boost::bind(&CountDownLatch::CountDown, boost::ref(latch))); - - latch.Wait(); + boost::bind(&Notification::Notify, boost::ref(note))); + note.WaitForNotification(); return controller->status(); } diff --git a/src/kudu/util/notification.h b/src/kudu/util/notification.h new file mode 100644 index 0000000..b2d523d --- /dev/null +++ b/src/kudu/util/notification.h @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include "kudu/gutil/macros.h" + +#ifdef __linux__ +#include "kudu/gutil/atomicops.h" +#include "kudu/gutil/linux_syscall_support.h" +#else +#include "kudu/util/countdown_latch.h" +#endif + +namespace kudu { + +// This class defines a `Notification` abstraction, which allows threads +// to receive notification of a single occurrence of a single event. +// +// NOTE: this class is modeled after absl::Notification but re-implemented +// to not have dependencies on other absl-specific code. If absl is ever +// imported, this can be removed. +// +// The `Notification` object maintains a private boolean "notified" state that +// transitions to `true` at most once. The `Notification` class provides the +// following primary member functions: +// * `HasBeenNotified() `to query its state +// * `WaitForNotification*()` to have threads wait until the "notified" state +// is `true`. +// * `Notify()` to set the notification's "notified" state to `true` and +// notify all waiting threads that the event has occurred. +// This method may only be called once. +// +// Note that while `Notify()` may only be called once, it is perfectly valid to +// call any of the `WaitForNotification*()` methods multiple times, from +// multiple threads -- even after the notification's "notified" state has been +// set -- in which case those methods will immediately return. +// +// Note that the lifetime of a `Notification` requires careful consideration; +// it might not be safe to destroy a notification after calling `Notify()` since +// it is still legal for other threads to call `WaitForNotification*()` methods +// on the notification. However, observers responding to a "notified" state of +// `true` can safely delete the notification without interfering with the call +// to `Notify()` in the other thread. +// +// Memory ordering: For any threads X and Y, if X calls `Notify()`, then any +// action taken by X before it calls `Notify()` is visible to thread Y after: +// * Y returns from `WaitForNotification()`, or +// * Y receives a `true` return value from `HasBeenNotified()`. +#ifdef __linux__ +class Notification { + public: + Notification() : state_(NOT_NOTIFIED_NO_WAITERS) {} + ~Notification() = default; + + bool HasBeenNotified() const { + return base::subtle::Acquire_Load(&state_) == NOTIFIED; + } + + void WaitForNotification() const { + while (true) { + auto s = base::subtle::Acquire_Load(&state_); + if (s == NOT_NOTIFIED_NO_WAITERS) { + s = base::subtle::Acquire_CompareAndSwap( + &state_, NOT_NOTIFIED_NO_WAITERS, NOT_NOTIFIED_HAS_WAITERS); + if (s == NOT_NOTIFIED_NO_WAITERS) { + // We succeeded in the CAS -- sets 's' to be the new value of the + // state rather than the previous value. + s = NOT_NOTIFIED_HAS_WAITERS; + } + } + if (s == NOTIFIED) return; + DCHECK_EQ(s, NOT_NOTIFIED_HAS_WAITERS); + sys_futex(&state_, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, NOT_NOTIFIED_HAS_WAITERS, + /* timeout */ nullptr); + } + } + + void Notify() { + auto s = base::subtle::Release_AtomicExchange(&state_, NOTIFIED); + DCHECK_NE(s, NOTIFIED) << "may only notify once"; + if (s == NOT_NOTIFIED_HAS_WAITERS) { + sys_futex(&state_, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, INT_MAX, + nullptr /* ignored */); + } + } + + private: + enum { + NOT_NOTIFIED_NO_WAITERS = 1, + NOT_NOTIFIED_HAS_WAITERS = 2, + NOTIFIED = 3 + }; + mutable Atomic32 state_; + + DISALLOW_COPY_AND_ASSIGN(Notification); +}; +#else +// macOS doesn't have futex, so we just use the mutex-based latch instead. +class Notification { + public: + Notification() : latch_(1) { } + ~Notification() = default; + + bool HasBeenNotified() const { + return latch_.count() == 0; + } + + void WaitForNotification() const { + latch_.Wait(); + } + + void Notify() { + latch_.CountDown(); + } + + private: + mutable CountDownLatch latch_; + + DISALLOW_COPY_AND_ASSIGN(Notification); +}; + +#endif +} // namespace kudu
