http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/net/dns_resolver-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/net/dns_resolver-test.cc b/be/src/kudu/util/net/dns_resolver-test.cc new file mode 100644 index 0000000..55be284 --- /dev/null +++ b/be/src/kudu/util/net/dns_resolver-test.cc @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/net/dns_resolver.h" + +#include <boost/bind.hpp> +#include <gtest/gtest.h> +#include <vector> + +#include "kudu/gutil/strings/util.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/net/net_util.h" +#include "kudu/util/net/sockaddr.h" +#include "kudu/util/test_util.h" + +using std::vector; + +namespace kudu { + +class DnsResolverTest : public KuduTest { + protected: + DnsResolver resolver_; +}; + +TEST_F(DnsResolverTest, TestResolution) { + vector<Sockaddr> addrs; + Synchronizer s; + { + HostPort hp("localhost", 12345); + resolver_.ResolveAddresses(hp, &addrs, s.AsStatusCallback()); + } + ASSERT_OK(s.Wait()); + ASSERT_TRUE(!addrs.empty()); + for (const Sockaddr& addr : addrs) { + LOG(INFO) << "Address: " << addr.ToString(); + EXPECT_TRUE(HasPrefixString(addr.ToString(), "127.")); + EXPECT_TRUE(HasSuffixString(addr.ToString(), ":12345")); + } +} + +} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/net/dns_resolver.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/net/dns_resolver.cc b/be/src/kudu/util/net/dns_resolver.cc new file mode 100644 index 0000000..4c37a95 --- /dev/null +++ b/be/src/kudu/util/net/dns_resolver.cc @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/net/dns_resolver.h" + +#include <boost/bind.hpp> +#include <gflags/gflags.h> +#include <glog/logging.h> +#include <vector> + +#include "kudu/util/flag_tags.h" +#include "kudu/util/threadpool.h" +#include "kudu/util/net/net_util.h" +#include "kudu/util/net/sockaddr.h" + +DEFINE_int32(dns_num_resolver_threads, 1, "The number of threads to use for DNS resolution"); +TAG_FLAG(dns_num_resolver_threads, advanced); + +using std::vector; + +namespace kudu { + +DnsResolver::DnsResolver() { + CHECK_OK(ThreadPoolBuilder("dns-resolver") + .set_max_threads(FLAGS_dns_num_resolver_threads) + .Build(&pool_)); +} + +DnsResolver::~DnsResolver() { + pool_->Shutdown(); +} + +namespace { +static void DoResolution(const HostPort &hostport, vector<Sockaddr>* addresses, + StatusCallback cb) { + cb.Run(hostport.ResolveAddresses(addresses)); +} +} // anonymous namespace + +void DnsResolver::ResolveAddresses(const HostPort& hostport, + vector<Sockaddr>* addresses, + const StatusCallback& cb) { + Status s = pool_->SubmitFunc(boost::bind(&DoResolution, hostport, addresses, cb)); + if (!s.ok()) { + cb.Run(s); + } +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/net/dns_resolver.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/net/dns_resolver.h b/be/src/kudu/util/net/dns_resolver.h new file mode 100644 index 0000000..4232174 --- /dev/null +++ b/be/src/kudu/util/net/dns_resolver.h @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_NET_DNS_RESOLVER_H +#define KUDU_UTIL_NET_DNS_RESOLVER_H + +#include <vector> + +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/macros.h" +#include "kudu/util/async_util.h" +#include "kudu/util/status.h" + +namespace kudu { + +class HostPort; +class Sockaddr; +class ThreadPool; + +// DNS Resolver which supports async address resolution. +class DnsResolver { + public: + DnsResolver(); + ~DnsResolver(); + + // Resolve any addresses corresponding to this host:port pair. + // Note that a host may resolve to more than one IP address. + // + // 'addresses' may be NULL, in which case this function simply checks that + // the host/port pair can be resolved, without returning anything. + // + // When the result is available, or an error occurred, 'cb' is called + // with the result Status. + // + // NOTE: the callback should be fast since it is called by the DNS + // resolution thread. + // NOTE: in some rare cases, the callback may also be called inline + // from this function call, on the caller's thread. + void ResolveAddresses(const HostPort& hostport, + std::vector<Sockaddr>* addresses, + const StatusCallback& cb); + + private: + gscoped_ptr<ThreadPool> pool_; + + DISALLOW_COPY_AND_ASSIGN(DnsResolver); +}; + +} // namespace kudu +#endif /* KUDU_UTIL_NET_DNS_RESOLVER_H */ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/net/net_util-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/net/net_util-test.cc b/be/src/kudu/util/net/net_util-test.cc new file mode 100644 index 0000000..c77b054 --- /dev/null +++ b/be/src/kudu/util/net/net_util-test.cc @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> + +#include <algorithm> +#include <string> +#include <vector> + +#include "kudu/gutil/strings/join.h" +#include "kudu/gutil/strings/util.h" +#include "kudu/util/net/net_util.h" +#include "kudu/util/net/socket.h" +#include "kudu/util/net/sockaddr.h" +#include "kudu/util/status.h" +#include "kudu/util/test_util.h" + +namespace kudu { + +class NetUtilTest : public KuduTest { + protected: + Status DoParseBindAddresses(const string& input, string* result) { + vector<Sockaddr> addrs; + RETURN_NOT_OK(ParseAddressList(input, kDefaultPort, &addrs)); + std::sort(addrs.begin(), addrs.end()); + + vector<string> addr_strs; + for (const Sockaddr& addr : addrs) { + addr_strs.push_back(addr.ToString()); + } + *result = JoinStrings(addr_strs, ","); + return Status::OK(); + } + + static const uint16_t kDefaultPort = 7150; +}; + +TEST(SockaddrTest, Test) { + Sockaddr addr; + ASSERT_OK(addr.ParseString("1.1.1.1:12345", 12345)); + ASSERT_EQ(12345, addr.port()); +} + +TEST_F(NetUtilTest, TestParseAddresses) { + string ret; + ASSERT_OK(DoParseBindAddresses("0.0.0.0:12345", &ret)); + ASSERT_EQ("0.0.0.0:12345", ret); + + ASSERT_OK(DoParseBindAddresses("0.0.0.0", &ret)); + ASSERT_EQ("0.0.0.0:7150", ret); + + ASSERT_OK(DoParseBindAddresses("0.0.0.0:12345, 0.0.0.0:12346", &ret)); + ASSERT_EQ("0.0.0.0:12345,0.0.0.0:12346", ret); + + // Test some invalid addresses. + Status s = DoParseBindAddresses("0.0.0.0:xyz", &ret); + ASSERT_STR_CONTAINS(s.ToString(), "Invalid port"); + + s = DoParseBindAddresses("0.0.0.0:100000", &ret); + ASSERT_STR_CONTAINS(s.ToString(), "Invalid port"); + + s = DoParseBindAddresses("0.0.0.0:", &ret); + ASSERT_STR_CONTAINS(s.ToString(), "Invalid port"); +} + +TEST_F(NetUtilTest, TestResolveAddresses) { + HostPort hp("localhost", 12345); + vector<Sockaddr> addrs; + ASSERT_OK(hp.ResolveAddresses(&addrs)); + ASSERT_TRUE(!addrs.empty()); + for (const Sockaddr& addr : addrs) { + LOG(INFO) << "Address: " << addr.ToString(); + EXPECT_TRUE(HasPrefixString(addr.ToString(), "127.")); + EXPECT_TRUE(HasSuffixString(addr.ToString(), ":12345")); + EXPECT_TRUE(addr.IsAnyLocalAddress()); + } + + ASSERT_OK(hp.ResolveAddresses(nullptr)); +} + +TEST_F(NetUtilTest, TestWithinNetwork) { + Sockaddr addr; + Network network; + + ASSERT_OK(addr.ParseString("10.0.23.0:12345", 0)); + ASSERT_OK(network.ParseCIDRString("10.0.0.0/8")); + EXPECT_TRUE(network.WithinNetwork(addr)); + + ASSERT_OK(addr.ParseString("172.28.3.4:0", 0)); + ASSERT_OK(network.ParseCIDRString("172.16.0.0/12")); + EXPECT_TRUE(network.WithinNetwork(addr)); + + ASSERT_OK(addr.ParseString("192.168.0.23", 0)); + ASSERT_OK(network.ParseCIDRString("192.168.1.14/16")); + EXPECT_TRUE(network.WithinNetwork(addr)); + + ASSERT_OK(addr.ParseString("8.8.8.8:0", 0)); + ASSERT_OK(network.ParseCIDRString("0.0.0.0/0")); + EXPECT_TRUE(network.WithinNetwork(addr)); + + ASSERT_OK(addr.ParseString("192.169.0.23", 0)); + ASSERT_OK(network.ParseCIDRString("192.168.0.0/16")); + EXPECT_FALSE(network.WithinNetwork(addr)); +} + +// Ensure that we are able to do a reverse DNS lookup on various IP addresses. +// The reverse lookups should never fail, but may return numeric strings. +TEST_F(NetUtilTest, TestReverseLookup) { + string host; + Sockaddr addr; + HostPort hp; + ASSERT_OK(addr.ParseString("0.0.0.0:12345", 0)); + EXPECT_EQ(12345, addr.port()); + ASSERT_OK(HostPortFromSockaddrReplaceWildcard(addr, &hp)); + EXPECT_NE("0.0.0.0", hp.host()); + EXPECT_NE("", hp.host()); + EXPECT_EQ(12345, hp.port()); + + ASSERT_OK(addr.ParseString("127.0.0.1:12345", 0)); + ASSERT_OK(HostPortFromSockaddrReplaceWildcard(addr, &hp)); + EXPECT_EQ("127.0.0.1", hp.host()); + EXPECT_EQ(12345, hp.port()); +} + +TEST_F(NetUtilTest, TestLsof) { + Socket s; + ASSERT_OK(s.Init(0)); + + Sockaddr addr; // wildcard + ASSERT_OK(s.BindAndListen(addr, 1)); + + ASSERT_OK(s.GetSocketAddress(&addr)); + ASSERT_NE(addr.port(), 0); + vector<string> lsof_lines; + TryRunLsof(addr, &lsof_lines); + SCOPED_TRACE(JoinStrings(lsof_lines, "\n")); + + ASSERT_GE(lsof_lines.size(), 3); + ASSERT_STR_CONTAINS(lsof_lines[2], "net_util-test"); +} + +TEST_F(NetUtilTest, TestGetFQDN) { + string fqdn; + ASSERT_OK(GetFQDN(&fqdn)); + LOG(INFO) << "fqdn is " << fqdn; +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/net/net_util.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/net/net_util.cc b/be/src/kudu/util/net/net_util.cc new file mode 100644 index 0000000..aca1ae2 --- /dev/null +++ b/be/src/kudu/util/net/net_util.cc @@ -0,0 +1,383 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arpa/inet.h> +#include <ifaddrs.h> +#include <netdb.h> +#include <sys/types.h> +#include <sys/socket.h> + +#include <algorithm> +#include <boost/functional/hash.hpp> +#include <gflags/gflags.h> +#include <unordered_set> +#include <utility> +#include <vector> + +#include "kudu/gutil/endian.h" +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/strings/join.h" +#include "kudu/gutil/strings/numbers.h" +#include "kudu/gutil/strings/split.h" +#include "kudu/gutil/strings/strip.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/gutil/strings/util.h" +#include "kudu/util/debug/trace_event.h" +#include "kudu/util/errno.h" +#include "kudu/util/faststring.h" +#include "kudu/util/flag_tags.h" +#include "kudu/util/net/net_util.h" +#include "kudu/util/net/sockaddr.h" +#include "kudu/util/scoped_cleanup.h" +#include "kudu/util/stopwatch.h" +#include "kudu/util/subprocess.h" +#include "kudu/util/trace.h" + +// Mac OS 10.9 does not appear to define HOST_NAME_MAX in unistd.h +#ifndef HOST_NAME_MAX +#define HOST_NAME_MAX 64 +#endif + +DEFINE_bool(fail_dns_resolution, false, "Whether to fail all dns resolution, for tests."); +TAG_FLAG(fail_dns_resolution, hidden); + +using std::unordered_set; +using std::vector; +using strings::Substitute; + +namespace kudu { + +namespace { +struct AddrinfoDeleter { + void operator()(struct addrinfo* info) { + freeaddrinfo(info); + } +}; +} + +HostPort::HostPort() + : host_(""), + port_(0) { +} + +HostPort::HostPort(std::string host, uint16_t port) + : host_(std::move(host)), port_(port) {} + +HostPort::HostPort(const Sockaddr& addr) + : host_(addr.host()), + port_(addr.port()) { +} + +bool operator==(const HostPort& hp1, const HostPort& hp2) { + return hp1.port() == hp2.port() && hp1.host() == hp2.host(); +} + +size_t HostPort::HashCode() const { + size_t seed = 0; + boost::hash_combine(seed, host_); + boost::hash_combine(seed, port_); + return seed; +} + +Status HostPort::ParseString(const string& str, uint16_t default_port) { + std::pair<string, string> p = strings::Split(str, strings::delimiter::Limit(":", 1)); + + // Strip any whitespace from the host. + StripWhiteSpace(&p.first); + + // Parse the port. + uint32_t port; + if (p.second.empty() && strcount(str, ':') == 0) { + // No port specified. + port = default_port; + } else if (!SimpleAtoi(p.second, &port) || + port > 65535) { + return Status::InvalidArgument("Invalid port", str); + } + + host_.swap(p.first); + port_ = port; + return Status::OK(); +} + +Status HostPort::ResolveAddresses(vector<Sockaddr>* addresses) const { + TRACE_EVENT1("net", "HostPort::ResolveAddresses", + "host", host_); + TRACE_COUNTER_SCOPE_LATENCY_US("dns_us"); + struct addrinfo hints; + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + struct addrinfo* res = nullptr; + int rc; + LOG_SLOW_EXECUTION(WARNING, 200, + Substitute("resolving address for $0", host_)) { + rc = getaddrinfo(host_.c_str(), nullptr, &hints, &res); + } + if (rc != 0) { + return Status::NetworkError( + StringPrintf("Unable to resolve address '%s'", host_.c_str()), + gai_strerror(rc)); + } + gscoped_ptr<addrinfo, AddrinfoDeleter> scoped_res(res); + for (; res != nullptr; res = res->ai_next) { + CHECK_EQ(res->ai_family, AF_INET); + struct sockaddr_in* addr = reinterpret_cast<struct sockaddr_in*>(res->ai_addr); + addr->sin_port = htons(port_); + Sockaddr sockaddr(*addr); + if (addresses) { + addresses->push_back(sockaddr); + } + VLOG(2) << "Resolved address " << sockaddr.ToString() + << " for host/port " << ToString(); + } + if (PREDICT_FALSE(FLAGS_fail_dns_resolution)) { + return Status::NetworkError("injected DNS resolution failure"); + } + return Status::OK(); +} + +Status HostPort::ParseStrings(const string& comma_sep_addrs, + uint16_t default_port, + vector<HostPort>* res) { + vector<string> addr_strings = strings::Split(comma_sep_addrs, ",", strings::SkipEmpty()); + for (const string& addr_string : addr_strings) { + HostPort host_port; + RETURN_NOT_OK(host_port.ParseString(addr_string, default_port)); + res->push_back(host_port); + } + return Status::OK(); +} + +string HostPort::ToString() const { + return Substitute("$0:$1", host_, port_); +} + +string HostPort::ToCommaSeparatedString(const vector<HostPort>& hostports) { + vector<string> hostport_strs; + for (const HostPort& hostport : hostports) { + hostport_strs.push_back(hostport.ToString()); + } + return JoinStrings(hostport_strs, ","); +} + +Network::Network() + : addr_(0), + netmask_(0) { +} + +Network::Network(uint32_t addr, uint32_t netmask) + : addr_(addr), netmask_(netmask) {} + +bool Network::WithinNetwork(const Sockaddr& addr) const { + return ((addr.addr().sin_addr.s_addr & netmask_) == + (addr_ & netmask_)); +} + +Status Network::ParseCIDRString(const string& addr) { + std::pair<string, string> p = strings::Split(addr, strings::delimiter::Limit("/", 1)); + + kudu::Sockaddr sockaddr; + Status s = sockaddr.ParseString(p.first, 0); + + uint32_t bits; + bool success = SimpleAtoi(p.second, &bits); + + if (!s.ok() || !success || bits > 32) { + return Status::NetworkError("Unable to parse CIDR address", addr); + } + + // Netmask in network byte order + uint32_t netmask = NetworkByteOrder::FromHost32(~(0xffffffff >> bits)); + addr_ = sockaddr.addr().sin_addr.s_addr; + netmask_ = netmask; + return Status::OK(); +} + +Status Network::ParseCIDRStrings(const string& comma_sep_addrs, + vector<Network>* res) { + vector<string> addr_strings = strings::Split(comma_sep_addrs, ",", strings::SkipEmpty()); + for (const string& addr_string : addr_strings) { + Network network; + RETURN_NOT_OK(network.ParseCIDRString(addr_string)); + res->push_back(network); + } + return Status::OK(); +} + +bool IsPrivilegedPort(uint16_t port) { + return port <= 1024 && port != 0; +} + +Status ParseAddressList(const std::string& addr_list, + uint16_t default_port, + std::vector<Sockaddr>* addresses) { + vector<HostPort> host_ports; + RETURN_NOT_OK(HostPort::ParseStrings(addr_list, default_port, &host_ports)); + if (host_ports.empty()) return Status::InvalidArgument("No address specified"); + unordered_set<Sockaddr> uniqued; + for (const HostPort& host_port : host_ports) { + vector<Sockaddr> this_addresses; + RETURN_NOT_OK(host_port.ResolveAddresses(&this_addresses)); + + // Only add the unique ones -- the user may have specified + // some IP addresses in multiple ways + for (const Sockaddr& addr : this_addresses) { + if (InsertIfNotPresent(&uniqued, addr)) { + addresses->push_back(addr); + } else { + LOG(INFO) << "Address " << addr.ToString() << " for " << host_port.ToString() + << " duplicates an earlier resolved entry."; + } + } + } + return Status::OK(); +} + +Status GetHostname(string* hostname) { + TRACE_EVENT0("net", "GetHostname"); + char name[HOST_NAME_MAX]; + int ret = gethostname(name, HOST_NAME_MAX); + if (ret != 0) { + return Status::NetworkError("Unable to determine local hostname", + ErrnoToString(errno), + errno); + } + *hostname = name; + return Status::OK(); +} + +Status GetLocalNetworks(std::vector<Network>* net) { + struct ifaddrs *ifap = nullptr; + + int ret = getifaddrs(&ifap); + auto cleanup = MakeScopedCleanup([&]() { + if (ifap) freeifaddrs(ifap); + }); + + if (ret != 0) { + return Status::NetworkError("Unable to determine local network addresses", + ErrnoToString(errno), + errno); + } + + net->clear(); + for (struct ifaddrs *ifa = ifap; ifa; ifa = ifa->ifa_next) { + if (ifa->ifa_addr == nullptr || ifa->ifa_netmask == nullptr) continue; + + if (ifa->ifa_addr->sa_family == AF_INET) { + Sockaddr addr(*reinterpret_cast<struct sockaddr_in*>(ifa->ifa_addr)); + Sockaddr netmask(*reinterpret_cast<struct sockaddr_in*>(ifa->ifa_netmask)); + Network network(addr.addr().sin_addr.s_addr, netmask.addr().sin_addr.s_addr); + net->push_back(network); + } + } + + return Status::OK(); +} + +Status GetFQDN(string* hostname) { + TRACE_EVENT0("net", "GetFQDN"); + // Start with the non-qualified hostname + RETURN_NOT_OK(GetHostname(hostname)); + + struct addrinfo hints; + memset(&hints, 0, sizeof(hints)); + hints.ai_socktype = SOCK_DGRAM; + hints.ai_flags = AI_CANONNAME; + + struct addrinfo* result; + LOG_SLOW_EXECUTION(WARNING, 200, + Substitute("looking up canonical hostname for localhost " + "(eventual result was $0)", *hostname)) { + TRACE_EVENT0("net", "getaddrinfo"); + int rc = getaddrinfo(hostname->c_str(), nullptr, &hints, &result); + if (rc != 0) { + return Status::NetworkError("Unable to lookup FQDN", ErrnoToString(errno), errno); + } + } + + *hostname = result->ai_canonname; + freeaddrinfo(result); + return Status::OK(); +} + +Status SockaddrFromHostPort(const HostPort& host_port, Sockaddr* addr) { + vector<Sockaddr> addrs; + RETURN_NOT_OK(host_port.ResolveAddresses(&addrs)); + if (addrs.empty()) { + return Status::NetworkError("Unable to resolve address", host_port.ToString()); + } + *addr = addrs[0]; + if (addrs.size() > 1) { + VLOG(1) << "Hostname " << host_port.host() << " resolved to more than one address. " + << "Using address: " << addr->ToString(); + } + return Status::OK(); +} + +Status HostPortFromSockaddrReplaceWildcard(const Sockaddr& addr, HostPort* hp) { + string host; + if (addr.IsWildcard()) { + RETURN_NOT_OK(GetFQDN(&host)); + } else { + host = addr.host(); + } + hp->set_host(host); + hp->set_port(addr.port()); + return Status::OK(); +} + +void TryRunLsof(const Sockaddr& addr, vector<string>* log) { +#if defined(__APPLE__) + string cmd = strings::Substitute( + "lsof -n -i 'TCP:$0' -sTCP:LISTEN ; " + "for pid in $$(lsof -F p -n -i 'TCP:$0' -sTCP:LISTEN | cut -f 2 -dp) ; do" + " pstree $$pid || ps h -p $$pid;" + "done", + addr.port()); +#else + // Little inline bash script prints the full ancestry of any pid listening + // on the same port as 'addr'. We could use 'pstree -s', but that option + // doesn't exist on el6. + string cmd = strings::Substitute( + "export PATH=$$PATH:/usr/sbin ; " + "lsof -n -i 'TCP:$0' -sTCP:LISTEN ; " + "for pid in $$(lsof -F p -n -i 'TCP:$0' -sTCP:LISTEN | grep p | cut -f 2 -dp) ; do" + " while [ $$pid -gt 1 ] ; do" + " ps h -fp $$pid ;" + " stat=($$(</proc/$$pid/stat)) ;" + " pid=$${stat[3]} ;" + " done ; " + "done", + addr.port()); +#endif // defined(__APPLE__) + LOG_STRING(WARNING, log) + << "Trying to use lsof to find any processes listening on " + << addr.ToString(); + LOG_STRING(INFO, log) << "$ " << cmd; + vector<string> argv = { "bash", "-c", cmd }; + string results; + Status s = Subprocess::Call(argv, "", &results); + if (PREDICT_FALSE(!s.ok())) { + LOG_STRING(WARNING, log) << s.ToString(); + } + LOG_STRING(WARNING, log) << results; +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/net/net_util.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/net/net_util.h b/be/src/kudu/util/net/net_util.h new file mode 100644 index 0000000..b246c5e --- /dev/null +++ b/be/src/kudu/util/net/net_util.h @@ -0,0 +1,164 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_NET_NET_UTIL_H +#define KUDU_UTIL_NET_NET_UTIL_H + +#include <string> +#include <vector> + +#include "kudu/util/status.h" + +namespace kudu { + +class Sockaddr; + +// A container for a host:port pair. +class HostPort { + public: + HostPort(); + HostPort(std::string host, uint16_t port); + explicit HostPort(const Sockaddr& addr); + + bool Initialized() const { + return !host_.empty(); + } + + // Parse a "host:port" pair into this object. + // If there is no port specified in the string, then 'default_port' is used. + Status ParseString(const std::string& str, uint16_t default_port); + + // Resolve any addresses corresponding to this host:port pair. + // Note that a host may resolve to more than one IP address. + // + // 'addresses' may be NULL, in which case this function simply checks that + // the host/port pair can be resolved, without returning anything. + Status ResolveAddresses(std::vector<Sockaddr>* addresses) const; + + std::string ToString() const; + + const std::string& host() const { return host_; } + void set_host(const std::string& host) { host_ = host; } + + uint16_t port() const { return port_; } + void set_port(uint16_t port) { port_ = port; } + + size_t HashCode() const; + + // Parse a comma separated list of "host:port" pairs into a vector + // HostPort objects. If no port is specified for an entry in the + // comma separated list, 'default_port' is used for that entry's + // pair. + static Status ParseStrings( + const std::string& comma_sep_addrs, uint16_t default_port, std::vector<HostPort>* res); + + // Takes a vector of HostPort objects and returns a comma separated + // string containing of "host:port" pairs. This method is the + // "inverse" of ParseStrings(). + static std::string ToCommaSeparatedString(const std::vector<HostPort>& host_ports); + + private: + std::string host_; + uint16_t port_; +}; + +bool operator==(const HostPort& hp1, const HostPort& hp2); + +// Hasher of HostPort objects for UnorderedAssociativeContainers. +struct HostPortHasher { + size_t operator()(const HostPort& hp) const { + return hp.HashCode(); + } +}; + +// Equality BinaryPredicate of HostPort objects for UnorderedAssociativeContainers. +struct HostPortEqualityPredicate { + bool operator()(const HostPort& hp1, const HostPort& hp2) const { + return hp1 == hp2; + } +}; + +// A container for addr:mask pair. +// Both addr and netmask are in big-endian byte order +// (same as network byte order). +class Network { + public: + Network(); + Network(uint32_t addr, uint32_t netmask); + + uint32_t addr() const { return addr_; } + + uint32_t netmask() const { return netmask_; } + + // Returns true if the address is within network. + bool WithinNetwork(const Sockaddr& addr) const; + + // Parses a "addr/netmask" (CIDR notation) pair into this object. + Status ParseCIDRString(const std::string& addr); + + // Parses a comma separated list of "addr/netmask" (CIDR notation) + // pairs into a vector of Network objects. + static Status ParseCIDRStrings( + const std::string& comma_sep_addrs, std::vector<Network>* res); + private: + uint32_t addr_; + uint32_t netmask_; +}; + +// Parse and resolve the given comma-separated list of addresses. +// +// The resulting addresses will be resolved, made unique, and added to +// the 'addresses' vector. +// +// Any elements which do not include a port will be assigned 'default_port'. +Status ParseAddressList(const std::string& addr_list, + uint16_t default_port, + std::vector<Sockaddr>* addresses); + +// Return true if the given port is likely to need root privileges to bind to. +bool IsPrivilegedPort(uint16_t port); + +// Return the local machine's hostname. +Status GetHostname(std::string* hostname); + +// Returns local subnets of all local network interfaces. +Status GetLocalNetworks(std::vector<Network>* net); + +// Return the local machine's FQDN. +Status GetFQDN(std::string* hostname); + +// Returns a single socket address from a HostPort. +// If the hostname resolves to multiple addresses, returns the first in the +// list and logs a message in verbose mode. +Status SockaddrFromHostPort(const HostPort& host_port, Sockaddr* addr); + +// Converts the given Sockaddr into a HostPort, substituting the FQDN +// in the case that the provided address is the wildcard. +// +// In the case of other addresses, the returned HostPort will contain just the +// stringified form of the IP. +Status HostPortFromSockaddrReplaceWildcard(const Sockaddr& addr, HostPort* hp); + +// Try to run 'lsof' to determine which process is preventing binding to +// the given 'addr'. If pids can be determined, outputs full 'ps' and 'pstree' +// output for that process. +// +// Output is issued to the log at WARNING level, or appended to 'log' if it +// is non-NULL (mostly useful for testing). +void TryRunLsof(const Sockaddr& addr, std::vector<std::string>* log = NULL); + +} // namespace kudu +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/net/sockaddr.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/net/sockaddr.cc b/be/src/kudu/util/net/sockaddr.cc new file mode 100644 index 0000000..ed249c7 --- /dev/null +++ b/be/src/kudu/util/net/sockaddr.cc @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/net/sockaddr.h" + +#include <arpa/inet.h> +#include <errno.h> +#include <netdb.h> +#include <stdio.h> +#include <string.h> +#include <string> + +#include "kudu/gutil/endian.h" +#include "kudu/gutil/hash/builtin_type_hash.h" +#include "kudu/gutil/macros.h" +#include "kudu/gutil/stringprintf.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/net/net_util.h" +#include "kudu/util/stopwatch.h" + +namespace kudu { + +using strings::Substitute; + +/// +/// Sockaddr +/// +Sockaddr::Sockaddr() { + memset(&addr_, 0, sizeof(addr_)); + addr_.sin_family = AF_INET; + addr_.sin_addr.s_addr = INADDR_ANY; +} + +Sockaddr::Sockaddr(const struct sockaddr_in& addr) { + memcpy(&addr_, &addr, sizeof(struct sockaddr_in)); +} + +Status Sockaddr::ParseString(const std::string& s, uint16_t default_port) { + HostPort hp; + RETURN_NOT_OK(hp.ParseString(s, default_port)); + + if (inet_pton(AF_INET, hp.host().c_str(), &addr_.sin_addr) != 1) { + return Status::InvalidArgument("Invalid IP address", hp.host()); + } + set_port(hp.port()); + return Status::OK(); +} + +Sockaddr& Sockaddr::operator=(const struct sockaddr_in &addr) { + memcpy(&addr_, &addr, sizeof(struct sockaddr_in)); + return *this; +} + +bool Sockaddr::operator==(const Sockaddr& other) const { + return memcmp(&other.addr_, &addr_, sizeof(addr_)) == 0; +} + +bool Sockaddr::operator<(const Sockaddr &rhs) const { + return addr_.sin_addr.s_addr < rhs.addr_.sin_addr.s_addr; +} + +uint32_t Sockaddr::HashCode() const { + uint32_t hash = Hash32NumWithSeed(addr_.sin_addr.s_addr, 0); + hash = Hash32NumWithSeed(addr_.sin_port, hash); + return hash; +} + +void Sockaddr::set_port(int port) { + addr_.sin_port = htons(port); +} + +int Sockaddr::port() const { + return ntohs(addr_.sin_port); +} + +std::string Sockaddr::host() const { + char str[INET_ADDRSTRLEN]; + ::inet_ntop(AF_INET, &addr_.sin_addr, str, INET_ADDRSTRLEN); + return str; +} + +const struct sockaddr_in& Sockaddr::addr() const { + return addr_; +} + +std::string Sockaddr::ToString() const { + char str[INET_ADDRSTRLEN]; + ::inet_ntop(AF_INET, &addr_.sin_addr, str, INET_ADDRSTRLEN); + return StringPrintf("%s:%d", str, port()); +} + +bool Sockaddr::IsWildcard() const { + return addr_.sin_addr.s_addr == 0; +} + +bool Sockaddr::IsAnyLocalAddress() const { + return (NetworkByteOrder::FromHost32(addr_.sin_addr.s_addr) >> 24) == 127; +} + +Status Sockaddr::LookupHostname(string* hostname) const { + char host[NI_MAXHOST]; + int flags = 0; + + int rc; + LOG_SLOW_EXECUTION(WARNING, 200, + Substitute("DNS reverse-lookup for $0", ToString())) { + rc = getnameinfo((struct sockaddr *) &addr_, sizeof(sockaddr_in), + host, NI_MAXHOST, + nullptr, 0, flags); + } + if (PREDICT_FALSE(rc != 0)) { + if (rc == EAI_SYSTEM) { + int errno_saved = errno; + return Status::NetworkError(Substitute("getnameinfo: $0", gai_strerror(rc)), + strerror(errno_saved), errno_saved); + } + return Status::NetworkError("getnameinfo", gai_strerror(rc), rc); + } + *hostname = host; + return Status::OK(); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/net/sockaddr.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/net/sockaddr.h b/be/src/kudu/util/net/sockaddr.h new file mode 100644 index 0000000..09777f3 --- /dev/null +++ b/be/src/kudu/util/net/sockaddr.h @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_NET_SOCKADDR_H +#define KUDU_UTIL_NET_SOCKADDR_H + +#include <netinet/in.h> +#include <iosfwd> +#include <string> + +#include "kudu/util/status.h" + +namespace kudu { + +/// +/// Represents a sockaddr. +/// +/// Currently only IPv4 is implemented. When IPv6 and UNIX domain are +/// implemented, this should become an abstract base class and those should be +/// multiple implementations. +/// +class Sockaddr { + public: + Sockaddr(); + explicit Sockaddr(const struct sockaddr_in &addr); + + // Parse a string IP address of the form "A.B.C.D:port", storing the result + // in this Sockaddr object. If no ':port' is specified, uses 'default_port'. + // Note that this function will not handle resolving hostnames. + // + // Returns a bad Status if the input is malformed. + Status ParseString(const std::string& s, uint16_t default_port); + + Sockaddr& operator=(const struct sockaddr_in &addr); + + bool operator==(const Sockaddr& other) const; + + // Compare the endpoints of two sockaddrs. + // The port number is ignored in this comparison. + bool operator<(const Sockaddr &rhs) const; + + uint32_t HashCode() const; + + std::string host() const; + + void set_port(int port); + int port() const; + const struct sockaddr_in& addr() const; + std::string ToString() const; + + // Returns true if the address is 0.0.0.0 + bool IsWildcard() const; + + // Returns true if the address is 127.*.*.* + bool IsAnyLocalAddress() const; + + // Does reverse DNS lookup of the address and stores it in hostname. + Status LookupHostname(std::string* hostname) const; + + // the default auto-generated copy constructor is fine here + private: + struct sockaddr_in addr_; +}; + +} // namespace kudu + +// Specialize std::hash for Sockaddr +namespace std { +template<> +struct hash<kudu::Sockaddr> { + int operator()(const kudu::Sockaddr& addr) const { + return addr.HashCode(); + } +}; +} // namespace std +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/net/socket.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/net/socket.cc b/be/src/kudu/util/net/socket.cc new file mode 100644 index 0000000..e0bea14 --- /dev/null +++ b/be/src/kudu/util/net/socket.cc @@ -0,0 +1,582 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/net/socket.h" + +#include <errno.h> +#include <fcntl.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <unistd.h> + +#include <limits> +#include <string> + +#include <glog/logging.h> + +#include "kudu/gutil/basictypes.h" +#include "kudu/gutil/stringprintf.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/debug/trace_event.h" +#include "kudu/util/errno.h" +#include "kudu/util/flag_tags.h" +#include "kudu/util/monotime.h" +#include "kudu/util/net/net_util.h" +#include "kudu/util/net/sockaddr.h" +#include "kudu/util/random.h" +#include "kudu/util/random_util.h" +#include "kudu/util/subprocess.h" + +DEFINE_string(local_ip_for_outbound_sockets, "", + "IP to bind to when making outgoing socket connections. " + "This must be an IP address of the form A.B.C.D, not a hostname. " + "Advanced parameter, subject to change."); +TAG_FLAG(local_ip_for_outbound_sockets, experimental); + +DEFINE_bool(socket_inject_short_recvs, false, + "Inject short recv() responses which return less data than " + "requested"); +TAG_FLAG(socket_inject_short_recvs, hidden); +TAG_FLAG(socket_inject_short_recvs, unsafe); + +namespace kudu { + +Socket::Socket() + : fd_(-1) { +} + +Socket::Socket(int fd) + : fd_(fd) { +} + +void Socket::Reset(int fd) { + ignore_result(Close()); + fd_ = fd; +} + +int Socket::Release() { + int fd = fd_; + fd_ = -1; + return fd; +} + +Socket::~Socket() { + ignore_result(Close()); +} + +Status Socket::Close() { + if (fd_ < 0) { + return Status::OK(); + } + int fd = fd_; + if (::close(fd) < 0) { + int err = errno; + return Status::NetworkError(std::string("close error: ") + + ErrnoToString(err), Slice(), err); + } + fd_ = -1; + return Status::OK(); +} + +Status Socket::Shutdown(bool shut_read, bool shut_write) { + DCHECK_GE(fd_, 0); + int flags = 0; + if (shut_read && shut_write) { + flags |= SHUT_RDWR; + } else if (shut_read) { + flags |= SHUT_RD; + } else if (shut_write) { + flags |= SHUT_WR; + } + if (::shutdown(fd_, flags) < 0) { + int err = errno; + return Status::NetworkError(std::string("shutdown error: ") + + ErrnoToString(err), Slice(), err); + } + return Status::OK(); +} + +int Socket::GetFd() const { + return fd_; +} + +bool Socket::IsTemporarySocketError(int err) { + return ((err == EAGAIN) || (err == EWOULDBLOCK) || (err == EINTR)); +} + +#if defined(__linux__) + +Status Socket::Init(int flags) { + int nonblocking_flag = (flags & FLAG_NONBLOCKING) ? SOCK_NONBLOCK : 0; + Reset(::socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC | nonblocking_flag, 0)); + if (fd_ < 0) { + int err = errno; + return Status::NetworkError(std::string("error opening socket: ") + + ErrnoToString(err), Slice(), err); + } + + return Status::OK(); +} + +#else + +Status Socket::Init(int flags) { + Reset(::socket(AF_INET, SOCK_STREAM, 0)); + if (fd_ < 0) { + int err = errno; + return Status::NetworkError(std::string("error opening socket: ") + + ErrnoToString(err), Slice(), err); + } + RETURN_NOT_OK(SetNonBlocking(flags & FLAG_NONBLOCKING)); + RETURN_NOT_OK(SetCloseOnExec()); + + // Disable SIGPIPE. + int set = 1; + if (setsockopt(fd_, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof(set)) == -1) { + int err = errno; + return Status::NetworkError(std::string("failed to set SO_NOSIGPIPE: ") + + ErrnoToString(err), Slice(), err); + } + + return Status::OK(); +} + +#endif // defined(__linux__) + +Status Socket::SetNoDelay(bool enabled) { + int flag = enabled ? 1 : 0; + if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) == -1) { + int err = errno; + return Status::NetworkError(std::string("failed to set TCP_NODELAY: ") + + ErrnoToString(err), Slice(), err); + } + return Status::OK(); +} + +Status Socket::SetTcpCork(bool enabled) { +#if defined(__linux__) + int flag = enabled ? 1 : 0; + if (setsockopt(fd_, IPPROTO_TCP, TCP_CORK, &flag, sizeof(flag)) == -1) { + int err = errno; + return Status::NetworkError(std::string("failed to set TCP_CORK: ") + + ErrnoToString(err), Slice(), err); + } +#endif // defined(__linux__) + // TODO: Use TCP_NOPUSH for OSX if perf becomes an issue. + return Status::OK(); +} + +Status Socket::SetNonBlocking(bool enabled) { + int curflags = ::fcntl(fd_, F_GETFL, 0); + if (curflags == -1) { + int err = errno; + return Status::NetworkError( + StringPrintf("Failed to get file status flags on fd %d", fd_), + ErrnoToString(err), err); + } + int newflags = (enabled) ? (curflags | O_NONBLOCK) : (curflags & ~O_NONBLOCK); + if (::fcntl(fd_, F_SETFL, newflags) == -1) { + int err = errno; + if (enabled) { + return Status::NetworkError( + StringPrintf("Failed to set O_NONBLOCK on fd %d", fd_), + ErrnoToString(err), err); + } else { + return Status::NetworkError( + StringPrintf("Failed to clear O_NONBLOCK on fd %d", fd_), + ErrnoToString(err), err); + } + } + return Status::OK(); +} + +Status Socket::IsNonBlocking(bool* is_nonblock) const { + int curflags = ::fcntl(fd_, F_GETFL, 0); + if (curflags == -1) { + int err = errno; + return Status::NetworkError( + StringPrintf("Failed to get file status flags on fd %d", fd_), + ErrnoToString(err), err); + } + *is_nonblock = ((curflags & O_NONBLOCK) != 0); + return Status::OK(); +} + +Status Socket::SetCloseOnExec() { + int curflags = fcntl(fd_, F_GETFD, 0); + if (curflags == -1) { + int err = errno; + Reset(-1); + return Status::NetworkError(std::string("fcntl(F_GETFD) error: ") + + ErrnoToString(err), Slice(), err); + } + if (fcntl(fd_, F_SETFD, curflags | FD_CLOEXEC) == -1) { + int err = errno; + Reset(-1); + return Status::NetworkError(std::string("fcntl(F_SETFD) error: ") + + ErrnoToString(err), Slice(), err); + } + return Status::OK(); +} + +Status Socket::SetSendTimeout(const MonoDelta& timeout) { + return SetTimeout(SO_SNDTIMEO, "SO_SNDTIMEO", timeout); +} + +Status Socket::SetRecvTimeout(const MonoDelta& timeout) { + return SetTimeout(SO_RCVTIMEO, "SO_RCVTIMEO", timeout); +} + +Status Socket::SetReuseAddr(bool flag) { + int err; + int int_flag = flag ? 1 : 0; + if (setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &int_flag, sizeof(int_flag)) == -1) { + err = errno; + return Status::NetworkError(std::string("failed to set SO_REUSEADDR: ") + + ErrnoToString(err), Slice(), err); + } + return Status::OK(); +} + +Status Socket::BindAndListen(const Sockaddr &sockaddr, + int listenQueueSize) { + RETURN_NOT_OK(SetReuseAddr(true)); + RETURN_NOT_OK(Bind(sockaddr)); + RETURN_NOT_OK(Listen(listenQueueSize)); + return Status::OK(); +} + +Status Socket::Listen(int listen_queue_size) { + if (listen(fd_, listen_queue_size)) { + int err = errno; + return Status::NetworkError("listen() error", ErrnoToString(err)); + } + return Status::OK(); +} + +Status Socket::GetSocketAddress(Sockaddr *cur_addr) const { + struct sockaddr_in sin; + socklen_t len = sizeof(sin); + DCHECK_GE(fd_, 0); + if (::getsockname(fd_, (struct sockaddr *)&sin, &len) == -1) { + int err = errno; + return Status::NetworkError(string("getsockname error: ") + + ErrnoToString(err), Slice(), err); + } + *cur_addr = sin; + return Status::OK(); +} + +Status Socket::GetPeerAddress(Sockaddr *cur_addr) const { + struct sockaddr_in sin; + socklen_t len = sizeof(sin); + DCHECK_GE(fd_, 0); + if (::getpeername(fd_, (struct sockaddr *)&sin, &len) == -1) { + int err = errno; + return Status::NetworkError(string("getpeername error: ") + + ErrnoToString(err), Slice(), err); + } + *cur_addr = sin; + return Status::OK(); +} + +bool Socket::IsLoopbackConnection() const { + Sockaddr local, remote; + if (!GetSocketAddress(&local).ok()) return false; + if (!GetPeerAddress(&remote).ok()) return false; + + // Compare without comparing ports. + local.set_port(0); + remote.set_port(0); + return local == remote; +} + +Status Socket::Bind(const Sockaddr& bind_addr) { + struct sockaddr_in addr = bind_addr.addr(); + + DCHECK_GE(fd_, 0); + if (PREDICT_FALSE(::bind(fd_, (struct sockaddr*) &addr, sizeof(addr)))) { + int err = errno; + Status s = Status::NetworkError( + strings::Substitute("error binding socket to $0: $1", + bind_addr.ToString(), ErrnoToString(err)), + Slice(), err); + + if (s.IsNetworkError() && s.posix_code() == EADDRINUSE && bind_addr.port() != 0) { + TryRunLsof(bind_addr); + } + return s; + } + + return Status::OK(); +} + +Status Socket::Accept(Socket *new_conn, Sockaddr *remote, int flags) { + TRACE_EVENT0("net", "Socket::Accept"); + struct sockaddr_in addr; + socklen_t olen = sizeof(addr); + DCHECK_GE(fd_, 0); +#if defined(__linux__) + int accept_flags = SOCK_CLOEXEC; + if (flags & FLAG_NONBLOCKING) { + accept_flags |= SOCK_NONBLOCK; + } + new_conn->Reset(::accept4(fd_, (struct sockaddr*)&addr, + &olen, accept_flags)); + if (new_conn->GetFd() < 0) { + int err = errno; + return Status::NetworkError(std::string("accept4(2) error: ") + + ErrnoToString(err), Slice(), err); + } +#else + new_conn->Reset(::accept(fd_, (struct sockaddr*)&addr, &olen)); + if (new_conn->GetFd() < 0) { + int err = errno; + return Status::NetworkError(std::string("accept(2) error: ") + + ErrnoToString(err), Slice(), err); + } + RETURN_NOT_OK(new_conn->SetNonBlocking(flags & FLAG_NONBLOCKING)); + RETURN_NOT_OK(new_conn->SetCloseOnExec()); +#endif // defined(__linux__) + + *remote = addr; + TRACE_EVENT_INSTANT1("net", "Accepted", TRACE_EVENT_SCOPE_THREAD, + "remote", remote->ToString()); + return Status::OK(); +} + +Status Socket::BindForOutgoingConnection() { + Sockaddr bind_host; + Status s = bind_host.ParseString(FLAGS_local_ip_for_outbound_sockets, 0); + CHECK(s.ok() && bind_host.port() == 0) + << "Invalid local IP set for 'local_ip_for_outbound_sockets': '" + << FLAGS_local_ip_for_outbound_sockets << "': " << s.ToString(); + + RETURN_NOT_OK(Bind(bind_host)); + return Status::OK(); +} + +Status Socket::Connect(const Sockaddr &remote) { + TRACE_EVENT1("net", "Socket::Connect", + "remote", remote.ToString()); + if (PREDICT_FALSE(!FLAGS_local_ip_for_outbound_sockets.empty())) { + RETURN_NOT_OK(BindForOutgoingConnection()); + } + + struct sockaddr_in addr; + memcpy(&addr, &remote.addr(), sizeof(sockaddr_in)); + DCHECK_GE(fd_, 0); + if (::connect(fd_, (const struct sockaddr*)&addr, sizeof(addr)) < 0) { + int err = errno; + return Status::NetworkError(std::string("connect(2) error: ") + + ErrnoToString(err), Slice(), err); + } + return Status::OK(); +} + +Status Socket::GetSockError() const { + int val = 0, ret; + socklen_t val_len = sizeof(val); + DCHECK_GE(fd_, 0); + ret = ::getsockopt(fd_, SOL_SOCKET, SO_ERROR, &val, &val_len); + if (ret) { + int err = errno; + return Status::NetworkError(std::string("getsockopt(SO_ERROR) failed: ") + + ErrnoToString(err), Slice(), err); + } + if (val != 0) { + return Status::NetworkError(ErrnoToString(val), Slice(), val); + } + return Status::OK(); +} + +Status Socket::Write(const uint8_t *buf, int32_t amt, int32_t *nwritten) { + if (amt <= 0) { + return Status::NetworkError( + StringPrintf("invalid send of %" PRId32 " bytes", + amt), Slice(), EINVAL); + } + DCHECK_GE(fd_, 0); + int res = ::send(fd_, buf, amt, MSG_NOSIGNAL); + if (res < 0) { + int err = errno; + return Status::NetworkError(std::string("write error: ") + + ErrnoToString(err), Slice(), err); + } + *nwritten = res; + return Status::OK(); +} + +Status Socket::Writev(const struct ::iovec *iov, int iov_len, + int32_t *nwritten) { + if (PREDICT_FALSE(iov_len <= 0)) { + return Status::NetworkError( + StringPrintf("writev: invalid io vector length of %d", + iov_len), + Slice(), EINVAL); + } + DCHECK_GE(fd_, 0); + + struct msghdr msg; + memset(&msg, 0, sizeof(struct msghdr)); + msg.msg_iov = const_cast<iovec *>(iov); + msg.msg_iovlen = iov_len; + int res = ::sendmsg(fd_, &msg, MSG_NOSIGNAL); + if (PREDICT_FALSE(res < 0)) { + int err = errno; + return Status::NetworkError(std::string("sendmsg error: ") + + ErrnoToString(err), Slice(), err); + } + + *nwritten = res; + return Status::OK(); +} + +// Mostly follows writen() from Stevens (2004) or Kerrisk (2010). +Status Socket::BlockingWrite(const uint8_t *buf, size_t buflen, size_t *nwritten, + const MonoTime& deadline) { + DCHECK_LE(buflen, std::numeric_limits<int32_t>::max()) << "Writes > INT32_MAX not supported"; + DCHECK(nwritten); + + size_t tot_written = 0; + while (tot_written < buflen) { + int32_t inc_num_written = 0; + int32_t num_to_write = buflen - tot_written; + MonoDelta timeout = deadline - MonoTime::Now(); + if (PREDICT_FALSE(timeout.ToNanoseconds() <= 0)) { + return Status::TimedOut("BlockingWrite timed out"); + } + RETURN_NOT_OK(SetSendTimeout(timeout)); + Status s = Write(buf, num_to_write, &inc_num_written); + tot_written += inc_num_written; + buf += inc_num_written; + *nwritten = tot_written; + + if (PREDICT_FALSE(!s.ok())) { + // Continue silently when the syscall is interrupted. + if (s.posix_code() == EINTR) { + continue; + } + if (s.posix_code() == EAGAIN) { + return Status::TimedOut(""); + } + return s.CloneAndPrepend("BlockingWrite error"); + } + if (PREDICT_FALSE(inc_num_written == 0)) { + // Shouldn't happen on Linux with a blocking socket. Maybe other Unices. + break; + } + } + + if (tot_written < buflen) { + return Status::IOError("Wrote zero bytes on a BlockingWrite() call", + StringPrintf("Transferred %zu of %zu bytes", tot_written, buflen)); + } + return Status::OK(); +} + +Status Socket::Recv(uint8_t *buf, int32_t amt, int32_t *nread) { + if (amt <= 0) { + return Status::NetworkError( + StringPrintf("invalid recv of %d bytes", amt), Slice(), EINVAL); + } + + // The recv() call can return fewer than the requested number of bytes. + // Especially when 'amt' is small, this is very unlikely to happen in + // the context of unit tests. So, we provide an injection hook which + // simulates the same behavior. + if (PREDICT_FALSE(FLAGS_socket_inject_short_recvs && amt > 1)) { + Random r(GetRandomSeed32()); + amt = 1 + r.Uniform(amt - 1); + } + + DCHECK_GE(fd_, 0); + int res = ::recv(fd_, buf, amt, 0); + if (res <= 0) { + if (res == 0) { + return Status::NetworkError("Recv() got EOF from remote", Slice(), ESHUTDOWN); + } + int err = errno; + return Status::NetworkError(std::string("recv error: ") + + ErrnoToString(err), Slice(), err); + } + *nread = res; + return Status::OK(); +} + +// Mostly follows readn() from Stevens (2004) or Kerrisk (2010). +// One place where we deviate: we consider EOF a failure if < amt bytes are read. +Status Socket::BlockingRecv(uint8_t *buf, size_t amt, size_t *nread, const MonoTime& deadline) { + DCHECK_LE(amt, std::numeric_limits<int32_t>::max()) << "Reads > INT32_MAX not supported"; + DCHECK(nread); + size_t tot_read = 0; + while (tot_read < amt) { + int32_t inc_num_read = 0; + int32_t num_to_read = amt - tot_read; + MonoDelta timeout = deadline - MonoTime::Now(); + if (PREDICT_FALSE(timeout.ToNanoseconds() <= 0)) { + return Status::TimedOut(""); + } + RETURN_NOT_OK(SetRecvTimeout(timeout)); + Status s = Recv(buf, num_to_read, &inc_num_read); + tot_read += inc_num_read; + buf += inc_num_read; + *nread = tot_read; + + if (PREDICT_FALSE(!s.ok())) { + // Continue silently when the syscall is interrupted. + if (s.posix_code() == EINTR) { + continue; + } + if (s.posix_code() == EAGAIN) { + return Status::TimedOut(""); + } + return s.CloneAndPrepend("BlockingRecv error"); + } + if (PREDICT_FALSE(inc_num_read == 0)) { + // EOF. + break; + } + } + + if (PREDICT_FALSE(tot_read < amt)) { + return Status::IOError("Read zero bytes on a blocking Recv() call", + StringPrintf("Transferred %zu of %zu bytes", tot_read, amt)); + } + return Status::OK(); +} + +Status Socket::SetTimeout(int opt, std::string optname, const MonoDelta& timeout) { + if (PREDICT_FALSE(timeout.ToNanoseconds() < 0)) { + return Status::InvalidArgument("Timeout specified as negative to SetTimeout", + timeout.ToString()); + } + struct timeval tv; + timeout.ToTimeVal(&tv); + socklen_t optlen = sizeof(tv); + if (::setsockopt(fd_, SOL_SOCKET, opt, &tv, optlen) == -1) { + int err = errno; + return Status::NetworkError( + StringPrintf("Failed to set %s to %s", optname.c_str(), timeout.ToString().c_str()), + ErrnoToString(err), err); + } + return Status::OK(); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/net/socket.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/net/socket.h b/be/src/kudu/util/net/socket.h new file mode 100644 index 0000000..ce5b7bb --- /dev/null +++ b/be/src/kudu/util/net/socket.h @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_NET_SOCKET_H +#define KUDU_UTIL_NET_SOCKET_H + +#include <sys/uio.h> +#include <string> + +#include "kudu/gutil/macros.h" +#include "kudu/util/status.h" + +namespace kudu { + +class MonoDelta; +class MonoTime; +class Sockaddr; + +class Socket { + public: + static const int FLAG_NONBLOCKING = 0x1; + + // Create a new invalid Socket object. + Socket(); + + // Start managing a socket. + explicit Socket(int fd); + + // Close the socket. Errors will be ignored. + virtual ~Socket(); + + // Close the Socket, checking for errors. + virtual Status Close(); + + // call shutdown() on the socket + Status Shutdown(bool shut_read, bool shut_write); + + // Start managing a socket. + void Reset(int fd); + + // Stop managing the socket and return it. + int Release(); + + // Get the raw file descriptor, or -1 if there is no file descriptor being + // managed. + int GetFd() const; + + // Returns true if the error is temporary and will go away if we retry on + // the socket. + static bool IsTemporarySocketError(int err); + + Status Init(int flags); // See FLAG_NONBLOCKING + + // Set or clear TCP_NODELAY + Status SetNoDelay(bool enabled); + + // Set or clear TCP_CORK + Status SetTcpCork(bool enabled); + + // Set or clear O_NONBLOCK + Status SetNonBlocking(bool enabled); + Status IsNonBlocking(bool* is_nonblock) const; + + // Set SO_SENDTIMEO to the specified value. Should only be used for blocking sockets. + Status SetSendTimeout(const MonoDelta& timeout); + + // Set SO_RCVTIMEO to the specified value. Should only be used for blocking sockets. + Status SetRecvTimeout(const MonoDelta& timeout); + + // Sets SO_REUSEADDR to 'flag'. Should be used prior to Bind(). + Status SetReuseAddr(bool flag); + + // Convenience method to invoke the common sequence: + // 1) SetReuseAddr(true) + // 2) Bind() + // 3) Listen() + Status BindAndListen(const Sockaddr &sockaddr, int listen_queue_size); + + // Start listening for new connections, with the given backlog size. + // Requires that the socket has already been bound using Bind(). + Status Listen(int listen_queue_size); + + // Call getsockname to get the address of this socket. + Status GetSocketAddress(Sockaddr *cur_addr) const; + + // Call getpeername to get the address of the connected peer. + // It is virtual so that tests can override. + virtual Status GetPeerAddress(Sockaddr *cur_addr) const; + + // Return true if this socket is determined to be a loopback connection + // (i.e. the local and remote peer share an IP address). + // + // If any error occurs while determining this, returns false. + bool IsLoopbackConnection() const; + + // Call bind() to bind the socket to a given address. + // If bind() fails and indicates that the requested port is already in use, + // generates an informative log message by calling 'lsof' if available. + Status Bind(const Sockaddr& bind_addr); + + // Call accept(2) to get a new connection. + Status Accept(Socket *new_conn, Sockaddr *remote, int flags); + + // start connecting this socket to a remote address. + Status Connect(const Sockaddr &remote); + + // get the error status using getsockopt(2) + Status GetSockError() const; + + virtual Status Write(const uint8_t *buf, int32_t amt, int32_t *nwritten); + + virtual Status Writev(const struct ::iovec *iov, int iov_len, int32_t *nwritten); + + // Blocking Write call, returns IOError unless full buffer is sent. + // Underlying Socket expected to be in blocking mode. Fails if any Write() sends 0 bytes. + // Returns OK if buflen bytes were sent, otherwise IOError. + // Upon return, nwritten will contain the number of bytes actually written. + // See also writen() from Stevens (2004) or Kerrisk (2010) + Status BlockingWrite(const uint8_t *buf, size_t buflen, size_t *nwritten, + const MonoTime& deadline); + + virtual Status Recv(uint8_t *buf, int32_t amt, int32_t *nread); + + // Blocking Recv call, returns IOError unless requested amt bytes are read. + // Underlying Socket expected to be in blocking mode. Fails if any Recv() reads 0 bytes. + // Returns OK if amt bytes were read, otherwise IOError. + // Upon return, nread will contain the number of bytes actually read. + // See also readn() from Stevens (2004) or Kerrisk (2010) + Status BlockingRecv(uint8_t *buf, size_t amt, size_t *nread, const MonoTime& deadline); + + private: + // Called internally from SetSend/RecvTimeout(). + Status SetTimeout(int opt, std::string optname, const MonoDelta& timeout); + + // Called internally during socket setup. + Status SetCloseOnExec(); + + // Bind the socket to a local address before making an outbound connection, + // based on the value of FLAGS_local_ip_for_outbound_sockets. + Status BindForOutgoingConnection(); + + int fd_; + + DISALLOW_COPY_AND_ASSIGN(Socket); +}; + +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/nvm_cache.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/nvm_cache.cc b/be/src/kudu/util/nvm_cache.cc new file mode 100644 index 0000000..678aa87 --- /dev/null +++ b/be/src/kudu/util/nvm_cache.cc @@ -0,0 +1,578 @@ +// This file is derived from cache.cc in the LevelDB project: +// +// Some portions copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// ------------------------------------------------------------ +// This file implements a cache based on the NVML library (http://pmem.io), +// specifically its "libvmem" component. This library makes it easy to program +// against persistent memory hardware by exposing an API which parallels +// malloc/free, but allocates from persistent memory instead of DRAM. +// +// We use this API to implement a cache which treats persistent memory or +// non-volatile memory as if it were a larger cheaper bank of volatile memory. We +// currently make no use of its persistence properties. +// +// Currently, we only store key/value in NVM. All other data structures such as the +// ShardedLRUCache instances, hash table, etc are in DRAM. The assumption is that +// the ratio of data stored vs overhead is quite high. + +#include "kudu/util/nvm_cache.h" + +#include <gflags/gflags.h> +#include <glog/logging.h> +#include <iostream> +#include <libvmem.h> +#include <memory> +#include <mutex> +#include <stdlib.h> +#include <string> +#include <vector> + +#include "kudu/gutil/atomic_refcount.h" +#include "kudu/gutil/hash/city.h" +#include "kudu/gutil/stl_util.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/atomic.h" +#include "kudu/util/cache.h" +#include "kudu/util/cache_metrics.h" +#include "kudu/util/flags.h" +#include "kudu/util/flag_tags.h" +#include "kudu/util/locks.h" +#include "kudu/util/metrics.h" + +DEFINE_string(nvm_cache_path, "/vmem", + "The path at which the NVM cache will try to allocate its memory. " + "This can be a tmpfs or ramfs for testing purposes."); +TAG_FLAG(nvm_cache_path, experimental); + +DEFINE_int32(nvm_cache_allocation_retry_count, 10, + "The number of times that the NVM cache will retry attempts to allocate " + "memory for new entries. In between attempts, a cache entry will be " + "evicted."); +TAG_FLAG(nvm_cache_allocation_retry_count, advanced); +TAG_FLAG(nvm_cache_allocation_retry_count, experimental); + +DEFINE_bool(nvm_cache_simulate_allocation_failure, false, + "If true, the NVM cache will inject failures in calls to vmem_malloc " + "for testing."); +TAG_FLAG(nvm_cache_simulate_allocation_failure, unsafe); + + +namespace kudu { + +class MetricEntity; + +namespace { + +using std::shared_ptr; +using std::vector; + +typedef simple_spinlock MutexType; + +// LRU cache implementation + +// An entry is a variable length heap-allocated structure. Entries +// are kept in a circular doubly linked list ordered by access time. +struct LRUHandle { + Cache::EvictionCallback* eviction_callback; + LRUHandle* next_hash; + LRUHandle* next; + LRUHandle* prev; + size_t charge; // TODO(opt): Only allow uint32_t? + uint32_t key_length; + uint32_t val_length; + Atomic32 refs; + uint32_t hash; // Hash of key(); used for fast sharding and comparisons + uint8_t* kv_data; + + Slice key() const { + return Slice(kv_data, key_length); + } + + Slice value() const { + return Slice(&kv_data[key_length], val_length); + } + + uint8_t* val_ptr() { + return &kv_data[key_length]; + } +}; + +// We provide our own simple hash table since it removes a whole bunch +// of porting hacks and is also faster than some of the built-in hash +// table implementations in some of the compiler/runtime combinations +// we have tested. E.g., readrandom speeds up by ~5% over the g++ +// 4.4.3's builtin hashtable. +class HandleTable { + public: + HandleTable() : length_(0), elems_(0), list_(NULL) { Resize(); } + ~HandleTable() { delete[] list_; } + + LRUHandle* Lookup(const Slice& key, uint32_t hash) { + return *FindPointer(key, hash); + } + + LRUHandle* Insert(LRUHandle* h) { + LRUHandle** ptr = FindPointer(h->key(), h->hash); + LRUHandle* old = *ptr; + h->next_hash = (old == NULL ? NULL : old->next_hash); + *ptr = h; + if (old == NULL) { + ++elems_; + if (elems_ > length_) { + // Since each cache entry is fairly large, we aim for a small + // average linked list length (<= 1). + Resize(); + } + } + return old; + } + + LRUHandle* Remove(const Slice& key, uint32_t hash) { + LRUHandle** ptr = FindPointer(key, hash); + LRUHandle* result = *ptr; + if (result != NULL) { + *ptr = result->next_hash; + --elems_; + } + return result; + } + + private: + // The table consists of an array of buckets where each bucket is + // a linked list of cache entries that hash into the bucket. + uint32_t length_; + uint32_t elems_; + LRUHandle** list_; + + // Return a pointer to slot that points to a cache entry that + // matches key/hash. If there is no such cache entry, return a + // pointer to the trailing slot in the corresponding linked list. + LRUHandle** FindPointer(const Slice& key, uint32_t hash) { + LRUHandle** ptr = &list_[hash & (length_ - 1)]; + while (*ptr != NULL && + ((*ptr)->hash != hash || key != (*ptr)->key())) { + ptr = &(*ptr)->next_hash; + } + return ptr; + } + + void Resize() { + uint32_t new_length = 16; + while (new_length < elems_ * 1.5) { + new_length *= 2; + } + LRUHandle** new_list = new LRUHandle*[new_length]; + memset(new_list, 0, sizeof(new_list[0]) * new_length); + uint32_t count = 0; + for (uint32_t i = 0; i < length_; i++) { + LRUHandle* h = list_[i]; + while (h != NULL) { + LRUHandle* next = h->next_hash; + uint32_t hash = h->hash; + LRUHandle** ptr = &new_list[hash & (new_length - 1)]; + h->next_hash = *ptr; + *ptr = h; + h = next; + count++; + } + } + DCHECK_EQ(elems_, count); + delete[] list_; + list_ = new_list; + length_ = new_length; + } +}; + +// A single shard of sharded cache. +class NvmLRUCache { + public: + explicit NvmLRUCache(VMEM *vmp); + ~NvmLRUCache(); + + // Separate from constructor so caller can easily make an array of LRUCache + void SetCapacity(size_t capacity) { capacity_ = capacity; } + + void SetMetrics(CacheMetrics* metrics) { metrics_ = metrics; } + + Cache::Handle* Insert(LRUHandle* h, Cache::EvictionCallback* eviction_callback); + + // Like Cache::Lookup, but with an extra "hash" parameter. + Cache::Handle* Lookup(const Slice& key, uint32_t hash, bool caching); + void Release(Cache::Handle* handle); + void Erase(const Slice& key, uint32_t hash); + void* AllocateAndRetry(size_t size); + + private: + void NvmLRU_Remove(LRUHandle* e); + void NvmLRU_Append(LRUHandle* e); + // Just reduce the reference count by 1. + // Return true if last reference + bool Unref(LRUHandle* e); + void FreeEntry(LRUHandle* e); + + // Evict the LRU item in the cache, adding it to the linked list + // pointed to by 'to_remove_head'. + void EvictOldestUnlocked(LRUHandle** to_remove_head); + + // Free all of the entries in the linked list that has to_free_head + // as its head. + void FreeLRUEntries(LRUHandle* to_free_head); + + // Wrapper around vmem_malloc which injects failures based on a flag. + void* VmemMalloc(size_t size); + + // Initialized before use. + size_t capacity_; + + // mutex_ protects the following state. + MutexType mutex_; + size_t usage_; + + // Dummy head of LRU list. + // lru.prev is newest entry, lru.next is oldest entry. + LRUHandle lru_; + + HandleTable table_; + + VMEM* vmp_; + + CacheMetrics* metrics_; +}; + +NvmLRUCache::NvmLRUCache(VMEM* vmp) + : usage_(0), + vmp_(vmp), + metrics_(NULL) { + // Make empty circular linked list + lru_.next = &lru_; + lru_.prev = &lru_; +} + +NvmLRUCache::~NvmLRUCache() { + for (LRUHandle* e = lru_.next; e != &lru_; ) { + LRUHandle* next = e->next; + DCHECK_EQ(e->refs, 1); // Error if caller has an unreleased handle + if (Unref(e)) { + FreeEntry(e); + } + e = next; + } +} + +void* NvmLRUCache::VmemMalloc(size_t size) { + if (PREDICT_FALSE(FLAGS_nvm_cache_simulate_allocation_failure)) { + return NULL; + } + return vmem_malloc(vmp_, size); +} + +bool NvmLRUCache::Unref(LRUHandle* e) { + DCHECK_GT(ANNOTATE_UNPROTECTED_READ(e->refs), 0); + return !base::RefCountDec(&e->refs); +} + +void NvmLRUCache::FreeEntry(LRUHandle* e) { + DCHECK_EQ(ANNOTATE_UNPROTECTED_READ(e->refs), 0); + if (e->eviction_callback) { + e->eviction_callback->EvictedEntry(e->key(), e->value()); + } + if (PREDICT_TRUE(metrics_)) { + metrics_->cache_usage->DecrementBy(e->charge); + metrics_->evictions->Increment(); + } + vmem_free(vmp_, e); +} + +// Allocate nvm memory. Try until successful or FLAGS_nvm_cache_allocation_retry_count +// has been exceeded. +void *NvmLRUCache::AllocateAndRetry(size_t size) { + void *tmp; + // There may be times that an allocation fails. With NVM we have + // a fixed size to allocate from. If we cannot allocate the size + // that was asked for, we will remove entries from the cache and + // retry up to the configured number of retries. If this fails, we + // return NULL, which will cause the caller to not insert anything + // into the cache. + LRUHandle *to_remove_head = NULL; + tmp = VmemMalloc(size); + + if (tmp == NULL) { + std::unique_lock<MutexType> l(mutex_); + + int retries_remaining = FLAGS_nvm_cache_allocation_retry_count; + while (tmp == NULL && retries_remaining-- > 0 && lru_.next != &lru_) { + EvictOldestUnlocked(&to_remove_head); + + // Unlock while allocating memory. + l.unlock(); + tmp = VmemMalloc(size); + l.lock(); + } + } + + // we free the entries here outside of mutex for + // performance reasons + FreeLRUEntries(to_remove_head); + return tmp; +} + +void NvmLRUCache::NvmLRU_Remove(LRUHandle* e) { + e->next->prev = e->prev; + e->prev->next = e->next; + usage_ -= e->charge; +} + +void NvmLRUCache::NvmLRU_Append(LRUHandle* e) { + // Make "e" newest entry by inserting just before lru_ + e->next = &lru_; + e->prev = lru_.prev; + e->prev->next = e; + e->next->prev = e; + usage_ += e->charge; +} + +Cache::Handle* NvmLRUCache::Lookup(const Slice& key, uint32_t hash, bool caching) { + LRUHandle* e; + { + std::lock_guard<MutexType> l(mutex_); + e = table_.Lookup(key, hash); + if (e != NULL) { + // If an entry exists, remove the old entry from the cache + // and re-add to the end of the linked list. + base::RefCountInc(&e->refs); + NvmLRU_Remove(e); + NvmLRU_Append(e); + } + } + + // Do the metrics outside of the lock. + if (metrics_) { + metrics_->lookups->Increment(); + bool was_hit = (e != NULL); + if (was_hit) { + if (caching) { + metrics_->cache_hits_caching->Increment(); + } else { + metrics_->cache_hits->Increment(); + } + } else { + if (caching) { + metrics_->cache_misses_caching->Increment(); + } else { + metrics_->cache_misses->Increment(); + } + } + } + + return reinterpret_cast<Cache::Handle*>(e); +} + +void NvmLRUCache::Release(Cache::Handle* handle) { + LRUHandle* e = reinterpret_cast<LRUHandle*>(handle); + bool last_reference = Unref(e); + if (last_reference) { + FreeEntry(e); + } +} + +void NvmLRUCache::EvictOldestUnlocked(LRUHandle** to_remove_head) { + LRUHandle* old = lru_.next; + NvmLRU_Remove(old); + table_.Remove(old->key(), old->hash); + if (Unref(old)) { + old->next = *to_remove_head; + *to_remove_head = old; + } +} + +void NvmLRUCache::FreeLRUEntries(LRUHandle* to_free_head) { + while (to_free_head != NULL) { + LRUHandle* next = to_free_head->next; + FreeEntry(to_free_head); + to_free_head = next; + } +} + +Cache::Handle* NvmLRUCache::Insert(LRUHandle* e, + Cache::EvictionCallback* eviction_callback) { + DCHECK(e); + LRUHandle* to_remove_head = NULL; + + e->refs = 2; // One from LRUCache, one for the returned handle + e->eviction_callback = eviction_callback; + if (PREDICT_TRUE(metrics_)) { + metrics_->cache_usage->IncrementBy(e->charge); + metrics_->inserts->Increment(); + } + + { + std::lock_guard<MutexType> l(mutex_); + + NvmLRU_Append(e); + + LRUHandle* old = table_.Insert(e); + if (old != NULL) { + NvmLRU_Remove(old); + if (Unref(old)) { + old->next = to_remove_head; + to_remove_head = old; + } + } + + while (usage_ > capacity_ && lru_.next != &lru_) { + EvictOldestUnlocked(&to_remove_head); + } + } + + // we free the entries here outside of mutex for + // performance reasons + FreeLRUEntries(to_remove_head); + + return reinterpret_cast<Cache::Handle*>(e); +} + +void NvmLRUCache::Erase(const Slice& key, uint32_t hash) { + LRUHandle* e; + bool last_reference = false; + { + std::lock_guard<MutexType> l(mutex_); + e = table_.Remove(key, hash); + if (e != NULL) { + NvmLRU_Remove(e); + last_reference = Unref(e); + } + } + // mutex not held here + // last_reference will only be true if e != NULL + if (last_reference) { + FreeEntry(e); + } +} +static const int kNumShardBits = 4; +static const int kNumShards = 1 << kNumShardBits; + +class ShardedLRUCache : public Cache { + private: + gscoped_ptr<CacheMetrics> metrics_; + vector<NvmLRUCache*> shards_; + MutexType id_mutex_; + uint64_t last_id_; + VMEM* vmp_; + + static inline uint32_t HashSlice(const Slice& s) { + return util_hash::CityHash64( + reinterpret_cast<const char *>(s.data()), s.size()); + } + + static uint32_t Shard(uint32_t hash) { + return hash >> (32 - kNumShardBits); + } + + public: + explicit ShardedLRUCache(size_t capacity, const string& id, VMEM* vmp) + : last_id_(0), + vmp_(vmp) { + + const size_t per_shard = (capacity + (kNumShards - 1)) / kNumShards; + for (int s = 0; s < kNumShards; s++) { + gscoped_ptr<NvmLRUCache> shard(new NvmLRUCache(vmp_)); + shard->SetCapacity(per_shard); + shards_.push_back(shard.release()); + } + } + + virtual ~ShardedLRUCache() { + STLDeleteElements(&shards_); + // Per the note at the top of this file, our cache is entirely volatile. + // Hence, when the cache is destructed, we delete the underlying + // VMEM pool. + vmem_delete(vmp_); + } + + virtual Handle* Insert(PendingHandle* handle, + Cache::EvictionCallback* eviction_callback) OVERRIDE { + LRUHandle* h = reinterpret_cast<LRUHandle*>(DCHECK_NOTNULL(handle)); + return shards_[Shard(h->hash)]->Insert(h, eviction_callback); + } + virtual Handle* Lookup(const Slice& key, CacheBehavior caching) OVERRIDE { + const uint32_t hash = HashSlice(key); + return shards_[Shard(hash)]->Lookup(key, hash, caching == EXPECT_IN_CACHE); + } + virtual void Release(Handle* handle) OVERRIDE { + LRUHandle* h = reinterpret_cast<LRUHandle*>(handle); + shards_[Shard(h->hash)]->Release(handle); + } + virtual void Erase(const Slice& key) OVERRIDE { + const uint32_t hash = HashSlice(key); + shards_[Shard(hash)]->Erase(key, hash); + } + virtual Slice Value(Handle* handle) OVERRIDE { + return reinterpret_cast<LRUHandle*>(handle)->value(); + } + virtual uint8_t* MutableValue(PendingHandle* handle) OVERRIDE { + return reinterpret_cast<LRUHandle*>(handle)->val_ptr(); + } + + virtual uint64_t NewId() OVERRIDE { + std::lock_guard<MutexType> l(id_mutex_); + return ++(last_id_); + } + virtual void SetMetrics(const scoped_refptr<MetricEntity>& entity) OVERRIDE { + metrics_.reset(new CacheMetrics(entity)); + for (NvmLRUCache* cache : shards_) { + cache->SetMetrics(metrics_.get()); + } + } + virtual PendingHandle* Allocate(Slice key, int val_len, int charge) OVERRIDE { + int key_len = key.size(); + DCHECK_GE(key_len, 0); + DCHECK_GE(val_len, 0); + LRUHandle* handle = nullptr; + + // Try allocating from each of the shards -- if vmem is tight, + // this can cause eviction, so we might have better luck in different + // shards. + for (NvmLRUCache* cache : shards_) { + uint8_t* buf = static_cast<uint8_t*>(cache->AllocateAndRetry( + sizeof(LRUHandle) + key_len + val_len)); + if (buf) { + handle = reinterpret_cast<LRUHandle*>(buf); + handle->kv_data = &buf[sizeof(LRUHandle)]; + handle->val_length = val_len; + handle->key_length = key_len; + handle->charge = charge + key.size(); + handle->hash = HashSlice(key); + memcpy(handle->kv_data, key.data(), key.size()); + return reinterpret_cast<PendingHandle*>(handle); + } + } + // TODO: increment a metric here on allocation failure. + return nullptr; + } + + virtual void Free(PendingHandle* ph) OVERRIDE { + vmem_free(vmp_, ph); + } +}; + +} // end anonymous namespace + +Cache* NewLRUNvmCache(size_t capacity, const std::string& id) { + // vmem_create() will fail if the capacity is too small, but with + // an inscrutable error. So, we'll check ourselves. + CHECK_GE(capacity, VMEM_MIN_POOL) + << "configured capacity " << capacity << " bytes is less than " + << "the minimum capacity for an NVM cache: " << VMEM_MIN_POOL; + + VMEM* vmp = vmem_create(FLAGS_nvm_cache_path.c_str(), capacity); + // If we cannot create the cache pool we should not retry. + PLOG_IF(FATAL, vmp == NULL) << "Could not initialize NVM cache library in path " + << FLAGS_nvm_cache_path.c_str(); + + return new ShardedLRUCache(capacity, id, vmp); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/nvm_cache.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/nvm_cache.h b/be/src/kudu/util/nvm_cache.h new file mode 100644 index 0000000..38962f2 --- /dev/null +++ b/be/src/kudu/util/nvm_cache.h @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_NVM_CACHE_H_ +#define KUDU_UTIL_NVM_CACHE_H_ + +#include <string> + +namespace kudu { +class Cache; + +// Create a cache in persistent memory with the given capacity. +Cache* NewLRUNvmCache(size_t capacity, const std::string& id); + +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/object_pool-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/object_pool-test.cc b/be/src/kudu/util/object_pool-test.cc new file mode 100644 index 0000000..d6b34ae --- /dev/null +++ b/be/src/kudu/util/object_pool-test.cc @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> +#include "kudu/util/object_pool.h" + +namespace kudu { + +// Simple class which maintains a count of how many objects +// are currently alive. +class MyClass { + public: + MyClass() { + instance_count_++; + } + + ~MyClass() { + instance_count_--; + } + + static int instance_count() { + return instance_count_; + } + + static void ResetCount() { + instance_count_ = 0; + } + + private: + static int instance_count_; +}; +int MyClass::instance_count_ = 0; + +TEST(TestObjectPool, TestPooling) { + MyClass::ResetCount(); + { + ObjectPool<MyClass> pool; + ASSERT_EQ(0, MyClass::instance_count()); + MyClass *a = pool.Construct(); + ASSERT_EQ(1, MyClass::instance_count()); + MyClass *b = pool.Construct(); + ASSERT_EQ(2, MyClass::instance_count()); + ASSERT_TRUE(a != b); + pool.Destroy(b); + ASSERT_EQ(1, MyClass::instance_count()); + MyClass *c = pool.Construct(); + ASSERT_EQ(2, MyClass::instance_count()); + ASSERT_TRUE(c == b) << "should reuse instance"; + pool.Destroy(c); + + ASSERT_EQ(1, MyClass::instance_count()); + } + + ASSERT_EQ(0, MyClass::instance_count()) + << "destructing pool should have cleared instances"; +} + +TEST(TestObjectPool, TestScopedPtr) { + MyClass::ResetCount(); + ASSERT_EQ(0, MyClass::instance_count()); + ObjectPool<MyClass> pool; + { + ObjectPool<MyClass>::scoped_ptr sptr( + pool.make_scoped_ptr(pool.Construct())); + ASSERT_EQ(1, MyClass::instance_count()); + } + ASSERT_EQ(0, MyClass::instance_count()); +} + +} // namespace kudu
