http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/net/dns_resolver-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/dns_resolver-test.cc 
b/be/src/kudu/util/net/dns_resolver-test.cc
new file mode 100644
index 0000000..55be284
--- /dev/null
+++ b/be/src/kudu/util/net/dns_resolver-test.cc
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/net/dns_resolver.h"
+
+#include <boost/bind.hpp>
+#include <gtest/gtest.h>
+#include <vector>
+
+#include "kudu/gutil/strings/util.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/test_util.h"
+
+using std::vector;
+
+namespace kudu {
+
+class DnsResolverTest : public KuduTest {
+ protected:
+  DnsResolver resolver_;
+};
+
+TEST_F(DnsResolverTest, TestResolution) {
+  vector<Sockaddr> addrs;
+  Synchronizer s;
+  {
+    HostPort hp("localhost", 12345);
+    resolver_.ResolveAddresses(hp, &addrs, s.AsStatusCallback());
+  }
+  ASSERT_OK(s.Wait());
+  ASSERT_TRUE(!addrs.empty());
+  for (const Sockaddr& addr : addrs) {
+    LOG(INFO) << "Address: " << addr.ToString();
+    EXPECT_TRUE(HasPrefixString(addr.ToString(), "127."));
+    EXPECT_TRUE(HasSuffixString(addr.ToString(), ":12345"));
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/net/dns_resolver.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/dns_resolver.cc 
b/be/src/kudu/util/net/dns_resolver.cc
new file mode 100644
index 0000000..4c37a95
--- /dev/null
+++ b/be/src/kudu/util/net/dns_resolver.cc
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/net/dns_resolver.h"
+
+#include <boost/bind.hpp>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <vector>
+
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/threadpool.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/net/sockaddr.h"
+
+DEFINE_int32(dns_num_resolver_threads, 1, "The number of threads to use for 
DNS resolution");
+TAG_FLAG(dns_num_resolver_threads, advanced);
+
+using std::vector;
+
+namespace kudu {
+
+DnsResolver::DnsResolver() {
+  CHECK_OK(ThreadPoolBuilder("dns-resolver")
+           .set_max_threads(FLAGS_dns_num_resolver_threads)
+           .Build(&pool_));
+}
+
+DnsResolver::~DnsResolver() {
+  pool_->Shutdown();
+}
+
+namespace {
+static void DoResolution(const HostPort &hostport, vector<Sockaddr>* addresses,
+                         StatusCallback cb) {
+  cb.Run(hostport.ResolveAddresses(addresses));
+}
+} // anonymous namespace
+
+void DnsResolver::ResolveAddresses(const HostPort& hostport,
+                                   vector<Sockaddr>* addresses,
+                                   const StatusCallback& cb) {
+  Status s = pool_->SubmitFunc(boost::bind(&DoResolution, hostport, addresses, 
cb));
+  if (!s.ok()) {
+    cb.Run(s);
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/net/dns_resolver.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/dns_resolver.h 
b/be/src/kudu/util/net/dns_resolver.h
new file mode 100644
index 0000000..4232174
--- /dev/null
+++ b/be/src/kudu/util/net/dns_resolver.h
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_NET_DNS_RESOLVER_H
+#define KUDU_UTIL_NET_DNS_RESOLVER_H
+
+#include <vector>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/util/async_util.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class HostPort;
+class Sockaddr;
+class ThreadPool;
+
+// DNS Resolver which supports async address resolution.
+class DnsResolver {
+ public:
+  DnsResolver();
+  ~DnsResolver();
+
+  // Resolve any addresses corresponding to this host:port pair.
+  // Note that a host may resolve to more than one IP address.
+  //
+  // 'addresses' may be NULL, in which case this function simply checks that
+  // the host/port pair can be resolved, without returning anything.
+  //
+  // When the result is available, or an error occurred, 'cb' is called
+  // with the result Status.
+  //
+  // NOTE: the callback should be fast since it is called by the DNS
+  // resolution thread.
+  // NOTE: in some rare cases, the callback may also be called inline
+  // from this function call, on the caller's thread.
+  void ResolveAddresses(const HostPort& hostport,
+                        std::vector<Sockaddr>* addresses,
+                        const StatusCallback& cb);
+
+ private:
+  gscoped_ptr<ThreadPool> pool_;
+
+  DISALLOW_COPY_AND_ASSIGN(DnsResolver);
+};
+
+} // namespace kudu
+#endif /* KUDU_UTIL_NET_DNS_RESOLVER_H */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/net/net_util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/net_util-test.cc 
b/be/src/kudu/util/net/net_util-test.cc
new file mode 100644
index 0000000..c77b054
--- /dev/null
+++ b/be/src/kudu/util/net/net_util-test.cc
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <algorithm>
+#include <string>
+#include <vector>
+
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/util.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+
+class NetUtilTest : public KuduTest {
+ protected:
+  Status DoParseBindAddresses(const string& input, string* result) {
+    vector<Sockaddr> addrs;
+    RETURN_NOT_OK(ParseAddressList(input, kDefaultPort, &addrs));
+    std::sort(addrs.begin(), addrs.end());
+
+    vector<string> addr_strs;
+    for (const Sockaddr& addr : addrs) {
+      addr_strs.push_back(addr.ToString());
+    }
+    *result = JoinStrings(addr_strs, ",");
+    return Status::OK();
+  }
+
+  static const uint16_t kDefaultPort = 7150;
+};
+
+TEST(SockaddrTest, Test) {
+  Sockaddr addr;
+  ASSERT_OK(addr.ParseString("1.1.1.1:12345", 12345));
+  ASSERT_EQ(12345, addr.port());
+}
+
+TEST_F(NetUtilTest, TestParseAddresses) {
+  string ret;
+  ASSERT_OK(DoParseBindAddresses("0.0.0.0:12345", &ret));
+  ASSERT_EQ("0.0.0.0:12345", ret);
+
+  ASSERT_OK(DoParseBindAddresses("0.0.0.0", &ret));
+  ASSERT_EQ("0.0.0.0:7150", ret);
+
+  ASSERT_OK(DoParseBindAddresses("0.0.0.0:12345, 0.0.0.0:12346", &ret));
+  ASSERT_EQ("0.0.0.0:12345,0.0.0.0:12346", ret);
+
+  // Test some invalid addresses.
+  Status s = DoParseBindAddresses("0.0.0.0:xyz", &ret);
+  ASSERT_STR_CONTAINS(s.ToString(), "Invalid port");
+
+  s = DoParseBindAddresses("0.0.0.0:100000", &ret);
+  ASSERT_STR_CONTAINS(s.ToString(), "Invalid port");
+
+  s = DoParseBindAddresses("0.0.0.0:", &ret);
+  ASSERT_STR_CONTAINS(s.ToString(), "Invalid port");
+}
+
+TEST_F(NetUtilTest, TestResolveAddresses) {
+  HostPort hp("localhost", 12345);
+  vector<Sockaddr> addrs;
+  ASSERT_OK(hp.ResolveAddresses(&addrs));
+  ASSERT_TRUE(!addrs.empty());
+  for (const Sockaddr& addr : addrs) {
+    LOG(INFO) << "Address: " << addr.ToString();
+    EXPECT_TRUE(HasPrefixString(addr.ToString(), "127."));
+    EXPECT_TRUE(HasSuffixString(addr.ToString(), ":12345"));
+    EXPECT_TRUE(addr.IsAnyLocalAddress());
+  }
+
+  ASSERT_OK(hp.ResolveAddresses(nullptr));
+}
+
+TEST_F(NetUtilTest, TestWithinNetwork) {
+  Sockaddr addr;
+  Network network;
+
+  ASSERT_OK(addr.ParseString("10.0.23.0:12345", 0));
+  ASSERT_OK(network.ParseCIDRString("10.0.0.0/8"));
+  EXPECT_TRUE(network.WithinNetwork(addr));
+
+  ASSERT_OK(addr.ParseString("172.28.3.4:0", 0));
+  ASSERT_OK(network.ParseCIDRString("172.16.0.0/12"));
+  EXPECT_TRUE(network.WithinNetwork(addr));
+
+  ASSERT_OK(addr.ParseString("192.168.0.23", 0));
+  ASSERT_OK(network.ParseCIDRString("192.168.1.14/16"));
+  EXPECT_TRUE(network.WithinNetwork(addr));
+
+  ASSERT_OK(addr.ParseString("8.8.8.8:0", 0));
+  ASSERT_OK(network.ParseCIDRString("0.0.0.0/0"));
+  EXPECT_TRUE(network.WithinNetwork(addr));
+
+  ASSERT_OK(addr.ParseString("192.169.0.23", 0));
+  ASSERT_OK(network.ParseCIDRString("192.168.0.0/16"));
+  EXPECT_FALSE(network.WithinNetwork(addr));
+}
+
+// Ensure that we are able to do a reverse DNS lookup on various IP addresses.
+// The reverse lookups should never fail, but may return numeric strings.
+TEST_F(NetUtilTest, TestReverseLookup) {
+  string host;
+  Sockaddr addr;
+  HostPort hp;
+  ASSERT_OK(addr.ParseString("0.0.0.0:12345", 0));
+  EXPECT_EQ(12345, addr.port());
+  ASSERT_OK(HostPortFromSockaddrReplaceWildcard(addr, &hp));
+  EXPECT_NE("0.0.0.0", hp.host());
+  EXPECT_NE("", hp.host());
+  EXPECT_EQ(12345, hp.port());
+
+  ASSERT_OK(addr.ParseString("127.0.0.1:12345", 0));
+  ASSERT_OK(HostPortFromSockaddrReplaceWildcard(addr, &hp));
+  EXPECT_EQ("127.0.0.1", hp.host());
+  EXPECT_EQ(12345, hp.port());
+}
+
+TEST_F(NetUtilTest, TestLsof) {
+  Socket s;
+  ASSERT_OK(s.Init(0));
+
+  Sockaddr addr; // wildcard
+  ASSERT_OK(s.BindAndListen(addr, 1));
+
+  ASSERT_OK(s.GetSocketAddress(&addr));
+  ASSERT_NE(addr.port(), 0);
+  vector<string> lsof_lines;
+  TryRunLsof(addr, &lsof_lines);
+  SCOPED_TRACE(JoinStrings(lsof_lines, "\n"));
+
+  ASSERT_GE(lsof_lines.size(), 3);
+  ASSERT_STR_CONTAINS(lsof_lines[2], "net_util-test");
+}
+
+TEST_F(NetUtilTest, TestGetFQDN) {
+  string fqdn;
+  ASSERT_OK(GetFQDN(&fqdn));
+  LOG(INFO) << "fqdn is " << fqdn;
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/net/net_util.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/net_util.cc b/be/src/kudu/util/net/net_util.cc
new file mode 100644
index 0000000..aca1ae2
--- /dev/null
+++ b/be/src/kudu/util/net/net_util.cc
@@ -0,0 +1,383 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arpa/inet.h>
+#include <ifaddrs.h>
+#include <netdb.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+
+#include <algorithm>
+#include <boost/functional/hash.hpp>
+#include <gflags/gflags.h>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include "kudu/gutil/endian.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/numbers.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/strip.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/errno.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/subprocess.h"
+#include "kudu/util/trace.h"
+
+// Mac OS 10.9 does not appear to define HOST_NAME_MAX in unistd.h
+#ifndef HOST_NAME_MAX
+#define HOST_NAME_MAX 64
+#endif
+
+DEFINE_bool(fail_dns_resolution, false, "Whether to fail all dns resolution, 
for tests.");
+TAG_FLAG(fail_dns_resolution, hidden);
+
+using std::unordered_set;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+namespace {
+struct AddrinfoDeleter {
+  void operator()(struct addrinfo* info) {
+    freeaddrinfo(info);
+  }
+};
+}
+
+HostPort::HostPort()
+  : host_(""),
+    port_(0) {
+}
+
+HostPort::HostPort(std::string host, uint16_t port)
+    : host_(std::move(host)), port_(port) {}
+
+HostPort::HostPort(const Sockaddr& addr)
+  : host_(addr.host()),
+    port_(addr.port()) {
+}
+
+bool operator==(const HostPort& hp1, const HostPort& hp2) {
+  return hp1.port() == hp2.port() && hp1.host() == hp2.host();
+}
+
+size_t HostPort::HashCode() const {
+  size_t seed = 0;
+  boost::hash_combine(seed, host_);
+  boost::hash_combine(seed, port_);
+  return seed;
+}
+
+Status HostPort::ParseString(const string& str, uint16_t default_port) {
+  std::pair<string, string> p = strings::Split(str, 
strings::delimiter::Limit(":", 1));
+
+  // Strip any whitespace from the host.
+  StripWhiteSpace(&p.first);
+
+  // Parse the port.
+  uint32_t port;
+  if (p.second.empty() && strcount(str, ':') == 0) {
+    // No port specified.
+    port = default_port;
+  } else if (!SimpleAtoi(p.second, &port) ||
+             port > 65535) {
+    return Status::InvalidArgument("Invalid port", str);
+  }
+
+  host_.swap(p.first);
+  port_ = port;
+  return Status::OK();
+}
+
+Status HostPort::ResolveAddresses(vector<Sockaddr>* addresses) const {
+  TRACE_EVENT1("net", "HostPort::ResolveAddresses",
+               "host", host_);
+  TRACE_COUNTER_SCOPE_LATENCY_US("dns_us");
+  struct addrinfo hints;
+  memset(&hints, 0, sizeof(hints));
+  hints.ai_family = AF_INET;
+  hints.ai_socktype = SOCK_STREAM;
+  struct addrinfo* res = nullptr;
+  int rc;
+  LOG_SLOW_EXECUTION(WARNING, 200,
+                     Substitute("resolving address for $0", host_)) {
+    rc = getaddrinfo(host_.c_str(), nullptr, &hints, &res);
+  }
+  if (rc != 0) {
+    return Status::NetworkError(
+      StringPrintf("Unable to resolve address '%s'", host_.c_str()),
+      gai_strerror(rc));
+  }
+  gscoped_ptr<addrinfo, AddrinfoDeleter> scoped_res(res);
+  for (; res != nullptr; res = res->ai_next) {
+    CHECK_EQ(res->ai_family, AF_INET);
+    struct sockaddr_in* addr = reinterpret_cast<struct 
sockaddr_in*>(res->ai_addr);
+    addr->sin_port = htons(port_);
+    Sockaddr sockaddr(*addr);
+    if (addresses) {
+      addresses->push_back(sockaddr);
+    }
+    VLOG(2) << "Resolved address " << sockaddr.ToString()
+            << " for host/port " << ToString();
+  }
+  if (PREDICT_FALSE(FLAGS_fail_dns_resolution)) {
+    return Status::NetworkError("injected DNS resolution failure");
+  }
+  return Status::OK();
+}
+
+Status HostPort::ParseStrings(const string& comma_sep_addrs,
+                              uint16_t default_port,
+                              vector<HostPort>* res) {
+  vector<string> addr_strings = strings::Split(comma_sep_addrs, ",", 
strings::SkipEmpty());
+  for (const string& addr_string : addr_strings) {
+    HostPort host_port;
+    RETURN_NOT_OK(host_port.ParseString(addr_string, default_port));
+    res->push_back(host_port);
+  }
+  return Status::OK();
+}
+
+string HostPort::ToString() const {
+  return Substitute("$0:$1", host_, port_);
+}
+
+string HostPort::ToCommaSeparatedString(const vector<HostPort>& hostports) {
+  vector<string> hostport_strs;
+  for (const HostPort& hostport : hostports) {
+    hostport_strs.push_back(hostport.ToString());
+  }
+  return JoinStrings(hostport_strs, ",");
+}
+
+Network::Network()
+  : addr_(0),
+    netmask_(0) {
+}
+
+Network::Network(uint32_t addr, uint32_t netmask)
+  : addr_(addr), netmask_(netmask) {}
+
+bool Network::WithinNetwork(const Sockaddr& addr) const {
+  return ((addr.addr().sin_addr.s_addr & netmask_) ==
+          (addr_ & netmask_));
+}
+
+Status Network::ParseCIDRString(const string& addr) {
+  std::pair<string, string> p = strings::Split(addr, 
strings::delimiter::Limit("/", 1));
+
+  kudu::Sockaddr sockaddr;
+  Status s = sockaddr.ParseString(p.first, 0);
+
+  uint32_t bits;
+  bool success = SimpleAtoi(p.second, &bits);
+
+  if (!s.ok() || !success || bits > 32) {
+    return Status::NetworkError("Unable to parse CIDR address", addr);
+  }
+
+  // Netmask in network byte order
+  uint32_t netmask = NetworkByteOrder::FromHost32(~(0xffffffff >> bits));
+  addr_ = sockaddr.addr().sin_addr.s_addr;
+  netmask_ = netmask;
+  return Status::OK();
+}
+
+Status Network::ParseCIDRStrings(const string& comma_sep_addrs,
+                                 vector<Network>* res) {
+  vector<string> addr_strings = strings::Split(comma_sep_addrs, ",", 
strings::SkipEmpty());
+  for (const string& addr_string : addr_strings) {
+    Network network;
+    RETURN_NOT_OK(network.ParseCIDRString(addr_string));
+    res->push_back(network);
+  }
+  return Status::OK();
+}
+
+bool IsPrivilegedPort(uint16_t port) {
+  return port <= 1024 && port != 0;
+}
+
+Status ParseAddressList(const std::string& addr_list,
+                        uint16_t default_port,
+                        std::vector<Sockaddr>* addresses) {
+  vector<HostPort> host_ports;
+  RETURN_NOT_OK(HostPort::ParseStrings(addr_list, default_port, &host_ports));
+  if (host_ports.empty()) return Status::InvalidArgument("No address 
specified");
+  unordered_set<Sockaddr> uniqued;
+  for (const HostPort& host_port : host_ports) {
+    vector<Sockaddr> this_addresses;
+    RETURN_NOT_OK(host_port.ResolveAddresses(&this_addresses));
+
+    // Only add the unique ones -- the user may have specified
+    // some IP addresses in multiple ways
+    for (const Sockaddr& addr : this_addresses) {
+      if (InsertIfNotPresent(&uniqued, addr)) {
+        addresses->push_back(addr);
+      } else {
+        LOG(INFO) << "Address " << addr.ToString() << " for " << 
host_port.ToString()
+                  << " duplicates an earlier resolved entry.";
+      }
+    }
+  }
+  return Status::OK();
+}
+
+Status GetHostname(string* hostname) {
+  TRACE_EVENT0("net", "GetHostname");
+  char name[HOST_NAME_MAX];
+  int ret = gethostname(name, HOST_NAME_MAX);
+  if (ret != 0) {
+    return Status::NetworkError("Unable to determine local hostname",
+                                ErrnoToString(errno),
+                                errno);
+  }
+  *hostname = name;
+  return Status::OK();
+}
+
+Status GetLocalNetworks(std::vector<Network>* net) {
+  struct ifaddrs *ifap = nullptr;
+
+  int ret = getifaddrs(&ifap);
+  auto cleanup = MakeScopedCleanup([&]() {
+    if (ifap) freeifaddrs(ifap);
+  });
+
+  if (ret != 0) {
+    return Status::NetworkError("Unable to determine local network addresses",
+                                ErrnoToString(errno),
+                                errno);
+  }
+
+  net->clear();
+  for (struct ifaddrs *ifa = ifap; ifa; ifa = ifa->ifa_next) {
+    if (ifa->ifa_addr == nullptr || ifa->ifa_netmask == nullptr) continue;
+
+    if (ifa->ifa_addr->sa_family == AF_INET) {
+      Sockaddr addr(*reinterpret_cast<struct sockaddr_in*>(ifa->ifa_addr));
+      Sockaddr netmask(*reinterpret_cast<struct 
sockaddr_in*>(ifa->ifa_netmask));
+      Network network(addr.addr().sin_addr.s_addr, 
netmask.addr().sin_addr.s_addr);
+      net->push_back(network);
+    }
+  }
+
+  return Status::OK();
+}
+
+Status GetFQDN(string* hostname) {
+  TRACE_EVENT0("net", "GetFQDN");
+  // Start with the non-qualified hostname
+  RETURN_NOT_OK(GetHostname(hostname));
+
+  struct addrinfo hints;
+  memset(&hints, 0, sizeof(hints));
+  hints.ai_socktype = SOCK_DGRAM;
+  hints.ai_flags = AI_CANONNAME;
+
+  struct addrinfo* result;
+  LOG_SLOW_EXECUTION(WARNING, 200,
+                     Substitute("looking up canonical hostname for localhost "
+                                "(eventual result was $0)", *hostname)) {
+    TRACE_EVENT0("net", "getaddrinfo");
+    int rc = getaddrinfo(hostname->c_str(), nullptr, &hints, &result);
+    if (rc != 0) {
+      return Status::NetworkError("Unable to lookup FQDN", 
ErrnoToString(errno), errno);
+    }
+  }
+
+  *hostname = result->ai_canonname;
+  freeaddrinfo(result);
+  return Status::OK();
+}
+
+Status SockaddrFromHostPort(const HostPort& host_port, Sockaddr* addr) {
+  vector<Sockaddr> addrs;
+  RETURN_NOT_OK(host_port.ResolveAddresses(&addrs));
+  if (addrs.empty()) {
+    return Status::NetworkError("Unable to resolve address", 
host_port.ToString());
+  }
+  *addr = addrs[0];
+  if (addrs.size() > 1) {
+    VLOG(1) << "Hostname " << host_port.host() << " resolved to more than one 
address. "
+            << "Using address: " << addr->ToString();
+  }
+  return Status::OK();
+}
+
+Status HostPortFromSockaddrReplaceWildcard(const Sockaddr& addr, HostPort* hp) 
{
+  string host;
+  if (addr.IsWildcard()) {
+    RETURN_NOT_OK(GetFQDN(&host));
+  } else {
+    host = addr.host();
+  }
+  hp->set_host(host);
+  hp->set_port(addr.port());
+  return Status::OK();
+}
+
+void TryRunLsof(const Sockaddr& addr, vector<string>* log) {
+#if defined(__APPLE__)
+  string cmd = strings::Substitute(
+      "lsof -n -i 'TCP:$0' -sTCP:LISTEN ; "
+      "for pid in $$(lsof -F p -n -i 'TCP:$0' -sTCP:LISTEN | cut -f 2 -dp) ; 
do"
+      "  pstree $$pid || ps h -p $$pid;"
+      "done",
+      addr.port());
+#else
+  // Little inline bash script prints the full ancestry of any pid listening
+  // on the same port as 'addr'. We could use 'pstree -s', but that option
+  // doesn't exist on el6.
+  string cmd = strings::Substitute(
+      "export PATH=$$PATH:/usr/sbin ; "
+      "lsof -n -i 'TCP:$0' -sTCP:LISTEN ; "
+      "for pid in $$(lsof -F p -n -i 'TCP:$0' -sTCP:LISTEN | grep p | cut -f 2 
-dp) ; do"
+      "  while [ $$pid -gt 1 ] ; do"
+      "    ps h -fp $$pid ;"
+      "    stat=($$(</proc/$$pid/stat)) ;"
+      "    pid=$${stat[3]} ;"
+      "  done ; "
+      "done",
+      addr.port());
+#endif // defined(__APPLE__)
+  LOG_STRING(WARNING, log)
+      << "Trying to use lsof to find any processes listening on "
+      << addr.ToString();
+  LOG_STRING(INFO, log) << "$ " << cmd;
+  vector<string> argv = { "bash", "-c", cmd };
+  string results;
+  Status s = Subprocess::Call(argv, "", &results);
+  if (PREDICT_FALSE(!s.ok())) {
+    LOG_STRING(WARNING, log) << s.ToString();
+  }
+  LOG_STRING(WARNING, log) << results;
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/net/net_util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/net_util.h b/be/src/kudu/util/net/net_util.h
new file mode 100644
index 0000000..b246c5e
--- /dev/null
+++ b/be/src/kudu/util/net/net_util.h
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_NET_NET_UTIL_H
+#define KUDU_UTIL_NET_NET_UTIL_H
+
+#include <string>
+#include <vector>
+
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Sockaddr;
+
+// A container for a host:port pair.
+class HostPort {
+ public:
+  HostPort();
+  HostPort(std::string host, uint16_t port);
+  explicit HostPort(const Sockaddr& addr);
+
+  bool Initialized() const {
+    return !host_.empty();
+  }
+
+  // Parse a "host:port" pair into this object.
+  // If there is no port specified in the string, then 'default_port' is used.
+  Status ParseString(const std::string& str, uint16_t default_port);
+
+  // Resolve any addresses corresponding to this host:port pair.
+  // Note that a host may resolve to more than one IP address.
+  //
+  // 'addresses' may be NULL, in which case this function simply checks that
+  // the host/port pair can be resolved, without returning anything.
+  Status ResolveAddresses(std::vector<Sockaddr>* addresses) const;
+
+  std::string ToString() const;
+
+  const std::string& host() const { return host_; }
+  void set_host(const std::string& host) { host_ = host; }
+
+  uint16_t port() const { return port_; }
+  void set_port(uint16_t port) { port_ = port; }
+
+  size_t HashCode() const;
+
+  // Parse a comma separated list of "host:port" pairs into a vector
+  // HostPort objects. If no port is specified for an entry in the
+  // comma separated list, 'default_port' is used for that entry's
+  // pair.
+  static Status ParseStrings(
+      const std::string& comma_sep_addrs, uint16_t default_port, 
std::vector<HostPort>* res);
+
+  // Takes a vector of HostPort objects and returns a comma separated
+  // string containing of "host:port" pairs. This method is the
+  // "inverse" of ParseStrings().
+  static std::string ToCommaSeparatedString(const std::vector<HostPort>& 
host_ports);
+
+ private:
+  std::string host_;
+  uint16_t port_;
+};
+
+bool operator==(const HostPort& hp1, const HostPort& hp2);
+
+// Hasher of HostPort objects for UnorderedAssociativeContainers.
+struct HostPortHasher {
+  size_t operator()(const HostPort& hp) const {
+    return hp.HashCode();
+  }
+};
+
+// Equality BinaryPredicate of HostPort objects for 
UnorderedAssociativeContainers.
+struct HostPortEqualityPredicate {
+  bool operator()(const HostPort& hp1, const HostPort& hp2) const {
+    return hp1 == hp2;
+  }
+};
+
+// A container for addr:mask pair.
+// Both addr and netmask are in big-endian byte order
+// (same as network byte order).
+class Network {
+ public:
+  Network();
+  Network(uint32_t addr, uint32_t netmask);
+
+  uint32_t addr() const { return addr_; }
+
+  uint32_t netmask() const { return netmask_; }
+
+  // Returns true if the address is within network.
+  bool WithinNetwork(const Sockaddr& addr) const;
+
+  // Parses a "addr/netmask" (CIDR notation) pair into this object.
+  Status ParseCIDRString(const std::string& addr);
+
+  // Parses a comma separated list of "addr/netmask" (CIDR notation)
+  // pairs into a vector of Network objects.
+  static Status ParseCIDRStrings(
+      const std::string& comma_sep_addrs, std::vector<Network>* res);
+ private:
+  uint32_t addr_;
+  uint32_t netmask_;
+};
+
+// Parse and resolve the given comma-separated list of addresses.
+//
+// The resulting addresses will be resolved, made unique, and added to
+// the 'addresses' vector.
+//
+// Any elements which do not include a port will be assigned 'default_port'.
+Status ParseAddressList(const std::string& addr_list,
+                        uint16_t default_port,
+                        std::vector<Sockaddr>* addresses);
+
+// Return true if the given port is likely to need root privileges to bind to.
+bool IsPrivilegedPort(uint16_t port);
+
+// Return the local machine's hostname.
+Status GetHostname(std::string* hostname);
+
+// Returns local subnets of all local network interfaces.
+Status GetLocalNetworks(std::vector<Network>* net);
+
+// Return the local machine's FQDN.
+Status GetFQDN(std::string* hostname);
+
+// Returns a single socket address from a HostPort.
+// If the hostname resolves to multiple addresses, returns the first in the
+// list and logs a message in verbose mode.
+Status SockaddrFromHostPort(const HostPort& host_port, Sockaddr* addr);
+
+// Converts the given Sockaddr into a HostPort, substituting the FQDN
+// in the case that the provided address is the wildcard.
+//
+// In the case of other addresses, the returned HostPort will contain just the
+// stringified form of the IP.
+Status HostPortFromSockaddrReplaceWildcard(const Sockaddr& addr, HostPort* hp);
+
+// Try to run 'lsof' to determine which process is preventing binding to
+// the given 'addr'. If pids can be determined, outputs full 'ps' and 'pstree'
+// output for that process.
+//
+// Output is issued to the log at WARNING level, or appended to 'log' if it
+// is non-NULL (mostly useful for testing).
+void TryRunLsof(const Sockaddr& addr, std::vector<std::string>* log = NULL);
+
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/net/sockaddr.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/sockaddr.cc b/be/src/kudu/util/net/sockaddr.cc
new file mode 100644
index 0000000..ed249c7
--- /dev/null
+++ b/be/src/kudu/util/net/sockaddr.cc
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/net/sockaddr.h"
+
+#include <arpa/inet.h>
+#include <errno.h>
+#include <netdb.h>
+#include <stdio.h>
+#include <string.h>
+#include <string>
+
+#include "kudu/gutil/endian.h"
+#include "kudu/gutil/hash/builtin_type_hash.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/stopwatch.h"
+
+namespace kudu {
+
+using strings::Substitute;
+
+///
+/// Sockaddr
+///
+Sockaddr::Sockaddr() {
+  memset(&addr_, 0, sizeof(addr_));
+  addr_.sin_family = AF_INET;
+  addr_.sin_addr.s_addr = INADDR_ANY;
+}
+
+Sockaddr::Sockaddr(const struct sockaddr_in& addr) {
+  memcpy(&addr_, &addr, sizeof(struct sockaddr_in));
+}
+
+Status Sockaddr::ParseString(const std::string& s, uint16_t default_port) {
+  HostPort hp;
+  RETURN_NOT_OK(hp.ParseString(s, default_port));
+
+  if (inet_pton(AF_INET, hp.host().c_str(), &addr_.sin_addr) != 1) {
+    return Status::InvalidArgument("Invalid IP address", hp.host());
+  }
+  set_port(hp.port());
+  return Status::OK();
+}
+
+Sockaddr& Sockaddr::operator=(const struct sockaddr_in &addr) {
+  memcpy(&addr_, &addr, sizeof(struct sockaddr_in));
+  return *this;
+}
+
+bool Sockaddr::operator==(const Sockaddr& other) const {
+  return memcmp(&other.addr_, &addr_, sizeof(addr_)) == 0;
+}
+
+bool Sockaddr::operator<(const Sockaddr &rhs) const {
+  return addr_.sin_addr.s_addr < rhs.addr_.sin_addr.s_addr;
+}
+
+uint32_t Sockaddr::HashCode() const {
+  uint32_t hash = Hash32NumWithSeed(addr_.sin_addr.s_addr, 0);
+  hash = Hash32NumWithSeed(addr_.sin_port, hash);
+  return hash;
+}
+
+void Sockaddr::set_port(int port) {
+  addr_.sin_port = htons(port);
+}
+
+int Sockaddr::port() const {
+  return ntohs(addr_.sin_port);
+}
+
+std::string Sockaddr::host() const {
+  char str[INET_ADDRSTRLEN];
+  ::inet_ntop(AF_INET, &addr_.sin_addr, str, INET_ADDRSTRLEN);
+  return str;
+}
+
+const struct sockaddr_in& Sockaddr::addr() const {
+  return addr_;
+}
+
+std::string Sockaddr::ToString() const {
+  char str[INET_ADDRSTRLEN];
+  ::inet_ntop(AF_INET, &addr_.sin_addr, str, INET_ADDRSTRLEN);
+  return StringPrintf("%s:%d", str, port());
+}
+
+bool Sockaddr::IsWildcard() const {
+  return addr_.sin_addr.s_addr == 0;
+}
+
+bool Sockaddr::IsAnyLocalAddress() const {
+  return (NetworkByteOrder::FromHost32(addr_.sin_addr.s_addr) >> 24) == 127;
+}
+
+Status Sockaddr::LookupHostname(string* hostname) const {
+  char host[NI_MAXHOST];
+  int flags = 0;
+
+  int rc;
+  LOG_SLOW_EXECUTION(WARNING, 200,
+                     Substitute("DNS reverse-lookup for $0", ToString())) {
+    rc = getnameinfo((struct sockaddr *) &addr_, sizeof(sockaddr_in),
+                     host, NI_MAXHOST,
+                     nullptr, 0, flags);
+  }
+  if (PREDICT_FALSE(rc != 0)) {
+    if (rc == EAI_SYSTEM) {
+      int errno_saved = errno;
+      return Status::NetworkError(Substitute("getnameinfo: $0", 
gai_strerror(rc)),
+                                  strerror(errno_saved), errno_saved);
+    }
+    return Status::NetworkError("getnameinfo", gai_strerror(rc), rc);
+  }
+  *hostname = host;
+  return Status::OK();
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/net/sockaddr.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/sockaddr.h b/be/src/kudu/util/net/sockaddr.h
new file mode 100644
index 0000000..09777f3
--- /dev/null
+++ b/be/src/kudu/util/net/sockaddr.h
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_NET_SOCKADDR_H
+#define KUDU_UTIL_NET_SOCKADDR_H
+
+#include <netinet/in.h>
+#include <iosfwd>
+#include <string>
+
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+///
+/// Represents a sockaddr.
+///
+/// Currently only IPv4 is implemented.  When IPv6 and UNIX domain are
+/// implemented, this should become an abstract base class and those should be
+/// multiple implementations.
+///
+class Sockaddr {
+ public:
+  Sockaddr();
+  explicit Sockaddr(const struct sockaddr_in &addr);
+
+  // Parse a string IP address of the form "A.B.C.D:port", storing the result
+  // in this Sockaddr object. If no ':port' is specified, uses 'default_port'.
+  // Note that this function will not handle resolving hostnames.
+  //
+  // Returns a bad Status if the input is malformed.
+  Status ParseString(const std::string& s, uint16_t default_port);
+
+  Sockaddr& operator=(const struct sockaddr_in &addr);
+
+  bool operator==(const Sockaddr& other) const;
+
+  // Compare the endpoints of two sockaddrs.
+  // The port number is ignored in this comparison.
+  bool operator<(const Sockaddr &rhs) const;
+
+  uint32_t HashCode() const;
+
+  std::string host() const;
+
+  void set_port(int port);
+  int port() const;
+  const struct sockaddr_in& addr() const;
+  std::string ToString() const;
+
+  // Returns true if the address is 0.0.0.0
+  bool IsWildcard() const;
+
+  // Returns true if the address is 127.*.*.*
+  bool IsAnyLocalAddress() const;
+
+  // Does reverse DNS lookup of the address and stores it in hostname.
+  Status LookupHostname(std::string* hostname) const;
+
+  // the default auto-generated copy constructor is fine here
+ private:
+  struct sockaddr_in addr_;
+};
+
+} // namespace kudu
+
+// Specialize std::hash for Sockaddr
+namespace std {
+template<>
+struct hash<kudu::Sockaddr> {
+  int operator()(const kudu::Sockaddr& addr) const {
+    return addr.HashCode();
+  }
+};
+} // namespace std
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/net/socket.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/socket.cc b/be/src/kudu/util/net/socket.cc
new file mode 100644
index 0000000..e0bea14
--- /dev/null
+++ b/be/src/kudu/util/net/socket.cc
@@ -0,0 +1,582 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/net/socket.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <limits>
+#include <string>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/errno.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
+#include "kudu/util/subprocess.h"
+
+DEFINE_string(local_ip_for_outbound_sockets, "",
+              "IP to bind to when making outgoing socket connections. "
+              "This must be an IP address of the form A.B.C.D, not a hostname. 
"
+              "Advanced parameter, subject to change.");
+TAG_FLAG(local_ip_for_outbound_sockets, experimental);
+
+DEFINE_bool(socket_inject_short_recvs, false,
+            "Inject short recv() responses which return less data than "
+            "requested");
+TAG_FLAG(socket_inject_short_recvs, hidden);
+TAG_FLAG(socket_inject_short_recvs, unsafe);
+
+namespace kudu {
+
+Socket::Socket()
+  : fd_(-1) {
+}
+
+Socket::Socket(int fd)
+  : fd_(fd) {
+}
+
+void Socket::Reset(int fd) {
+  ignore_result(Close());
+  fd_ = fd;
+}
+
+int Socket::Release() {
+  int fd = fd_;
+  fd_ = -1;
+  return fd;
+}
+
+Socket::~Socket() {
+  ignore_result(Close());
+}
+
+Status Socket::Close() {
+  if (fd_ < 0) {
+    return Status::OK();
+  }
+  int fd = fd_;
+  if (::close(fd) < 0) {
+    int err = errno;
+    return Status::NetworkError(std::string("close error: ") +
+                                ErrnoToString(err), Slice(), err);
+  }
+  fd_ = -1;
+  return Status::OK();
+}
+
+Status Socket::Shutdown(bool shut_read, bool shut_write) {
+  DCHECK_GE(fd_, 0);
+  int flags = 0;
+  if (shut_read && shut_write) {
+    flags |= SHUT_RDWR;
+  } else if (shut_read) {
+    flags |= SHUT_RD;
+  } else if (shut_write) {
+    flags |= SHUT_WR;
+  }
+  if (::shutdown(fd_, flags) < 0) {
+    int err = errno;
+    return Status::NetworkError(std::string("shutdown error: ") +
+                                ErrnoToString(err), Slice(), err);
+  }
+  return Status::OK();
+}
+
+int Socket::GetFd() const {
+  return fd_;
+}
+
+bool Socket::IsTemporarySocketError(int err) {
+  return ((err == EAGAIN) || (err == EWOULDBLOCK) || (err == EINTR));
+}
+
+#if defined(__linux__)
+
+Status Socket::Init(int flags) {
+  int nonblocking_flag = (flags & FLAG_NONBLOCKING) ? SOCK_NONBLOCK : 0;
+  Reset(::socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC | nonblocking_flag, 0));
+  if (fd_ < 0) {
+    int err = errno;
+    return Status::NetworkError(std::string("error opening socket: ") +
+                                ErrnoToString(err), Slice(), err);
+  }
+
+  return Status::OK();
+}
+
+#else
+
+Status Socket::Init(int flags) {
+  Reset(::socket(AF_INET, SOCK_STREAM, 0));
+  if (fd_ < 0) {
+    int err = errno;
+    return Status::NetworkError(std::string("error opening socket: ") +
+                                ErrnoToString(err), Slice(), err);
+  }
+  RETURN_NOT_OK(SetNonBlocking(flags & FLAG_NONBLOCKING));
+  RETURN_NOT_OK(SetCloseOnExec());
+
+  // Disable SIGPIPE.
+  int set = 1;
+  if (setsockopt(fd_, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof(set)) == -1) {
+    int err = errno;
+    return Status::NetworkError(std::string("failed to set SO_NOSIGPIPE: ") +
+                                ErrnoToString(err), Slice(), err);
+  }
+
+  return Status::OK();
+}
+
+#endif // defined(__linux__)
+
+Status Socket::SetNoDelay(bool enabled) {
+  int flag = enabled ? 1 : 0;
+  if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) == -1) {
+    int err = errno;
+    return Status::NetworkError(std::string("failed to set TCP_NODELAY: ") +
+                                ErrnoToString(err), Slice(), err);
+  }
+  return Status::OK();
+}
+
+Status Socket::SetTcpCork(bool enabled) {
+#if defined(__linux__)
+  int flag = enabled ? 1 : 0;
+  if (setsockopt(fd_, IPPROTO_TCP, TCP_CORK, &flag, sizeof(flag)) == -1) {
+    int err = errno;
+    return Status::NetworkError(std::string("failed to set TCP_CORK: ") +
+                                ErrnoToString(err), Slice(), err);
+  }
+#endif // defined(__linux__)
+  // TODO: Use TCP_NOPUSH for OSX if perf becomes an issue.
+  return Status::OK();
+}
+
+Status Socket::SetNonBlocking(bool enabled) {
+  int curflags = ::fcntl(fd_, F_GETFL, 0);
+  if (curflags == -1) {
+    int err = errno;
+    return Status::NetworkError(
+        StringPrintf("Failed to get file status flags on fd %d", fd_),
+        ErrnoToString(err), err);
+  }
+  int newflags = (enabled) ? (curflags | O_NONBLOCK) : (curflags & 
~O_NONBLOCK);
+  if (::fcntl(fd_, F_SETFL, newflags) == -1) {
+    int err = errno;
+    if (enabled) {
+      return Status::NetworkError(
+          StringPrintf("Failed to set O_NONBLOCK on fd %d", fd_),
+          ErrnoToString(err), err);
+    } else {
+      return Status::NetworkError(
+          StringPrintf("Failed to clear O_NONBLOCK on fd %d", fd_),
+          ErrnoToString(err), err);
+    }
+  }
+  return Status::OK();
+}
+
+Status Socket::IsNonBlocking(bool* is_nonblock) const {
+  int curflags = ::fcntl(fd_, F_GETFL, 0);
+  if (curflags == -1) {
+    int err = errno;
+    return Status::NetworkError(
+        StringPrintf("Failed to get file status flags on fd %d", fd_),
+        ErrnoToString(err), err);
+  }
+  *is_nonblock = ((curflags & O_NONBLOCK) != 0);
+  return Status::OK();
+}
+
+Status Socket::SetCloseOnExec() {
+  int curflags = fcntl(fd_, F_GETFD, 0);
+  if (curflags == -1) {
+    int err = errno;
+    Reset(-1);
+    return Status::NetworkError(std::string("fcntl(F_GETFD) error: ") +
+                                ErrnoToString(err), Slice(), err);
+  }
+  if (fcntl(fd_, F_SETFD, curflags | FD_CLOEXEC) == -1) {
+    int err = errno;
+    Reset(-1);
+    return Status::NetworkError(std::string("fcntl(F_SETFD) error: ") +
+                                ErrnoToString(err), Slice(), err);
+  }
+  return Status::OK();
+}
+
+Status Socket::SetSendTimeout(const MonoDelta& timeout) {
+  return SetTimeout(SO_SNDTIMEO, "SO_SNDTIMEO", timeout);
+}
+
+Status Socket::SetRecvTimeout(const MonoDelta& timeout) {
+  return SetTimeout(SO_RCVTIMEO, "SO_RCVTIMEO", timeout);
+}
+
+Status Socket::SetReuseAddr(bool flag) {
+  int err;
+  int int_flag = flag ? 1 : 0;
+  if (setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &int_flag, sizeof(int_flag)) 
== -1) {
+    err = errno;
+    return Status::NetworkError(std::string("failed to set SO_REUSEADDR: ") +
+                                ErrnoToString(err), Slice(), err);
+  }
+  return Status::OK();
+}
+
+Status Socket::BindAndListen(const Sockaddr &sockaddr,
+                             int listenQueueSize) {
+  RETURN_NOT_OK(SetReuseAddr(true));
+  RETURN_NOT_OK(Bind(sockaddr));
+  RETURN_NOT_OK(Listen(listenQueueSize));
+  return Status::OK();
+}
+
+Status Socket::Listen(int listen_queue_size) {
+  if (listen(fd_, listen_queue_size)) {
+    int err = errno;
+    return Status::NetworkError("listen() error", ErrnoToString(err));
+  }
+  return Status::OK();
+}
+
+Status Socket::GetSocketAddress(Sockaddr *cur_addr) const {
+  struct sockaddr_in sin;
+  socklen_t len = sizeof(sin);
+  DCHECK_GE(fd_, 0);
+  if (::getsockname(fd_, (struct sockaddr *)&sin, &len) == -1) {
+    int err = errno;
+    return Status::NetworkError(string("getsockname error: ") +
+                                ErrnoToString(err), Slice(), err);
+  }
+  *cur_addr = sin;
+  return Status::OK();
+}
+
+Status Socket::GetPeerAddress(Sockaddr *cur_addr) const {
+  struct sockaddr_in sin;
+  socklen_t len = sizeof(sin);
+  DCHECK_GE(fd_, 0);
+  if (::getpeername(fd_, (struct sockaddr *)&sin, &len) == -1) {
+    int err = errno;
+    return Status::NetworkError(string("getpeername error: ") +
+                                ErrnoToString(err), Slice(), err);
+  }
+  *cur_addr = sin;
+  return Status::OK();
+}
+
+bool Socket::IsLoopbackConnection() const {
+  Sockaddr local, remote;
+  if (!GetSocketAddress(&local).ok()) return false;
+  if (!GetPeerAddress(&remote).ok()) return false;
+
+  // Compare without comparing ports.
+  local.set_port(0);
+  remote.set_port(0);
+  return local == remote;
+}
+
+Status Socket::Bind(const Sockaddr& bind_addr) {
+  struct sockaddr_in addr = bind_addr.addr();
+
+  DCHECK_GE(fd_, 0);
+  if (PREDICT_FALSE(::bind(fd_, (struct sockaddr*) &addr, sizeof(addr)))) {
+    int err = errno;
+    Status s = Status::NetworkError(
+        strings::Substitute("error binding socket to $0: $1",
+                            bind_addr.ToString(), ErrnoToString(err)),
+        Slice(), err);
+
+    if (s.IsNetworkError() && s.posix_code() == EADDRINUSE && bind_addr.port() 
!= 0) {
+      TryRunLsof(bind_addr);
+    }
+    return s;
+  }
+
+  return Status::OK();
+}
+
+Status Socket::Accept(Socket *new_conn, Sockaddr *remote, int flags) {
+  TRACE_EVENT0("net", "Socket::Accept");
+  struct sockaddr_in addr;
+  socklen_t olen = sizeof(addr);
+  DCHECK_GE(fd_, 0);
+#if defined(__linux__)
+  int accept_flags = SOCK_CLOEXEC;
+  if (flags & FLAG_NONBLOCKING) {
+    accept_flags |= SOCK_NONBLOCK;
+  }
+  new_conn->Reset(::accept4(fd_, (struct sockaddr*)&addr,
+                  &olen, accept_flags));
+  if (new_conn->GetFd() < 0) {
+    int err = errno;
+    return Status::NetworkError(std::string("accept4(2) error: ") +
+                                ErrnoToString(err), Slice(), err);
+  }
+#else
+  new_conn->Reset(::accept(fd_, (struct sockaddr*)&addr, &olen));
+  if (new_conn->GetFd() < 0) {
+    int err = errno;
+    return Status::NetworkError(std::string("accept(2) error: ") +
+                                ErrnoToString(err), Slice(), err);
+  }
+  RETURN_NOT_OK(new_conn->SetNonBlocking(flags & FLAG_NONBLOCKING));
+  RETURN_NOT_OK(new_conn->SetCloseOnExec());
+#endif // defined(__linux__)
+
+  *remote = addr;
+  TRACE_EVENT_INSTANT1("net", "Accepted", TRACE_EVENT_SCOPE_THREAD,
+                       "remote", remote->ToString());
+  return Status::OK();
+}
+
+Status Socket::BindForOutgoingConnection() {
+  Sockaddr bind_host;
+  Status s = bind_host.ParseString(FLAGS_local_ip_for_outbound_sockets, 0);
+  CHECK(s.ok() && bind_host.port() == 0)
+    << "Invalid local IP set for 'local_ip_for_outbound_sockets': '"
+    << FLAGS_local_ip_for_outbound_sockets << "': " << s.ToString();
+
+  RETURN_NOT_OK(Bind(bind_host));
+  return Status::OK();
+}
+
+Status Socket::Connect(const Sockaddr &remote) {
+  TRACE_EVENT1("net", "Socket::Connect",
+               "remote", remote.ToString());
+  if (PREDICT_FALSE(!FLAGS_local_ip_for_outbound_sockets.empty())) {
+    RETURN_NOT_OK(BindForOutgoingConnection());
+  }
+
+  struct sockaddr_in addr;
+  memcpy(&addr, &remote.addr(), sizeof(sockaddr_in));
+  DCHECK_GE(fd_, 0);
+  if (::connect(fd_, (const struct sockaddr*)&addr, sizeof(addr)) < 0) {
+    int err = errno;
+    return Status::NetworkError(std::string("connect(2) error: ") +
+                                ErrnoToString(err), Slice(), err);
+  }
+  return Status::OK();
+}
+
+Status Socket::GetSockError() const {
+  int val = 0, ret;
+  socklen_t val_len = sizeof(val);
+  DCHECK_GE(fd_, 0);
+  ret = ::getsockopt(fd_, SOL_SOCKET, SO_ERROR, &val, &val_len);
+  if (ret) {
+    int err = errno;
+    return Status::NetworkError(std::string("getsockopt(SO_ERROR) failed: ") +
+                                ErrnoToString(err), Slice(), err);
+  }
+  if (val != 0) {
+    return Status::NetworkError(ErrnoToString(val), Slice(), val);
+  }
+  return Status::OK();
+}
+
+Status Socket::Write(const uint8_t *buf, int32_t amt, int32_t *nwritten) {
+  if (amt <= 0) {
+    return Status::NetworkError(
+              StringPrintf("invalid send of %" PRId32 " bytes",
+                           amt), Slice(), EINVAL);
+  }
+  DCHECK_GE(fd_, 0);
+  int res = ::send(fd_, buf, amt, MSG_NOSIGNAL);
+  if (res < 0) {
+    int err = errno;
+    return Status::NetworkError(std::string("write error: ") +
+                                ErrnoToString(err), Slice(), err);
+  }
+  *nwritten = res;
+  return Status::OK();
+}
+
+Status Socket::Writev(const struct ::iovec *iov, int iov_len,
+                      int32_t *nwritten) {
+  if (PREDICT_FALSE(iov_len <= 0)) {
+    return Status::NetworkError(
+                StringPrintf("writev: invalid io vector length of %d",
+                             iov_len),
+                Slice(), EINVAL);
+  }
+  DCHECK_GE(fd_, 0);
+
+  struct msghdr msg;
+  memset(&msg, 0, sizeof(struct msghdr));
+  msg.msg_iov = const_cast<iovec *>(iov);
+  msg.msg_iovlen = iov_len;
+  int res = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
+  if (PREDICT_FALSE(res < 0)) {
+    int err = errno;
+    return Status::NetworkError(std::string("sendmsg error: ") +
+                                ErrnoToString(err), Slice(), err);
+  }
+
+  *nwritten = res;
+  return Status::OK();
+}
+
+// Mostly follows writen() from Stevens (2004) or Kerrisk (2010).
+Status Socket::BlockingWrite(const uint8_t *buf, size_t buflen, size_t 
*nwritten,
+    const MonoTime& deadline) {
+  DCHECK_LE(buflen, std::numeric_limits<int32_t>::max()) << "Writes > 
INT32_MAX not supported";
+  DCHECK(nwritten);
+
+  size_t tot_written = 0;
+  while (tot_written < buflen) {
+    int32_t inc_num_written = 0;
+    int32_t num_to_write = buflen - tot_written;
+    MonoDelta timeout = deadline - MonoTime::Now();
+    if (PREDICT_FALSE(timeout.ToNanoseconds() <= 0)) {
+      return Status::TimedOut("BlockingWrite timed out");
+    }
+    RETURN_NOT_OK(SetSendTimeout(timeout));
+    Status s = Write(buf, num_to_write, &inc_num_written);
+    tot_written += inc_num_written;
+    buf += inc_num_written;
+    *nwritten = tot_written;
+
+    if (PREDICT_FALSE(!s.ok())) {
+      // Continue silently when the syscall is interrupted.
+      if (s.posix_code() == EINTR) {
+        continue;
+      }
+      if (s.posix_code() == EAGAIN) {
+        return Status::TimedOut("");
+      }
+      return s.CloneAndPrepend("BlockingWrite error");
+    }
+    if (PREDICT_FALSE(inc_num_written == 0)) {
+      // Shouldn't happen on Linux with a blocking socket. Maybe other Unices.
+      break;
+    }
+  }
+
+  if (tot_written < buflen) {
+    return Status::IOError("Wrote zero bytes on a BlockingWrite() call",
+        StringPrintf("Transferred %zu of %zu bytes", tot_written, buflen));
+  }
+  return Status::OK();
+}
+
+Status Socket::Recv(uint8_t *buf, int32_t amt, int32_t *nread) {
+  if (amt <= 0) {
+    return Status::NetworkError(
+          StringPrintf("invalid recv of %d bytes", amt), Slice(), EINVAL);
+  }
+
+  // The recv() call can return fewer than the requested number of bytes.
+  // Especially when 'amt' is small, this is very unlikely to happen in
+  // the context of unit tests. So, we provide an injection hook which
+  // simulates the same behavior.
+  if (PREDICT_FALSE(FLAGS_socket_inject_short_recvs && amt > 1)) {
+    Random r(GetRandomSeed32());
+    amt = 1 + r.Uniform(amt - 1);
+  }
+
+  DCHECK_GE(fd_, 0);
+  int res = ::recv(fd_, buf, amt, 0);
+  if (res <= 0) {
+    if (res == 0) {
+      return Status::NetworkError("Recv() got EOF from remote", Slice(), 
ESHUTDOWN);
+    }
+    int err = errno;
+    return Status::NetworkError(std::string("recv error: ") +
+                                ErrnoToString(err), Slice(), err);
+  }
+  *nread = res;
+  return Status::OK();
+}
+
+// Mostly follows readn() from Stevens (2004) or Kerrisk (2010).
+// One place where we deviate: we consider EOF a failure if < amt bytes are 
read.
+Status Socket::BlockingRecv(uint8_t *buf, size_t amt, size_t *nread, const 
MonoTime& deadline) {
+  DCHECK_LE(amt, std::numeric_limits<int32_t>::max()) << "Reads > INT32_MAX 
not supported";
+  DCHECK(nread);
+  size_t tot_read = 0;
+  while (tot_read < amt) {
+    int32_t inc_num_read = 0;
+    int32_t num_to_read = amt - tot_read;
+    MonoDelta timeout = deadline - MonoTime::Now();
+    if (PREDICT_FALSE(timeout.ToNanoseconds() <= 0)) {
+      return Status::TimedOut("");
+    }
+    RETURN_NOT_OK(SetRecvTimeout(timeout));
+    Status s = Recv(buf, num_to_read, &inc_num_read);
+    tot_read += inc_num_read;
+    buf += inc_num_read;
+    *nread = tot_read;
+
+    if (PREDICT_FALSE(!s.ok())) {
+      // Continue silently when the syscall is interrupted.
+      if (s.posix_code() == EINTR) {
+        continue;
+      }
+      if (s.posix_code() == EAGAIN) {
+        return Status::TimedOut("");
+      }
+      return s.CloneAndPrepend("BlockingRecv error");
+    }
+    if (PREDICT_FALSE(inc_num_read == 0)) {
+      // EOF.
+      break;
+    }
+  }
+
+  if (PREDICT_FALSE(tot_read < amt)) {
+    return Status::IOError("Read zero bytes on a blocking Recv() call",
+        StringPrintf("Transferred %zu of %zu bytes", tot_read, amt));
+  }
+  return Status::OK();
+}
+
+Status Socket::SetTimeout(int opt, std::string optname, const MonoDelta& 
timeout) {
+  if (PREDICT_FALSE(timeout.ToNanoseconds() < 0)) {
+    return Status::InvalidArgument("Timeout specified as negative to 
SetTimeout",
+                                   timeout.ToString());
+  }
+  struct timeval tv;
+  timeout.ToTimeVal(&tv);
+  socklen_t optlen = sizeof(tv);
+  if (::setsockopt(fd_, SOL_SOCKET, opt, &tv, optlen) == -1) {
+    int err = errno;
+    return Status::NetworkError(
+        StringPrintf("Failed to set %s to %s", optname.c_str(), 
timeout.ToString().c_str()),
+        ErrnoToString(err), err);
+  }
+  return Status::OK();
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/net/socket.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/socket.h b/be/src/kudu/util/net/socket.h
new file mode 100644
index 0000000..ce5b7bb
--- /dev/null
+++ b/be/src/kudu/util/net/socket.h
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_NET_SOCKET_H
+#define KUDU_UTIL_NET_SOCKET_H
+
+#include <sys/uio.h>
+#include <string>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class MonoDelta;
+class MonoTime;
+class Sockaddr;
+
+class Socket {
+ public:
+  static const int FLAG_NONBLOCKING = 0x1;
+
+  // Create a new invalid Socket object.
+  Socket();
+
+  // Start managing a socket.
+  explicit Socket(int fd);
+
+  // Close the socket.  Errors will be ignored.
+  virtual ~Socket();
+
+  // Close the Socket, checking for errors.
+  virtual Status Close();
+
+  // call shutdown() on the socket
+  Status Shutdown(bool shut_read, bool shut_write);
+
+  // Start managing a socket.
+  void Reset(int fd);
+
+  // Stop managing the socket and return it.
+  int Release();
+
+  // Get the raw file descriptor, or -1 if there is no file descriptor being
+  // managed.
+  int GetFd() const;
+
+  // Returns true if the error is temporary and will go away if we retry on
+  // the socket.
+  static bool IsTemporarySocketError(int err);
+
+  Status Init(int flags); // See FLAG_NONBLOCKING
+
+  // Set or clear TCP_NODELAY
+  Status SetNoDelay(bool enabled);
+
+  // Set or clear TCP_CORK
+  Status SetTcpCork(bool enabled);
+
+  // Set or clear O_NONBLOCK
+  Status SetNonBlocking(bool enabled);
+  Status IsNonBlocking(bool* is_nonblock) const;
+
+  // Set SO_SENDTIMEO to the specified value. Should only be used for blocking 
sockets.
+  Status SetSendTimeout(const MonoDelta& timeout);
+
+  // Set SO_RCVTIMEO to the specified value. Should only be used for blocking 
sockets.
+  Status SetRecvTimeout(const MonoDelta& timeout);
+
+  // Sets SO_REUSEADDR to 'flag'. Should be used prior to Bind().
+  Status SetReuseAddr(bool flag);
+
+  // Convenience method to invoke the common sequence:
+  // 1) SetReuseAddr(true)
+  // 2) Bind()
+  // 3) Listen()
+  Status BindAndListen(const Sockaddr &sockaddr, int listen_queue_size);
+
+  // Start listening for new connections, with the given backlog size.
+  // Requires that the socket has already been bound using Bind().
+  Status Listen(int listen_queue_size);
+
+  // Call getsockname to get the address of this socket.
+  Status GetSocketAddress(Sockaddr *cur_addr) const;
+
+  // Call getpeername to get the address of the connected peer.
+  // It is virtual so that tests can override.
+  virtual Status GetPeerAddress(Sockaddr *cur_addr) const;
+
+  // Return true if this socket is determined to be a loopback connection
+  // (i.e. the local and remote peer share an IP address).
+  //
+  // If any error occurs while determining this, returns false.
+  bool IsLoopbackConnection() const;
+
+  // Call bind() to bind the socket to a given address.
+  // If bind() fails and indicates that the requested port is already in use,
+  // generates an informative log message by calling 'lsof' if available.
+  Status Bind(const Sockaddr& bind_addr);
+
+  // Call accept(2) to get a new connection.
+  Status Accept(Socket *new_conn, Sockaddr *remote, int flags);
+
+  // start connecting this socket to a remote address.
+  Status Connect(const Sockaddr &remote);
+
+  // get the error status using getsockopt(2)
+  Status GetSockError() const;
+
+  virtual Status Write(const uint8_t *buf, int32_t amt, int32_t *nwritten);
+
+  virtual Status Writev(const struct ::iovec *iov, int iov_len, int32_t 
*nwritten);
+
+  // Blocking Write call, returns IOError unless full buffer is sent.
+  // Underlying Socket expected to be in blocking mode. Fails if any Write() 
sends 0 bytes.
+  // Returns OK if buflen bytes were sent, otherwise IOError.
+  // Upon return, nwritten will contain the number of bytes actually written.
+  // See also writen() from Stevens (2004) or Kerrisk (2010)
+  Status BlockingWrite(const uint8_t *buf, size_t buflen, size_t *nwritten,
+      const MonoTime& deadline);
+
+  virtual Status Recv(uint8_t *buf, int32_t amt, int32_t *nread);
+
+  // Blocking Recv call, returns IOError unless requested amt bytes are read.
+  // Underlying Socket expected to be in blocking mode. Fails if any Recv() 
reads 0 bytes.
+  // Returns OK if amt bytes were read, otherwise IOError.
+  // Upon return, nread will contain the number of bytes actually read.
+  // See also readn() from Stevens (2004) or Kerrisk (2010)
+  Status BlockingRecv(uint8_t *buf, size_t amt, size_t *nread, const MonoTime& 
deadline);
+
+ private:
+  // Called internally from SetSend/RecvTimeout().
+  Status SetTimeout(int opt, std::string optname, const MonoDelta& timeout);
+
+  // Called internally during socket setup.
+  Status SetCloseOnExec();
+
+  // Bind the socket to a local address before making an outbound connection,
+  // based on the value of FLAGS_local_ip_for_outbound_sockets.
+  Status BindForOutgoingConnection();
+
+  int fd_;
+
+  DISALLOW_COPY_AND_ASSIGN(Socket);
+};
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/nvm_cache.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/nvm_cache.cc b/be/src/kudu/util/nvm_cache.cc
new file mode 100644
index 0000000..678aa87
--- /dev/null
+++ b/be/src/kudu/util/nvm_cache.cc
@@ -0,0 +1,578 @@
+// This file is derived from cache.cc in the LevelDB project:
+//
+//   Some portions copyright (c) 2011 The LevelDB Authors. All rights reserved.
+//   Use of this source code is governed by a BSD-style license that can be
+//   found in the LICENSE file.
+//
+// ------------------------------------------------------------
+// This file implements a cache based on the NVML library (http://pmem.io),
+// specifically its "libvmem" component. This library makes it easy to program
+// against persistent memory hardware by exposing an API which parallels
+// malloc/free, but allocates from persistent memory instead of DRAM.
+//
+// We use this API to implement a cache which treats persistent memory or
+// non-volatile memory as if it were a larger cheaper bank of volatile memory. 
We
+// currently make no use of its persistence properties.
+//
+// Currently, we only store key/value in NVM. All other data structures such 
as the
+// ShardedLRUCache instances, hash table, etc are in DRAM. The assumption is 
that
+// the ratio of data stored vs overhead is quite high.
+
+#include "kudu/util/nvm_cache.h"
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <iostream>
+#include <libvmem.h>
+#include <memory>
+#include <mutex>
+#include <stdlib.h>
+#include <string>
+#include <vector>
+
+#include "kudu/gutil/atomic_refcount.h"
+#include "kudu/gutil/hash/city.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/atomic.h"
+#include "kudu/util/cache.h"
+#include "kudu/util/cache_metrics.h"
+#include "kudu/util/flags.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/metrics.h"
+
+DEFINE_string(nvm_cache_path, "/vmem",
+              "The path at which the NVM cache will try to allocate its 
memory. "
+              "This can be a tmpfs or ramfs for testing purposes.");
+TAG_FLAG(nvm_cache_path, experimental);
+
+DEFINE_int32(nvm_cache_allocation_retry_count, 10,
+             "The number of times that the NVM cache will retry attempts to 
allocate "
+             "memory for new entries. In between attempts, a cache entry will 
be "
+             "evicted.");
+TAG_FLAG(nvm_cache_allocation_retry_count, advanced);
+TAG_FLAG(nvm_cache_allocation_retry_count, experimental);
+
+DEFINE_bool(nvm_cache_simulate_allocation_failure, false,
+            "If true, the NVM cache will inject failures in calls to 
vmem_malloc "
+            "for testing.");
+TAG_FLAG(nvm_cache_simulate_allocation_failure, unsafe);
+
+
+namespace kudu {
+
+class MetricEntity;
+
+namespace {
+
+using std::shared_ptr;
+using std::vector;
+
+typedef simple_spinlock MutexType;
+
+// LRU cache implementation
+
+// An entry is a variable length heap-allocated structure.  Entries
+// are kept in a circular doubly linked list ordered by access time.
+struct LRUHandle {
+  Cache::EvictionCallback* eviction_callback;
+  LRUHandle* next_hash;
+  LRUHandle* next;
+  LRUHandle* prev;
+  size_t charge;      // TODO(opt): Only allow uint32_t?
+  uint32_t key_length;
+  uint32_t val_length;
+  Atomic32 refs;
+  uint32_t hash;      // Hash of key(); used for fast sharding and comparisons
+  uint8_t* kv_data;
+
+  Slice key() const {
+    return Slice(kv_data, key_length);
+  }
+
+  Slice value() const {
+    return Slice(&kv_data[key_length], val_length);
+  }
+
+  uint8_t* val_ptr() {
+    return &kv_data[key_length];
+  }
+};
+
+// We provide our own simple hash table since it removes a whole bunch
+// of porting hacks and is also faster than some of the built-in hash
+// table implementations in some of the compiler/runtime combinations
+// we have tested.  E.g., readrandom speeds up by ~5% over the g++
+// 4.4.3's builtin hashtable.
+class HandleTable {
+ public:
+  HandleTable() : length_(0), elems_(0), list_(NULL) { Resize(); }
+  ~HandleTable() { delete[] list_; }
+
+  LRUHandle* Lookup(const Slice& key, uint32_t hash) {
+    return *FindPointer(key, hash);
+  }
+
+  LRUHandle* Insert(LRUHandle* h) {
+    LRUHandle** ptr = FindPointer(h->key(), h->hash);
+    LRUHandle* old = *ptr;
+    h->next_hash = (old == NULL ? NULL : old->next_hash);
+    *ptr = h;
+    if (old == NULL) {
+      ++elems_;
+      if (elems_ > length_) {
+        // Since each cache entry is fairly large, we aim for a small
+        // average linked list length (<= 1).
+        Resize();
+      }
+    }
+    return old;
+  }
+
+  LRUHandle* Remove(const Slice& key, uint32_t hash) {
+    LRUHandle** ptr = FindPointer(key, hash);
+    LRUHandle* result = *ptr;
+    if (result != NULL) {
+      *ptr = result->next_hash;
+      --elems_;
+    }
+    return result;
+  }
+
+ private:
+  // The table consists of an array of buckets where each bucket is
+  // a linked list of cache entries that hash into the bucket.
+  uint32_t length_;
+  uint32_t elems_;
+  LRUHandle** list_;
+
+  // Return a pointer to slot that points to a cache entry that
+  // matches key/hash.  If there is no such cache entry, return a
+  // pointer to the trailing slot in the corresponding linked list.
+  LRUHandle** FindPointer(const Slice& key, uint32_t hash) {
+    LRUHandle** ptr = &list_[hash & (length_ - 1)];
+    while (*ptr != NULL &&
+           ((*ptr)->hash != hash || key != (*ptr)->key())) {
+      ptr = &(*ptr)->next_hash;
+    }
+    return ptr;
+  }
+
+  void Resize() {
+    uint32_t new_length = 16;
+    while (new_length < elems_ * 1.5) {
+      new_length *= 2;
+    }
+    LRUHandle** new_list = new LRUHandle*[new_length];
+    memset(new_list, 0, sizeof(new_list[0]) * new_length);
+    uint32_t count = 0;
+    for (uint32_t i = 0; i < length_; i++) {
+      LRUHandle* h = list_[i];
+      while (h != NULL) {
+        LRUHandle* next = h->next_hash;
+        uint32_t hash = h->hash;
+        LRUHandle** ptr = &new_list[hash & (new_length - 1)];
+        h->next_hash = *ptr;
+        *ptr = h;
+        h = next;
+        count++;
+      }
+    }
+    DCHECK_EQ(elems_, count);
+    delete[] list_;
+    list_ = new_list;
+    length_ = new_length;
+  }
+};
+
+// A single shard of sharded cache.
+class NvmLRUCache {
+ public:
+  explicit NvmLRUCache(VMEM *vmp);
+  ~NvmLRUCache();
+
+  // Separate from constructor so caller can easily make an array of LRUCache
+  void SetCapacity(size_t capacity) { capacity_ = capacity; }
+
+  void SetMetrics(CacheMetrics* metrics) { metrics_ = metrics; }
+
+  Cache::Handle* Insert(LRUHandle* h, Cache::EvictionCallback* 
eviction_callback);
+
+  // Like Cache::Lookup, but with an extra "hash" parameter.
+  Cache::Handle* Lookup(const Slice& key, uint32_t hash, bool caching);
+  void Release(Cache::Handle* handle);
+  void Erase(const Slice& key, uint32_t hash);
+  void* AllocateAndRetry(size_t size);
+
+ private:
+  void NvmLRU_Remove(LRUHandle* e);
+  void NvmLRU_Append(LRUHandle* e);
+  // Just reduce the reference count by 1.
+  // Return true if last reference
+  bool Unref(LRUHandle* e);
+  void FreeEntry(LRUHandle* e);
+
+  // Evict the LRU item in the cache, adding it to the linked list
+  // pointed to by 'to_remove_head'.
+  void EvictOldestUnlocked(LRUHandle** to_remove_head);
+
+  // Free all of the entries in the linked list that has to_free_head
+  // as its head.
+  void FreeLRUEntries(LRUHandle* to_free_head);
+
+  // Wrapper around vmem_malloc which injects failures based on a flag.
+  void* VmemMalloc(size_t size);
+
+  // Initialized before use.
+  size_t capacity_;
+
+  // mutex_ protects the following state.
+  MutexType mutex_;
+  size_t usage_;
+
+  // Dummy head of LRU list.
+  // lru.prev is newest entry, lru.next is oldest entry.
+  LRUHandle lru_;
+
+  HandleTable table_;
+
+  VMEM* vmp_;
+
+  CacheMetrics* metrics_;
+};
+
+NvmLRUCache::NvmLRUCache(VMEM* vmp)
+  : usage_(0),
+  vmp_(vmp),
+  metrics_(NULL) {
+  // Make empty circular linked list
+  lru_.next = &lru_;
+  lru_.prev = &lru_;
+}
+
+NvmLRUCache::~NvmLRUCache() {
+  for (LRUHandle* e = lru_.next; e != &lru_; ) {
+    LRUHandle* next = e->next;
+    DCHECK_EQ(e->refs, 1);  // Error if caller has an unreleased handle
+    if (Unref(e)) {
+      FreeEntry(e);
+    }
+    e = next;
+  }
+}
+
+void* NvmLRUCache::VmemMalloc(size_t size) {
+  if (PREDICT_FALSE(FLAGS_nvm_cache_simulate_allocation_failure)) {
+    return NULL;
+  }
+  return vmem_malloc(vmp_, size);
+}
+
+bool NvmLRUCache::Unref(LRUHandle* e) {
+  DCHECK_GT(ANNOTATE_UNPROTECTED_READ(e->refs), 0);
+  return !base::RefCountDec(&e->refs);
+}
+
+void NvmLRUCache::FreeEntry(LRUHandle* e) {
+  DCHECK_EQ(ANNOTATE_UNPROTECTED_READ(e->refs), 0);
+  if (e->eviction_callback) {
+    e->eviction_callback->EvictedEntry(e->key(), e->value());
+  }
+  if (PREDICT_TRUE(metrics_)) {
+    metrics_->cache_usage->DecrementBy(e->charge);
+    metrics_->evictions->Increment();
+  }
+  vmem_free(vmp_, e);
+}
+
+// Allocate nvm memory. Try until successful or 
FLAGS_nvm_cache_allocation_retry_count
+// has been exceeded.
+void *NvmLRUCache::AllocateAndRetry(size_t size) {
+  void *tmp;
+  // There may be times that an allocation fails. With NVM we have
+  // a fixed size to allocate from. If we cannot allocate the size
+  // that was asked for, we will remove entries from the cache and
+  // retry up to the configured number of retries. If this fails, we
+  // return NULL, which will cause the caller to not insert anything
+  // into the cache.
+  LRUHandle *to_remove_head = NULL;
+  tmp = VmemMalloc(size);
+
+  if (tmp == NULL) {
+    std::unique_lock<MutexType> l(mutex_);
+
+    int retries_remaining = FLAGS_nvm_cache_allocation_retry_count;
+    while (tmp == NULL && retries_remaining-- > 0 && lru_.next != &lru_) {
+      EvictOldestUnlocked(&to_remove_head);
+
+      // Unlock while allocating memory.
+      l.unlock();
+      tmp = VmemMalloc(size);
+      l.lock();
+    }
+  }
+
+  // we free the entries here outside of mutex for
+  // performance reasons
+  FreeLRUEntries(to_remove_head);
+  return tmp;
+}
+
+void NvmLRUCache::NvmLRU_Remove(LRUHandle* e) {
+  e->next->prev = e->prev;
+  e->prev->next = e->next;
+  usage_ -= e->charge;
+}
+
+void NvmLRUCache::NvmLRU_Append(LRUHandle* e) {
+  // Make "e" newest entry by inserting just before lru_
+  e->next = &lru_;
+  e->prev = lru_.prev;
+  e->prev->next = e;
+  e->next->prev = e;
+  usage_ += e->charge;
+}
+
+Cache::Handle* NvmLRUCache::Lookup(const Slice& key, uint32_t hash, bool 
caching) {
+ LRUHandle* e;
+  {
+    std::lock_guard<MutexType> l(mutex_);
+    e = table_.Lookup(key, hash);
+    if (e != NULL) {
+      // If an entry exists, remove the old entry from the cache
+      // and re-add to the end of the linked list.
+      base::RefCountInc(&e->refs);
+      NvmLRU_Remove(e);
+      NvmLRU_Append(e);
+    }
+  }
+
+  // Do the metrics outside of the lock.
+  if (metrics_) {
+    metrics_->lookups->Increment();
+    bool was_hit = (e != NULL);
+    if (was_hit) {
+      if (caching) {
+        metrics_->cache_hits_caching->Increment();
+      } else {
+        metrics_->cache_hits->Increment();
+      }
+    } else {
+      if (caching) {
+        metrics_->cache_misses_caching->Increment();
+      } else {
+        metrics_->cache_misses->Increment();
+      }
+    }
+  }
+
+  return reinterpret_cast<Cache::Handle*>(e);
+}
+
+void NvmLRUCache::Release(Cache::Handle* handle) {
+  LRUHandle* e = reinterpret_cast<LRUHandle*>(handle);
+  bool last_reference = Unref(e);
+  if (last_reference) {
+    FreeEntry(e);
+  }
+}
+
+void NvmLRUCache::EvictOldestUnlocked(LRUHandle** to_remove_head) {
+  LRUHandle* old = lru_.next;
+  NvmLRU_Remove(old);
+  table_.Remove(old->key(), old->hash);
+  if (Unref(old)) {
+    old->next = *to_remove_head;
+    *to_remove_head = old;
+  }
+}
+
+void NvmLRUCache::FreeLRUEntries(LRUHandle* to_free_head) {
+  while (to_free_head != NULL) {
+    LRUHandle* next = to_free_head->next;
+    FreeEntry(to_free_head);
+    to_free_head = next;
+  }
+}
+
+Cache::Handle* NvmLRUCache::Insert(LRUHandle* e,
+                                   Cache::EvictionCallback* eviction_callback) 
{
+  DCHECK(e);
+  LRUHandle* to_remove_head = NULL;
+
+  e->refs = 2;  // One from LRUCache, one for the returned handle
+  e->eviction_callback = eviction_callback;
+  if (PREDICT_TRUE(metrics_)) {
+    metrics_->cache_usage->IncrementBy(e->charge);
+    metrics_->inserts->Increment();
+  }
+
+  {
+    std::lock_guard<MutexType> l(mutex_);
+
+    NvmLRU_Append(e);
+
+    LRUHandle* old = table_.Insert(e);
+    if (old != NULL) {
+      NvmLRU_Remove(old);
+      if (Unref(old)) {
+        old->next = to_remove_head;
+        to_remove_head = old;
+      }
+    }
+
+    while (usage_ > capacity_ && lru_.next != &lru_) {
+      EvictOldestUnlocked(&to_remove_head);
+    }
+  }
+
+  // we free the entries here outside of mutex for
+  // performance reasons
+  FreeLRUEntries(to_remove_head);
+
+  return reinterpret_cast<Cache::Handle*>(e);
+}
+
+void NvmLRUCache::Erase(const Slice& key, uint32_t hash) {
+  LRUHandle* e;
+  bool last_reference = false;
+  {
+    std::lock_guard<MutexType> l(mutex_);
+    e = table_.Remove(key, hash);
+    if (e != NULL) {
+      NvmLRU_Remove(e);
+      last_reference = Unref(e);
+    }
+  }
+  // mutex not held here
+  // last_reference will only be true if e != NULL
+  if (last_reference) {
+    FreeEntry(e);
+  }
+}
+static const int kNumShardBits = 4;
+static const int kNumShards = 1 << kNumShardBits;
+
+class ShardedLRUCache : public Cache {
+ private:
+  gscoped_ptr<CacheMetrics> metrics_;
+  vector<NvmLRUCache*> shards_;
+  MutexType id_mutex_;
+  uint64_t last_id_;
+  VMEM* vmp_;
+
+  static inline uint32_t HashSlice(const Slice& s) {
+    return util_hash::CityHash64(
+      reinterpret_cast<const char *>(s.data()), s.size());
+  }
+
+  static uint32_t Shard(uint32_t hash) {
+    return hash >> (32 - kNumShardBits);
+  }
+
+ public:
+  explicit ShardedLRUCache(size_t capacity, const string& id, VMEM* vmp)
+        : last_id_(0),
+          vmp_(vmp) {
+
+    const size_t per_shard = (capacity + (kNumShards - 1)) / kNumShards;
+    for (int s = 0; s < kNumShards; s++) {
+      gscoped_ptr<NvmLRUCache> shard(new NvmLRUCache(vmp_));
+      shard->SetCapacity(per_shard);
+      shards_.push_back(shard.release());
+    }
+  }
+
+  virtual ~ShardedLRUCache() {
+    STLDeleteElements(&shards_);
+    // Per the note at the top of this file, our cache is entirely volatile.
+    // Hence, when the cache is destructed, we delete the underlying
+    // VMEM pool.
+    vmem_delete(vmp_);
+  }
+
+  virtual Handle* Insert(PendingHandle* handle,
+                         Cache::EvictionCallback* eviction_callback) OVERRIDE {
+    LRUHandle* h = reinterpret_cast<LRUHandle*>(DCHECK_NOTNULL(handle));
+    return shards_[Shard(h->hash)]->Insert(h, eviction_callback);
+  }
+  virtual Handle* Lookup(const Slice& key, CacheBehavior caching) OVERRIDE {
+    const uint32_t hash = HashSlice(key);
+    return shards_[Shard(hash)]->Lookup(key, hash, caching == EXPECT_IN_CACHE);
+  }
+  virtual void Release(Handle* handle) OVERRIDE {
+    LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
+    shards_[Shard(h->hash)]->Release(handle);
+  }
+  virtual void Erase(const Slice& key) OVERRIDE {
+    const uint32_t hash = HashSlice(key);
+    shards_[Shard(hash)]->Erase(key, hash);
+  }
+  virtual Slice Value(Handle* handle) OVERRIDE {
+    return reinterpret_cast<LRUHandle*>(handle)->value();
+  }
+  virtual uint8_t* MutableValue(PendingHandle* handle) OVERRIDE {
+    return reinterpret_cast<LRUHandle*>(handle)->val_ptr();
+  }
+
+  virtual uint64_t NewId() OVERRIDE {
+    std::lock_guard<MutexType> l(id_mutex_);
+    return ++(last_id_);
+  }
+  virtual void SetMetrics(const scoped_refptr<MetricEntity>& entity) OVERRIDE {
+    metrics_.reset(new CacheMetrics(entity));
+    for (NvmLRUCache* cache : shards_) {
+      cache->SetMetrics(metrics_.get());
+    }
+  }
+  virtual PendingHandle* Allocate(Slice key, int val_len, int charge) OVERRIDE 
{
+    int key_len = key.size();
+    DCHECK_GE(key_len, 0);
+    DCHECK_GE(val_len, 0);
+    LRUHandle* handle = nullptr;
+
+    // Try allocating from each of the shards -- if vmem is tight,
+    // this can cause eviction, so we might have better luck in different
+    // shards.
+    for (NvmLRUCache* cache : shards_) {
+      uint8_t* buf = static_cast<uint8_t*>(cache->AllocateAndRetry(
+          sizeof(LRUHandle) + key_len + val_len));
+      if (buf) {
+        handle = reinterpret_cast<LRUHandle*>(buf);
+        handle->kv_data = &buf[sizeof(LRUHandle)];
+        handle->val_length = val_len;
+        handle->key_length = key_len;
+        handle->charge = charge + key.size();
+        handle->hash = HashSlice(key);
+        memcpy(handle->kv_data, key.data(), key.size());
+        return reinterpret_cast<PendingHandle*>(handle);
+      }
+    }
+    // TODO: increment a metric here on allocation failure.
+    return nullptr;
+  }
+
+  virtual void Free(PendingHandle* ph) OVERRIDE {
+    vmem_free(vmp_, ph);
+  }
+};
+
+} // end anonymous namespace
+
+Cache* NewLRUNvmCache(size_t capacity, const std::string& id) {
+  // vmem_create() will fail if the capacity is too small, but with
+  // an inscrutable error. So, we'll check ourselves.
+  CHECK_GE(capacity, VMEM_MIN_POOL)
+    << "configured capacity " << capacity << " bytes is less than "
+    << "the minimum capacity for an NVM cache: " << VMEM_MIN_POOL;
+
+  VMEM* vmp = vmem_create(FLAGS_nvm_cache_path.c_str(), capacity);
+  // If we cannot create the cache pool we should not retry.
+  PLOG_IF(FATAL, vmp == NULL) << "Could not initialize NVM cache library in 
path "
+                              << FLAGS_nvm_cache_path.c_str();
+
+  return new ShardedLRUCache(capacity, id, vmp);
+}
+
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/nvm_cache.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/nvm_cache.h b/be/src/kudu/util/nvm_cache.h
new file mode 100644
index 0000000..38962f2
--- /dev/null
+++ b/be/src/kudu/util/nvm_cache.h
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_NVM_CACHE_H_
+#define KUDU_UTIL_NVM_CACHE_H_
+
+#include <string>
+
+namespace kudu {
+class Cache;
+
+// Create a cache in persistent memory with the given capacity.
+Cache* NewLRUNvmCache(size_t capacity, const std::string& id);
+
+}  // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/object_pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/object_pool-test.cc 
b/be/src/kudu/util/object_pool-test.cc
new file mode 100644
index 0000000..d6b34ae
--- /dev/null
+++ b/be/src/kudu/util/object_pool-test.cc
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include "kudu/util/object_pool.h"
+
+namespace kudu {
+
+// Simple class which maintains a count of how many objects
+// are currently alive.
+class MyClass {
+ public:
+  MyClass() {
+    instance_count_++;
+  }
+
+  ~MyClass() {
+    instance_count_--;
+  }
+
+  static int instance_count() {
+    return instance_count_;
+  }
+
+  static void ResetCount() {
+    instance_count_ = 0;
+  }
+
+ private:
+  static int instance_count_;
+};
+int MyClass::instance_count_ = 0;
+
+TEST(TestObjectPool, TestPooling) {
+  MyClass::ResetCount();
+  {
+    ObjectPool<MyClass> pool;
+    ASSERT_EQ(0, MyClass::instance_count());
+    MyClass *a = pool.Construct();
+    ASSERT_EQ(1, MyClass::instance_count());
+    MyClass *b = pool.Construct();
+    ASSERT_EQ(2, MyClass::instance_count());
+    ASSERT_TRUE(a != b);
+    pool.Destroy(b);
+    ASSERT_EQ(1, MyClass::instance_count());
+    MyClass *c = pool.Construct();
+    ASSERT_EQ(2, MyClass::instance_count());
+    ASSERT_TRUE(c == b) << "should reuse instance";
+    pool.Destroy(c);
+
+    ASSERT_EQ(1, MyClass::instance_count());
+  }
+
+  ASSERT_EQ(0, MyClass::instance_count())
+    << "destructing pool should have cleared instances";
+}
+
+TEST(TestObjectPool, TestScopedPtr) {
+  MyClass::ResetCount();
+  ASSERT_EQ(0, MyClass::instance_count());
+  ObjectPool<MyClass> pool;
+  {
+    ObjectPool<MyClass>::scoped_ptr sptr(
+      pool.make_scoped_ptr(pool.Construct()));
+    ASSERT_EQ(1, MyClass::instance_count());
+  }
+  ASSERT_EQ(0, MyClass::instance_count());
+}
+
+} // namespace kudu

Reply via email to