Repository: hadoop
Updated Branches:
  refs/heads/HDFS-8707 3117e2a87 -> 411a9f829


HDFS-12134: libhdfs++: Add a synchronization interface for the GSSAPI.  
Contributed by James Clampffer.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/411a9f82
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/411a9f82
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/411a9f82

Branch: refs/heads/HDFS-8707
Commit: 411a9f829fe3399fa7fc952bb4bd597a4f0e5861
Parents: 3117e2a
Author: James Clampffer <james.clampf...@hp.com>
Authored: Mon Aug 7 13:04:50 2017 -0400
Committer: James Clampffer <james.clampf...@hp.com>
Committed: Mon Aug 7 13:04:50 2017 -0400

----------------------------------------------------------------------
 .../native/libhdfspp/include/hdfspp/hdfspp.h    |   1 +
 .../native/libhdfspp/include/hdfspp/locks.h     | 110 +++++++++
 .../native/libhdfspp/include/hdfspp/status.h    |   2 +
 .../native/libhdfspp/lib/common/CMakeLists.txt  |   2 +-
 .../main/native/libhdfspp/lib/common/locks.cc   | 100 +++++++++
 .../main/native/libhdfspp/lib/common/status.cc  |  11 +-
 .../src/main/native/libhdfspp/lib/common/uri.cc |   1 +
 .../libhdfspp/lib/rpc/cyrus_sasl_engine.cc      | 100 +++++++--
 .../native/libhdfspp/lib/rpc/gsasl_engine.cc    | 110 ++++++---
 .../native/libhdfspp/lib/rpc/sasl_protocol.cc   |   2 -
 .../main/native/libhdfspp/tests/CMakeLists.txt  |   3 +
 .../src/main/native/libhdfspp/tests/uri_test.cc |   1 -
 .../native/libhdfspp/tests/user_lock_test.cc    | 225 +++++++++++++++++++
 13 files changed, 610 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/411a9f82/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
index 673455e..611da21 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
@@ -26,6 +26,7 @@
 #include "hdfspp/fsinfo.h"
 #include "hdfspp/content_summary.h"
 #include "hdfspp/uri.h"
+#include "hdfspp/locks.h"
 
 #include <functional>
 #include <memory>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/411a9f82/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/locks.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/locks.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/locks.h
new file mode 100644
index 0000000..3dfeab4
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/locks.h
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef COMMON_HDFS_LOCKS_H_
+#define COMMON_HDFS_LOCKS_H_
+
+#include <stdexcept>
+#include <string>
+#include <atomic>
+#include <mutex>
+#include <memory>
+
+namespace hdfs
+{
+
+//
+//  Thrown by LockGuard to indicate that it was unable to acquire a mutex
+//  what_str should contain info about what caused the failure
+//
+class LockFailure : public std::runtime_error {
+ public:
+  LockFailure(const char *what_str) : std::runtime_error(what_str) {};
+  LockFailure(const std::string& what_str) : std::runtime_error(what_str) {};
+};
+
+//
+//  A pluggable mutex type to allow client code to share mutexes it may
+//  already use to protect certain system resources.  Certain shared
+//  libraries have some procedures that aren't always implemented in a thread
+//  safe manner. If libhdfs++ and the code linking it depend on the same
+//  library this provides a mechanism to coordinate safe access.
+//
+//  Interface provided is intended to be similar to std::mutex.  If the lock
+//  can't be aquired it may throw LockFailure from the lock method. If lock
+//  does fail libhdfs++ is expected fail as cleanly as possible e.g.
+//  FileSystem::Mkdirs might return a MutexError but a subsequent call may be
+//  successful.
+//
+class Mutex {
+ public:
+  virtual ~Mutex() {};
+  virtual void lock() = 0;
+  virtual void unlock() = 0;
+  virtual std::string str() = 0;
+};
+
+//
+//  LockGuard works in a similar manner to std::lock_guard: it locks the mutex
+//  in the constructor and unlocks it in the destructor.
+//  Failure to acquire the mutex in the constructor will result in throwing a
+//  LockFailure exception.
+//
+class LockGuard {
+ public:
+  LockGuard(Mutex *m);
+  ~LockGuard();
+ private:
+  Mutex *_mtx;
+};
+
+//
+//  Manage instances of hdfs::Mutex that are intended to be global to the
+//  process.
+//
+//  LockManager's InitLocks method provides a mechanism for the calling
+//  application to share its own implementations of hdfs::Mutex.  It must be
+//  called prior to instantiating any FileSystem objects and can only be
+//  called once.  If a lock is not provided a default mutex type wrapping
+//  std::mutex is used as a default.
+//
+
+class LockManager {
+ public:
+  // Initializes with a default set of C++11 style mutexes
+  static bool InitLocks(Mutex *gssapi);
+  static Mutex *getGssapiMutex();
+
+  // Tests only, implementation may no-op on release builds.
+  // Reset _finalized to false and set all Mutex* members to default values.
+  static void TEST_reset_manager();
+  static Mutex *TEST_get_default_mutex();
+ private:
+  // Used only in tests.
+  static Mutex *TEST_default_mutex;
+  // Use to synchronize calls into GSSAPI/Kerberos libs
+  static Mutex *gssapiMtx;
+
+  // Prevent InitLocks from being called more than once
+  // Allows all locks to be set a single time atomically
+  static std::mutex _state_lock;
+  static bool _finalized;
+};
+
+} // end namespace hdfs
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/411a9f82/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h
index d0922ae..718e530 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h
@@ -50,6 +50,7 @@ class Status {
   static Status PathNotFound(const char *msg);
   static Status InvalidOffset(const char *msg);
   static Status PathIsNotDirectory(const char *msg);
+  static Status MutexError(const char *msg);
 
   // success
   bool ok() const { return code_ == 0; }
@@ -79,6 +80,7 @@ class Status {
     kNotADirectory = static_cast<unsigned>(std::errc::not_a_directory),
     kFileAlreadyExists = static_cast<unsigned>(std::errc::file_exists),
     kPathIsNotEmptyDirectory = 
static_cast<unsigned>(std::errc::directory_not_empty),
+    kBusy = static_cast<unsigned>(std::errc::device_or_resource_busy),
 
     // non-errc codes start at 256
     kException = 256,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/411a9f82/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
index b0b721a..15e65c1 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
@@ -19,6 +19,6 @@ if(NEED_LINK_DL)
    set(LIB_DL dl)
 endif()
 
-add_library(common_obj OBJECT status.cc sasl_digest_md5.cc hdfs_ioservice.cc 
options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc 
uri.cc util.cc retry_policy.cc cancel_tracker.cc logging.cc 
libhdfs_events_impl.cc auth_info.cc namenode_info.cc statinfo.cc fsinfo.cc 
content_summary.cc)
+add_library(common_obj OBJECT status.cc sasl_digest_md5.cc hdfs_ioservice.cc 
options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc 
uri.cc util.cc retry_policy.cc cancel_tracker.cc logging.cc 
libhdfs_events_impl.cc auth_info.cc namenode_info.cc statinfo.cc fsinfo.cc 
content_summary.cc locks.cc)
 add_library(common $<TARGET_OBJECTS:common_obj> 
$<TARGET_OBJECTS:uriparser2_obj>)
 target_link_libraries(common ${LIB_DL})

http://git-wip-us.apache.org/repos/asf/hadoop/blob/411a9f82/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/locks.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/locks.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/locks.cc
new file mode 100644
index 0000000..30dcb44
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/locks.cc
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "hdfspp/locks.h"
+
+#include <mutex>
+
+
+namespace hdfs {
+
+LockGuard::LockGuard(Mutex *m) : _mtx(m) {
+  if(!m) {
+    throw LockFailure("LockGuard passed invalid (null) Mutex pointer");
+  }
+  _mtx->lock();
+}
+
+LockGuard::~LockGuard() {
+  if(_mtx) {
+    _mtx->unlock();
+  }
+}
+
+
+// Basic mutexes to use as default.  Just a wrapper around C++11 std::mutex.
+class DefaultMutex : public Mutex {
+ public:
+  DefaultMutex() {}
+
+  void lock() override {
+    // Could throw in here if the implementation couldn't lock for some reason.
+    _mtx.lock();
+  }
+
+  void unlock() override {
+    _mtx.unlock();
+  }
+
+  std::string str() override {
+    return "DefaultMutex";
+  }
+ private:
+  std::mutex _mtx;
+};
+
+DefaultMutex defaultTestMutex;
+DefaultMutex defaultGssapiMutex;
+
+// LockManager static var instantiation
+Mutex *LockManager::TEST_default_mutex = &defaultTestMutex;
+Mutex *LockManager::gssapiMtx = &defaultGssapiMutex;
+std::mutex LockManager::_state_lock;
+bool LockManager::_finalized = false;
+
+bool LockManager::InitLocks(Mutex *gssapi) {
+  std::lock_guard<std::mutex> guard(_state_lock);
+
+  // You get once shot to set this - swapping the locks
+  // out while in use gets risky.  It can still be done by
+  // using the Mutex as a proxy object if one understands
+  // the implied risk of doing so.
+  if(_finalized)
+    return false;
+
+  gssapiMtx = gssapi;
+  _finalized = true;
+  return true;
+}
+
+Mutex *LockManager::getGssapiMutex() {
+  std::lock_guard<std::mutex> guard(_state_lock);
+  return gssapiMtx;
+}
+
+Mutex *LockManager::TEST_get_default_mutex() {
+  return TEST_default_mutex;
+}
+
+void LockManager::TEST_reset_manager() {
+  _finalized = false;
+  // user still responsible for cleanup
+  gssapiMtx = &defaultGssapiMutex;
+}
+
+} // end namepace hdfs

http://git-wip-us.apache.org/repos/asf/hadoop/blob/411a9f82/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc
index 5903553..4c5c7be 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc
@@ -177,7 +177,16 @@ std::string Status::ToString() const {
 }
 
 bool Status::notWorthRetry() const {
-    return noRetryExceptions.find(code_) != noRetryExceptions.end();
+  return noRetryExceptions.find(code_) != noRetryExceptions.end();
+}
+
+Status Status::MutexError(const char *msg) {
+  std::string formatted = "MutexError";
+  if(msg) {
+    formatted += ": ";
+    formatted += msg;
+  }
+  return Status(kBusy/*try_lock failure errno*/, msg);
 }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/411a9f82/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc
index 2213f8b..9e9319b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/uri.cc
@@ -244,6 +244,7 @@ URI URI::parse_from_string(const std::string &str)
 ///////////////////////////////////////////////////////////////////////////////
 
 URI::URI() : _port(-1) {}
+
 URI::Query::Query(const std::string& k, const std::string& v) : key(k), 
value(v) {}
 
 std::string URI::str(bool encoded_output) const

http://git-wip-us.apache.org/repos/asf/hadoop/blob/411a9f82/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.cc
index 69b2267..5c96ede 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/cyrus_sasl_engine.cc
@@ -16,6 +16,8 @@
  * limitations under the License.
  */
 
+#include "hdfspp/locks.h"
+
 #include <sys/types.h>
 #include "sasl/sasl.h"
 #include "sasl/saslutil.h"
@@ -31,6 +33,9 @@
 
 namespace hdfs {
 
+static Mutex *getSaslMutex() {
+  return LockManager::getGssapiMutex();
+}
 
 // Forward decls of sasl callback functions
 typedef int (*sasl_callback_ft)(void);
@@ -111,23 +116,30 @@ Status CySaslEngine::SaslError( int rc) {
 *                     Cyrus SASL ENGINE
 */
 
-  CySaslEngine::CySaslEngine() : SaslEngine(), conn_(nullptr)
-  {
-    // Create an array of callbacks that embed a pointer to this
-    //   so we can call methods of the engine
-    per_connection_callbacks_ = {
-      { SASL_CB_USER,     (sasl_callback_ft) & get_name, this}, // userid for 
authZ
-      { SASL_CB_AUTHNAME, (sasl_callback_ft) & get_name, this}, // authid for 
authT
-      { SASL_CB_GETREALM, (sasl_callback_ft) & getrealm, this}, // krb/gssapi 
realm
-      //  { SASL_CB_PASS,        (sasl_callback_ft)&getsecret,  this
-      { SASL_CB_LIST_END, (sasl_callback_ft) NULL, NULL}
-    };
-  }
+CySaslEngine::CySaslEngine() : SaslEngine(), conn_(nullptr)
+{
+  // Create an array of callbacks that embed a pointer to this
+  //   so we can call methods of the engine
+  per_connection_callbacks_ = {
+    { SASL_CB_USER,     (sasl_callback_ft) & get_name, this}, // userid for 
authZ
+    { SASL_CB_AUTHNAME, (sasl_callback_ft) & get_name, this}, // authid for 
authT
+    { SASL_CB_GETREALM, (sasl_callback_ft) & getrealm, this}, // krb/gssapi 
realm
+    //  { SASL_CB_PASS,        (sasl_callback_ft)&getsecret,  this
+    { SASL_CB_LIST_END, (sasl_callback_ft) NULL, NULL}
+  };
+}
 
+// Cleanup of last resort.  Call Finish to allow a safer check on disposal
 CySaslEngine::~CySaslEngine()
 {
+
   if (conn_) {
+    try {
+      LockGuard saslGuard(getSaslMutex());
       sasl_dispose( &conn_); // undo sasl_client_new()
+    } catch (const LockFailure& e) {
+      LOG_ERROR(kRPC, << "Unable to dispose of SASL context due to " << 
e.what());
+    }
   }
 } // destructor
 
@@ -146,8 +158,15 @@ Status CySaslEngine::InitCyrusSasl()
   const char * fqdn = chosen_mech_.serverid.c_str();
   const char * proto = chosen_mech_.protocol.c_str();
 
-  rc = sasl_client_new(proto, fqdn, NULL, NULL, &per_connection_callbacks_[0], 
0, &conn_);
-  if (rc != SASL_OK) return SaslError(rc);
+  try {
+    LockGuard saslGuard(getSaslMutex());
+    rc = sasl_client_new(proto, fqdn, NULL, NULL, 
&per_connection_callbacks_[0], 0, &conn_);
+    if (rc != SASL_OK) {
+      return SaslError(rc);
+    }
+  } catch (const LockFailure& e) {
+    return Status::MutexError("mutex that guards sasl_client_new unable to 
lock");
+  }
 
   return Status::OK();
 } // cysasl_new()
@@ -176,8 +195,15 @@ CySaslEngine::Start()
   const char      * chosen_mech;
   std::string       token;
 
-  rc = sasl_client_start(conn_, chosen_mech_.mechanism.c_str(), 
&client_interact,
-            (const char **) &buf, &buflen, &chosen_mech);
+  try {
+    LockGuard saslGuard(getSaslMutex());
+    rc = sasl_client_start(conn_, chosen_mech_.mechanism.c_str(), 
&client_interact,
+              (const char **) &buf, &buflen, &chosen_mech);
+  } catch (const LockFailure& e) {
+    state_ = kFailure;
+    return std::make_pair( Status::MutexError("mutex that guards 
sasl_client_new unable to lock"), "" );
+  }
+
 
   switch (rc) {
   case SASL_OK:        state_ = kSuccess;
@@ -192,6 +218,7 @@ CySaslEngine::Start()
   // Cyrus will free this buffer when the connection is shut down
   token = std::string( buf, buflen);
   return std::make_pair( Status::OK(), token);
+
 } // start() method
 
 std::pair<Status, std::string> CySaslEngine::Step(const std::string data)
@@ -203,9 +230,15 @@ std::pair<Status, std::string> CySaslEngine::Step(const 
std::string data)
   if (state_ != kWaitingForData)
     LOG_WARN(kRPC, << "CySaslEngine::step when state is " << state_);
 
-  int rc = sasl_client_step(conn_, data.c_str(), data.size(), &client_interact,
-                        (const char **) &output, &outlen);
-
+  int rc = 0;
+  try {
+    LockGuard saslGuard(getSaslMutex());
+    rc = sasl_client_step(conn_, data.c_str(), data.size(), &client_interact,
+                     (const char **) &output, &outlen);
+  } catch (const LockFailure& e) {
+    state_ = kFailure;
+    return std::make_pair( Status::MutexError("mutex that guards 
sasl_client_new unable to lock"), "" );
+  }
   // right now, state_ == kWaitingForData,
   // so update  state_, to reflect _step()'s result:
   switch (rc) {
@@ -224,8 +257,13 @@ Status CySaslEngine::Finish()
     LOG_WARN(kRPC, << "CySaslEngine::finish when state is " << state_);
 
   if (conn_ != nullptr) {
+    try {
+      LockGuard saslGuard(getSaslMutex());
       sasl_dispose( &conn_);
       conn_ = NULL;
+    } catch (const LockFailure& e) {
+      return Status::MutexError("mutex that guards sasl_dispose unable to 
lock");
+    }
   }
 
   return Status::OK();
@@ -234,6 +272,8 @@ Status CySaslEngine::Finish()
 //////////////////////////////////////////////////
 // Internal callbacks, for sasl_init_client().  //
 // Mostly lifted from cyrus' sample_client.c .  //
+// Implicitly called in a context that already  //
+// holds the SASL/GSSAPI lock.                  //
 //////////////////////////////////////////////////
 
 static int
@@ -388,14 +428,26 @@ const sasl_callback_t per_process_callbacks[] = {
 
 CyrusPerProcessData::CyrusPerProcessData()
 {
-  int init_rc = sasl_client_init(per_process_callbacks);
-  init_status_ = make_status(init_rc);
+  try {
+    LockGuard saslGuard(getSaslMutex());
+    int init_rc = sasl_client_init(per_process_callbacks);
+    init_status_ = make_status(init_rc);
+  } catch (const LockFailure& e) {
+    init_status_ = Status::MutexError("mutex protecting process-wide 
sasl_client_init unable to lock");
+  }
 }
 
 CyrusPerProcessData::~CyrusPerProcessData()
 {
   // Undo sasl_client_init())
-  sasl_done();
+  try {
+    LockGuard saslGuard(getSaslMutex());
+    sasl_done();
+  } catch (const LockFailure& e) {
+    // Not can be done at this point, but the process is most likely shutting 
down anyway.
+    LOG_ERROR(kRPC, << "mutex protecting process-wide sasl_done unable to 
lock");
+  }
+
 }
 
 Status CyrusPerProcessData::Init()
@@ -405,6 +457,10 @@ Status CyrusPerProcessData::Init()
 
 CyrusPerProcessData & CyrusPerProcessData::GetInstance()
 {
+  // Meyer's singleton, thread safe and lazily initialized in C++11
+  //
+  // Must be lazily initialized to allow client code to plug in a GSSAPI mutex
+  // implementation.
   static CyrusPerProcessData per_process_data;
   return per_process_data;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/411a9f82/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.cc
index 8286bac..7705c81 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/gsasl_engine.cc
@@ -16,18 +16,26 @@
  * limitations under the License.
  */
 
+#include "hdfspp/locks.h"
+
 #include <sstream>
 #include <gsasl.h>
 #include  "sasl_engine.h"
 #include "gsasl_engine.h"
 #include "common/logging.h"
 
+
 namespace hdfs {
 
+
 /*****************************************************************************
  *               GSASL UTILITY FUNCTIONS
  */
 
+static Mutex *getSaslMutex() {
+  return LockManager::getGssapiMutex();
+}
+
 static Status rc_to_status(int rc)
 {
   if (rc == GSASL_OK) {
@@ -70,32 +78,45 @@ std::pair<Status, std::string> base64_encode(const 
std::string & in) {
 
 GSaslEngine::~GSaslEngine()
 {
-  if (session_ != nullptr) {
+  // These should already be called in this->Finish
+  try {
+    LockGuard saslGuard(getSaslMutex());
+    if (session_ != nullptr) {
       gsasl_finish(session_);
-  }
+    }
 
-  if (ctx_ != nullptr) {
+    if (ctx_ != nullptr) {
       gsasl_done(ctx_);
+    }
+  } catch (const LockFailure& e) {
+    if(session_ || ctx_) {
+      LOG_ERROR(kRPC, << "GSaslEngine::~GSaslEngine@" << this << " unable to 
dispose of gsasl state: " << e.what());
+    }
   }
 }
 
 Status GSaslEngine::gsasl_new() {
-   int status = GSASL_OK;
-
-   if (ctx_) return Status::OK();
-
-   status = gsasl_init( & ctx_);
-
-   switch ( status) {
-   case GSASL_OK:
-      return Status::OK();
-   case GSASL_MALLOC_ERROR:
-      LOG_WARN(kRPC, <<   "GSaslEngine: Out of memory.");
-      return Status::Error("SaslEngine: Out of memory.");
-   default:
-      LOG_WARN(kRPC, <<   "GSaslEngine: Unexpected error." << status);
-      return Status::Error("SaslEngine: Unexpected error.");
-   }
+  int status = GSASL_OK;
+
+  if (ctx_) return Status::OK();
+
+  try {
+    LockGuard saslGuard(getSaslMutex());
+    status = gsasl_init( & ctx_);
+  } catch (const LockFailure& e) {
+    return Status::MutexError("Mutex that guards gsasl_init unable to lock");
+  }
+
+  switch ( status) {
+  case GSASL_OK:
+    return Status::OK();
+  case GSASL_MALLOC_ERROR:
+    LOG_WARN(kRPC, <<   "GSaslEngine: Out of memory.");
+    return Status::Error("SaslEngine: Out of memory.");
+  default:
+    LOG_WARN(kRPC, <<   "GSaslEngine: Unexpected error." << status);
+    return Status::Error("SaslEngine: Unexpected error.");
+  }
 } // gsasl_new()
 
 std::pair<Status, std::string>
@@ -107,12 +128,22 @@ GSaslEngine::Start()
   this->gsasl_new();
 
   /* Create new authentication session. */
-  rc = gsasl_client_start(ctx_, chosen_mech_.mechanism.c_str(), &session_);
+  try {
+    LockGuard saslGuard(getSaslMutex());
+    rc = gsasl_client_start(ctx_, chosen_mech_.mechanism.c_str(), &session_);
+  } catch (const LockFailure& e) {
+    state_ = kErrorState;
+    return std::make_pair(Status::MutexError("Mutex that guards 
gsasl_client_start unable to lock"), "");
+  }
   if (rc != GSASL_OK) {
     state_ = kErrorState;
     return std::make_pair( rc_to_status( rc), std::string(""));
   }
-  init_kerberos();
+  Status init_status = init_kerberos();
+  if(!init_status.ok()) {
+    state_ = kErrorState;
+    return std::make_pair(init_status, "");
+  }
 
   state_ = kWaitingForData;
 
@@ -124,12 +155,17 @@ GSaslEngine::Start()
 Status GSaslEngine::init_kerberos() {
 
   //TODO: check that we have a principal
-
-  gsasl_property_set(session_, GSASL_AUTHID, principal_.value().c_str());
-  gsasl_property_set(session_, GSASL_HOSTNAME,   
chosen_mech_.serverid.c_str());
-  gsasl_property_set(session_, GSASL_SERVICE,    
chosen_mech_.protocol.c_str());
-  return Status::OK();
+  try {
+    LockGuard saslGuard(getSaslMutex());
+    // these don't return anything that indicates failure
+    gsasl_property_set(session_, GSASL_AUTHID, principal_.value().c_str());
+    gsasl_property_set(session_, GSASL_HOSTNAME,   
chosen_mech_.serverid.c_str());
+    gsasl_property_set(session_, GSASL_SERVICE,    
chosen_mech_.protocol.c_str());
+  } catch (const LockFailure& e) {
+    return Status::MutexError("Mutex that guards gsasl_property_set in 
GSaslEngine::init_kerberos unable to lock");
   }
+  return Status::OK();
+}
 
 std::pair<Status, std::string> GSaslEngine::Step(const std::string data) {
   if (state_ != kWaitingForData)
@@ -137,8 +173,16 @@ std::pair<Status, std::string> GSaslEngine::Step(const 
std::string data) {
 
   char * output = NULL;
   size_t outputSize;
-  int rc = gsasl_step(session_, data.c_str(), data.size(), &output,
+
+  int rc = 0;
+  try {
+    LockGuard saslGuard(getSaslMutex());
+    rc = gsasl_step(session_, data.c_str(), data.size(), &output,
           &outputSize);
+  } catch (const LockFailure& e) {
+    state_ = kFailure;
+    return std::make_pair(Status::MutexError("Mutex that guards 
gsasl_client_start unable to lock"), "");
+  }
 
   if (rc == GSASL_NEEDS_MORE || rc == GSASL_OK) {
     std::string retval(output, output ? outputSize : 0);
@@ -166,16 +210,20 @@ Status GSaslEngine::Finish()
   if (state_ != kSuccess && state_ != kFailure && state_ != kErrorState )
     LOG_WARN(kRPC, << "GSaslEngine::finish when state is " << state_);
 
-  if (session_ != nullptr) {
+  try {
+    LockGuard saslGuard(getSaslMutex());
+    if (session_ != nullptr) {
       gsasl_finish(session_);
       session_ = NULL;
-  }
+    }
 
-  if (ctx_ != nullptr) {
+    if (ctx_ != nullptr) {
       gsasl_done(ctx_);
       ctx_ = nullptr;
+    }
+  } catch (const LockFailure& e) {
+    return Status::MutexError("Mutex that guards sasl state cleanup in 
GSaslEngine::Finish unable to lock");
   }
-
   return Status::OK();
 } // finish() method
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/411a9f82/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc
index ad8191b..0957ea3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc
@@ -38,8 +38,6 @@ namespace hdfs {
 
 using namespace hadoop::common;
 using namespace google::protobuf;
-template <class T>
-using optional = std::experimental::optional<T>;
 
 /*****
  * Threading model: all entry points need to acquire the sasl_lock before 
accessing

http://git-wip-us.apache.org/repos/asf/hadoop/blob/411a9f82/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
index 395fad5..0b4581e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
@@ -116,6 +116,9 @@ add_executable(hdfs_ioservice_test hdfs_ioservice_test.cc)
 target_link_libraries(hdfs_ioservice_test fs gmock_main common 
${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} 
${CMAKE_THREAD_LIBS_INIT})
 add_memcheck_test(hdfs_ioservice hdfs_ioservice_test)
 
+add_executable(user_lock_test user_lock_test.cc)
+target_link_libraries(user_lock_test fs gmock_main common 
${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} 
${CMAKE_THREAD_LIBS_INIT})
+add_memcheck_test(user_lock user_lock_test)
 
 #
 #

http://git-wip-us.apache.org/repos/asf/hadoop/blob/411a9f82/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/uri_test.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/uri_test.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/uri_test.cc
index 78f1a58..97f0afd 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/uri_test.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/uri_test.cc
@@ -23,7 +23,6 @@ using ::testing::_;
 
 using namespace hdfs;
 
-
 URI expect_uri_throw(const char *uri) {
   bool threw = false;
   std::string what_msg;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/411a9f82/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/user_lock_test.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/user_lock_test.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/user_lock_test.cc
new file mode 100644
index 0000000..6df47b2
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/user_lock_test.cc
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hdfspp/locks.h>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <iostream>
+#include <memory>
+#include <mutex>
+#include <thread>
+#include <vector>
+
+using namespace hdfs;
+
+// try_lock will always return false, unlock will always throw because it
+// can never be locked.
+class CantLockMutex : public Mutex {
+ public:
+  void lock() override {
+    throw LockFailure("This mutex cannot be locked");
+  }
+  void unlock() override {
+    throw LockFailure("Unlock");
+  }
+  std::string str() override {
+    return "CantLockMutex";
+  }
+};
+
+TEST(UserLockTest, DefaultMutexBasics) {
+  Mutex *mtx = LockManager::TEST_get_default_mutex();
+
+  // lock and unlock twice to make sure unlock works
+  bool locked = false;
+  try {
+    mtx->lock();
+    locked = true;
+  } catch (...) {}
+  EXPECT_TRUE(locked);
+  mtx->unlock();
+
+  locked = false;
+  try {
+    mtx->lock();
+    locked = true;
+  } catch (...) {}
+  EXPECT_TRUE(locked);
+  mtx->unlock();
+
+  EXPECT_EQ(mtx->str(), "DefaultMutex");
+}
+
+
+// Make sure lock manager can only be initialized once unless test reset called
+TEST(UserLockTest, LockManager) {
+  std::unique_ptr<CantLockMutex> mtx(new CantLockMutex());
+  EXPECT_TRUE(mtx != nullptr);
+
+  // Check the default lock
+  Mutex *defaultGssapiMtx = LockManager::getGssapiMutex();
+  EXPECT_TRUE(defaultGssapiMtx != nullptr);
+
+  // Try a double init.  Should not work
+  bool res = LockManager::InitLocks(mtx.get());
+  EXPECT_TRUE(res);
+
+  // Check pointer value
+  EXPECT_EQ(LockManager::getGssapiMutex(), mtx.get());
+
+  res = LockManager::InitLocks(mtx.get());
+  EXPECT_FALSE(res);
+
+  // Make sure test reset still works
+  LockManager::TEST_reset_manager();
+  res = LockManager::InitLocks(mtx.get());
+  EXPECT_TRUE(res);
+  LockManager::TEST_reset_manager();
+  EXPECT_EQ(LockManager::getGssapiMutex(), defaultGssapiMtx);
+}
+
+TEST(UserLockTest, CheckCantLockMutex) {
+  std::unique_ptr<CantLockMutex> mtx(new CantLockMutex());
+  EXPECT_TRUE(mtx != nullptr);
+
+  bool locked = false;
+  try {
+    mtx->lock();
+  } catch (...) {}
+  EXPECT_FALSE(locked);
+
+  bool threw_on_unlock = false;
+  try {
+    mtx->unlock();
+  } catch (const LockFailure& e) {
+    threw_on_unlock = true;
+  }
+  EXPECT_TRUE(threw_on_unlock);
+
+  EXPECT_EQ("CantLockMutex", mtx->str());
+}
+
+TEST(UserLockTest, LockGuardBasics) {
+  Mutex *goodMtx = LockManager::TEST_get_default_mutex();
+  CantLockMutex badMtx;
+
+  // lock/unlock a few times to increase chances of UB if lock is misused
+  for(int i=0;i<10;i++) {
+    bool caught_exception = false;
+    try {
+      LockGuard guard(goodMtx);
+      // now have a scoped lock
+    } catch (const LockFailure& e) {
+      caught_exception = true;
+    }
+    EXPECT_FALSE(caught_exception);
+  }
+
+  // still do a few times, but expect it to blow up each time
+  for(int i=0;i<10;i++) {
+    bool caught_exception = false;
+    try {
+      LockGuard guard(&badMtx);
+      // now have a scoped lock
+    } catch (const LockFailure& e) {
+      caught_exception = true;
+    }
+    EXPECT_TRUE(caught_exception);
+  }
+
+}
+
+struct Incrementer {
+  int64_t& _val;
+  int64_t _iters;
+  Mutex *_mtx;
+  Incrementer(int64_t &val, int64_t iters, Mutex *m)
+        : _val(val), _iters(iters), _mtx(m) {}
+  void operator()(){
+    for(int64_t i=0; i<_iters; i++) {
+      LockGuard valguard(_mtx);
+      _val += 1;
+    }
+  }
+};
+
+struct Decrementer {
+  int64_t& _val;
+  int64_t _iters;
+  Mutex *_mtx;
+  Decrementer(int64_t &val, int64_t iters, Mutex *m)
+        : _val(val), _iters(iters), _mtx(m) {}
+  void operator()(){
+    for(int64_t i=0; i<_iters; i++) {
+      LockGuard valguard(_mtx);
+      _val -= 1;
+    }
+  }
+};
+
+TEST(UserLockTest, LockGuardConcurrency) {
+  Mutex *mtx = LockManager::TEST_get_default_mutex();
+
+  // Prove that these actually mutate the value
+  int64_t test_value = 0;
+  Incrementer inc(test_value, 1000, mtx);
+  inc();
+  EXPECT_EQ(test_value, 1000);
+
+  Decrementer dec(test_value, 1000, mtx);
+  dec();
+  EXPECT_EQ(test_value, 0);
+
+  std::vector<std::thread> workers;
+  std::vector<Incrementer> incrementers;
+  std::vector<Decrementer> decrementors;
+
+  const int delta = 1024 * 1024;
+  const int threads = 2 * 6;
+  EXPECT_EQ(threads % 2, 0);
+
+  // a bunch of threads race to increment and decrement the value
+  // if all goes well the operations balance out and the value is unchanged
+  for(int i=0; i < threads; i++) {
+    if(i%2 == 0) {
+      incrementers.emplace_back(test_value, delta, mtx);
+      workers.emplace_back(incrementers.back());
+    } else {
+      decrementors.emplace_back(test_value, delta, mtx);
+      workers.emplace_back(decrementors.back());
+    }
+  }
+
+  // join, everything should balance to 0
+  for(std::thread& thread : workers) {
+    thread.join();
+  }
+  EXPECT_EQ(test_value, 0);
+}
+
+
+int main(int argc, char *argv[]) {
+
+  // The following line must be executed to initialize Google Mock
+  // (and Google Test) before running the tests.
+  ::testing::InitGoogleMock(&argc, argv);
+  int res = RUN_ALL_TESTS();
+  return res;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to