Repository: hadoop
Updated Branches:
  refs/heads/HDFS-8707 71023fd27 -> ff138e891


HDFS-9616.  libhdfs++: Add runtime hooks to allow a client application to add 
low level monitoring and tests.  Contributed by Bob Hansen


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ff138e89
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ff138e89
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ff138e89

Branch: refs/heads/HDFS-8707
Commit: ff138e8917d46e0101e4b5f96e766248244c4368
Parents: 71023fd
Author: James <[email protected]>
Authored: Thu Mar 24 00:18:59 2016 -0400
Committer: James <[email protected]>
Committed: Thu Mar 24 00:18:59 2016 -0400

----------------------------------------------------------------------
 .../native/libhdfspp/include/hdfspp/events.h    | 110 +++++++++++++++++++
 .../native/libhdfspp/include/hdfspp/hdfs_ext.h  |  55 ++++++++++
 .../native/libhdfspp/include/hdfspp/hdfspp.h    |  24 ++++
 .../native/libhdfspp/lib/bindings/c/hdfs.cc     |  76 +++++++++++++
 .../native/libhdfspp/lib/common/CMakeLists.txt  |   2 +-
 .../libhdfspp/lib/common/libhdfs_events_impl.cc |  51 +++++++++
 .../libhdfspp/lib/common/libhdfs_events_impl.h  |  73 ++++++++++++
 .../lib/connection/datanodeconnection.cc        |   3 +-
 .../lib/connection/datanodeconnection.h         |  10 +-
 .../main/native/libhdfspp/lib/fs/filehandle.cc  |  59 ++++++++--
 .../main/native/libhdfspp/lib/fs/filehandle.h   |  26 ++++-
 .../main/native/libhdfspp/lib/fs/filesystem.cc  |  36 ++++--
 .../main/native/libhdfspp/lib/fs/filesystem.h   |  17 ++-
 .../native/libhdfspp/lib/rpc/rpc_connection.cc  |  10 ++
 .../native/libhdfspp/lib/rpc/rpc_connection.h   |  22 +++-
 .../main/native/libhdfspp/lib/rpc/rpc_engine.cc |  31 ++++--
 .../main/native/libhdfspp/lib/rpc/rpc_engine.h  |  21 +++-
 .../native/libhdfspp/tests/bad_datanode_test.cc |  73 +++++++++++-
 .../native/libhdfspp/tests/rpc_engine_test.cc   |  74 ++++++++++++-
 19 files changed, 731 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff138e89/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h
new file mode 100644
index 0000000..82109fd
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef HDFSPP_EVENTS
+#define HDFSPP_EVENTS
+
+#include "hdfspp/status.h"
+
+#include <functional>
+
+namespace hdfs {
+
+/*
+ * Supported event names.  These names will stay consistent in libhdfs 
callbacks.
+ *
+ * Other events not listed here may be seen, but they are not stable and
+ * should not be counted on.
+ */
+
+static constexpr const char * FS_NN_CONNECT_EVENT = "NN::connect";
+static constexpr const char * FS_NN_READ_EVENT = "NN::read";
+static constexpr const char * FS_NN_WRITE_EVENT = "NN::write";
+
+static constexpr const char * FILE_DN_CONNECT_EVENT = "DN::connect";
+static constexpr const char * FILE_DN_READ_EVENT = "DN::read";
+static constexpr const char * FILE_DN_WRITE_EVENT = "DN::write";
+
+
+
+class event_response {
+public:
+// Create a response
+enum event_response_type {
+  kOk = 0,
+
+#ifndef NDEBUG
+  // Responses to be used in testing only
+  kTest_Error = 100
+#endif
+};
+
+
+  // The default ok response; libhdfspp should continue normally
+  static event_response ok() { return event_response(); }
+  event_response_type response() { return response_; }
+
+private:
+  event_response() : response_(event_response_type::kOk) {};
+
+  event_response_type response_;
+
+
+
+///////////////////////////////////////////////
+//
+//   Testing support
+//
+// If running a debug build, the consumer can stimulate errors
+// within libhdfdspp by returning a Status from the callback.
+///////////////////////////////////////////////
+#ifndef NDEBUG
+public:
+  static event_response test_err(const Status &status) {
+    return event_response(status);
+  }
+
+  Status status() { return error_status_; }
+
+private:
+  event_response(const Status & status) :
+    response_(event_response_type::kTest_Error), error_status_(status) {}
+
+  Status error_status_; // To be used with kTest_Error
+#endif
+};
+
+
+
+/* callback signature */
+typedef std::function<
+  event_response (const char * event,
+                  const char * cluster,
+                  int64_t value)>
+  fs_event_callback;
+
+typedef std::function<
+  event_response (const char * event,
+                  const char * cluster,
+                  const char * file,
+                  int64_t value)>
+  file_event_callback;
+
+
+}
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff138e89/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h
index 1e73bc5..1e1e6aa 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h
@@ -162,6 +162,61 @@ int hdfsDisableLoggingForComponent(int component);
 LIBHDFS_EXTERNAL
 int hdfsSetLoggingLevel(int component);
 
+/*
+ * Supported event names.  These names will stay consistent in libhdfs 
callbacks.
+ *
+ * Other events not listed here may be seen, but they are not stable and
+ * should not be counted on.
+ */
+extern const char * FS_NN_CONNECT_EVENT;
+extern const char * FS_NN_READ_EVENT;
+extern const char * FS_NN_WRITE_EVENT;
+
+extern const char * FILE_DN_CONNECT_EVENT;
+extern const char * FILE_DN_READ_EVENT;
+extern const char * FILE_DN_WRITE_EVENT;
+
+
+#define LIBHDFSPP_EVENT_OK (0)
+#ifndef NDEBUG
+  #define DEBUG_SIMULATE_ERROR (-1)
+#endif
+
+typedef int (*libhdfspp_fs_event_callback)(const char * event, const char * 
cluster,
+                                           int64_t value, int64_t cookie);
+typedef int (*libhdfspp_file_event_callback)(const char * event,
+                                             const char * cluster,
+                                             const char * file,
+                                             int64_t value, int64_t cookie);
+
+/**
+ * Registers a callback for the next filesystem connect operation the current
+ * thread executes.
+ *
+ *  @param handler A function pointer.  Taken as a void* and internally
+ *                 cast into the appropriate type.
+ *  @param cookie  An opaque value that will be passed into the handler; can
+ *                 be used to correlate the handler with some object in the
+ *                 consumer's space.
+ **/
+LIBHDFS_EXTERNAL
+int hdfsPreAttachFSMonitor(libhdfspp_fs_event_callback handler, int64_t 
cookie);
+
+
+/**
+ * Registers a callback for the next file open operation the current thread
+ * executes.
+ *
+ *  @param fs      The filesystem
+ *  @param handler A function pointer.  Taken as a void* and internally
+ *                 cast into the appropriate type.
+ *  @param cookie  An opaque value that will be passed into the handler; can
+ *                 be used to correlate the handler with some object in the
+ *                 consumer's space.
+ **/
+LIBHDFS_EXTERNAL
+int hdfsPreAttachFileMonitor(libhdfspp_file_event_callback handler, int64_t 
cookie);
+
 
 #ifdef __cplusplus
 } /* end extern "C" */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff138e89/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
index b52e832..674dc4a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
@@ -20,6 +20,7 @@
 
 #include "hdfspp/options.h"
 #include "hdfspp/status.h"
+#include "hdfspp/events.h"
 
 #include <functional>
 #include <memory>
@@ -108,6 +109,18 @@ public:
    **/
   static bool ShouldExclude(const Status &status);
 
+
+  /**
+   * Sets an event callback for file-level event notifications (such as 
connecting
+   * to the DataNode, communications errors, etc.)
+   *
+   * Many events are defined in hdfspp/events.h; the consumer should also 
expect
+   * to be called with many private events, which can be ignored.
+   *
+   * @param callback The function to call when a reporting event occurs.
+   */
+  virtual void SetFileEventCallback(file_event_callback callback) = 0;
+
   virtual ~FileHandle();
 };
 
@@ -161,6 +174,17 @@ class FileSystem {
    */
   virtual ~FileSystem() {};
 
+
+  /**
+   * Sets an event callback for fs-level event notifications (such as 
connecting
+   * to the NameNode, communications errors with the NN, etc.)
+   *
+   * Many events are defined in hdfspp/events.h; the consumer should also 
expect
+   * to be called with many private events, which can be ignored.
+   *
+   * @param callback The function to call when a reporting event occurs.
+   */
+  virtual void SetFsEventCallback(fs_event_callback callback) = 0;
 };
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff138e89/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
index 9ce5c86..cc0d964 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
@@ -30,9 +30,11 @@
 #include <cstring>
 #include <iostream>
 #include <algorithm>
+#include <functional>
 
 using namespace hdfs;
 using std::experimental::nullopt;
+using namespace std::placeholders;
 
 static constexpr tPort kDefaultPort = 8020;
 
@@ -81,6 +83,10 @@ void hdfsGetLastError(char *buf, int len) {
   buf[copylen] = 0;
 }
 
+/* Event callbacks for next open calls */
+thread_local std::experimental::optional<fs_event_callback> fsEventCallback;
+thread_local std::experimental::optional<file_event_callback> 
fileEventCallback;
+
 struct hdfsBuilder {
   hdfsBuilder();
   hdfsBuilder(const char * directory);
@@ -197,6 +203,10 @@ hdfsFS doHdfsConnect(optional<std::string> nn, 
optional<tPort> port, optional<st
       return nullptr;
     }
 
+    if (fsEventCallback) {
+      fs->SetFsEventCallback(fsEventCallback.value());
+    }
+
     Status status;
     if (nn || port) {
       if (!port) {
@@ -399,6 +409,72 @@ int hdfsCancel(hdfsFS fs, hdfsFile file) {
   }
 }
 
+
+/*******************************************************************
+ *                EVENT CALLBACKS
+ *******************************************************************/
+
+const char * FS_NN_CONNECT_EVENT = hdfs::FS_NN_CONNECT_EVENT;
+const char * FS_NN_READ_EVENT = hdfs::FS_NN_READ_EVENT;
+const char * FS_NN_WRITE_EVENT = hdfs::FS_NN_WRITE_EVENT;
+
+const char * FILE_DN_CONNECT_EVENT = hdfs::FILE_DN_CONNECT_EVENT;
+const char * FILE_DN_READ_EVENT = hdfs::FILE_DN_READ_EVENT;
+const char * FILE_DN_WRITE_EVENT = hdfs::FILE_DN_WRITE_EVENT;
+
+
+event_response fs_callback_glue(libhdfspp_fs_event_callback handler,
+                      int64_t cookie,
+                      const char * event,
+                      const char * cluster,
+                      int64_t value) {
+  int result = handler(event, cluster, value, cookie);
+  if (result == LIBHDFSPP_EVENT_OK) {
+    return event_response::ok();
+  }
+#ifndef NDEBUG
+  if (result == DEBUG_SIMULATE_ERROR) {
+    return event_response::test_err(Status::Error("Simulated error"));
+  }
+#endif
+
+  return event_response::ok();
+}
+
+event_response file_callback_glue(libhdfspp_file_event_callback handler,
+                      int64_t cookie,
+                      const char * event,
+                      const char * cluster,
+                      const char * file,
+                      int64_t value) {
+  int result = handler(event, cluster, file, value, cookie);
+  if (result == LIBHDFSPP_EVENT_OK) {
+    return event_response::ok();
+  }
+#ifndef NDEBUG
+  if (result == DEBUG_SIMULATE_ERROR) {
+    return event_response::test_err(Status::Error("Simulated error"));
+  }
+#endif
+
+  return event_response::ok();
+}
+
+int hdfsPreAttachFSMonitor(libhdfspp_fs_event_callback handler, int64_t cookie)
+{
+  fs_event_callback callback = std::bind(fs_callback_glue, handler, cookie, 
_1, _2, _3);
+  fsEventCallback = callback;
+  return 0;
+}
+
+
+int hdfsPreAttachFileMonitor(libhdfspp_file_event_callback handler, int64_t 
cookie)
+{
+  file_event_callback callback = std::bind(file_callback_glue, handler, 
cookie, _1, _2, _3, _4);
+  fileEventCallback = callback;
+  return 0;
+}
+
 /*******************************************************************
  *                BUILDER INTERFACE
  *******************************************************************/

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff138e89/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
index 77860b0..ea2f952 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
@@ -19,6 +19,6 @@ if(NEED_LINK_DL)
    set(LIB_DL dl)
 endif()
 
-add_library(common_obj OBJECT base64.cc status.cc sasl_digest_md5.cc 
hdfs_public_api.cc options.cc configuration.cc configuration_loader.cc 
hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc 
logging.cc)
+add_library(common_obj OBJECT base64.cc status.cc sasl_digest_md5.cc 
hdfs_public_api.cc options.cc configuration.cc configuration_loader.cc 
hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc 
logging.cc libhdfs_events_impl.cc)
 add_library(common $<TARGET_OBJECTS:common_obj> 
$<TARGET_OBJECTS:uriparser2_obj>)
 target_link_libraries(common ${LIB_DL})

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff138e89/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/libhdfs_events_impl.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/libhdfs_events_impl.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/libhdfs_events_impl.cc
new file mode 100644
index 0000000..bcf9ccc
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/libhdfs_events_impl.cc
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "libhdfs_events_impl.h"
+
+namespace hdfs {
+
+/**
+ * Default no-op callback implementations
+ **/
+
+LibhdfsEvents::LibhdfsEvents() : fs_callback(std::experimental::nullopt),
+                                 file_callback(std::experimental::nullopt)
+{}
+
+LibhdfsEvents::~LibhdfsEvents() {}
+
+void LibhdfsEvents::set_fs_callback(const fs_event_callback & callback) {
+  fs_callback = callback;
+}
+
+void LibhdfsEvents::set_file_callback(const file_event_callback & callback) {
+  file_callback = callback;
+}
+
+void LibhdfsEvents::clear_fs_callback() {
+  fs_callback = std::experimental::nullopt;
+}
+
+void LibhdfsEvents::clear_file_callback() {
+  file_callback = std::experimental::nullopt;
+}
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff138e89/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/libhdfs_events_impl.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/libhdfs_events_impl.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/libhdfs_events_impl.h
new file mode 100644
index 0000000..122f7b0
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/libhdfs_events_impl.h
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIBHDFSPP_COMMON_LIBHDFSEVENTS_IMPL
+#define LIBHDFSPP_COMMON_LIBHDFSEVENTS_IMPL
+
+#include "hdfspp/events.h"
+
+
+#include <optional.hpp>
+#include <functional>
+
+namespace hdfs {
+
+/**
+ * Users can specify event handlers.  Default is a no-op handler.
+ **/
+class LibhdfsEvents {
+public:
+  LibhdfsEvents();
+  virtual ~LibhdfsEvents();
+
+  void set_fs_callback(const fs_event_callback & callback);
+  void set_file_callback(const file_event_callback & callback);
+  void clear_fs_callback();
+  void clear_file_callback();
+
+  event_response call(const char * event,
+                                const char * cluster,
+                                int64_t value) {
+      if (fs_callback) {
+          return fs_callback->operator ()(event, cluster, value);
+      } else {
+          return event_response::ok();
+      }
+  }
+
+  event_response call(const char * event,
+                                const char * cluster,
+                                const char * file,
+                                int64_t value) {
+      if (file_callback) {
+          return file_callback->operator ()(event, cluster, file, value);
+      } else {
+          return event_response::ok();
+      }
+  }
+
+private:
+  // Called when fs events occur
+  std::experimental::optional<fs_event_callback> fs_callback;
+
+  // Called when file events occur
+  std::experimental::optional<file_event_callback> file_callback;
+};
+
+}
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff138e89/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
index 247c75e..19878ab 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
@@ -26,7 +26,8 @@ DataNodeConnectionImpl::~DataNodeConnectionImpl(){}
 
 DataNodeConnectionImpl::DataNodeConnectionImpl(asio::io_service * io_service,
                                                 const 
::hadoop::hdfs::DatanodeInfoProto &dn_proto,
-                                                const 
hadoop::common::TokenProto *token)
+                                                const 
hadoop::common::TokenProto *token,
+                                                LibhdfsEvents *event_handlers) 
: event_handlers_(event_handlers)
 {
   using namespace ::asio::ip;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff138e89/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
index 8f64110..6cb7f4a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
@@ -21,6 +21,7 @@
 #include "common/hdfs_public_api.h"
 #include "common/async_stream.h"
 #include "ClientNamenodeProtocol.pb.h"
+#include "common/libhdfs_events_impl.h"
 
 #include "asio.hpp"
 
@@ -42,10 +43,12 @@ public:
   std::unique_ptr<asio::ip::tcp::socket> conn_;
   std::array<asio::ip::tcp::endpoint, 1> endpoints_;
   std::string uuid_;
+  LibhdfsEvents *event_handlers_;
 
   virtual ~DataNodeConnectionImpl();
   DataNodeConnectionImpl(asio::io_service * io_service, const 
::hadoop::hdfs::DatanodeInfoProto &dn_proto,
-                          const hadoop::common::TokenProto *token);
+                          const hadoop::common::TokenProto *token,
+                          LibhdfsEvents *event_handlers);
 
   void Connect(std::function<void(Status status, 
std::shared_ptr<DataNodeConnection> dn)> handler) override;
 
@@ -54,12 +57,17 @@ public:
   void async_read_some(const MutableBuffers &buf,
         std::function<void (const asio::error_code & error,
                                std::size_t bytes_transferred) > handler) 
override {
+    event_handlers_->call("DN_read_req", "", "", buf.end() - buf.begin());
+
     conn_->async_read_some(buf, handler);
   };
 
   void async_write_some(const ConstBuffers &buf,
             std::function<void (const asio::error_code & error,
                                  std::size_t bytes_transferred) > handler) 
override {
+
+    event_handlers_->call("DN_write_req", "", "", buf.end() - buf.begin());
+
     conn_->async_write_some(buf, handler);
   }
 };

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff138e89/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
index b3954e1..471281a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
@@ -21,6 +21,7 @@
 #include "common/logging.h"
 #include "connection/datanodeconnection.h"
 #include "reader/block_reader.h"
+#include "hdfspp/events.h"
 
 #include <future>
 #include <tuple>
@@ -33,13 +34,17 @@ using ::hadoop::hdfs::LocatedBlocksProto;
 
 FileHandle::~FileHandle() {}
 
-FileHandleImpl::FileHandleImpl(::asio::io_service *io_service, const 
std::string &client_name,
+FileHandleImpl::FileHandleImpl(const std::string & cluster_name,
+                               const std::string & path,
+                               ::asio::io_service *io_service, const 
std::string &client_name,
                                  const std::shared_ptr<const struct FileInfo> 
file_info,
-                                 std::shared_ptr<BadDataNodeTracker> 
bad_data_nodes)
-    : io_service_(io_service), client_name_(client_name), 
file_info_(file_info),
-      bad_node_tracker_(bad_data_nodes), offset_(0), 
cancel_state_(CancelTracker::New()) {
+                                 std::shared_ptr<BadDataNodeTracker> 
bad_data_nodes,
+                                 std::shared_ptr<LibhdfsEvents> event_handlers)
+    : cluster_name_(cluster_name), path_(path), io_service_(io_service), 
client_name_(client_name), file_info_(file_info),
+      bad_node_tracker_(bad_data_nodes), offset_(0), 
cancel_state_(CancelTracker::New()), event_handlers_(event_handlers) {
   LOG_TRACE(kFileHandle, << "FileHandleImpl::FileHandleImpl("
                          << FMT_THIS_ADDR << ", ...) called");
+
 }
 
 void FileHandleImpl::PositionRead(
@@ -228,14 +233,34 @@ void FileHandleImpl::AsyncPreadSome(
   std::shared_ptr<BlockReader> reader;
   reader = CreateBlockReader(BlockReaderOptions(), dn);
 
+  // Lambdas cannot capture copies of member variables so we'll make explicit
+  //    copies for it
+  auto event_handlers = event_handlers_;
+  auto path = path_;
+  auto cluster_name = cluster_name_;
+
+  auto read_handler = [reader, event_handlers, cluster_name, path, dn_id, 
handler](const Status & status, size_t transferred) {
+    auto event_resp = event_handlers->call(FILE_DN_READ_EVENT, 
cluster_name.c_str(), path.c_str(), transferred);
+#ifndef NDEBUG
+    if (event_resp.response() == event_response::kTest_Error) {
+      handler(event_resp.status(), dn_id, transferred);
+      return;
+    }
+#endif
 
-  auto read_handler = [reader, dn_id, handler](const Status & status, size_t 
transferred) {
     handler(status, dn_id, transferred);
   };
 
-  
dn->Connect([handler,read_handler,block,offset_within_block,size_within_block, 
buffers, reader, dn_id, client_name]
+  auto connect_handler = 
[handler,event_handlers,cluster_name,path,read_handler,block,offset_within_block,size_within_block,
 buffers, reader, dn_id, client_name]
           (Status status, std::shared_ptr<DataNodeConnection> dn) {
     (void)dn;
+    auto event_resp = event_handlers->call(FILE_DN_CONNECT_EVENT, 
cluster_name.c_str(), path.c_str(), 0);
+#ifndef NDEBUG
+    if (event_resp.response() == event_response::kTest_Error) {
+      status = event_resp.status();
+    }
+#endif
+
     if (status.ok()) {
       reader->AsyncReadBlock(
           client_name, *block, offset_within_block,
@@ -243,7 +268,9 @@ void FileHandleImpl::AsyncPreadSome(
     } else {
       handler(status, dn_id, 0);
     }
-  });
+  };
+
+  dn->Connect(connect_handler);
 
   return;
 }
@@ -267,7 +294,11 @@ std::shared_ptr<DataNodeConnection> 
FileHandleImpl::CreateDataNodeConnection(
     const hadoop::common::TokenProto * token) {
   LOG_TRACE(kFileHandle, << "FileHandleImpl::CreateDataNodeConnection("
                          << FMT_THIS_ADDR << ", ...) called");
-  return std::make_shared<DataNodeConnectionImpl>(io_service, dn, token);
+  return std::make_shared<DataNodeConnectionImpl>(io_service, dn, token, 
event_handlers_.get());
+}
+
+std::shared_ptr<LibhdfsEvents> FileHandleImpl::get_event_handlers() {
+  return event_handlers_;
 }
 
 void FileHandleImpl::CancelOperations() {
@@ -283,6 +314,18 @@ void FileHandleImpl::CancelOperations() {
   }
 }
 
+void FileHandleImpl::SetFileEventCallback(file_event_callback callback) {
+  std::shared_ptr<LibhdfsEvents> new_event_handlers;
+  if (event_handlers_) {
+    new_event_handlers = std::make_shared<LibhdfsEvents>(*event_handlers_);
+  } else {
+    new_event_handlers = std::make_shared<LibhdfsEvents>();
+  }
+  new_event_handlers->set_file_callback(callback);
+  event_handlers_ = new_event_handlers;
+}
+
+
 
 bool FileHandle::ShouldExclude(const Status &s) {
   if (s.ok()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff138e89/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
index 8c03b37..0fb014b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
@@ -21,6 +21,7 @@
 #include "common/hdfs_public_api.h"
 #include "common/async_stream.h"
 #include "common/cancel_tracker.h"
+#include "common/libhdfs_events_impl.h"
 #include "reader/fileinfo.h"
 #include "reader/readergroup.h"
 
@@ -48,9 +49,12 @@ class DataNodeConnection;
  */
 class FileHandleImpl : public FileHandle {
 public:
-  FileHandleImpl(::asio::io_service *io_service, const std::string 
&client_name,
+  FileHandleImpl(const std::string & cluster_name,
+                 const std::string & path,
+                 ::asio::io_service *io_service, const std::string 
&client_name,
                   const std::shared_ptr<const struct FileInfo> file_info,
-                  std::shared_ptr<BadDataNodeTracker> bad_data_nodes);
+                  std::shared_ptr<BadDataNodeTracker> bad_data_nodes,
+                  std::shared_ptr<LibhdfsEvents> event_handlers);
 
   /*
    * [Some day reliably] Reads a particular offset into the data file.
@@ -58,9 +62,9 @@ public:
    * success, bytes_read will equal nbyte
    */
   void PositionRead(
-               void *buf,
-               size_t nbyte,
-               uint64_t offset,
+    void *buf,
+    size_t nbyte,
+    uint64_t offset,
     const std::function<void(const Status &status, size_t bytes_read)> &handler
     ) override;
 
@@ -96,7 +100,6 @@ public:
                       const std::function<void(const Status &status,
                       const std::string &dn_id, size_t bytes_read)> handler);
 
-
   /**
    *  Cancels all operations instantiated from this FileHandle.
    *  Will set a flag to abort continuation pipelines when they try to move to 
the next step.
@@ -104,6 +107,14 @@ public:
    **/
   virtual void CancelOperations(void) override;
 
+  virtual void SetFileEventCallback(file_event_callback callback) override;
+
+  /**
+   * Ephemeral objects created by the filehandle will need to get the event
+   * handler registry owned by the FileSystem.
+   **/
+  std::shared_ptr<LibhdfsEvents> get_event_handlers();
+
 protected:
   virtual std::shared_ptr<BlockReader> CreateBlockReader(const 
BlockReaderOptions &options,
                                                  
std::shared_ptr<DataNodeConnection> dn);
@@ -112,6 +123,8 @@ protected:
       const ::hadoop::hdfs::DatanodeInfoProto & dn,
       const hadoop::common::TokenProto * token);
 private:
+  const std::string cluster_name_;
+  const std::string path_;
   ::asio::io_service * const io_service_;
   const std::string client_name_;
   const std::shared_ptr<const struct FileInfo> file_info_;
@@ -120,6 +133,7 @@ private:
   off_t offset_;
   CancelHandle cancel_state_;
   ReaderGroup readers_;
+  std::shared_ptr<LibhdfsEvents> event_handlers_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff138e89/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
index 8f386ed..569b479 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
@@ -47,7 +47,8 @@ static constexpr uint16_t kDefaultPort = 8020;
  *                    NAMENODE OPERATIONS
  ****************************************************************************/
 
-void NameNodeOperations::Connect(const std::string &server,
+void NameNodeOperations::Connect(const std::string &cluster_name,
+                                 const std::string &server,
                              const std::string &service,
                              std::function<void(const Status &)> &&handler) {
   using namespace asio_continuation;
@@ -55,8 +56,8 @@ void NameNodeOperations::Connect(const std::string &server,
   auto m = Pipeline<State>::Create();
   m->Push(Resolve(io_service_, server, service,
                   std::back_inserter(m->state())))
-      .Push(Bind([this, m](const Continuation::Next &next) {
-        engine_.Connect(m->state(), next);
+      .Push(Bind([this, m, cluster_name](const Continuation::Next &next) {
+        engine_.Connect(cluster_name, m->state(), next);
       }));
   m->Run([this, handler](const Status &status, const State &) {
     handler(status);
@@ -113,6 +114,10 @@ void NameNodeOperations::GetBlockLocations(const 
std::string & path,
 }
 
 
+void NameNodeOperations::SetFsEventCallback(fs_event_callback callback) {
+  engine_.SetFsEventCallback(callback);
+}
+
 /*****************************************************************************
  *                    FILESYSTEM BASE CLASS
  ****************************************************************************/
@@ -162,7 +167,8 @@ FileSystemImpl::FileSystemImpl(IoService *&io_service, 
const std::string &user_n
       nn_(&io_service_->io_service(), options,
       GetRandomClientName(), get_effective_user_name(user_name), 
kNamenodeProtocol,
       kNamenodeProtocolVersion), client_name_(GetRandomClientName()),
-      bad_node_tracker_(std::make_shared<BadDataNodeTracker>())
+      bad_node_tracker_(std::make_shared<BadDataNodeTracker>()),
+      event_handlers_(std::make_shared<LibhdfsEvents>())
 {
   LOG_TRACE(kFileSystem, << "FileSystemImpl::FileSystemImpl("
                          << FMT_THIS_ADDR << ") called");
@@ -201,7 +207,9 @@ void FileSystemImpl::Connect(const std::string &server,
     handler (Status::Error("Null IoService"), this);
   }
 
-  nn_.Connect(server, service, [this, handler](const Status & s) {
+  cluster_name_ = server + ":" + service;
+
+  nn_.Connect(cluster_name_, server, service, [this, handler](const Status & 
s) {
     handler(s, this);
   });
 }
@@ -288,8 +296,8 @@ void FileSystemImpl::Open(
                                  << FMT_THIS_ADDR << ", path="
                                  << path << ") called");
 
-  nn_.GetBlockLocations(path, [this, handler](const Status &stat, 
std::shared_ptr<const struct FileInfo> file_info) {
-    handler(stat, stat.ok() ? new FileHandleImpl(&io_service_->io_service(), 
client_name_, file_info, bad_node_tracker_)
+  nn_.GetBlockLocations(path, [this, path, handler](const Status &stat, 
std::shared_ptr<const struct FileInfo> file_info) {
+    handler(stat, stat.ok() ? new FileHandleImpl(cluster_name_, path, 
&io_service_->io_service(), client_name_, file_info, bad_node_tracker_, 
event_handlers_)
                             : nullptr);
   });
 }
@@ -340,4 +348,18 @@ void FileSystemImpl::WorkerDeleter::operator()(std::thread 
*t) {
   delete t;
 }
 
+
+void FileSystemImpl::SetFsEventCallback(fs_event_callback callback) {
+  if (event_handlers_) {
+    event_handlers_->set_fs_callback(callback);
+    nn_.SetFsEventCallback(callback);
+  }
+}
+
+
+
+std::shared_ptr<LibhdfsEvents> FileSystemImpl::get_event_handlers() {
+  return event_handlers_;
+}
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff138e89/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
index b208a6c..24854c0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
@@ -19,6 +19,7 @@
 #define LIBHDFSPP_LIB_FS_FILESYSTEM_H_
 
 #include "filehandle.h"
+#include "common/libhdfs_events_impl.h"
 #include "common/hdfs_public_api.h"
 #include "common/async_stream.h"
 #include "hdfspp/hdfspp.h"
@@ -53,13 +54,15 @@ public:
   engine_(io_service, options, client_name, user_name, protocol_name, 
protocol_version),
   namenode_(& engine_) {}
 
-  void Connect(const std::string &server,
+  void Connect(const std::string &cluster_name,
+               const std::string &server,
                const std::string &service,
                std::function<void(const Status &)> &&handler);
 
   void GetBlockLocations(const std::string & path,
     std::function<void(const Status &, std::shared_ptr<const struct 
FileInfo>)> handler);
 
+  void SetFsEventCallback(fs_event_callback callback);
 private:
   ::asio::io_service * io_service_;
   RpcEngine engine_;
@@ -100,6 +103,8 @@ public:
   Status Open(const std::string &path, FileHandle **handle) override;
 
 
+  void SetFsEventCallback(fs_event_callback callback) override;
+
   /* add a new thread to handle asio requests, return number of threads in pool
    */
   int AddWorkerThread();
@@ -107,9 +112,13 @@ public:
   /* how many worker threads are servicing asio requests */
   int WorkerThreadCount() { return worker_threads_.size(); }
 
+  /* all monitored events will need to lookup handlers */
+  std::shared_ptr<LibhdfsEvents> get_event_handlers();
 
 private:
   const Options options_;
+
+  std::string cluster_name_;
   /**
    *  The IoService must be the first member variable to ensure that it gets
    *  destroyed last.  This allows other members to dequeue things from the
@@ -126,6 +135,12 @@ private:
   typedef std::unique_ptr<std::thread, WorkerDeleter> WorkerPtr;
   std::vector<WorkerPtr> worker_threads_;
 
+  /**
+   * Runtime event monitoring handlers.
+   * Note:  This is really handy to have for advanced usage but
+   * exposes implementation details that may change at any time.
+   **/
+  std::shared_ptr<LibhdfsEvents> event_handlers_;
 };
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff138e89/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
index c65c063..bed3347 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
@@ -321,6 +321,16 @@ void RpcConnection::PreEnqueueRequests(
   // Don't start sending yet; will flush when connected
 }
 
+void RpcConnection::SetEventHandlers(std::shared_ptr<LibhdfsEvents> 
event_handlers) {
+  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
+  event_handlers_ = event_handlers;
+}
+
+void RpcConnection::SetClusterName(std::string cluster_name) {
+  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
+  cluster_name_ = cluster_name;
+}
+
 void RpcConnection::CommsError(const Status &status) {
   assert(lock_held(connection_state_lock_));  // Must be holding lock before 
calling
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff138e89/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
index 3413438..cab14fa 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
@@ -22,6 +22,7 @@
 
 #include "common/logging.h"
 #include "common/util.h"
+#include "common/libhdfs_events_impl.h"
 
 #include <asio/connect.hpp>
 #include <asio/read.hpp>
@@ -111,6 +112,15 @@ void RpcConnectionImpl<NextLayer>::ConnectComplete(const 
::asio::error_code &ec)
   LOG_TRACE(kRPC, << "RpcConnectionImpl::ConnectComplete called");
 
   Status status = ToStatus(ec);
+  if(event_handlers_) {
+    auto event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, 
cluster_name_.c_str(), 0);
+#ifndef NDEBUG
+    if (event_resp.response() == event_response::kTest_Error) {
+      status = event_resp.status();
+    }
+#endif
+  }
+
   if (status.ok()) {
     StartReading();
     Handshake([shared_this, this](const Status & s) {
@@ -241,7 +251,7 @@ void RpcConnectionImpl<NextLayer>::FlushPendingRequests() {
 
 
 template <class NextLayer>
-void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code 
&ec,
+void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code 
&asio_ec,
                                                    size_t) {
   using std::placeholders::_1;
   using std::placeholders::_2;
@@ -251,6 +261,16 @@ void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const 
::asio::error_code &ec,
 
   std::shared_ptr<RpcConnection> shared_this = shared_from_this();
 
+  ::asio::error_code ec = asio_ec;
+  if(event_handlers_) {
+    auto event_resp = event_handlers_->call(FS_NN_READ_EVENT, 
cluster_name_.c_str(), 0);
+#ifndef NDEBUG
+    if (event_resp.response() == event_response::kTest_Error) {
+        ec = std::make_error_code(std::errc::network_down);
+    }
+#endif
+  }
+
   switch (ec.value()) {
     case 0:
       // No errors

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff138e89/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
index b598d0f..70b50cf 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
@@ -39,18 +39,21 @@ RpcEngine::RpcEngine(::asio::io_service *io_service, const 
Options &options,
       protocol_version_(protocol_version),
       retry_policy_(std::move(MakeRetryPolicy(options))),
       call_id_(0),
-      retry_timer(*io_service) {
+      retry_timer(*io_service),
+      event_handlers_(std::make_shared<LibhdfsEvents>()) {
     LOG_DEBUG(kRPC, << "RpcEngine::RpcEngine called");
-  }
+}
 
-void RpcEngine::Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
+void RpcEngine::Connect(const std::string &cluster_name,
+                        const std::vector<::asio::ip::tcp::endpoint> &server,
                         RpcCallback &handler) {
   std::lock_guard<std::mutex> state_lock(engine_state_lock_);
   LOG_DEBUG(kRPC, << "RpcEngine::Connect called");
 
   last_endpoints_ = server;
+  cluster_name_ = cluster_name;
 
-  conn_ = NewConnection();
+  conn_ = InitializeConnection();
   conn_->Connect(last_endpoints_, handler);
 }
 
@@ -85,7 +88,7 @@ void RpcEngine::AsyncRpc(
   LOG_TRACE(kRPC, << "RpcEngine::AsyncRpc called");
 
   if (!conn_) {
-    conn_ = NewConnection();
+    conn_ = InitializeConnection();
     conn_->ConnectAndFlush(last_endpoints_);
   }
   conn_->AsyncRpc(method_name, req, resp, handler);
@@ -111,6 +114,14 @@ std::shared_ptr<RpcConnection> RpcEngine::NewConnection()
   return std::make_shared<RpcConnectionImpl<::asio::ip::tcp::socket>>(this);
 }
 
+std::shared_ptr<RpcConnection> RpcEngine::InitializeConnection()
+{
+  std::shared_ptr<RpcConnection> result = NewConnection();
+  result->SetEventHandlers(event_handlers_);
+  result->SetClusterName(cluster_name_);
+  return result;
+}
+
 
 Status RpcEngine::RawRpc(const std::string &method_name, const std::string 
&req,
                          std::shared_ptr<std::string> resp) {
@@ -120,7 +131,7 @@ Status RpcEngine::RawRpc(const std::string &method_name, 
const std::string &req,
   {
     std::lock_guard<std::mutex> state_lock(engine_state_lock_);
     if (!conn_) {
-        conn_ = NewConnection();
+        conn_ = InitializeConnection();
         conn_->ConnectAndFlush(last_endpoints_);
       }
     conn = conn_;
@@ -185,7 +196,7 @@ void RpcEngine::RpcCommsError(
   //    the NN
   if (!pendingRequests.empty() &&
           head_action && head_action->action != RetryAction::FAIL) {
-    conn_ = NewConnection();
+    conn_ = InitializeConnection();
 
     conn_->PreEnqueueRequests(pendingRequests);
     if (head_action->delayMillis > 0) {
@@ -203,4 +214,10 @@ void RpcEngine::RpcCommsError(
   }
 }
 
+
+void RpcEngine::SetFsEventCallback(fs_event_callback callback) {
+  event_handlers_->set_fs_callback(callback);
+}
+
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff138e89/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
index 75d4e67..7b66ac0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
@@ -22,6 +22,7 @@
 #include "hdfspp/status.h"
 
 #include "common/retry_policy.h"
+#include "common/libhdfs_events_impl.h"
 
 #include <google/protobuf/message_lite.h>
 #include <google/protobuf/io/coded_stream.h>
@@ -131,6 +132,9 @@ class RpcConnection : public 
std::enable_shared_from_this<RpcConnection> {
   //   on connect
   void PreEnqueueRequests(std::vector<std::shared_ptr<Request>> requests);
 
+  void SetEventHandlers(std::shared_ptr<LibhdfsEvents> event_handlers);
+  void SetClusterName(std::string cluster_name);
+
   LockFreeRpcEngine *engine() { return engine_; }
   ::asio::io_service &io_service();
 
@@ -186,6 +190,10 @@ class RpcConnection : public 
std::enable_shared_from_this<RpcConnection> {
   // Requests that are waiting for responses
   typedef std::unordered_map<int, std::shared_ptr<Request>> RequestOnFlyMap;
   RequestOnFlyMap requests_on_fly_;
+  std::shared_ptr<LibhdfsEvents> event_handlers_;
+  std::string cluster_name_;
+
+
   // Lock for mutable parts of this class that need to be thread safe
   std::mutex connection_state_lock_;
 };
@@ -234,7 +242,9 @@ class RpcEngine : public LockFreeRpcEngine {
             const std::string &client_name, const std::string &user_name,
             const char *protocol_name, int protocol_version);
 
-  void Connect(const std::vector<::asio::ip::tcp::endpoint> &server, 
RpcCallback &handler);
+  void Connect(const std::string & cluster_name,
+               const std::vector<::asio::ip::tcp::endpoint> &server,
+               RpcCallback &handler);
 
   void AsyncRpc(const std::string &method_name,
                 const ::google::protobuf::MessageLite *req,
@@ -272,13 +282,17 @@ class RpcEngine : public LockFreeRpcEngine {
   ::asio::io_service &io_service() override { return *io_service_; }
   const Options &options() const override { return options_; }
   static std::string GetRandomClientName();
- protected:
+
+  void SetFsEventCallback(fs_event_callback callback);
+protected:
   std::shared_ptr<RpcConnection> conn_;
+  std::shared_ptr<RpcConnection> InitializeConnection();
   virtual std::shared_ptr<RpcConnection> NewConnection();
   virtual std::unique_ptr<const RetryPolicy> MakeRetryPolicy(const Options 
&options);
 
   // Remember all of the last endpoints in case we need to reconnect and retry
   std::vector<::asio::ip::tcp::endpoint> last_endpoints_;
+
 private:
   ::asio::io_service * const io_service_;
   const Options options_;
@@ -287,9 +301,12 @@ private:
   const std::string protocol_name_;
   const int protocol_version_;
   const std::unique_ptr<const RetryPolicy> retry_policy_; //null --> no retry
+  std::string cluster_name_;
   std::atomic_int call_id_;
   ::asio::deadline_timer retry_timer;
 
+  std::shared_ptr<LibhdfsEvents> event_handlers_;
+
   std::mutex engine_state_lock_;
 
 };

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff138e89/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
index 4741817..01d723f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
@@ -18,6 +18,7 @@
 
 #include "fs/filesystem.h"
 #include "fs/bad_datanode_tracker.h"
+#include "common/libhdfs_events_impl.h"
 
 #include "common/util.h"
 
@@ -129,9 +130,10 @@ TEST(BadDataNodeTest, TestNoNodes) {
   };
   IoServiceImpl io_service;
   auto bad_node_tracker = std::make_shared<BadDataNodeTracker>();
+  auto monitors = std::make_shared<LibhdfsEvents>();
   bad_node_tracker->AddBadNode("foo");
 
-  PartialMockFileHandle is(&io_service.io_service(), GetRandomClientName(), 
file_info, bad_node_tracker);
+  PartialMockFileHandle is("cluster", "file", &io_service.io_service(), 
GetRandomClientName(), file_info, bad_node_tracker, monitors);
   Status stat;
   size_t read = 0;
 
@@ -147,6 +149,69 @@ TEST(BadDataNodeTest, TestNoNodes) {
   ASSERT_EQ(0UL, read);
 }
 
+TEST(BadDataNodeTest, NNEventCallback) {
+  auto file_info = std::make_shared<struct FileInfo>();
+  file_info->blocks_.push_back(LocatedBlockProto());
+  LocatedBlockProto & block = file_info->blocks_[0];
+  ExtendedBlockProto *b = block.mutable_b();
+  b->set_poolid("");
+  b->set_blockid(1);
+  b->set_generationstamp(1);
+  b->set_numbytes(4096);
+
+  // Set up the one block to have one datanodes holding it
+  DatanodeInfoProto *di = block.add_locs();
+  DatanodeIDProto *dnid = di->mutable_id();
+  dnid->set_datanodeuuid("dn1");
+
+  char buf[4096] = {
+      0,
+  };
+  IoServiceImpl io_service;
+  auto tracker = std::make_shared<BadDataNodeTracker>();
+
+
+  // Set up event callbacks
+  int calls = 0;
+  std::vector<std::string> callbacks;
+  auto monitors = std::make_shared<LibhdfsEvents>();
+  monitors->set_file_callback([&calls, &callbacks] (const char * event,
+                    const char * cluster,
+                    const char * file,
+                    int64_t value) {
+    (void)cluster; (void) file; (void)value;
+    callbacks.push_back(event);
+
+    // Allow connect call to succeed by fail on read
+    if (calls++ == 1)
+      return event_response::test_err(Status::Error("Test"));
+
+    return event_response::ok();
+  });
+  PartialMockFileHandle is("cluster", "file", &io_service.io_service(), 
GetRandomClientName(),  file_info, tracker, monitors);
+  Status stat;
+  size_t read = 0;
+
+  EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_))
+      // Will return OK, but our callback will subvert it
+      .WillOnce(InvokeArgument<4>(
+          Status::OK(), 0));
+
+  is.AsyncPreadSome(
+      0, asio::buffer(buf, sizeof(buf)), nullptr,
+      [&stat, &read](const Status &status, const std::string &,
+                     size_t transferred) {
+        stat = status;
+        read = transferred;
+      });
+
+  ASSERT_FALSE(stat.ok());
+  ASSERT_EQ(2, callbacks.size());
+  ASSERT_EQ(FILE_DN_CONNECT_EVENT, callbacks[0]);
+  ASSERT_EQ(FILE_DN_READ_EVENT, callbacks[1]);
+}
+
+
 TEST(BadDataNodeTest, RecoverableError) {
   auto file_info = std::make_shared<struct FileInfo>();
   file_info->blocks_.push_back(LocatedBlockProto());
@@ -167,7 +232,8 @@ TEST(BadDataNodeTest, RecoverableError) {
   };
   IoServiceImpl io_service;
   auto tracker = std::make_shared<BadDataNodeTracker>();
-  PartialMockFileHandle is(&io_service.io_service(), GetRandomClientName(),  
file_info, tracker);
+  auto monitors = std::make_shared<LibhdfsEvents>();
+  PartialMockFileHandle is("cluster", "file", &io_service.io_service(), 
GetRandomClientName(),  file_info, tracker, monitors);
   Status stat;
   size_t read = 0;
   EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_))
@@ -216,7 +282,8 @@ TEST(BadDataNodeTest, InternalError) {
   };
   IoServiceImpl io_service;
   auto tracker = std::make_shared<BadDataNodeTracker>();
-  PartialMockFileHandle is(&io_service.io_service(), GetRandomClientName(),  
file_info, tracker);
+  auto monitors = std::make_shared<LibhdfsEvents>();
+  PartialMockFileHandle is("cluster", "file", &io_service.io_service(), 
GetRandomClientName(),  file_info, tracker, monitors);
   Status stat;
   size_t read = 0;
   EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_))

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff138e89/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
index de9972e..b7d5d0b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
@@ -266,7 +266,7 @@ TEST(RpcEngineTest, TestConnectionFailure)
   EXPECT_CALL(*producer, Produce())
       
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset),
 "")));
 
-  engine.Connect(make_endpoint(), [&complete, &io_service](const Status &stat) 
{
+  engine.Connect("", make_endpoint(), [&complete, &io_service](const Status 
&stat) {
     complete = true;
     io_service.stop();
     ASSERT_FALSE(stat.ok());
@@ -294,7 +294,7 @@ TEST(RpcEngineTest, TestConnectionFailureRetryAndFailure)
       
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset),
 "")))
       
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset),
 "")));
 
-  engine.Connect(make_endpoint(), [&complete, &io_service](const Status &stat) 
{
+  engine.Connect("", make_endpoint(), [&complete, &io_service](const Status 
&stat) {
     complete = true;
     io_service.stop();
     ASSERT_FALSE(stat.ok());
@@ -322,7 +322,7 @@ TEST(RpcEngineTest, TestConnectionFailureAndRecover)
       .WillOnce(Return(std::make_pair(::asio::error_code(), "")))
       .WillOnce(Return(std::make_pair(::asio::error::would_block, "")));
 
-  engine.Connect(make_endpoint(), [&complete, &io_service](const Status &stat) 
{
+  engine.Connect("", make_endpoint(), [&complete, &io_service](const Status 
&stat) {
     complete = true;
     io_service.stop();
     ASSERT_TRUE(stat.ok());
@@ -331,6 +331,72 @@ TEST(RpcEngineTest, TestConnectionFailureAndRecover)
   ASSERT_TRUE(complete);
 }
 
+TEST(RpcEngineTest, TestEventCallbacks)
+{
+  ::asio::io_service io_service;
+  Options options;
+  options.max_rpc_retries = 99;
+  options.rpc_retry_delay_ms = 0;
+  SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 
1);
+
+  // Set up event callbacks
+  int calls = 0;
+  std::vector<std::string> callbacks;
+  engine.SetFsEventCallback([&calls, &callbacks] (const char * event,
+                    const char * cluster,
+                    int64_t value) {
+    (void)cluster; (void)value;
+    callbacks.push_back(event);
+
+    // Allow connect and fail first read
+    calls++;
+    if (calls == 1 || calls == 3) // First connect and first read
+      return event_response::test_err(Status::Error("Test"));
+
+    return event_response::ok();
+  });
+
+
+
+  EchoResponseProto server_resp;
+  server_resp.set_message("foo");
+
+  auto producer = std::make_shared<SharedConnectionData>();
+  producer->checkProducerForConnect = true;
+  RpcResponseHeaderProto h;
+  h.set_callid(1);
+  h.set_status(RpcResponseHeaderProto::SUCCESS);
+  EXPECT_CALL(*producer, Produce())
+      .WillOnce(Return(std::make_pair(::asio::error_code(), ""))) // subverted 
by callback
+      .WillOnce(Return(std::make_pair(::asio::error_code(), "")))
+      .WillOnce(Return(RpcResponse(h, "b"))) // subverted by callback
+      .WillOnce(Return(RpcResponse(h, server_resp.SerializeAsString())));
+  SharedMockConnection::SetSharedConnectionData(producer);
+
+
+  EchoRequestProto req;
+  req.set_message("foo");
+  std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
+
+  bool complete = false;
+  engine.AsyncRpc("test", &req, resp, [&complete, &io_service](const Status 
&stat) {
+    complete = true;
+    io_service.stop();
+    ASSERT_TRUE(stat.ok());
+  });
+  io_service.run();
+  ASSERT_TRUE(complete);
+  ASSERT_EQ(7, callbacks.size());
+  ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[0]); // error
+  ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[1]); // reconnect
+  ASSERT_EQ(FS_NN_READ_EVENT, callbacks[2]); // makes an error
+  ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[3]); // reconnect
+  for (int i=4; i < 7; i++)
+    ASSERT_EQ(FS_NN_READ_EVENT, callbacks[i]);
+}
+
+
+
 TEST(RpcEngineTest, TestConnectionFailureAndAsyncRecover)
 {
   // Error and async recover
@@ -351,7 +417,7 @@ TEST(RpcEngineTest, TestConnectionFailureAndAsyncRecover)
       .WillOnce(Return(std::make_pair(::asio::error_code(), "")))
       .WillOnce(Return(std::make_pair(::asio::error::would_block, "")));
 
-  engine.Connect(make_endpoint(), [&complete, &io_service](const Status &stat) 
{
+  engine.Connect("", make_endpoint(), [&complete, &io_service](const Status 
&stat) {
     complete = true;
     io_service.stop();
     ASSERT_TRUE(stat.ok());

Reply via email to