Repository: hadoop
Updated Branches:
  refs/heads/HDFS-8707 ba6f2fe33 -> f1b591d69


HDFS-10441: libhdfs++: HA namenode support.  Contributed by James Clampffer.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f1b591d6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f1b591d6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f1b591d6

Branch: refs/heads/HDFS-8707
Commit: f1b591d69989d11a735fddd576694aa51cfe47e9
Parents: ba6f2fe
Author: James <j...@apache.org>
Authored: Fri Jul 29 20:10:53 2016 -0400
Committer: James <j...@apache.org>
Committed: Fri Jul 29 20:10:53 2016 -0400

----------------------------------------------------------------------
 .../main/native/libhdfspp/examples/cat/c/cat.c  |   1 +
 .../native/libhdfspp/include/hdfspp/events.h    |   9 +-
 .../native/libhdfspp/include/hdfspp/options.h   |  42 +++-
 .../native/libhdfspp/include/hdfspp/status.h    |  24 +-
 .../native/libhdfspp/lib/common/CMakeLists.txt  |   2 +-
 .../libhdfspp/lib/common/hdfs_configuration.cc  | 123 ++++++++++
 .../libhdfspp/lib/common/hdfs_configuration.h   |   5 +
 .../main/native/libhdfspp/lib/common/logging.cc |   2 +-
 .../main/native/libhdfspp/lib/common/logging.h  |   9 +-
 .../libhdfspp/lib/common/namenode_info.cc       | 132 ++++++++++
 .../native/libhdfspp/lib/common/namenode_info.h |  49 ++++
 .../main/native/libhdfspp/lib/common/options.cc |  21 +-
 .../native/libhdfspp/lib/common/retry_policy.cc |  43 +++-
 .../native/libhdfspp/lib/common/retry_policy.h  |  83 ++++++-
 .../main/native/libhdfspp/lib/common/status.cc  |  73 ++++--
 .../src/main/native/libhdfspp/lib/common/uri.cc |  48 +++-
 .../src/main/native/libhdfspp/lib/common/uri.h  |   6 +-
 .../main/native/libhdfspp/lib/fs/filesystem.cc  |  39 ++-
 .../libhdfspp/lib/fs/namenode_operations.cc     |  18 +-
 .../libhdfspp/lib/fs/namenode_operations.h      |   7 +-
 .../native/libhdfspp/lib/rpc/rpc_connection.cc  |  56 ++++-
 .../native/libhdfspp/lib/rpc/rpc_connection.h   |  13 +-
 .../main/native/libhdfspp/lib/rpc/rpc_engine.cc | 241 ++++++++++++++++---
 .../main/native/libhdfspp/lib/rpc/rpc_engine.h  |  75 +++++-
 .../native/libhdfspp/tests/rpc_engine_test.cc   |  24 +-
 25 files changed, 1016 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b591d6/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cat/c/cat.c
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cat/c/cat.c
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cat/c/cat.c
index 5839308..dec8758 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cat/c/cat.c
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cat/c/cat.c
@@ -86,6 +86,7 @@ void parse_uri(const char * uri_string, struct Uri * uri) {
 };
 
 int main(int argc, char** argv) {
+
   char error_text[1024];
   if (argc != 2) {
     fprintf(stderr, "usage: cat 
[hdfs://[<hostname>:<port>]]/<path-to-file>\n");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b591d6/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h
index 43187a5..80c3712 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h
@@ -29,13 +29,20 @@ namespace hdfs {
  * Supported event names.  These names will stay consistent in libhdfs 
callbacks.
  *
  * Other events not listed here may be seen, but they are not stable and
- * should not be counted on.
+ * should not be counted on.  May need to be broken up into more components
+ * as more events are added.
  */
 
 static constexpr const char * FS_NN_CONNECT_EVENT = "NN::connect";
 static constexpr const char * FS_NN_READ_EVENT = "NN::read";
 static constexpr const char * FS_NN_WRITE_EVENT = "NN::write";
 
+// NN failover event due to issues with the current NN; might be standby, 
might be dead.
+// Invokes the fs_event_callback using the nameservice name in the cluster 
string.
+// The uint64_t value argument holds an address that can be reinterpreted as a 
const char *
+// and provides the full URI of the node the failover will attempt to connect 
to next.
+static constexpr const char * FS_NN_FAILOVER_EVENT = "NN::failover";
+
 static constexpr const char * FILE_DN_CONNECT_EVENT = "DN::connect";
 static constexpr const char * FILE_DN_READ_EVENT = "DN::read";
 static constexpr const char * FILE_DN_WRITE_EVENT = "DN::write";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b591d6/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h
index 8562f6d..1acfe1a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h
@@ -20,8 +20,29 @@
 
 #include "common/uri.h"
 
+#include <string>
+#include <vector>
+#include <map>
+
 namespace hdfs {
 
+
+struct NamenodeInfo {
+  NamenodeInfo(const std::string &nameservice, const std::string &nodename, 
const URI &uri) :
+                nameservice(nameservice), name(nodename), uri(uri) {}
+  NamenodeInfo(){};
+  //nameservice this belongs to
+  std::string nameservice;
+  //node name
+  std::string name;
+  //host:port
+  URI uri;
+
+  //get server hostname and port (aka service)
+  std::string get_host() const;
+  std::string get_port() const;
+};
+
 /**
  * Options to control the behavior of the libhdfspp library.
  **/
@@ -44,7 +65,7 @@ struct Options {
    * Maximum number of retries for RPC operations
    **/
   int max_rpc_retries;
-  static const int kNoRetry = -1;
+  static const int kNoRetry = 0;
   static const int kDefaultMaxRpcRetries = kNoRetry;
 
   /**
@@ -66,6 +87,25 @@ struct Options {
   URI defaultFS;
 
   /**
+   * Namenodes used to provide HA for this cluster if applicable
+   **/
+  std::map<std::string, std::vector<NamenodeInfo>> services;
+
+
+  /**
+   * Client failover attempts before failover gives up
+   **/
+  int failover_max_retries;
+  static const unsigned int kDefaultFailoverMaxRetries = 15;
+
+  /**
+   * Client failover attempts before failover gives up if server
+   * connection is timing out.
+   **/
+  int failover_connection_max_retries;
+  static const unsigned int kDefaultFailoverConnectionMaxRetries = 0;
+
+  /*
    * Which form of authentication to use with the server
    * Default: simple
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b591d6/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h
index 1628d8c..f217cad 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h
@@ -27,15 +27,20 @@ class Status {
  public:
   // Create a success status.
   Status() : code_(0) {};
+
+  // Note: Avoid calling the Status constructors directly, call the factory 
methods instead
+
+  // Used for common status  types
   Status(int code, const char *msg);
-  Status(int code, const char *msg1, const char *msg2);
+  // Used for server side exceptions reported through RpcResponseProto and 
similar
+  Status(int code, const char *exception_class, const char *exception_details);
 
   // Factory methods
   static Status OK();
   static Status InvalidArgument(const char *msg);
   static Status ResourceUnavailable(const char *msg);
   static Status Unimplemented();
-  static Status Exception(const char *expception_class_name, const char 
*error_message);
+  static Status Exception(const char *exception_class_name, const char 
*exception_details);
   static Status Error(const char *error_message);
   static Status AuthenticationFailed();
   static Status Canceled();
@@ -61,13 +66,28 @@ class Status {
     kNotADirectory = static_cast<unsigned>(std::errc::not_a_directory),
     kFileAlreadyExists = static_cast<unsigned>(std::errc::file_exists),
     kPathIsNotEmptyDirectory = 
static_cast<unsigned>(std::errc::directory_not_empty),
+
+    // non-errc codes start at 256
     kException = 256,
     kAuthenticationFailed = 257,
+    kAccessControlException = 258,
+    kStandbyException = 259,
+    kSnapshotProtocolException = 260,
   };
 
+  std::string get_exception_class_str() const {
+    return exception_class_;
+  }
+
+  int get_server_exception_type() const {
+    return code_;
+  }
+
  private:
   int code_;
   std::string msg_;
+
+  std::string exception_class_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b591d6/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
index 484d1a4..501d3b5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
@@ -19,6 +19,6 @@ if(NEED_LINK_DL)
    set(LIB_DL dl)
 endif()
 
-add_library(common_obj OBJECT base64.cc status.cc sasl_digest_md5.cc 
hdfs_public_api.cc options.cc configuration.cc configuration_loader.cc 
hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc 
logging.cc libhdfs_events_impl.cc auth_info.cc)
+add_library(common_obj OBJECT base64.cc status.cc sasl_digest_md5.cc 
hdfs_public_api.cc options.cc configuration.cc configuration_loader.cc 
hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc 
logging.cc libhdfs_events_impl.cc auth_info.cc namenode_info.cc)
 add_library(common $<TARGET_OBJECTS:common_obj> 
$<TARGET_OBJECTS:uriparser2_obj>)
 target_link_libraries(common ${LIB_DL})

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b591d6/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc
index ef67af9..70775b8 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc
@@ -17,6 +17,13 @@
  */
 
 #include "common/hdfs_configuration.h"
+#include "common/logging.h"
+
+#include <exception>
+
+#ifndef DEFAULT_SCHEME
+  #define DEFAULT_SCHEME "hdfs://"
+#endif
 
 namespace hdfs {
 
@@ -40,6 +47,94 @@ void OptionalSet(T& target, optional<U> value) {
     target = *value;
 }
 
+std::vector<std::string> SplitOnComma(const std::string &s, bool 
include_empty_strings) {
+  std::vector<std::string> res;
+  std::string buf;
+
+  for(unsigned int i=0;i<s.size();i++) {
+    char c = s[i];
+    if(c != ',') {
+      buf += c;
+    } else {
+      if(!include_empty_strings && buf.empty()) {
+        // Skip adding empty strings if needed
+        continue;
+      }
+      res.push_back(buf);
+      buf.clear();
+    }
+  }
+
+  if(buf.size() > 0)
+    res.push_back(buf);
+
+  return res;
+}
+
+// Prepend hdfs:// to string if there isn't already a scheme
+// Converts unset optional into empty string
+std::string PrependHdfsScheme(optional<std::string> str) {
+  if(!str)
+    return "";
+
+  if(str.value().find("://") == std::string::npos)
+    return DEFAULT_SCHEME + str.value();
+  return str.value();
+}
+
+// It's either use this, goto, or a lot of returns w/ status checks
+struct ha_parse_error : public std::exception {
+  std::string desc;
+  ha_parse_error(const std::string &val) : desc(val) {};
+  const char *what() const noexcept override  {
+    return desc.c_str();
+  };
+};
+
+std::vector<NamenodeInfo> HdfsConfiguration::LookupNameService(const 
std::string &nameservice) {
+  std::vector<NamenodeInfo> namenodes;
+  try {
+    // Find namenodes that belong to nameservice
+    std::vector<std::string> namenode_ids;
+    {
+      std::string service_nodes = std::string("dfs.ha.namenodes.") + 
nameservice;
+      optional<std::string> namenode_list = Get(service_nodes);
+      if(namenode_list)
+        namenode_ids = SplitOnComma(namenode_list.value(), false);
+      else
+        throw ha_parse_error("unable to find " + service_nodes);
+
+      for(auto it=namenode_ids.begin(); it != namenode_ids.end(); it++)
+        LOG_INFO(kRPC, << "Namenode: " << *it);
+    }
+
+    // should this error if we only find 1 NN?
+    if(namenode_ids.empty())
+      throw ha_parse_error("No namenodes found for nameservice " + 
nameservice);
+
+    // Get URI for each HA namenode
+    for(auto node_id=namenode_ids.begin(); node_id != namenode_ids.end(); 
node_id++) {
+      // find URI
+      std::string dom_node_name = std::string("dfs.namenode.rpc-address.") + 
nameservice + "." + *node_id;
+      optional<URI> node_uri = 
URI::parse_from_string(PrependHdfsScheme(Get(dom_node_name)));
+
+      if(!node_uri) {
+        throw ha_parse_error("unable to find " + dom_node_name);
+      }
+
+      URI uri = node_uri.value();
+      LOG_INFO(kRPC, << "Read the following HA Namenode URI from config" << 
uri.GetDebugString());
+
+      NamenodeInfo node(nameservice, *node_id, uri);
+      namenodes.push_back(node);
+    }
+  } catch (ha_parse_error e) {
+    LOG_ERROR(kRPC, << "HA cluster detected but failed because : " << 
e.what());
+    namenodes.clear(); // Don't return inconsistent view
+  }
+  return namenodes;
+}
+
 // Interprets the resources to build an Options object
 Options HdfsConfiguration::GetOptions() {
   Options result;
@@ -50,7 +145,35 @@ Options HdfsConfiguration::GetOptions() {
   OptionalSet(result.rpc_retry_delay_ms, 
GetInt(kIpcClientConnectRetryIntervalKey));
   OptionalSet(result.defaultFS, GetUri(kFsDefaultFsKey));
 
+
+  OptionalSet(result.failover_max_retries, 
GetInt(kDfsClientFailoverMaxAttempts));
+  OptionalSet(result.failover_connection_max_retries, 
GetInt(kDfsClientFailoverConnectionRetriesOnTimeouts));
+
+  // Load all nameservices if it's HA configured
+  optional<std::string> dfs_nameservices = Get("dfs.nameservices");
+  if(dfs_nameservices) {
+    std::string nameservice = dfs_nameservices.value();
+
+    std::vector<std::string> all_services = SplitOnComma(nameservice, false);
+
+    // Look up nodes for each nameservice so that FileSystem object can support
+    // multiple nameservices by ID.
+    for(const std::string &service : all_services) {
+      if(service.empty())
+        continue;
+
+      LOG_DEBUG(kFileSystem, << "Parsing info for nameservice: " << service);
+      std::vector<NamenodeInfo> nodes = LookupNameService(service);
+      if(nodes.empty()) {
+        LOG_WARN(kFileSystem, << "Nameservice \"" << service << "\" declared 
in config but nodes aren't");
+      } else {
+        result.services[service] = nodes;
+      }
+    }
+  }
+
   optional<std::string> authentication_value = 
Get(kHadoopSecurityAuthenticationKey);
+
   if (authentication_value ) {
       std::string fixed_case_value = fixCase(authentication_value.value());
       if (fixed_case_value == fixCase(kHadoopSecurityAuthentication_kerberos))

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b591d6/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.h
index c6ead66..459364f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.h
@@ -46,6 +46,10 @@ class HdfsConfiguration : public Configuration {
     static constexpr const char * kHadoopSecurityAuthentication_simple = 
"simple";
     static constexpr const char * kHadoopSecurityAuthentication_kerberos = 
"kerberos";
 
+    static constexpr const char * kDfsClientFailoverMaxAttempts = 
"dfs.client.failover.max.attempts";
+    static constexpr const char * 
kDfsClientFailoverConnectionRetriesOnTimeouts = 
"dfs.client.failover.connection.retries.on.timeouts";
+
+
 private:
     friend class ConfigurationLoader;
 
@@ -57,6 +61,7 @@ private:
     HdfsConfiguration(const ConfigMap &src_map);
 
     static std::vector<std::string> GetDefaultFilenames();
+    std::vector<NamenodeInfo> LookupNameService(const std::string 
&nameservice);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b591d6/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc
index 39ed944..ac1c336 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc
@@ -177,12 +177,12 @@ LogMessage& LogMessage::operator<<(uint64_t val) {
   return *this;
 }
 
-
 LogMessage& LogMessage::operator<<(void *ptr) {
   msg_buffer_ << ptr;
   return *this;
 }
 
+
 std::string LogMessage::MsgString() const {
   return msg_buffer_.str();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b591d6/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h
index 9dc0c5f..f807fc4 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h
@@ -19,6 +19,8 @@
 #ifndef LIB_COMMON_LOGGING_H_
 #define LIB_COMMON_LOGGING_H_
 
+#include <asio/ip/tcp.hpp>
+
 #include "hdfspp/log.h"
 
 #include <iostream>
@@ -179,11 +181,10 @@ class LogMessage {
   LogMessage& operator<<(const std::string*);
   LogMessage& operator<<(const std::string&);
 
-  LogMessage& operator<<(const ::asio::ip::tcp::endpoint& endpoint);
-
   //convert to a string "true"/"false"
   LogMessage& operator<<(bool);
 
+  //integral types
   LogMessage& operator<<(int32_t);
   LogMessage& operator<<(uint32_t);
   LogMessage& operator<<(int64_t);
@@ -192,6 +193,10 @@ class LogMessage {
   //print address as hex
   LogMessage& operator<<(void *);
 
+  //asio types
+  LogMessage& operator<<(const ::asio::ip::tcp::endpoint& endpoint);
+
+
   std::string MsgString() const;
 
  private:

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b591d6/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc
new file mode 100644
index 0000000..bc38be7
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "namenode_info.h"
+
+#include "common/continuation/asio.h"
+#include "common/logging.h"
+
+#include <sstream>
+#include <utility>
+#include <future>
+#include <memory>
+
+namespace hdfs {
+
+ResolvedNamenodeInfo& ResolvedNamenodeInfo::operator=(const NamenodeInfo 
&info) {
+  nameservice = info.nameservice;
+  name = info.name;
+  uri = info.uri;
+  return *this;
+}
+
+
+
+std::string ResolvedNamenodeInfo::str() const {
+  std::stringstream ss;
+  ss << "ResolvedNamenodeInfo {nameservice: " << nameservice << ", name: " << 
name << ", uri: " << uri.str();
+  ss << ", host: " << uri.get_host();
+  auto port = uri.get_port();
+  if(port)
+    ss << ", port: " << port.value();
+  else
+    ss << ", port: unable to parse";
+
+  ss << ", scheme: " << uri.get_scheme();
+
+  ss << " [";
+  for(unsigned int i=0;i<endpoints.size();i++)
+    ss << endpoints[i] << " ";
+  ss << "] }";
+
+  return ss.str();
+}
+
+
+bool ResolveInPlace(::asio::io_service *ioservice, ResolvedNamenodeInfo &info) 
{
+  // this isn't very memory friendly, but if it needs to be called often there 
are bigger issues at hand
+  info.endpoints.clear();
+  std::vector<ResolvedNamenodeInfo> resolved = BulkResolve(ioservice, {info});
+  if(resolved.size() != 1)
+    return false;
+
+  info.endpoints = resolved[0].endpoints;
+  if(info.endpoints.size() == 0)
+    return false;
+  return true;
+}
+
+
+std::vector<ResolvedNamenodeInfo> BulkResolve(::asio::io_service *ioservice, 
const std::vector<NamenodeInfo> &nodes) {
+  using namespace asio_continuation;
+
+  typedef std::vector<asio::ip::tcp::endpoint> endpoint_vector;
+  typedef Pipeline<endpoint_vector> resolve_pipeline_t;
+
+
+  std::vector<std::pair<resolve_pipeline_t*, 
std::shared_ptr<std::promise<Status>>>> pipelines;
+  pipelines.reserve(nodes.size());
+
+  std::vector<ResolvedNamenodeInfo> resolved_info;
+  // This must never reallocate once async ops begin
+  resolved_info.reserve(nodes.size());
+
+  for(unsigned int i=0; i<nodes.size(); i++) {
+    std::string host = nodes[i].get_host();
+    std::string port = nodes[i].get_port();
+
+    ResolvedNamenodeInfo resolved;
+    resolved = nodes[i];
+    resolved_info.push_back(resolved);
+
+    // build the pipeline
+    resolve_pipeline_t *pipeline = resolve_pipeline_t::Create();
+    auto resolve_step = Resolve(ioservice, host, port, 
std::back_inserter(pipeline->state()));
+    pipeline->Push(resolve_step);
+
+    // make a status associated with current pipeline
+    std::shared_ptr<std::promise<Status>> active_stat = 
std::make_shared<std::promise<Status>>();
+    pipelines.push_back(std::make_pair(pipeline, active_stat));
+
+    pipeline->Run([i,active_stat, &resolved_info](const Status &s, const 
endpoint_vector &ends){
+      resolved_info[i].endpoints = ends;
+      active_stat->set_value(s);
+    });
+
+  }
+
+  // Join all async operations
+  std::vector<ResolvedNamenodeInfo> return_set;
+  for(unsigned int i=0; i<pipelines.size();i++) {
+    std::shared_ptr<std::promise<Status>> promise = pipelines[i].second;
+
+    std::future<Status> future = promise->get_future();
+    Status stat = future.get();
+
+    // Clear endpoints if we hit an error
+    if(!stat.ok()) {
+      LOG_WARN(kRPC, << "Unable to resolve endpoints for " << 
nodes[i].uri.str());
+      resolved_info[i].endpoints.clear();
+    }
+  }
+
+  return resolved_info;
+}
+
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b591d6/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.h
new file mode 100644
index 0000000..fdee8d7
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.h
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef COMMON_HDFS_NAMENODE_INFO_H_
+#define COMMON_HDFS_NAMENODE_INFO_H_
+
+#include <asio.hpp>
+#include <hdfspp/options.h>
+
+#include <string>
+#include <vector>
+
+namespace hdfs {
+
+// Internal representation of namenode info that keeps track
+// of its endpoints.
+struct ResolvedNamenodeInfo : public NamenodeInfo {
+  ResolvedNamenodeInfo& operator=(const NamenodeInfo &info);
+  std::string str() const;
+
+  std::vector<::asio::ip::tcp::endpoint> endpoints;
+};
+
+// Clear endpoints if set and resolve all of them in parallel.
+// Only successful lookups will be placed in the result set.
+std::vector<ResolvedNamenodeInfo> BulkResolve(::asio::io_service *ioservice, 
const std::vector<NamenodeInfo> &nodes);
+
+// Clear endpoints, if any, and resolve them again
+// Return true if endpoints were resolved
+bool ResolveInPlace(::asio::io_service *ioservice, ResolvedNamenodeInfo &info);
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b591d6/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc
index 305ea1a..29b45b2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc
@@ -26,12 +26,27 @@ const int Options::kNoRetry;
 const int Options::kDefaultMaxRpcRetries;
 const int Options::kDefaultRpcRetryDelayMs;
 const unsigned int Options::kDefaultHostExclusionDuration;
+const unsigned int Options::kDefaultFailoverMaxRetries;
+const unsigned int Options::kDefaultFailoverConnectionMaxRetries;
 
-Options::Options() : rpc_timeout(kDefaultRpcTimeout), 
rpc_connect_timeout(kDefaultRpcConnectTimeout),
+Options::Options() : rpc_timeout(kDefaultRpcTimeout),
+                     rpc_connect_timeout(kDefaultRpcConnectTimeout),
                      max_rpc_retries(kDefaultMaxRpcRetries),
                      rpc_retry_delay_ms(kDefaultRpcRetryDelayMs),
                      host_exclusion_duration(kDefaultHostExclusionDuration),
                      defaultFS(),
-                     authentication(kDefaultAuthentication)
-{}
+                     failover_max_retries(kDefaultFailoverMaxRetries),
+                     
failover_connection_max_retries(kDefaultFailoverConnectionMaxRetries),
+                     authentication(kDefaultAuthentication) {}
+
+std::string NamenodeInfo::get_host() const {
+  return uri.get_host();
+}
+std::string NamenodeInfo::get_port() const {
+  optional<uint16_t> p = uri.get_port();
+  if(!p)
+    return std::to_string(-1);
+  return std::to_string(p.value());
+}
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b591d6/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.cc
index 1857e20..a885d53 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.cc
@@ -17,31 +17,64 @@
  */
 
 #include "common/retry_policy.h"
+#include "common/logging.h"
+
+#include <sstream>
 
 namespace hdfs {
 
 RetryAction FixedDelayRetryPolicy::ShouldRetry(
     const Status &s, uint64_t retries, uint64_t failovers,
     bool isIdempotentOrAtMostOnce) const {
-  (void)s;
+  LOG_TRACE(kRPC, << "FixedDelayRetryPolicy::ShouldRetry(retries=" << retries 
<< ", failovers=" << failovers << ")");
   (void)isIdempotentOrAtMostOnce;
   if (retries + failovers >= max_retries_) {
     return RetryAction::fail(
-        "Failovers (" + std::to_string(retries + failovers) +
-        ") exceeded maximum retries (" + std::to_string(max_retries_) + ")");
+        "Failovers and retries(" + std::to_string(retries + failovers) +
+        ") exceeded maximum retries (" + std::to_string(max_retries_) + "), 
Status: " +
+        s.ToString());
   } else {
     return RetryAction::retry(delay_);
   }
 }
 
+
 RetryAction NoRetryPolicy::ShouldRetry(
     const Status &s, uint64_t retries, uint64_t failovers,
     bool isIdempotentOrAtMostOnce) const {
-  (void)s;
+  LOG_TRACE(kRPC, << "NoRetryPolicy::ShouldRetry(retries=" << retries << ", 
failovers=" << failovers << ")");
   (void)retries;
   (void)failovers;
   (void)isIdempotentOrAtMostOnce;
-  return RetryAction::fail("No retry");
+  return RetryAction::fail("No retry, Status: " + s.ToString());
+}
+
+
+RetryAction FixedDelayWithFailover::ShouldRetry(const Status &s, uint64_t 
retries,
+    uint64_t failovers,
+    bool isIdempotentOrAtMostOnce) const {
+  (void)isIdempotentOrAtMostOnce;
+  LOG_TRACE(kRPC, << "FixedDelayWithFailover::ShouldRetry(retries=" << retries 
<< ", failovers=" << failovers << ")");
+
+  if(s.code() == ::asio::error::timed_out && failovers < 
max_failover_retries_) {
+    // Try connecting to another NN in case this one keeps timing out
+    // Can add the backoff wait specified by 
dfs.client.failover.sleep.base.millis here
+    return RetryAction::failover(delay_);
+  }
+
+  if(retries < max_retries_) {
+    LOG_TRACE(kRPC, << "FixedDelayWithFailover::ShouldRetry: retries < 
max_retries_");
+    return RetryAction::retry(delay_);
+  } else if (retries >= max_retries_ && failovers < max_failover_retries_) {
+    LOG_TRACE(kRPC, << "FixedDelayWithFailover::ShouldRetry: retries >= 
max_retries_ && failovers < max_failover_retries_");
+    return RetryAction::failover(delay_);
+  } else if (retries >= max_retries_ && failovers == max_failover_retries_) {
+    LOG_TRACE(kRPC, << "FixedDelayWithFailover::ShouldRetry: retries >= 
max_retries_ && failovers == max_failover_retries_");
+    // 1 last retry on new connection
+    return RetryAction::retry(delay_);
+  }
+
+  return RetryAction::fail("Retry and failover didn't work, Status: " + 
s.ToString());
 }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b591d6/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.h
index afa4f7d..0b5bd80 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/retry_policy.h
@@ -43,13 +43,31 @@ class RetryAction {
   static RetryAction retry(uint64_t delay) {
     return RetryAction(RETRY, delay, "");
   }
-  static RetryAction failover() {
-    return RetryAction(FAILOVER_AND_RETRY, 0, "");
+  static RetryAction failover(uint64_t delay) {
+    return RetryAction(FAILOVER_AND_RETRY, delay, "");
   }
+
+  std::string decision_str() const {
+    switch(action) {
+      case FAIL: return "FAIL";
+      case RETRY: return "RETRY";
+      case FAILOVER_AND_RETRY: return "FAILOVER_AND_RETRY";
+      default: return "UNDEFINED ACTION";
+    }
+  };
 };
 
 class RetryPolicy {
+ protected:
+  uint64_t delay_;
+  uint64_t max_retries_;
+  RetryPolicy(uint64_t delay, uint64_t max_retries) :
+              delay_(delay), max_retries_(max_retries) {}
+
  public:
+  RetryPolicy() {};
+
+  virtual ~RetryPolicy() {}
   /*
    * If there was an error in communications, responds with the configured
    * action to take.
@@ -58,23 +76,71 @@ class RetryPolicy {
                                             uint64_t failovers,
                                             bool isIdempotentOrAtMostOnce) 
const = 0;
 
-  virtual ~RetryPolicy() {}
+  virtual std::string str() const { return "Base RetryPolicy"; }
 };
 
+
+/*
+ * Overview of how the failover retry policy works:
+ *
+ * 1) Acts the same as FixedDelayRetryPolicy in terms of connection retries 
against a single NN
+ *    with two differences:
+ *      a) If we have retried more than the maximum number of retries we will 
failover to the
+ *         other node and reset the retry counter rather than error out.  It 
will begin the same
+ *         routine on the other node.
+ *      b) If an attempted connection times out and max_failover_conn_retries_ 
is less than the
+ *         normal number of retries it will failover sooner.  The connection 
timeout retry limit
+ *         defaults to zero; the idea being that if a node is unresponsive 
it's better to just
+ *         try the secondary rather than incur the timeout cost multiple times.
+ *
+ * 2) Keeps track of the failover count in the same way that the retry count 
is tracked.  If failover
+ *    is triggered more than a set number (dfs.client.failover.max.attempts) 
of times then the operation
+ *    will error out in the same way that a non-HA operation would error if it 
ran out of retries.
+ *
+ * 3) Failover between namenodes isn't instantaneous so the RPC retry delay is 
reused to add a small
+ *    delay between failover attempts.  This helps prevent the client from 
quickly using up all of
+ *    its failover attempts while thrashing between namenodes that are both 
temporarily marked standby.
+ *    Note: The java client implements exponential backoff here with a base 
other than the rpc delay,
+ *    and this will do the same here in the future. This doesn't do any sort 
of exponential backoff
+ *    and the name can be changed to ExponentialDelayWithFailover when backoff 
is implemented.
+ */
+class FixedDelayWithFailover : public RetryPolicy {
+ public:
+  FixedDelayWithFailover(uint64_t delay, uint64_t max_retries,
+                         uint64_t max_failover_retries,
+                         uint64_t max_failover_conn_retries)
+      : RetryPolicy(delay, max_retries), 
max_failover_retries_(max_failover_retries),
+        max_failover_conn_retries_(max_failover_conn_retries) {}
+
+  RetryAction ShouldRetry(const Status &s, uint64_t retries,
+                          uint64_t failovers,
+                          bool isIdempotentOrAtMostOnce) const override;
+
+  std::string str() const override { return "FixedDelayWithFailover"; }
+
+ private:
+  // Attempts to fail over
+  uint64_t max_failover_retries_;
+  // Attempts to fail over if connection times out rather than
+  // tring to connect and wait for the timeout delay failover_retries_
+  // times.
+  uint64_t max_failover_conn_retries_;
+};
+
+
 /*
  * Returns a fixed delay up to a certain number of retries
  */
 class FixedDelayRetryPolicy : public RetryPolicy {
  public:
   FixedDelayRetryPolicy(uint64_t delay, uint64_t max_retries)
-      : delay_(delay), max_retries_(max_retries) {}
+      : RetryPolicy(delay, max_retries) {}
 
   RetryAction ShouldRetry(const Status &s, uint64_t retries,
                           uint64_t failovers,
                           bool isIdempotentOrAtMostOnce) const override;
- private:
-  uint64_t delay_;
-  uint64_t max_retries_;
+
+  std::string str() const override { return "FixedDelayRetryPolicy"; }
 };
 
 /*
@@ -82,9 +148,12 @@ class FixedDelayRetryPolicy : public RetryPolicy {
  */
 class NoRetryPolicy : public RetryPolicy {
  public:
+  NoRetryPolicy() {};
   RetryAction ShouldRetry(const Status &s, uint64_t retries,
                           uint64_t failovers,
                           bool isIdempotentOrAtMostOnce) const override;
+
+  std::string str() const override { return "NoRetryPolicy"; }
 };
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b591d6/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc
index d6c9875..796b1a2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc
@@ -21,33 +21,55 @@
 #include <cassert>
 #include <sstream>
 #include <cstring>
+#include <map>
 
 namespace hdfs {
 
-const char * kStatusAccessControlException = 
"org.apache.hadoop.security.AccessControlException";
-const char * kStatusSaslException = "javax.security.sasl.SaslException";
-const char * kPathNotFoundException = 
"org.apache.hadoop.fs.InvalidPathException";
-const char * kPathNotFoundException2 = "java.io.FileNotFoundException";
-const char * kPathIsNotDirectoryException = 
"org.apache.hadoop.fs.PathIsNotDirectoryException";
-const char * kSnapshotException = 
"org.apache.hadoop.hdfs.protocol.SnapshotException";
-const char * kFileAlreadyExistsException = 
"org.apache.hadoop.fs.FileAlreadyExistsException";
+//  Server side exceptions that we capture from the RpcResponseHeaderProto
+const char * kStatusAccessControlException     = 
"org.apache.hadoop.security.AccessControlException";
+const char * kPathIsNotDirectoryException      = 
"org.apache.hadoop.fs.PathIsNotDirectoryException";
+const char * kSnapshotException                = 
"org.apache.hadoop.hdfs.protocol.SnapshotException";
+const char * kStatusStandbyException           = 
"org.apache.hadoop.ipc.StandbyException";
+const char * kStatusSaslException              = 
"javax.security.sasl.SaslException";
+const char * kPathNotFoundException            = 
"org.apache.hadoop.fs.InvalidPathException";
+const char * kPathNotFoundException2           = 
"java.io.FileNotFoundException";
+const char * kFileAlreadyExistsException       = 
"org.apache.hadoop.fs.FileAlreadyExistsException";
 const char * kPathIsNotEmptyDirectoryException = 
"org.apache.hadoop.fs.PathIsNotEmptyDirectoryException";
 
-Status::Status(int code, const char *msg1) : code_(code) {
+
+const static std::map<std::string, int> kKnownServerExceptionClasses = {
+                                            {kStatusAccessControlException, 
Status::kAccessControlException},
+                                            {kPathIsNotDirectoryException, 
Status::kNotADirectory},
+                                            {kSnapshotException, 
Status::kSnapshotProtocolException},
+                                            {kStatusStandbyException, 
Status::kStandbyException},
+                                            {kStatusSaslException, 
Status::kAuthenticationFailed},
+                                            {kPathNotFoundException, 
Status::kPathNotFound},
+                                            {kPathNotFoundException2, 
Status::kPathNotFound},
+                                            {kFileAlreadyExistsException, 
Status::kFileAlreadyExists},
+                                            
{kPathIsNotEmptyDirectoryException, Status::kPathIsNotEmptyDirectory}
+                                        };
+
+
+Status::Status(int code, const char *msg1)
+               : code_(code) {
   if(msg1) {
     msg_ = msg1;
   }
 }
 
-Status::Status(int code, const char *msg1, const char *msg2) : code_(code) {
-  std::stringstream ss;
-  if(msg1) {
-    ss << msg1;
-    if(msg2) {
-      ss << ":" << msg2;
-    }
+Status::Status(int code, const char *exception_class_name, const char 
*exception_details)
+               : code_(code) {
+  // If we can assure this never gets nullptr args this can be
+  // in the initializer list.
+  if(exception_class_name)
+    exception_class_ = exception_class_name;
+  if(exception_details)
+    msg_ = exception_details;
+
+  std::map<std::string, int>::const_iterator it = 
kKnownServerExceptionClasses.find(exception_class_);
+  if(it != kKnownServerExceptionClasses.end()) {
+    code_ = it->second;
   }
-  msg_ = ss.str();
 }
 
 
@@ -72,6 +94,7 @@ Status Status::Unimplemented() {
 }
 
 Status Status::Exception(const char *exception_class_name, const char 
*error_message) {
+  // Server side exception but can be represented by std::errc codes
   if (exception_class_name && (strcmp(exception_class_name, 
kStatusAccessControlException) == 0) )
     return Status(kPermissionDenied, error_message);
   else if (exception_class_name && (strcmp(exception_class_name, 
kStatusSaslException) == 0))
@@ -81,13 +104,13 @@ Status Status::Exception(const char *exception_class_name, 
const char *error_mes
   else if (exception_class_name && (strcmp(exception_class_name, 
kPathNotFoundException2) == 0))
     return Status(kPathNotFound, error_message);
   else if (exception_class_name && (strcmp(exception_class_name, 
kPathIsNotDirectoryException) == 0))
-      return Status(kNotADirectory, error_message);
+    return Status(kNotADirectory, error_message);
   else if (exception_class_name && (strcmp(exception_class_name, 
kSnapshotException) == 0))
-        return Status(kInvalidArgument, error_message);
+    return Status(kInvalidArgument, error_message);
   else if (exception_class_name && (strcmp(exception_class_name, 
kFileAlreadyExistsException) == 0))
-          return Status(kFileAlreadyExists, error_message);
+    return Status(kFileAlreadyExists, error_message);
   else if (exception_class_name && (strcmp(exception_class_name, 
kPathIsNotEmptyDirectoryException) == 0))
-          return Status(kPathIsNotEmptyDirectory, error_message);
+    return Status(kPathIsNotEmptyDirectory, error_message);
   else
     return Status(kException, exception_class_name, error_message);
 }
@@ -101,15 +124,19 @@ Status Status::AuthenticationFailed() {
 }
 
 Status Status::Canceled() {
-  return Status(kOperationCanceled,"Operation canceled");
+  return Status(kOperationCanceled, "Operation canceled");
 }
 
-
 std::string Status::ToString() const {
   if (code_ == kOk) {
     return "OK";
   }
-  return msg_;
+  std::stringstream ss;
+  if(!exception_class_.empty()) {
+    ss << exception_class_ << ":";
+  }
+  ss << msg_;
+  return ss.str();
 }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b591d6/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc
index 616cf3f..a54be8a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc
@@ -133,11 +133,11 @@ std::vector<std::string> split(const std::string input, 
char separator)
 
 
 std::string copy_range(const UriTextRangeA *r) {
-       const int size = r->afterLast - r->first;
-       if (size) {
+  const int size = r->afterLast - r->first;
+  if (size) {
       return std::string(r->first, size);
-       }
-       return "";
+  }
+  return "";
 }
 
 bool parse_int(const UriTextRangeA *r, optional<uint16_t> * result) {
@@ -161,14 +161,14 @@ bool parse_int(const UriTextRangeA *r, optional<uint16_t> 
* result) {
 
 std::vector<std::string> copy_path(const UriPathSegmentA *ps) {
     std::vector<std::string> result;
-       if (nullptr == ps)
+  if (nullptr == ps)
       return result;
 
-       for (; ps != 0; ps = ps->next) {
-               result.push_back(copy_range(&ps->text));
-       }
+  for (; ps != 0; ps = ps->next) {
+    result.push_back(copy_range(&ps->text));
+  }
 
-       return result;
+  return result;
 }
 
 void parse_user_info(const UriTextRangeA *r, std::string * user, std::string * 
pass) {
@@ -204,7 +204,7 @@ std::vector<std::pair<std::string, std::string > > 
parse_query(const char *first
       uriFreeQueryListA(query);
     }
 
-       return result;
+  return result;
 }
 
 
@@ -368,4 +368,32 @@ void URI::remove_queries(const std::string &q_name, bool 
encoded_input)
   }
 }
 
+std::string URI::GetDebugString() const {
+  std::stringstream ss;
+  ss << std::endl;
+  ss << "\t" << "uri.str() = \"" << str() << "\"" << std::endl;
+  ss << "\t" << "uri.get_scheme() = \"" << get_scheme() << "\"" << std::endl;
+  ss << "\t" << "uri.get_host() = \"" << get_host() << "\"" << std::endl;
+
+  if(!port)
+    ss << "\t" << "uri.get_port() = unset optional<uint16_t>" << std::endl;
+  else
+    ss << "\t" << "uri.get_port() = \"" << port.value() << "\"" << std::endl;
+
+  ss << "\t" << "uri.get_path() = \"" << get_path() << "\"" << std::endl;
+  ss << "\t" << "uri.get_fragment() = \"" << get_fragment() << "\"" << 
std::endl;
+
+
+  std::vector<std::pair<std::string, std::string> > elems = 
get_query_elements();
+
+  if(elems.size() > 0)
+    ss << "\t" << "Query elements:" << std::endl;
+
+  for(auto qry = elems.begin(); qry != elems.end(); qry++) {
+    ss << "\t\t" << qry->first << " -> " << qry->second << std::endl;
+  }
+
+  return ss.str();
+}
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b591d6/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.h
index e6fbd78..87f6919 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.h
@@ -118,11 +118,13 @@ public:
     { fragment = to_encoded(encoded_input,f); }
 
     std::string str(bool encoded_output=true) const;
+
+    // Get a string with each URI field printed on a seperate line
+    std::string GetDebugString() const;
 };
 
 inline std::ostream& operator<<(std::ostream &out, const URI &uri)
 { return out << uri.str(); }
 
 }
-
-#endif
\ No newline at end of file
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b591d6/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
index f3c32c2..ae55e0b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
@@ -18,6 +18,8 @@
 
 #include "filesystem.h"
 
+#include "common/namenode_info.h"
+
 #include <functional>
 #include <limits>
 #include <future>
@@ -126,9 +128,35 @@ void FileSystemImpl::Connect(const std::string &server,
     handler (Status::Error("Null IoService"), this);
   }
 
-  cluster_name_ = server + ":" + service;
+  // DNS lookup here for namenode(s)
+  std::vector<ResolvedNamenodeInfo> resolved_namenodes;
+
+  auto name_service = options_.services.find(server);
+  if(name_service != options_.services.end()) {
+    cluster_name_ = name_service->first;
+    resolved_namenodes = BulkResolve(&io_service_->io_service(), 
name_service->second);
+  } else {
+    cluster_name_ = server + ":" + service;
+
+    // tmp namenode info just to get this in the right format for BulkResolve
+    NamenodeInfo tmp_info;
+    optional<URI> uri = URI::parse_from_string("hdfs://" + cluster_name_);
+    if(!uri) {
+      LOG_ERROR(kFileSystem, << "Unable to use URI for cluster " << 
cluster_name_);
+      handler(Status::Error(("Invalid namenode " + cluster_name_ + " in 
config").c_str()), this);
+    }
+    tmp_info.uri = uri.value();
+
+    resolved_namenodes = BulkResolve(&io_service_->io_service(), {tmp_info});
+  }
+
+  for(unsigned int i=0;i<resolved_namenodes.size();i++) {
+    LOG_DEBUG(kFileSystem, << "Resolved Namenode");
+    LOG_DEBUG(kFileSystem, << resolved_namenodes[i].str());
+  }
+
 
-  nn_.Connect(cluster_name_, server, service, [this, handler](const Status & 
s) {
+  nn_.Connect(cluster_name_, /*server, service*/ resolved_namenodes, [this, 
handler](const Status & s) {
     handler(s, this);
   });
 }
@@ -216,6 +244,13 @@ void FileSystemImpl::Open(
                                  << path << ") called");
 
   nn_.GetBlockLocations(path, [this, path, handler](const Status &stat, 
std::shared_ptr<const struct FileInfo> file_info) {
+    if(!stat.ok()) {
+      LOG_INFO(kFileSystem, << "FileSystemImpl::Open failed to get block 
locations. status=" << stat.ToString());
+      if(stat.get_server_exception_type() == Status::kStandbyException) {
+        LOG_INFO(kFileSystem, << "Operation not allowed on standby datanode");
+      }
+    }
+
     handler(stat, stat.ok() ? new FileHandleImpl(cluster_name_, path, 
&io_service_->io_service(), client_name_, file_info, bad_node_tracker_, 
event_handlers_)
                             : nullptr);
   });

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b591d6/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc
index ee566ea..8b4c126 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc
@@ -27,6 +27,7 @@
 #include <tuple>
 #include <iostream>
 #include <pwd.h>
+#include <utility>
 
 #define FMT_THIS_ADDR "this=" << (void*)this
 
@@ -51,20 +52,9 @@ Status NameNodeOperations::CheckValidPermissionMask(short 
permissions) {
 }
 
 void NameNodeOperations::Connect(const std::string &cluster_name,
-                                 const std::string &server,
-                             const std::string &service,
-                             std::function<void(const Status &)> &&handler) {
-  using namespace asio_continuation;
-  typedef std::vector<tcp::endpoint> State;
-  auto m = Pipeline<State>::Create();
-  m->Push(Resolve(io_service_, server, service,
-                  std::back_inserter(m->state())))
-      .Push(Bind([this, m, cluster_name](const Continuation::Next &next) {
-        engine_.Connect(cluster_name, m->state(), next);
-      }));
-  m->Run([this, handler](const Status &status, const State &) {
-    handler(status);
-  });
+                                 const std::vector<ResolvedNamenodeInfo> 
&servers,
+                                 std::function<void(const Status &)> 
&&handler) {
+  engine_.Connect(cluster_name, servers, handler);
 }
 
 void NameNodeOperations::GetBlockLocations(const std::string & path,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b591d6/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h
index 4f4d63e..9651570 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h
@@ -21,6 +21,7 @@
 #include "rpc/rpc_engine.h"
 #include "hdfspp/statinfo.h"
 #include "hdfspp/fsinfo.h"
+#include "common/namenode_info.h"
 #include "ClientNamenodeProtocol.pb.h"
 #include "ClientNamenodeProtocol.hrpc.inl"
 
@@ -45,13 +46,12 @@ public:
             const char *protocol_name, int protocol_version) :
   io_service_(io_service),
   engine_(io_service, options, client_name, user_name, protocol_name, 
protocol_version),
-  namenode_(& engine_) {}
+  namenode_(& engine_), options_(options) {}
 
   static Status CheckValidPermissionMask(short permissions);
 
   void Connect(const std::string &cluster_name,
-               const std::string &server,
-               const std::string &service,
+               const std::vector<ResolvedNamenodeInfo> &servers,
                std::function<void(const Status &)> &&handler);
 
   void GetBlockLocations(const std::string & path,
@@ -104,6 +104,7 @@ private:
   ::asio::io_service * io_service_;
   RpcEngine engine_;
   ClientNamenodeProtocol namenode_;
+  const Options options_;
 };
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b591d6/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
index be6d7bd..03f83f4 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
@@ -118,7 +118,8 @@ Request::Request(LockFreeRpcEngine *engine, const 
std::string &method_name, int
       call_id_(call_id),
       timer_(engine->io_service()),
       handler_(std::move(handler)),
-      retry_count_(engine->retry_policy() ? 0 : kNoRetry) {
+      retry_count_(engine->retry_policy() ? 0 : kNoRetry),
+      failover_count_(0) {
   ConstructPayload(&payload_, &request);
 }
 
@@ -129,7 +130,8 @@ Request::Request(LockFreeRpcEngine *engine, const 
std::string &method_name, int
       call_id_(call_id),
       timer_(engine->io_service()),
       handler_(std::move(handler)),
-      retry_count_(engine->retry_policy() ? 0 : kNoRetry) {
+      retry_count_(engine->retry_policy() ? 0 : kNoRetry),
+      failover_count_(0) {
   ConstructPayload(&payload_, request);
 }
 
@@ -138,10 +140,13 @@ Request::Request(LockFreeRpcEngine *engine, Handler 
&&handler)
       call_id_(-1),
       timer_(engine->io_service()),
       handler_(std::move(handler)),
-      retry_count_(engine->retry_policy() ? 0 : kNoRetry) {
+      retry_count_(engine->retry_policy() ? 0 : kNoRetry),
+      failover_count_(0) {
 }
 
 void Request::GetPacket(std::string *res) const {
+  LOG_TRACE(kRPC, << "Request::GetPacket called");
+
   if (payload_.empty())
     return;
 
@@ -159,9 +164,27 @@ void Request::GetPacket(std::string *res) const {
 
 void Request::OnResponseArrived(pbio::CodedInputStream *is,
                                 const Status &status) {
+  LOG_TRACE(kRPC, << "Request::OnResponseArrived called");
   handler_(is, status);
 }
 
+std::string Request::GetDebugString() const {
+  // Basic description of this object, aimed at debugging
+  std::stringstream ss;
+  ss << "\nRequest Object:\n";
+  ss << "\tMethod name    = \"" << method_name_ << "\"\n";
+  ss << "\tCall id        = " << call_id_ << "\n";
+  ss << "\tRetry Count    = " << retry_count_ << "\n";
+  ss << "\tFailover count = " << failover_count_ << "\n";
+  return ss.str();
+}
+
+int Request::IncrementFailoverCount() {
+  // reset retry count when failing over
+  retry_count_ = 0;
+  return failover_count_++;
+}
+
 RpcConnection::RpcConnection(LockFreeRpcEngine *engine)
     : engine_(engine),
       connected_(kNotYetConnected) {}
@@ -258,7 +281,7 @@ void RpcConnection::AsyncFlushPendingRequests() {
   });
 }
 
-void RpcConnection::HandleRpcResponse(std::shared_ptr<Response> response) {
+Status RpcConnection::HandleRpcResponse(std::shared_ptr<Response> response) {
   assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
 
   response->ar.reset(new pbio::ArrayInputStream(&response->data_[0], 
response->data_.size()));
@@ -270,7 +293,7 @@ void 
RpcConnection::HandleRpcResponse(std::shared_ptr<Response> response) {
   auto req = RemoveFromRunningQueue(h.callid());
   if (!req) {
     LOG_WARN(kRPC, << "RPC response with Unknown call id " << h.callid());
-    return;
+    return Status::Error("Rpc response with unknown call id");
   }
 
   Status status;
@@ -288,9 +311,22 @@ void 
RpcConnection::HandleRpcResponse(std::shared_ptr<Response> response) {
       Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str());
   }
 
+  if(status.get_server_exception_type() == Status::kStandbyException) {
+    LOG_WARN(kRPC, << "Tried to connect to standby. status = " << 
status.ToString());
+
+    // We got the request back, but it needs to be resent to the other NN
+    std::vector<std::shared_ptr<Request>> reqs_to_redirect = {req};
+    PrependRequests_locked(reqs_to_redirect);
+
+    CommsError(status);
+    return status;
+  }
+
   io_service().post([req, response, status]() {
     req->OnResponseArrived(response->in.get(), status);  // Never call back 
while holding a lock
   });
+
+  return Status::OK();
 }
 
 void RpcConnection::HandleRpcTimeout(std::shared_ptr<Request> req,
@@ -437,6 +473,15 @@ void RpcConnection::PreEnqueueRequests(
   // Don't start sending yet; will flush when connected
 }
 
+// Only call when already holding conn state lock
+void RpcConnection::PrependRequests_locked( 
std::vector<std::shared_ptr<Request>> requests) {
+  LOG_DEBUG(kRPC, << "RpcConnection::PrependRequests called");
+
+  pending_requests_.insert(pending_requests_.begin(), requests.begin(),
+                           requests.end());
+  // Don't start sending yet; will flush when connected
+}
+
 void RpcConnection::SetEventHandlers(std::shared_ptr<LibhdfsEvents> 
event_handlers) {
   std::lock_guard<std::mutex> state_lock(connection_state_lock_);
   event_handlers_ = event_handlers;
@@ -452,6 +497,7 @@ void RpcConnection::SetClusterName(std::string 
cluster_name) {
 
 void RpcConnection::CommsError(const Status &status) {
   assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
+  LOG_DEBUG(kRPC, << "RpcConnection::CommsError called");
 
   Disconnect();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b591d6/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
index 330f9b1..869be40 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
@@ -112,6 +112,8 @@ void RpcConnectionImpl<NextLayer>::Connect(
 template <class NextLayer>
 void RpcConnectionImpl<NextLayer>::ConnectAndFlush(
     const std::vector<::asio::ip::tcp::endpoint> &server) {
+
+  LOG_INFO(kRPC, << "ConnectAndFlush called");
   std::lock_guard<std::mutex> state_lock(connection_state_lock_);
 
   if (server.empty()) {
@@ -408,7 +410,16 @@ void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const 
::asio::error_code &ori
           OnRecvCompleted(ec, size);
         });
   } else if (current_response_state_->state_ == Response::kParseResponse) {
-    HandleRpcResponse(current_response_state_);
+    // Check return status from the RPC response.  We may have received a msg
+    // indicating a server side error.
+
+    Status stat = HandleRpcResponse(current_response_state_);
+
+    if(stat.get_server_exception_type() == Status::kStandbyException) {
+      // May need to bail out, connect to new NN, and restart loop
+      LOG_INFO(kRPC, << "Communicating with standby NN, attempting to 
reconnect");
+    }
+
     current_response_state_ = nullptr;
     StartReading();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b591d6/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
index a8438b1..be69d95 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
@@ -19,15 +19,126 @@
 #include "rpc_connection.h"
 #include "common/util.h"
 #include "common/logging.h"
+#include "common/namenode_info.h"
 #include "optional.hpp"
 
 #include <future>
+#include <algorithm>
 
 namespace hdfs {
 
 template <class T>
 using optional = std::experimental::optional<T>;
 
+HANamenodeTracker::HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> 
&servers,
+                                     ::asio::io_service *ioservice,
+                                     std::shared_ptr<LibhdfsEvents> 
event_handlers)
+                  : enabled_(false), resolved_(false),
+                    ioservice_(ioservice), event_handlers_(event_handlers)
+{
+  LOG_TRACE(kRPC, << "HANamenodeTracker got the following nodes");
+  for(unsigned int i=0;i<servers.size();i++)
+    LOG_TRACE(kRPC, << servers[i].str());
+
+  if(servers.size() >= 2) {
+    LOG_TRACE(kRPC, << "Creating HA namenode tracker");
+    if(servers.size() > 2) {
+      LOG_WARN(kRPC, << "Nameservice declares more than two nodes.  Some won't 
be used.");
+    }
+
+    active_info_ = servers[0];
+    standby_info_ = servers[1];
+    LOG_INFO(kRPC, << "Active namenode url  = " << active_info_.uri.str());
+    LOG_INFO(kRPC, << "Standby namenode url = " << standby_info_.uri.str());
+
+    enabled_ = true;
+    if(!active_info_.endpoints.empty() || !standby_info_.endpoints.empty()) {
+      resolved_ = true;
+    }
+  }
+}
+
+
+HANamenodeTracker::~HANamenodeTracker() { }
+
+
+static std::string format_endpoints(const 
std::vector<::asio::ip::tcp::endpoint> &pts) {
+  std::stringstream ss;
+  for(unsigned int i=0; i<pts.size(); i++)
+    if(i == pts.size() - 1)
+      ss << pts[i];
+    else
+      ss << pts[i] << ", ";
+  return ss.str();
+}
+
+//  Pass in endpoint from current connection, this will do a reverse lookup
+//  and return the info for the standby node. It will also swap its state 
internally.
+ResolvedNamenodeInfo 
HANamenodeTracker::GetFailoverAndUpdate(::asio::ip::tcp::endpoint 
current_endpoint) {
+  LOG_TRACE(kRPC, << "Swapping from endpoint " << current_endpoint);
+  mutex_guard swap_lock(swap_lock_);
+
+  ResolvedNamenodeInfo failover_node;
+
+  // Connected to standby, switch standby to active
+  if(IsCurrentActive_locked(current_endpoint)) {
+    std::swap(active_info_, standby_info_);
+    if(event_handlers_)
+      event_handlers_->call(FS_NN_FAILOVER_EVENT, 
active_info_.nameservice.c_str(),
+                            
reinterpret_cast<int64_t>(active_info_.uri.str().c_str()));
+    failover_node = active_info_;
+  } else if(IsCurrentStandby_locked(current_endpoint)) {
+    // Connected to standby
+    if(event_handlers_)
+      event_handlers_->call(FS_NN_FAILOVER_EVENT, 
active_info_.nameservice.c_str(),
+                            
reinterpret_cast<int64_t>(active_info_.uri.str().c_str()));
+    failover_node = active_info_;
+  } else {
+    // Invalid state, throw for testing
+    std::string ep1 = format_endpoints(active_info_.endpoints);
+    std::string ep2 = format_endpoints(standby_info_.endpoints);
+
+    std::stringstream msg;
+    msg << "Looked for " << current_endpoint << " in\n";
+    msg << ep1 << " and\n";
+    msg << ep2 << std::endl;
+
+    LOG_ERROR(kRPC, << "Unable to find RPC connection in config " << msg.str() 
<< ". Bailing out.");
+    throw std::runtime_error(msg.str());
+  }
+
+  if(failover_node.endpoints.empty()) {
+    LOG_WARN(kRPC, << "No endpoints for node " << failover_node.uri.str() << " 
attempting to resolve again");
+    if(!ResolveInPlace(ioservice_, failover_node)) {
+      LOG_ERROR(kRPC, << "Fallback endpoint resolution for node " << 
failover_node.uri.str()
+                      << "failed.  Please make sure your configuration is up 
to date.");
+    }
+  }
+  return failover_node;
+}
+
+bool HANamenodeTracker::IsCurrentActive_locked(const ::asio::ip::tcp::endpoint 
&ep) const {
+  for(unsigned int i=0;i<active_info_.endpoints.size();i++) {
+    if(ep.address() == active_info_.endpoints[i].address()) {
+      if(ep.port() != active_info_.endpoints[i].port())
+        LOG_WARN(kRPC, << "Port mismatch: " << ep << " vs " << 
active_info_.endpoints[i] << " trying anyway..");
+      return true;
+    }
+  }
+  return false;
+}
+
+bool HANamenodeTracker::IsCurrentStandby_locked(const 
::asio::ip::tcp::endpoint &ep) const {
+  for(unsigned int i=0;i<standby_info_.endpoints.size();i++) {
+    if(ep.address() == standby_info_.endpoints[i].address()) {
+      if(ep.port() != standby_info_.endpoints[i].port())
+        LOG_WARN(kRPC, << "Port mismatch: " << ep << " vs " << 
standby_info_.endpoints[i] << " trying anyway..");
+      return true;
+    }
+  }
+  return false;
+}
+
 RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options,
                      const std::string &client_name, const std::string 
&user_name,
                      const char *protocol_name, int protocol_version)
@@ -36,27 +147,35 @@ RpcEngine::RpcEngine(::asio::io_service *io_service, const 
Options &options,
       client_name_(client_name),
       protocol_name_(protocol_name),
       protocol_version_(protocol_version),
-      retry_policy_(std::move(MakeRetryPolicy(options))),
       call_id_(0),
       retry_timer(*io_service),
-      event_handlers_(std::make_shared<LibhdfsEvents>()) {
-
-    auth_info_.setUser(user_name);
-    if (options.authentication == Options::kKerberos) {
-        auth_info_.setMethod(AuthInfo::kKerberos);
-    }
+      event_handlers_(std::make_shared<LibhdfsEvents>())
+{
+  LOG_DEBUG(kRPC, << "RpcEngine::RpcEngine called");
 
-    LOG_DEBUG(kRPC, << "RpcEngine::RpcEngine called");
+  auth_info_.setUser(user_name);
+  if (options.authentication == Options::kKerberos) {
+    auth_info_.setMethod(AuthInfo::kKerberos);
+  }
 }
 
 void RpcEngine::Connect(const std::string &cluster_name,
-                        const std::vector<::asio::ip::tcp::endpoint> &server,
+                        const std::vector<ResolvedNamenodeInfo> servers,
                         RpcCallback &handler) {
   std::lock_guard<std::mutex> state_lock(engine_state_lock_);
   LOG_DEBUG(kRPC, << "RpcEngine::Connect called");
 
-  last_endpoints_ = server;
+  last_endpoints_ = servers[0].endpoints;
   cluster_name_ = cluster_name;
+  LOG_TRACE(kRPC, << "Got cluster name \"" << cluster_name << "\" in 
RpcEngine::Connect")
+
+  ha_persisted_info_.reset(new HANamenodeTracker(servers, io_service_, 
event_handlers_));
+  if(!ha_persisted_info_->is_enabled()) {
+    ha_persisted_info_.reset();
+  }
+
+  // Construct retry policy after we determine if config is HA
+  retry_policy_ = std::move(MakeRetryPolicy(options_));
 
   conn_ = InitializeConnection();
   conn_->Connect(last_endpoints_, auth_info_, handler);
@@ -72,8 +191,16 @@ void RpcEngine::Shutdown() {
 
 std::unique_ptr<const RetryPolicy> RpcEngine::MakeRetryPolicy(const Options 
&options) {
   LOG_DEBUG(kRPC, << "RpcEngine::MakeRetryPolicy called");
-  if (options.max_rpc_retries > 0) {
-    return std::unique_ptr<RetryPolicy>(new 
FixedDelayRetryPolicy(options.rpc_retry_delay_ms, options.max_rpc_retries));
+
+  if(ha_persisted_info_) {
+    LOG_INFO(kRPC, << "Cluster is HA configued so policy will default to HA 
until a knob is implemented");
+    return std::unique_ptr<RetryPolicy>(new 
FixedDelayWithFailover(options.rpc_retry_delay_ms,
+                                                                   
options.max_rpc_retries,
+                                                                   
options.failover_max_retries,
+                                                                   
options.failover_connection_max_retries));
+  } else if (options.max_rpc_retries > 0) {
+    return std::unique_ptr<RetryPolicy>(new 
FixedDelayRetryPolicy(options.rpc_retry_delay_ms,
+                                                                  
options.max_rpc_retries));
   } else {
     return nullptr;
   }
@@ -81,6 +208,15 @@ std::unique_ptr<const RetryPolicy> 
RpcEngine::MakeRetryPolicy(const Options &opt
 
 void RpcEngine::TEST_SetRpcConnection(std::shared_ptr<RpcConnection> conn) {
   conn_ = conn;
+  retry_policy_ = std::move(MakeRetryPolicy(options_));
+}
+
+void RpcEngine::TEST_SetRetryPolicy(std::unique_ptr<const RetryPolicy> policy) 
{
+  retry_policy_ = std::move(policy);
+}
+
+std::unique_ptr<const RetryPolicy> 
RpcEngine::TEST_GenerateRetryPolicyUsingOptions() {
+  return MakeRetryPolicy(options_);
 }
 
 void RpcEngine::AsyncRpc(
@@ -131,7 +267,7 @@ void RpcEngine::AsyncRpcCommsError(
     const Status &status,
     std::shared_ptr<RpcConnection> failedConnection,
     std::vector<std::shared_ptr<Request>> pendingRequests) {
-  LOG_ERROR(kRPC, << "RpcEngine::AsyncRpcCommsError called; conn=" << 
failedConnection.get() << " reqs=" << pendingRequests.size());
+  LOG_ERROR(kRPC, << "RpcEngine::AsyncRpcCommsError called; status=\"" << 
status.ToString() << "\" conn=" << failedConnection.get() << " reqs=" << 
pendingRequests.size());
 
   io_service().post([this, status, failedConnection, pendingRequests]() {
     RpcCommsError(status, failedConnection, pendingRequests);
@@ -142,44 +278,52 @@ void RpcEngine::RpcCommsError(
     const Status &status,
     std::shared_ptr<RpcConnection> failedConnection,
     std::vector<std::shared_ptr<Request>> pendingRequests) {
-  (void)status;
-
-  LOG_ERROR(kRPC, << "RpcEngine::RpcCommsError called; conn=" << 
failedConnection.get() << " reqs=" << pendingRequests.size());
+  LOG_WARN(kRPC, << "RpcEngine::RpcCommsError called; status=\"" << 
status.ToString() << "\" conn=" << failedConnection.get() << " reqs=" << 
pendingRequests.size());
 
   std::lock_guard<std::mutex> state_lock(engine_state_lock_);
 
   // If the failed connection is the current one, shut it down
   //    It will be reconnected when there is work to do
   if (failedConnection == conn_) {
-        conn_.reset();
+    LOG_INFO(kRPC, << "Disconnecting from failed RpcConnection");
+    conn_.reset();
   }
 
-  auto head_action = optional<RetryAction>();
+  optional<RetryAction> head_action = optional<RetryAction>();
 
-  // Filter out anything with too many retries already
-  for (auto it = pendingRequests.begin(); it < pendingRequests.end();) {
-    auto req = *it;
+  //We are talking to the Standby NN, let's talk to the active one instead.
+  if(ha_persisted_info_ && status.get_server_exception_type() == 
Status::kStandbyException) {
+    LOG_INFO(kRPC, << "Received StandbyException.  Failing over.");
+    head_action = 
RetryAction::failover(std::max(0,options_.rpc_retry_delay_ms));
+  } else {
+    // Filter out anything with too many retries already
+    for (auto it = pendingRequests.begin(); it < pendingRequests.end();) {
+      auto req = *it;
 
-    RetryAction retry = RetryAction::fail(""); // Default to fail
-    if (retry_policy()) {
-      retry = retry_policy()->ShouldRetry(status, req->IncrementRetryCount(), 
0, true);
-    }
+      LOG_DEBUG(kRPC, << req->GetDebugString());
 
-    if (retry.action == RetryAction::FAIL) {
-      // If we've exceeded the maximum retry, take the latest error and pass it
-      //    on.  There might be a good argument for caching the first error
-      //    rather than the last one, that gets messy
+      RetryAction retry = RetryAction::fail(""); // Default to fail
 
-      io_service().post([req, status]() {
-        req->OnResponseArrived(nullptr, status);  // Never call back while 
holding a lock
-      });
-      it = pendingRequests.erase(it);
-    } else {
-      if (!head_action) {
-        head_action = retry;
+      if (retry_policy()) {
+        retry = retry_policy()->ShouldRetry(status, 
req->IncrementRetryCount(), req->get_failover_count(), true);
       }
 
-      ++it;
+      if (retry.action == RetryAction::FAIL) {
+        // If we've exceeded the maximum retry, take the latest error and pass 
it
+        //    on.  There might be a good argument for caching the first error
+        //    rather than the last one, that gets messy
+
+        io_service().post([req, status]() {
+          req->OnResponseArrived(nullptr, status);  // Never call back while 
holding a lock
+        });
+        it = pendingRequests.erase(it);
+      } else {
+        if (!head_action) {
+          head_action = retry;
+        }
+
+        ++it;
+      }
     }
   }
 
@@ -189,8 +333,31 @@ void RpcEngine::RpcCommsError(
           head_action && head_action->action != RetryAction::FAIL;
 
   if (haveRequests) {
+    LOG_TRACE(kRPC, << "Have " << pendingRequests.size() << " requests to 
resend");
     bool needNewConnection = !conn_;
     if (needNewConnection) {
+      LOG_DEBUG(kRPC, << "Creating a new NN conection");
+
+
+      // If HA is enabled and we have valid HA info then fail over to the 
standby (hopefully now active)
+      if(head_action->action == RetryAction::FAILOVER_AND_RETRY && 
ha_persisted_info_) {
+
+        for(unsigned int i=0; i<pendingRequests.size();i++)
+          pendingRequests[i]->IncrementFailoverCount();
+
+        ResolvedNamenodeInfo new_active_nn_info =
+            
ha_persisted_info_->GetFailoverAndUpdate(last_endpoints_[0]/*reverse lookup*/);
+
+        LOG_INFO(kRPC, << "Going to try connecting to alternate Datanode: " << 
new_active_nn_info.uri.str());
+
+        if(ha_persisted_info_->is_resolved()) {
+          last_endpoints_ = new_active_nn_info.endpoints;
+        } else {
+          LOG_WARN(kRPC, << "It looks HA is turned on, but unable to fail 
over. has info="
+                         << ha_persisted_info_->is_enabled() << " resolved=" 
<< ha_persisted_info_->is_resolved());
+        }
+      }
+
       conn_ = InitializeConnection();
       conn_->PreEnqueueRequests(pendingRequests);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b591d6/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
index 5de7d53..47618a6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
@@ -24,7 +24,11 @@
 #include "common/auth_info.h"
 #include "common/retry_policy.h"
 #include "common/libhdfs_events_impl.h"
+#include "common/util.h"
+#include "common/continuation/asio.h"
+#include "common/logging.h"
 #include "common/new_delete.h"
+#include "common/namenode_info.h"
 
 #include <google/protobuf/message_lite.h>
 #include <google/protobuf/io/coded_stream.h>
@@ -38,6 +42,7 @@
 #include <unordered_map>
 #include <vector>
 #include <mutex>
+#include <future>
 
 namespace hdfs {
 
@@ -84,10 +89,15 @@ class Request {
   std::string  method_name() const { return method_name_; }
   ::asio::deadline_timer &timer() { return timer_; }
   int IncrementRetryCount() { return retry_count_++; }
+  int IncrementFailoverCount();
   void GetPacket(std::string *res) const;
   void OnResponseArrived(::google::protobuf::io::CodedInputStream *is,
                          const Status &status);
 
+  int get_failover_count() {return failover_count_;}
+
+  std::string GetDebugString() const;
+
  private:
   LockFreeRpcEngine *const engine_;
   const std::string method_name_;
@@ -98,6 +108,7 @@ class Request {
   const Handler handler_;
 
   int retry_count_;
+  int failover_count_;
 };
 
 /*
@@ -137,6 +148,9 @@ class RpcConnection : public 
std::enable_shared_from_this<RpcConnection> {
   //   on connect
   void PreEnqueueRequests(std::vector<std::shared_ptr<Request>> requests);
 
+  // Put requests at the front of the current request queue
+  void PrependRequests_locked(std::vector<std::shared_ptr<Request>> requests);
+
   void SetEventHandlers(std::shared_ptr<LibhdfsEvents> event_handlers);
   void SetClusterName(std::string cluster_name);
 
@@ -190,7 +204,7 @@ class RpcConnection : public 
std::enable_shared_from_this<RpcConnection> {
   static std::string SerializeRpcRequest(
       const std::string &method_name,
       const ::google::protobuf::MessageLite *req);
-  void HandleRpcResponse(std::shared_ptr<Response> response);
+  Status HandleRpcResponse(std::shared_ptr<Response> response);
   void HandleRpcTimeout(std::shared_ptr<Request> req,
                         const ::asio::error_code &ec);
   void CommsError(const Status &status);
@@ -261,6 +275,55 @@ public:
   virtual const Options &options() const = 0;
 };
 
+
+/*
+ *  Tracker gives the RpcEngine a quick way to use an endpoint that just
+ *  failed in order to lookup a set of endpoints for a failover node.
+ *
+ *  Note: For now this only deals with 2 NameNodes, but that's the default
+ *  anyway.
+ */
+class HANamenodeTracker {
+ public:
+  HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &servers,
+                    ::asio::io_service *ioservice,
+                    std::shared_ptr<LibhdfsEvents> event_handlers_);
+
+  virtual ~HANamenodeTracker();
+
+  bool is_enabled() const { return enabled_; }
+  bool is_resolved() const { return resolved_; }
+
+  // Get node opposite of the current one if possible (swaps active/standby)
+  // Note: This will always mutate internal state.  Use 
IsCurrentActive/Standby to
+  // get info without changing state
+  ResolvedNamenodeInfo GetFailoverAndUpdate(::asio::ip::tcp::endpoint 
current_endpoint);
+
+  bool IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const;
+  bool IsCurrentStandby_locked(const ::asio::ip::tcp::endpoint &ep) const;
+
+ private:
+  // If HA should be enabled, according to our options and runtime info like # 
nodes provided
+  bool enabled_;
+  // If we were able to resolve at least 1 HA namenode
+  bool resolved_;
+
+  // Keep service in case a second round of DNS lookup is required
+  ::asio::io_service *ioservice_;
+
+  // Event handlers, for now this is the simplest place to catch all failover 
events
+  // and push info out to client application.  Possibly move into RPCEngine.
+  std::shared_ptr<LibhdfsEvents> event_handlers_;
+
+  // Only support 1 active and 1 standby for now.
+  ResolvedNamenodeInfo active_info_;
+  ResolvedNamenodeInfo standby_info_;
+
+  // Aquire when switching from active-standby
+  std::mutex swap_lock_;
+};
+
+
 /*
  * An engine for reliable communication with a NameNode.  Handles connection,
  * retry, and (someday) failover of the requested messages.
@@ -285,7 +348,7 @@ class RpcEngine : public LockFreeRpcEngine {
             const char *protocol_name, int protocol_version);
 
   void Connect(const std::string & cluster_name,
-               const std::vector<::asio::ip::tcp::endpoint> &server,
+               const std::vector<ResolvedNamenodeInfo> servers,
                RpcCallback &handler);
 
   void AsyncRpc(const std::string &method_name,
@@ -297,7 +360,6 @@ class RpcEngine : public LockFreeRpcEngine {
              const ::google::protobuf::MessageLite *req,
              const std::shared_ptr<::google::protobuf::MessageLite> &resp);
 
-  void Start();
   void Shutdown();
 
   /* Enqueues a CommsError without acquiring a lock*/
@@ -313,6 +375,8 @@ class RpcEngine : public LockFreeRpcEngine {
   int NextCallId() override { return ++call_id_; }
 
   void TEST_SetRpcConnection(std::shared_ptr<RpcConnection> conn);
+  void TEST_SetRetryPolicy(std::unique_ptr<const RetryPolicy> policy);
+  std::unique_ptr<const RetryPolicy> TEST_GenerateRetryPolicyUsingOptions();
 
   const std::string &client_name() const override { return client_name_; }
   const std::string &user_name() const override { return auth_info_.getUser(); 
}
@@ -338,7 +402,7 @@ private:
   const std::string client_name_;
   const std::string protocol_name_;
   const int protocol_version_;
-  const std::unique_ptr<const RetryPolicy> retry_policy_; //null --> no retry
+  std::unique_ptr<const RetryPolicy> retry_policy_; //null --> no retry
   AuthInfo auth_info_;
   std::string cluster_name_;
   std::atomic_int call_id_;
@@ -348,6 +412,9 @@ private:
 
   std::mutex engine_state_lock_;
 
+  // Keep endpoint info for all HA connections, a non-null ptr indicates
+  // that HA info was found in the configuation.
+  std::unique_ptr<HANamenodeTracker> ha_persisted_info_;
 };
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b591d6/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
index defe95d..3e8c93f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
@@ -20,6 +20,7 @@
 #include "test.pb.h"
 #include "RpcHeader.pb.h"
 #include "rpc/rpc_connection.h"
+#include "common/namenode_info.h"
 
 #include <google/protobuf/io/coded_stream.h>
 
@@ -43,10 +44,10 @@ namespace pbio = ::google::protobuf::io;
 
 namespace hdfs {
 
-std::vector<asio::ip::basic_endpoint<asio::ip::tcp>> make_endpoint() {
-  std::vector<asio::ip::basic_endpoint<asio::ip::tcp>> result;
-  result.push_back(asio::ip::basic_endpoint<asio::ip::tcp>());
-  return result;
+std::vector<ResolvedNamenodeInfo> make_endpoint() {
+  ResolvedNamenodeInfo result;
+  result.endpoints.push_back(asio::ip::basic_endpoint<asio::ip::tcp>());
+  return std::vector<ResolvedNamenodeInfo>({result});
 }
 
 class MockRPCConnection : public MockConnectionBase {
@@ -68,7 +69,7 @@ class SharedConnectionEngine : public RpcEngine {
 protected:
   std::shared_ptr<RpcConnection> NewConnection() override {
     // Stuff in some dummy endpoints so we don't error out
-    last_endpoints_ = make_endpoint();
+    last_endpoints_ = make_endpoint()[0].endpoints;
 
     return std::make_shared<RpcConnectionImpl<SharedMockRPCConnection>>(this);
   }
@@ -180,6 +181,11 @@ TEST(RpcEngineTest, TestConnectionResetAndRecover) {
   options.rpc_retry_delay_ms = 0;
   SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 
1);
 
+  // Normally determined during RpcEngine::Connect, but in this case options
+  // provides enough info to determine policy here.
+  engine.TEST_SetRetryPolicy(engine.TEST_GenerateRetryPolicyUsingOptions());
+
+
   EchoResponseProto server_resp;
   server_resp.set_message("foo");
 
@@ -215,6 +221,10 @@ TEST(RpcEngineTest, 
TestConnectionResetAndRecoverWithDelay) {
   options.rpc_retry_delay_ms = 1;
   SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 
1);
 
+  // Normally determined during RpcEngine::Connect, but in this case options
+  // provides enough info to determine policy here.
+  engine.TEST_SetRetryPolicy(engine.TEST_GenerateRetryPolicyUsingOptions());
+
   EchoResponseProto server_resp;
   server_resp.set_message("foo");
 
@@ -339,6 +349,10 @@ TEST(RpcEngineTest, TestEventCallbacks)
   options.rpc_retry_delay_ms = 0;
   SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 
1);
 
+  // Normally determined during RpcEngine::Connect, but in this case options
+  // provides enough info to determine policy here.
+  engine.TEST_SetRetryPolicy(engine.TEST_GenerateRetryPolicyUsingOptions());
+
   // Set up event callbacks
   int calls = 0;
   std::vector<std::string> callbacks;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to