Repository: arrow
Updated Branches:
  refs/heads/master 68e39c686 -> cfde4607d


ARROW-243: [C++] Add option to switch between libhdfs and libhdfs3 when 
creating HdfsClient

Closes #108

Some users will not have a full Java Hadoop distribution and may wish to use 
the libhdfs3 package from Pivotal 
(https://github.com/Pivotal-Data-Attic/pivotalrd-libhdfs3), part of Apache HAWQ 
(incubating).

In C++, you can switch by setting:

```c++
HdfsConnectionConfig conf;
conf.driver = HdfsDriver::LIBHDFS3;
```

In Python, you can run:

```python
con = arrow.io.HdfsClient.connect(..., driver='libhdfs3')
```

Author: Wes McKinney <wes.mckin...@twosigma.com>

Closes #244 from wesm/ARROW-243 and squashes the following commits:

7ae197a [Wes McKinney] Refactor HdfsClient code to support both libhdfs and 
libhdfs3 at runtime. Add driver option to Python interface


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/cfde4607
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/cfde4607
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/cfde4607

Branch: refs/heads/master
Commit: cfde4607df453e4b97560e64caff744fb3ba3d1f
Parents: 68e39c6
Author: Wes McKinney <wes.mckin...@twosigma.com>
Authored: Mon Dec 19 18:26:17 2016 -0500
Committer: Wes McKinney <wes.mckin...@twosigma.com>
Committed: Mon Dec 19 18:26:17 2016 -0500

----------------------------------------------------------------------
 cpp/src/arrow/io/CMakeLists.txt         |   2 +-
 cpp/src/arrow/io/hdfs-internal.cc       | 590 +++++++++++++++++++++++++++
 cpp/src/arrow/io/hdfs-internal.h        | 203 +++++++++
 cpp/src/arrow/io/hdfs.cc                | 102 +++--
 cpp/src/arrow/io/hdfs.h                 |   6 +-
 cpp/src/arrow/io/io-hdfs-test.cc        | 211 +++++-----
 cpp/src/arrow/io/libhdfs_shim.cc        | 582 --------------------------
 python/.gitignore                       |   1 +
 python/pyarrow/includes/libarrow_io.pxd |   8 +-
 python/pyarrow/io.pyx                   |  45 +-
 python/pyarrow/tests/test_hdfs.py       | 161 ++++----
 11 files changed, 1109 insertions(+), 802 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/cfde4607/cpp/src/arrow/io/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt
index f285180..e2b6496 100644
--- a/cpp/src/arrow/io/CMakeLists.txt
+++ b/cpp/src/arrow/io/CMakeLists.txt
@@ -75,7 +75,7 @@ if(ARROW_HDFS)
 
   set(ARROW_HDFS_SRCS
     hdfs.cc
-    libhdfs_shim.cc)
+    hdfs-internal.cc)
 
   set_property(SOURCE ${ARROW_HDFS_SRCS}
     APPEND_STRING PROPERTY

http://git-wip-us.apache.org/repos/asf/arrow/blob/cfde4607/cpp/src/arrow/io/hdfs-internal.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs-internal.cc 
b/cpp/src/arrow/io/hdfs-internal.cc
new file mode 100644
index 0000000..7094785
--- /dev/null
+++ b/cpp/src/arrow/io/hdfs-internal.cc
@@ -0,0 +1,590 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This shim interface to libhdfs (for runtime shared library loading) has been
+// adapted from the SFrame project, released under the ASF-compatible 3-clause
+// BSD license
+//
+// Using this required having the $JAVA_HOME and $HADOOP_HOME environment
+// variables set, so that libjvm and libhdfs can be located easily
+
+// Copyright (C) 2015 Dato, Inc.
+// All rights reserved.
+//
+// This software may be modified and distributed under the terms
+// of the BSD license. See the LICENSE file for details.
+
+#ifdef HAS_HADOOP
+
+#ifndef _WIN32
+#include <dlfcn.h>
+#else
+#include <windows.h>
+#include <winsock2.h>
+
+// TODO(wesm): address when/if we add windows support
+// #include <util/syserr_reporting.hpp>
+#endif
+
+extern "C" {
+#include <hdfs.h>
+}
+
+#include <iostream>
+#include <mutex>
+#include <sstream>
+#include <string>
+#include <type_traits>
+#include <vector>
+
+#include <boost/filesystem.hpp>  // NOLINT
+
+#include "arrow/io/hdfs-internal.h"
+#include "arrow/status.h"
+#include "arrow/util/visibility.h"
+
+namespace fs = boost::filesystem;
+
+#ifndef _WIN32
+static void* libjvm_handle = NULL;
+#else
+static HINSTANCE libjvm_handle = NULL;
+#endif
+/*
+ * All the shim pointers
+ */
+
+// Helper functions for dlopens
+static std::vector<fs::path> get_potential_libjvm_paths();
+static std::vector<fs::path> get_potential_libhdfs_paths();
+static std::vector<fs::path> get_potential_libhdfs3_paths();
+static arrow::Status try_dlopen(std::vector<fs::path> potential_paths, const 
char* name,
+#ifndef _WIN32
+    void*& out_handle);
+#else
+    HINSTANCE& out_handle);
+#endif
+
+static std::vector<fs::path> get_potential_libhdfs_paths() {
+  std::vector<fs::path> libhdfs_potential_paths;
+  std::string file_name;
+
+// OS-specific file name
+#ifdef __WIN32
+  file_name = "hdfs.dll";
+#elif __APPLE__
+  file_name = "libhdfs.dylib";
+#else
+  file_name = "libhdfs.so";
+#endif
+
+  // Common paths
+  std::vector<fs::path> search_paths = {fs::path(""), fs::path(".")};
+
+  // Path from environment variable
+  const char* hadoop_home = std::getenv("HADOOP_HOME");
+  if (hadoop_home != nullptr) {
+    auto path = fs::path(hadoop_home) / "lib/native";
+    search_paths.push_back(path);
+  }
+
+  const char* libhdfs_dir = std::getenv("ARROW_LIBHDFS_DIR");
+  if (libhdfs_dir != nullptr) { search_paths.push_back(fs::path(libhdfs_dir)); 
}
+
+  // All paths with file name
+  for (auto& path : search_paths) {
+    libhdfs_potential_paths.push_back(path / file_name);
+  }
+
+  return libhdfs_potential_paths;
+}
+
+static std::vector<fs::path> get_potential_libhdfs3_paths() {
+  std::vector<fs::path> potential_paths;
+  std::string file_name;
+
+// OS-specific file name
+#ifdef __WIN32
+  file_name = "hdfs3.dll";
+#elif __APPLE__
+  file_name = "libhdfs3.dylib";
+#else
+  file_name = "libhdfs3.so";
+#endif
+
+  // Common paths
+  std::vector<fs::path> search_paths = {fs::path(""), fs::path(".")};
+
+  const char* libhdfs3_dir = std::getenv("ARROW_LIBHDFS3_DIR");
+  if (libhdfs3_dir != nullptr) { 
search_paths.push_back(fs::path(libhdfs3_dir)); }
+
+  // All paths with file name
+  for (auto& path : search_paths) {
+    potential_paths.push_back(path / file_name);
+  }
+
+  return potential_paths;
+}
+
+static std::vector<fs::path> get_potential_libjvm_paths() {
+  std::vector<fs::path> libjvm_potential_paths;
+
+  std::vector<fs::path> search_prefixes;
+  std::vector<fs::path> search_suffixes;
+  std::string file_name;
+
+// From heuristics
+#ifdef __WIN32
+  search_prefixes = {""};
+  search_suffixes = {"/jre/bin/server", "/bin/server"};
+  file_name = "jvm.dll";
+#elif __APPLE__
+  search_prefixes = {""};
+  search_suffixes = {"", "/jre/lib/server"};
+  file_name = "libjvm.dylib";
+
+// SFrame uses /usr/libexec/java_home to find JAVA_HOME; for now we are
+// expecting users to set an environment variable
+#else
+  search_prefixes = {
+      "/usr/lib/jvm/default-java",                // ubuntu / debian distros
+      "/usr/lib/jvm/java",                        // rhel6
+      "/usr/lib/jvm",                             // centos6
+      "/usr/lib64/jvm",                           // opensuse 13
+      "/usr/local/lib/jvm/default-java",          // alt ubuntu / debian 
distros
+      "/usr/local/lib/jvm/java",                  // alt rhel6
+      "/usr/local/lib/jvm",                       // alt centos6
+      "/usr/local/lib64/jvm",                     // alt opensuse 13
+      "/usr/local/lib/jvm/java-7-openjdk-amd64",  // alt ubuntu / debian 
distros
+      "/usr/lib/jvm/java-7-openjdk-amd64",        // alt ubuntu / debian 
distros
+      "/usr/local/lib/jvm/java-6-openjdk-amd64",  // alt ubuntu / debian 
distros
+      "/usr/lib/jvm/java-6-openjdk-amd64",        // alt ubuntu / debian 
distros
+      "/usr/lib/jvm/java-7-oracle",               // alt ubuntu
+      "/usr/lib/jvm/java-8-oracle",               // alt ubuntu
+      "/usr/lib/jvm/java-6-oracle",               // alt ubuntu
+      "/usr/local/lib/jvm/java-7-oracle",         // alt ubuntu
+      "/usr/local/lib/jvm/java-8-oracle",         // alt ubuntu
+      "/usr/local/lib/jvm/java-6-oracle",         // alt ubuntu
+      "/usr/lib/jvm/default",                     // alt centos
+      "/usr/java/latest",                         // alt centos
+  };
+  search_suffixes = {"/jre/lib/amd64/server"};
+  file_name = "libjvm.so";
+#endif
+  // From direct environment variable
+  char* env_value = NULL;
+  if ((env_value = getenv("JAVA_HOME")) != NULL) {
+    search_prefixes.insert(search_prefixes.begin(), env_value);
+  }
+
+  // Generate cross product between search_prefixes, search_suffixes, and 
file_name
+  for (auto& prefix : search_prefixes) {
+    for (auto& suffix : search_suffixes) {
+      auto path = (fs::path(prefix) / fs::path(suffix) / fs::path(file_name));
+      libjvm_potential_paths.push_back(path);
+    }
+  }
+
+  return libjvm_potential_paths;
+}
+
+#ifndef _WIN32
+static arrow::Status try_dlopen(
+    std::vector<fs::path> potential_paths, const char* name, void*& 
out_handle) {
+  std::vector<std::string> error_messages;
+
+  for (auto& i : potential_paths) {
+    i.make_preferred();
+    out_handle = dlopen(i.native().c_str(), RTLD_NOW | RTLD_LOCAL);
+
+    if (out_handle != NULL) {
+      // std::cout << "Loaded " << i << std::endl;
+      break;
+    } else {
+      const char* err_msg = dlerror();
+      if (err_msg != NULL) {
+        error_messages.push_back(std::string(err_msg));
+      } else {
+        error_messages.push_back(std::string(" returned NULL"));
+      }
+    }
+  }
+
+  if (out_handle == NULL) {
+    std::stringstream ss;
+    ss << "Unable to load " << name;
+    return arrow::Status::IOError(ss.str());
+  }
+
+  return arrow::Status::OK();
+}
+
+#else
+static arrow::Status try_dlopen(
+    std::vector<fs::path> potential_paths, const char* name, HINSTANCE& 
out_handle) {
+  std::vector<std::string> error_messages;
+
+  for (auto& i : potential_paths) {
+    i.make_preferred();
+    out_handle = LoadLibrary(i.string().c_str());
+
+    if (out_handle != NULL) {
+      break;
+    } else {
+      // error_messages.push_back(get_last_err_str(GetLastError()));
+    }
+  }
+
+  if (out_handle == NULL) {
+    std::stringstream ss;
+    ss << "Unable to load " << name;
+    return arrow::Status::IOError(ss.str());
+  }
+
+  return arrow::Status::OK();
+}
+#endif  // _WIN32
+
+static inline void* GetLibrarySymbol(void* handle, const char* symbol) {
+  if (handle == NULL) return NULL;
+#ifndef _WIN32
+  return dlsym(handle, symbol);
+#else
+
+  void* ret = reinterpret_cast<void*>(GetProcAddress(handle, symbol));
+  if (ret == NULL) {
+    // logstream(LOG_INFO) << "GetProcAddress error: "
+    //                     << get_last_err_str(GetLastError()) << std::endl;
+  }
+  return ret;
+#endif
+}
+
+#define GET_SYMBOL_REQUIRED(SHIM, SYMBOL_NAME)                         \
+  do {                                                                 \
+    if (!SHIM->SYMBOL_NAME) {                                          \
+      *reinterpret_cast<void**>(&SHIM->SYMBOL_NAME) =                  \
+          GetLibrarySymbol(SHIM->handle, "" #SYMBOL_NAME);             \
+    }                                                                  \
+    if (!SHIM->SYMBOL_NAME)                                            \
+      return Status::IOError("Getting symbol " #SYMBOL_NAME "failed"); \
+  } while (0)
+
+#define GET_SYMBOL(SHIM, SYMBOL_NAME)                    \
+  if (!SHIM->SYMBOL_NAME) {                              \
+    *reinterpret_cast<void**>(&SHIM->SYMBOL_NAME) =      \
+        GetLibrarySymbol(SHIM->handle, "" #SYMBOL_NAME); \
+  }
+
+namespace arrow {
+namespace io {
+
+static LibHdfsShim libhdfs_shim;
+static LibHdfsShim libhdfs3_shim;
+
+hdfsBuilder* LibHdfsShim::NewBuilder(void) {
+  return this->hdfsNewBuilder();
+}
+
+void LibHdfsShim::BuilderSetNameNode(hdfsBuilder* bld, const char* nn) {
+  this->hdfsBuilderSetNameNode(bld, nn);
+}
+
+void LibHdfsShim::BuilderSetNameNodePort(hdfsBuilder* bld, tPort port) {
+  this->hdfsBuilderSetNameNodePort(bld, port);
+}
+
+void LibHdfsShim::BuilderSetUserName(hdfsBuilder* bld, const char* userName) {
+  this->hdfsBuilderSetUserName(bld, userName);
+}
+
+void LibHdfsShim::BuilderSetKerbTicketCachePath(
+    hdfsBuilder* bld, const char* kerbTicketCachePath) {
+  this->hdfsBuilderSetKerbTicketCachePath(bld, kerbTicketCachePath);
+}
+
+hdfsFS LibHdfsShim::BuilderConnect(hdfsBuilder* bld) {
+  return this->hdfsBuilderConnect(bld);
+}
+
+int LibHdfsShim::Disconnect(hdfsFS fs) {
+  return this->hdfsDisconnect(fs);
+}
+
+hdfsFile LibHdfsShim::OpenFile(hdfsFS fs, const char* path, int flags, int 
bufferSize,
+    short replication, tSize blocksize) {  // NOLINT
+  return this->hdfsOpenFile(fs, path, flags, bufferSize, replication, 
blocksize);
+}
+
+int LibHdfsShim::CloseFile(hdfsFS fs, hdfsFile file) {
+  return this->hdfsCloseFile(fs, file);
+}
+
+int LibHdfsShim::Exists(hdfsFS fs, const char* path) {
+  return this->hdfsExists(fs, path);
+}
+
+int LibHdfsShim::Seek(hdfsFS fs, hdfsFile file, tOffset desiredPos) {
+  return this->hdfsSeek(fs, file, desiredPos);
+}
+
+tOffset LibHdfsShim::Tell(hdfsFS fs, hdfsFile file) {
+  return this->hdfsTell(fs, file);
+}
+
+tSize LibHdfsShim::Read(hdfsFS fs, hdfsFile file, void* buffer, tSize length) {
+  return this->hdfsRead(fs, file, buffer, length);
+}
+
+bool LibHdfsShim::HasPread() {
+  GET_SYMBOL(this, hdfsPread);
+  return this->hdfsPread != nullptr;
+}
+
+tSize LibHdfsShim::Pread(
+    hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length) {
+  GET_SYMBOL(this, hdfsPread);
+  return this->hdfsPread(fs, file, position, buffer, length);
+}
+
+tSize LibHdfsShim::Write(hdfsFS fs, hdfsFile file, const void* buffer, tSize 
length) {
+  return this->hdfsWrite(fs, file, buffer, length);
+}
+
+int LibHdfsShim::Flush(hdfsFS fs, hdfsFile file) {
+  return this->hdfsFlush(fs, file);
+}
+
+int LibHdfsShim::Available(hdfsFS fs, hdfsFile file) {
+  GET_SYMBOL(this, hdfsAvailable);
+  if (this->hdfsAvailable)
+    return this->hdfsAvailable(fs, file);
+  else
+    return 0;
+}
+
+int LibHdfsShim::Copy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* 
dst) {
+  GET_SYMBOL(this, hdfsCopy);
+  if (this->hdfsCopy)
+    return this->hdfsCopy(srcFS, src, dstFS, dst);
+  else
+    return 0;
+}
+
+int LibHdfsShim::Move(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* 
dst) {
+  GET_SYMBOL(this, hdfsMove);
+  if (this->hdfsMove)
+    return this->hdfsMove(srcFS, src, dstFS, dst);
+  else
+    return 0;
+}
+
+int LibHdfsShim::Delete(hdfsFS fs, const char* path, int recursive) {
+  return this->hdfsDelete(fs, path, recursive);
+}
+
+int LibHdfsShim::Rename(hdfsFS fs, const char* oldPath, const char* newPath) {
+  GET_SYMBOL(this, hdfsRename);
+  if (this->hdfsRename)
+    return this->hdfsRename(fs, oldPath, newPath);
+  else
+    return 0;
+}
+
+char* LibHdfsShim::GetWorkingDirectory(hdfsFS fs, char* buffer, size_t 
bufferSize) {
+  GET_SYMBOL(this, hdfsGetWorkingDirectory);
+  if (this->hdfsGetWorkingDirectory) {
+    return this->hdfsGetWorkingDirectory(fs, buffer, bufferSize);
+  } else {
+    return NULL;
+  }
+}
+
+int LibHdfsShim::SetWorkingDirectory(hdfsFS fs, const char* path) {
+  GET_SYMBOL(this, hdfsSetWorkingDirectory);
+  if (this->hdfsSetWorkingDirectory) {
+    return this->hdfsSetWorkingDirectory(fs, path);
+  } else {
+    return 0;
+  }
+}
+
+int LibHdfsShim::CreateDirectory(hdfsFS fs, const char* path) {
+  return this->hdfsCreateDirectory(fs, path);
+}
+
+int LibHdfsShim::SetReplication(hdfsFS fs, const char* path, int16_t 
replication) {
+  GET_SYMBOL(this, hdfsSetReplication);
+  if (this->hdfsSetReplication) {
+    return this->hdfsSetReplication(fs, path, replication);
+  } else {
+    return 0;
+  }
+}
+
+hdfsFileInfo* LibHdfsShim::ListDirectory(hdfsFS fs, const char* path, int* 
numEntries) {
+  return this->hdfsListDirectory(fs, path, numEntries);
+}
+
+hdfsFileInfo* LibHdfsShim::GetPathInfo(hdfsFS fs, const char* path) {
+  return this->hdfsGetPathInfo(fs, path);
+}
+
+void LibHdfsShim::FreeFileInfo(hdfsFileInfo* hdfsFileInfo, int numEntries) {
+  this->hdfsFreeFileInfo(hdfsFileInfo, numEntries);
+}
+
+char*** LibHdfsShim::GetHosts(
+    hdfsFS fs, const char* path, tOffset start, tOffset length) {
+  GET_SYMBOL(this, hdfsGetHosts);
+  if (this->hdfsGetHosts) {
+    return this->hdfsGetHosts(fs, path, start, length);
+  } else {
+    return NULL;
+  }
+}
+
+void LibHdfsShim::FreeHosts(char*** blockHosts) {
+  GET_SYMBOL(this, hdfsFreeHosts);
+  if (this->hdfsFreeHosts) { this->hdfsFreeHosts(blockHosts); }
+}
+
+tOffset LibHdfsShim::GetDefaultBlockSize(hdfsFS fs) {
+  GET_SYMBOL(this, hdfsGetDefaultBlockSize);
+  if (this->hdfsGetDefaultBlockSize) {
+    return this->hdfsGetDefaultBlockSize(fs);
+  } else {
+    return 0;
+  }
+}
+
+tOffset LibHdfsShim::GetCapacity(hdfsFS fs) {
+  return this->hdfsGetCapacity(fs);
+}
+
+tOffset LibHdfsShim::GetUsed(hdfsFS fs) {
+  return this->hdfsGetUsed(fs);
+}
+
+int LibHdfsShim::Chown(
+    hdfsFS fs, const char* path, const char* owner, const char* group) {
+  GET_SYMBOL(this, hdfsChown);
+  if (this->hdfsChown) {
+    return this->hdfsChown(fs, path, owner, group);
+  } else {
+    return 0;
+  }
+}
+
+int LibHdfsShim::Chmod(hdfsFS fs, const char* path, short mode) {  // NOLINT
+  GET_SYMBOL(this, hdfsChmod);
+  if (this->hdfsChmod) {
+    return this->hdfsChmod(fs, path, mode);
+  } else {
+    return 0;
+  }
+}
+
+int LibHdfsShim::Utime(hdfsFS fs, const char* path, tTime mtime, tTime atime) {
+  GET_SYMBOL(this, hdfsUtime);
+  if (this->hdfsUtime) {
+    return this->hdfsUtime(fs, path, mtime, atime);
+  } else {
+    return 0;
+  }
+}
+
+Status LibHdfsShim::GetRequiredSymbols() {
+  GET_SYMBOL_REQUIRED(this, hdfsNewBuilder);
+  GET_SYMBOL_REQUIRED(this, hdfsBuilderSetNameNode);
+  GET_SYMBOL_REQUIRED(this, hdfsBuilderSetNameNodePort);
+  GET_SYMBOL_REQUIRED(this, hdfsBuilderSetUserName);
+  GET_SYMBOL_REQUIRED(this, hdfsBuilderSetKerbTicketCachePath);
+  GET_SYMBOL_REQUIRED(this, hdfsBuilderConnect);
+  GET_SYMBOL_REQUIRED(this, hdfsCreateDirectory);
+  GET_SYMBOL_REQUIRED(this, hdfsDelete);
+  GET_SYMBOL_REQUIRED(this, hdfsDisconnect);
+  GET_SYMBOL_REQUIRED(this, hdfsExists);
+  GET_SYMBOL_REQUIRED(this, hdfsFreeFileInfo);
+  GET_SYMBOL_REQUIRED(this, hdfsGetCapacity);
+  GET_SYMBOL_REQUIRED(this, hdfsGetUsed);
+  GET_SYMBOL_REQUIRED(this, hdfsGetPathInfo);
+  GET_SYMBOL_REQUIRED(this, hdfsListDirectory);
+
+  // File methods
+  GET_SYMBOL_REQUIRED(this, hdfsCloseFile);
+  GET_SYMBOL_REQUIRED(this, hdfsFlush);
+  GET_SYMBOL_REQUIRED(this, hdfsOpenFile);
+  GET_SYMBOL_REQUIRED(this, hdfsRead);
+  GET_SYMBOL_REQUIRED(this, hdfsSeek);
+  GET_SYMBOL_REQUIRED(this, hdfsTell);
+  GET_SYMBOL_REQUIRED(this, hdfsWrite);
+
+  return Status::OK();
+}
+
+Status ARROW_EXPORT ConnectLibHdfs(LibHdfsShim** driver) {
+  static std::mutex lock;
+  std::lock_guard<std::mutex> guard(lock);
+
+  LibHdfsShim* shim = &libhdfs_shim;
+
+  static bool shim_attempted = false;
+  if (!shim_attempted) {
+    shim_attempted = true;
+
+    shim->Initialize();
+
+    std::vector<fs::path> libjvm_potential_paths = 
get_potential_libjvm_paths();
+    RETURN_NOT_OK(try_dlopen(libjvm_potential_paths, "libjvm", libjvm_handle));
+
+    std::vector<fs::path> libhdfs_potential_paths = 
get_potential_libhdfs_paths();
+    RETURN_NOT_OK(try_dlopen(libhdfs_potential_paths, "libhdfs", 
shim->handle));
+  } else if (shim->handle == nullptr) {
+    return Status::IOError("Prior attempt to load libhdfs failed");
+  }
+
+  *driver = shim;
+  return shim->GetRequiredSymbols();
+}
+
+Status ARROW_EXPORT ConnectLibHdfs3(LibHdfsShim** driver) {
+  static std::mutex lock;
+  std::lock_guard<std::mutex> guard(lock);
+
+  LibHdfsShim* shim = &libhdfs3_shim;
+
+  static bool shim_attempted = false;
+  if (!shim_attempted) {
+    shim_attempted = true;
+
+    shim->Initialize();
+
+    std::vector<fs::path> libhdfs3_potential_paths = 
get_potential_libhdfs3_paths();
+    RETURN_NOT_OK(try_dlopen(libhdfs3_potential_paths, "libhdfs3", 
shim->handle));
+  } else if (shim->handle == nullptr) {
+    return Status::IOError("Prior attempt to load libhdfs3 failed");
+  }
+
+  *driver = shim;
+  return shim->GetRequiredSymbols();
+}
+
+}  // namespace io
+}  // namespace arrow
+
+#endif  // HAS_HADOOP

http://git-wip-us.apache.org/repos/asf/arrow/blob/cfde4607/cpp/src/arrow/io/hdfs-internal.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs-internal.h b/cpp/src/arrow/io/hdfs-internal.h
new file mode 100644
index 0000000..0ff118a
--- /dev/null
+++ b/cpp/src/arrow/io/hdfs-internal.h
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_IO_HDFS_INTERNAL
+#define ARROW_IO_HDFS_INTERNAL
+
+#include <hdfs.h>
+
+namespace arrow {
+
+class Status;
+
+namespace io {
+
+// NOTE(wesm): cpplint does not like use of short and other imprecise C types
+struct LibHdfsShim {
+#ifndef _WIN32
+  void* handle;
+#else
+  HINSTANCE handle;
+#endif
+
+  hdfsBuilder* (*hdfsNewBuilder)(void);
+  void (*hdfsBuilderSetNameNode)(hdfsBuilder* bld, const char* nn);
+  void (*hdfsBuilderSetNameNodePort)(hdfsBuilder* bld, tPort port);
+  void (*hdfsBuilderSetUserName)(hdfsBuilder* bld, const char* userName);
+  void (*hdfsBuilderSetKerbTicketCachePath)(
+      hdfsBuilder* bld, const char* kerbTicketCachePath);
+  hdfsFS (*hdfsBuilderConnect)(hdfsBuilder* bld);
+
+  int (*hdfsDisconnect)(hdfsFS fs);
+
+  hdfsFile (*hdfsOpenFile)(hdfsFS fs, const char* path, int flags, int 
bufferSize,
+      short replication, tSize blocksize);  // NOLINT
+
+  int (*hdfsCloseFile)(hdfsFS fs, hdfsFile file);
+  int (*hdfsExists)(hdfsFS fs, const char* path);
+  int (*hdfsSeek)(hdfsFS fs, hdfsFile file, tOffset desiredPos);
+  tOffset (*hdfsTell)(hdfsFS fs, hdfsFile file);
+  tSize (*hdfsRead)(hdfsFS fs, hdfsFile file, void* buffer, tSize length);
+  tSize (*hdfsPread)(
+      hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length);
+  tSize (*hdfsWrite)(hdfsFS fs, hdfsFile file, const void* buffer, tSize 
length);
+  int (*hdfsFlush)(hdfsFS fs, hdfsFile file);
+  int (*hdfsAvailable)(hdfsFS fs, hdfsFile file);
+  int (*hdfsCopy)(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* 
dst);
+  int (*hdfsMove)(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* 
dst);
+  int (*hdfsDelete)(hdfsFS fs, const char* path, int recursive);
+  int (*hdfsRename)(hdfsFS fs, const char* oldPath, const char* newPath);
+  char* (*hdfsGetWorkingDirectory)(hdfsFS fs, char* buffer, size_t bufferSize);
+  int (*hdfsSetWorkingDirectory)(hdfsFS fs, const char* path);
+  int (*hdfsCreateDirectory)(hdfsFS fs, const char* path);
+  int (*hdfsSetReplication)(hdfsFS fs, const char* path, int16_t replication);
+  hdfsFileInfo* (*hdfsListDirectory)(hdfsFS fs, const char* path, int* 
numEntries);
+  hdfsFileInfo* (*hdfsGetPathInfo)(hdfsFS fs, const char* path);
+  void (*hdfsFreeFileInfo)(hdfsFileInfo* hdfsFileInfo, int numEntries);
+  char*** (*hdfsGetHosts)(hdfsFS fs, const char* path, tOffset start, tOffset 
length);
+  void (*hdfsFreeHosts)(char*** blockHosts);
+  tOffset (*hdfsGetDefaultBlockSize)(hdfsFS fs);
+  tOffset (*hdfsGetCapacity)(hdfsFS fs);
+  tOffset (*hdfsGetUsed)(hdfsFS fs);
+  int (*hdfsChown)(hdfsFS fs, const char* path, const char* owner, const char* 
group);
+  int (*hdfsChmod)(hdfsFS fs, const char* path, short mode);  // NOLINT
+  int (*hdfsUtime)(hdfsFS fs, const char* path, tTime mtime, tTime atime);
+
+  void Initialize() {
+    this->handle = nullptr;
+    this->hdfsNewBuilder = nullptr;
+    this->hdfsBuilderSetNameNode = nullptr;
+    this->hdfsBuilderSetNameNodePort = nullptr;
+    this->hdfsBuilderSetUserName = nullptr;
+    this->hdfsBuilderSetKerbTicketCachePath = nullptr;
+    this->hdfsBuilderConnect = nullptr;
+    this->hdfsDisconnect = nullptr;
+    this->hdfsOpenFile = nullptr;
+    this->hdfsCloseFile = nullptr;
+    this->hdfsExists = nullptr;
+    this->hdfsSeek = nullptr;
+    this->hdfsTell = nullptr;
+    this->hdfsRead = nullptr;
+    this->hdfsPread = nullptr;
+    this->hdfsWrite = nullptr;
+    this->hdfsFlush = nullptr;
+    this->hdfsAvailable = nullptr;
+    this->hdfsCopy = nullptr;
+    this->hdfsMove = nullptr;
+    this->hdfsDelete = nullptr;
+    this->hdfsRename = nullptr;
+    this->hdfsGetWorkingDirectory = nullptr;
+    this->hdfsSetWorkingDirectory = nullptr;
+    this->hdfsCreateDirectory = nullptr;
+    this->hdfsSetReplication = nullptr;
+    this->hdfsListDirectory = nullptr;
+    this->hdfsGetPathInfo = nullptr;
+    this->hdfsFreeFileInfo = nullptr;
+    this->hdfsGetHosts = nullptr;
+    this->hdfsFreeHosts = nullptr;
+    this->hdfsGetDefaultBlockSize = nullptr;
+    this->hdfsGetCapacity = nullptr;
+    this->hdfsGetUsed = nullptr;
+    this->hdfsChown = nullptr;
+    this->hdfsChmod = nullptr;
+    this->hdfsUtime = nullptr;
+  }
+
+  hdfsBuilder* NewBuilder(void);
+
+  void BuilderSetNameNode(hdfsBuilder* bld, const char* nn);
+
+  void BuilderSetNameNodePort(hdfsBuilder* bld, tPort port);
+
+  void BuilderSetUserName(hdfsBuilder* bld, const char* userName);
+
+  void BuilderSetKerbTicketCachePath(hdfsBuilder* bld, const char* 
kerbTicketCachePath);
+
+  hdfsFS BuilderConnect(hdfsBuilder* bld);
+
+  int Disconnect(hdfsFS fs);
+
+  hdfsFile OpenFile(hdfsFS fs, const char* path, int flags, int bufferSize,
+      short replication, tSize blocksize);  // NOLINT
+
+  int CloseFile(hdfsFS fs, hdfsFile file);
+
+  int Exists(hdfsFS fs, const char* path);
+
+  int Seek(hdfsFS fs, hdfsFile file, tOffset desiredPos);
+
+  tOffset Tell(hdfsFS fs, hdfsFile file);
+
+  tSize Read(hdfsFS fs, hdfsFile file, void* buffer, tSize length);
+
+  bool HasPread();
+
+  tSize Pread(hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize 
length);
+
+  tSize Write(hdfsFS fs, hdfsFile file, const void* buffer, tSize length);
+
+  int Flush(hdfsFS fs, hdfsFile file);
+
+  int Available(hdfsFS fs, hdfsFile file);
+
+  int Copy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst);
+
+  int Move(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst);
+
+  int Delete(hdfsFS fs, const char* path, int recursive);
+
+  int Rename(hdfsFS fs, const char* oldPath, const char* newPath);
+
+  char* GetWorkingDirectory(hdfsFS fs, char* buffer, size_t bufferSize);
+
+  int SetWorkingDirectory(hdfsFS fs, const char* path);
+
+  int CreateDirectory(hdfsFS fs, const char* path);
+
+  int SetReplication(hdfsFS fs, const char* path, int16_t replication);
+
+  hdfsFileInfo* ListDirectory(hdfsFS fs, const char* path, int* numEntries);
+
+  hdfsFileInfo* GetPathInfo(hdfsFS fs, const char* path);
+
+  void FreeFileInfo(hdfsFileInfo* hdfsFileInfo, int numEntries);
+
+  char*** GetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length);
+
+  void FreeHosts(char*** blockHosts);
+
+  tOffset GetDefaultBlockSize(hdfsFS fs);
+  tOffset GetCapacity(hdfsFS fs);
+
+  tOffset GetUsed(hdfsFS fs);
+
+  int Chown(hdfsFS fs, const char* path, const char* owner, const char* group);
+
+  int Chmod(hdfsFS fs, const char* path, short mode);  // NOLINT
+
+  int Utime(hdfsFS fs, const char* path, tTime mtime, tTime atime);
+
+  Status GetRequiredSymbols();
+};
+
+Status ConnectLibHdfs(LibHdfsShim** driver);
+Status ConnectLibHdfs3(LibHdfsShim** driver);
+
+}  // namespace io
+}  // namespace arrow
+
+#endif  // ARROW_IO_HDFS_INTERNAL

http://git-wip-us.apache.org/repos/asf/arrow/blob/cfde4607/cpp/src/arrow/io/hdfs.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc
index b8e2120..44e503f 100644
--- a/cpp/src/arrow/io/hdfs.cc
+++ b/cpp/src/arrow/io/hdfs.cc
@@ -23,6 +23,7 @@
 #include <string>
 
 #include "arrow/buffer.h"
+#include "arrow/io/hdfs-internal.h"
 #include "arrow/io/hdfs.h"
 #include "arrow/memory_pool.h"
 #include "arrow/status.h"
@@ -59,21 +60,23 @@ static constexpr int kDefaultHdfsBufferSize = 1 << 16;
 
 class HdfsAnyFileImpl {
  public:
-  void set_members(const std::string& path, hdfsFS fs, hdfsFile handle) {
+  void set_members(
+      const std::string& path, LibHdfsShim* driver, hdfsFS fs, hdfsFile 
handle) {
     path_ = path;
+    driver_ = driver;
     fs_ = fs;
     file_ = handle;
     is_open_ = true;
   }
 
   Status Seek(int64_t position) {
-    int ret = hdfsSeek(fs_, file_, position);
+    int ret = driver_->Seek(fs_, file_, position);
     CHECK_FAILURE(ret, "seek");
     return Status::OK();
   }
 
   Status Tell(int64_t* offset) {
-    int64_t ret = hdfsTell(fs_, file_);
+    int64_t ret = driver_->Tell(fs_, file_);
     CHECK_FAILURE(ret, "tell");
     *offset = ret;
     return Status::OK();
@@ -84,6 +87,8 @@ class HdfsAnyFileImpl {
  protected:
   std::string path_;
 
+  LibHdfsShim* driver_;
+
   // These are pointers in libhdfs, so OK to copy
   hdfsFS fs_;
   hdfsFile file_;
@@ -98,7 +103,7 @@ class HdfsReadableFile::HdfsReadableFileImpl : public 
HdfsAnyFileImpl {
 
   Status Close() {
     if (is_open_) {
-      int ret = hdfsCloseFile(fs_, file_);
+      int ret = driver_->CloseFile(fs_, file_);
       CHECK_FAILURE(ret, "CloseFile");
       is_open_ = false;
     }
@@ -106,8 +111,14 @@ class HdfsReadableFile::HdfsReadableFileImpl : public 
HdfsAnyFileImpl {
   }
 
   Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, 
uint8_t* buffer) {
-    tSize ret = hdfsPread(fs_, file_, static_cast<tOffset>(position),
-        reinterpret_cast<void*>(buffer), nbytes);
+    tSize ret;
+    if (driver_->HasPread()) {
+      ret = driver_->Pread(fs_, file_, static_cast<tOffset>(position),
+          reinterpret_cast<void*>(buffer), nbytes);
+    } else {
+      RETURN_NOT_OK(Seek(position));
+      return Read(nbytes, bytes_read, buffer);
+    }
     RETURN_NOT_OK(CheckReadResult(ret));
     *bytes_read = ret;
     return Status::OK();
@@ -129,7 +140,7 @@ class HdfsReadableFile::HdfsReadableFileImpl : public 
HdfsAnyFileImpl {
   Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
     int64_t total_bytes = 0;
     while (total_bytes < nbytes) {
-      tSize ret = hdfsRead(fs_, file_, reinterpret_cast<void*>(buffer + 
total_bytes),
+      tSize ret = driver_->Read(fs_, file_, reinterpret_cast<void*>(buffer + 
total_bytes),
           std::min<int64_t>(buffer_size_, nbytes - total_bytes));
       RETURN_NOT_OK(CheckReadResult(ret));
       total_bytes += ret;
@@ -153,11 +164,11 @@ class HdfsReadableFile::HdfsReadableFileImpl : public 
HdfsAnyFileImpl {
   }
 
   Status GetSize(int64_t* size) {
-    hdfsFileInfo* entry = hdfsGetPathInfo(fs_, path_.c_str());
+    hdfsFileInfo* entry = driver_->GetPathInfo(fs_, path_.c_str());
     if (entry == nullptr) { return Status::IOError("HDFS: GetPathInfo 
failed"); }
 
     *size = entry->mSize;
-    hdfsFreeFileInfo(entry, 1);
+    driver_->FreeFileInfo(entry, 1);
     return Status::OK();
   }
 
@@ -227,9 +238,9 @@ class HdfsOutputStream::HdfsOutputStreamImpl : public 
HdfsAnyFileImpl {
 
   Status Close() {
     if (is_open_) {
-      int ret = hdfsFlush(fs_, file_);
+      int ret = driver_->Flush(fs_, file_);
       CHECK_FAILURE(ret, "Flush");
-      ret = hdfsCloseFile(fs_, file_);
+      ret = driver_->CloseFile(fs_, file_);
       CHECK_FAILURE(ret, "CloseFile");
       is_open_ = false;
     }
@@ -237,7 +248,7 @@ class HdfsOutputStream::HdfsOutputStreamImpl : public 
HdfsAnyFileImpl {
   }
 
   Status Write(const uint8_t* buffer, int64_t nbytes, int64_t* bytes_written) {
-    tSize ret = hdfsWrite(fs_, file_, reinterpret_cast<const void*>(buffer), 
nbytes);
+    tSize ret = driver_->Write(fs_, file_, reinterpret_cast<const 
void*>(buffer), nbytes);
     CHECK_FAILURE(ret, "Write");
     *bytes_written = ret;
     return Status::OK();
@@ -297,17 +308,25 @@ class HdfsClient::HdfsClientImpl {
   HdfsClientImpl() {}
 
   Status Connect(const HdfsConnectionConfig* config) {
-    RETURN_NOT_OK(ConnectLibHdfs());
+    if (config->driver == HdfsDriver::LIBHDFS3) {
+      RETURN_NOT_OK(ConnectLibHdfs3(&driver_));
+    } else {
+      RETURN_NOT_OK(ConnectLibHdfs(&driver_));
+    }
 
     // connect to HDFS with the builder object
-    hdfsBuilder* builder = hdfsNewBuilder();
-    if (!config->host.empty()) { hdfsBuilderSetNameNode(builder, 
config->host.c_str()); }
-    hdfsBuilderSetNameNodePort(builder, config->port);
-    if (!config->user.empty()) { hdfsBuilderSetUserName(builder, 
config->user.c_str()); }
+    hdfsBuilder* builder = driver_->NewBuilder();
+    if (!config->host.empty()) {
+      driver_->BuilderSetNameNode(builder, config->host.c_str());
+    }
+    driver_->BuilderSetNameNodePort(builder, config->port);
+    if (!config->user.empty()) {
+      driver_->BuilderSetUserName(builder, config->user.c_str());
+    }
     if (!config->kerb_ticket.empty()) {
-      hdfsBuilderSetKerbTicketCachePath(builder, config->kerb_ticket.c_str());
+      driver_->BuilderSetKerbTicketCachePath(builder, 
config->kerb_ticket.c_str());
     }
-    fs_ = hdfsBuilderConnect(builder);
+    fs_ = driver_->BuilderConnect(builder);
 
     if (fs_ == nullptr) { return Status::IOError("HDFS connection failed"); }
     namenode_host_ = config->host;
@@ -319,19 +338,19 @@ class HdfsClient::HdfsClientImpl {
   }
 
   Status CreateDirectory(const std::string& path) {
-    int ret = hdfsCreateDirectory(fs_, path.c_str());
+    int ret = driver_->CreateDirectory(fs_, path.c_str());
     CHECK_FAILURE(ret, "create directory");
     return Status::OK();
   }
 
   Status Delete(const std::string& path, bool recursive) {
-    int ret = hdfsDelete(fs_, path.c_str(), static_cast<int>(recursive));
+    int ret = driver_->Delete(fs_, path.c_str(), static_cast<int>(recursive));
     CHECK_FAILURE(ret, "delete");
     return Status::OK();
   }
 
   Status Disconnect() {
-    int ret = hdfsDisconnect(fs_);
+    int ret = driver_->Disconnect(fs_);
     CHECK_FAILURE(ret, "hdfsFS::Disconnect");
     return Status::OK();
   }
@@ -339,38 +358,38 @@ class HdfsClient::HdfsClientImpl {
   bool Exists(const std::string& path) {
     // hdfsExists does not distinguish between RPC failure and the file not
     // existing
-    int ret = hdfsExists(fs_, path.c_str());
+    int ret = driver_->Exists(fs_, path.c_str());
     return ret == 0;
   }
 
   Status GetCapacity(int64_t* nbytes) {
-    tOffset ret = hdfsGetCapacity(fs_);
+    tOffset ret = driver_->GetCapacity(fs_);
     CHECK_FAILURE(ret, "GetCapacity");
     *nbytes = ret;
     return Status::OK();
   }
 
   Status GetUsed(int64_t* nbytes) {
-    tOffset ret = hdfsGetUsed(fs_);
+    tOffset ret = driver_->GetUsed(fs_);
     CHECK_FAILURE(ret, "GetUsed");
     *nbytes = ret;
     return Status::OK();
   }
 
   Status GetPathInfo(const std::string& path, HdfsPathInfo* info) {
-    hdfsFileInfo* entry = hdfsGetPathInfo(fs_, path.c_str());
+    hdfsFileInfo* entry = driver_->GetPathInfo(fs_, path.c_str());
 
     if (entry == nullptr) { return Status::IOError("HDFS: GetPathInfo 
failed"); }
 
     SetPathInfo(entry, info);
-    hdfsFreeFileInfo(entry, 1);
+    driver_->FreeFileInfo(entry, 1);
 
     return Status::OK();
   }
 
   Status ListDirectory(const std::string& path, std::vector<HdfsPathInfo>* 
listing) {
     int num_entries = 0;
-    hdfsFileInfo* entries = hdfsListDirectory(fs_, path.c_str(), &num_entries);
+    hdfsFileInfo* entries = driver_->ListDirectory(fs_, path.c_str(), 
&num_entries);
 
     if (entries == nullptr) {
       // If the directory is empty, entries is NULL but errno is 0. Non-zero
@@ -391,14 +410,14 @@ class HdfsClient::HdfsClientImpl {
     }
 
     // Free libhdfs file info
-    hdfsFreeFileInfo(entries, num_entries);
+    driver_->FreeFileInfo(entries, num_entries);
 
     return Status::OK();
   }
 
   Status OpenReadable(const std::string& path, int32_t buffer_size,
       std::shared_ptr<HdfsReadableFile>* file) {
-    hdfsFile handle = hdfsOpenFile(fs_, path.c_str(), O_RDONLY, buffer_size, 
0, 0);
+    hdfsFile handle = driver_->OpenFile(fs_, path.c_str(), O_RDONLY, 
buffer_size, 0, 0);
 
     if (handle == nullptr) {
       // TODO(wesm): determine cause of failure
@@ -409,7 +428,7 @@ class HdfsClient::HdfsClientImpl {
 
     // std::make_shared does not work with private ctors
     *file = std::shared_ptr<HdfsReadableFile>(new HdfsReadableFile());
-    (*file)->impl_->set_members(path, fs_, handle);
+    (*file)->impl_->set_members(path, driver_, fs_, handle);
     (*file)->impl_->set_buffer_size(buffer_size);
 
     return Status::OK();
@@ -421,7 +440,7 @@ class HdfsClient::HdfsClientImpl {
     int flags = O_WRONLY;
     if (append) flags |= O_APPEND;
 
-    hdfsFile handle = hdfsOpenFile(
+    hdfsFile handle = driver_->OpenFile(
         fs_, path.c_str(), flags, buffer_size, replication, 
default_block_size);
 
     if (handle == nullptr) {
@@ -433,18 +452,20 @@ class HdfsClient::HdfsClientImpl {
 
     // std::make_shared does not work with private ctors
     *file = std::shared_ptr<HdfsOutputStream>(new HdfsOutputStream());
-    (*file)->impl_->set_members(path, fs_, handle);
+    (*file)->impl_->set_members(path, driver_, fs_, handle);
 
     return Status::OK();
   }
 
   Status Rename(const std::string& src, const std::string& dst) {
-    int ret = hdfsRename(fs_, src.c_str(), dst.c_str());
+    int ret = driver_->Rename(fs_, src.c_str(), dst.c_str());
     CHECK_FAILURE(ret, "Rename");
     return Status::OK();
   }
 
  private:
+  LibHdfsShim* driver_;
+
   std::string namenode_host_;
   std::string user_;
   int port_;
@@ -530,5 +551,18 @@ Status HdfsClient::Rename(const std::string& src, const 
std::string& dst) {
   return impl_->Rename(src, dst);
 }
 
+// ----------------------------------------------------------------------
+// Allow public API users to check whether we are set up correctly
+
+Status HaveLibHdfs() {
+  LibHdfsShim* driver;
+  return ConnectLibHdfs(&driver);
+}
+
+Status HaveLibHdfs3() {
+  LibHdfsShim* driver;
+  return ConnectLibHdfs3(&driver);
+}
+
 }  // namespace io
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/cfde4607/cpp/src/arrow/io/hdfs.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs.h b/cpp/src/arrow/io/hdfs.h
index 1c76f15..5cc783e 100644
--- a/cpp/src/arrow/io/hdfs.h
+++ b/cpp/src/arrow/io/hdfs.h
@@ -56,11 +56,14 @@ struct HdfsPathInfo {
   int16_t permissions;
 };
 
+enum class HdfsDriver : char { LIBHDFS, LIBHDFS3 };
+
 struct HdfsConnectionConfig {
   std::string host;
   int port;
   std::string user;
   std::string kerb_ticket;
+  HdfsDriver driver;
 };
 
 class ARROW_EXPORT HdfsClient : public FileSystemClient {
@@ -218,7 +221,8 @@ class ARROW_EXPORT HdfsOutputStream : public OutputStream {
   DISALLOW_COPY_AND_ASSIGN(HdfsOutputStream);
 };
 
-Status ARROW_EXPORT ConnectLibHdfs();
+Status ARROW_EXPORT HaveLibHdfs();
+Status ARROW_EXPORT HaveLibHdfs3();
 
 }  // namespace io
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/cfde4607/cpp/src/arrow/io/io-hdfs-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-hdfs-test.cc b/cpp/src/arrow/io/io-hdfs-test.cc
index e07eaa3..4ef47b8 100644
--- a/cpp/src/arrow/io/io-hdfs-test.cc
+++ b/cpp/src/arrow/io/io-hdfs-test.cc
@@ -24,6 +24,7 @@
 
 #include <boost/filesystem.hpp>  // NOLINT
 
+#include "arrow/io/hdfs-internal.h"
 #include "arrow/io/hdfs.h"
 #include "arrow/status.h"
 #include "arrow/test-util.h"
@@ -37,6 +38,7 @@ std::vector<uint8_t> RandomData(int64_t size) {
   return buffer;
 }
 
+template <typename DRIVER>
 class TestHdfsClient : public ::testing::Test {
  public:
   Status MakeScratchDir() {
@@ -71,15 +73,34 @@ class TestHdfsClient : public ::testing::Test {
     return ss.str();
   }
 
- protected:
   // Set up shared state between unit tests
-  static void SetUpTestCase() {
-    if (!ConnectLibHdfs().ok()) {
-      std::cout << "Loading libhdfs failed, skipping tests gracefully" << 
std::endl;
-      return;
+  void SetUp() {
+    LibHdfsShim* driver_shim;
+
+    client_ = nullptr;
+    scratch_dir_ =
+        
boost::filesystem::unique_path("/tmp/arrow-hdfs/scratch-%%%%").native();
+
+    loaded_driver_ = false;
+
+    Status msg;
+
+    if (DRIVER::type == HdfsDriver::LIBHDFS) {
+      msg = ConnectLibHdfs(&driver_shim);
+      if (!msg.ok()) {
+        std::cout << "Loading libhdfs failed, skipping tests gracefully" << 
std::endl;
+        return;
+      }
+    } else {
+      msg = ConnectLibHdfs3(&driver_shim);
+      if (!msg.ok()) {
+        std::cout << "Loading libhdfs3 failed, skipping tests gracefully. "
+                  << msg.ToString() << std::endl;
+        return;
+      }
     }
 
-    loaded_libhdfs_ = true;
+    loaded_driver_ = true;
 
     const char* host = std::getenv("ARROW_HDFS_TEST_HOST");
     const char* port = std::getenv("ARROW_HDFS_TEST_PORT");
@@ -94,151 +115,159 @@ class TestHdfsClient : public ::testing::Test {
     ASSERT_OK(HdfsClient::Connect(&conf_, &client_));
   }
 
-  static void TearDownTestCase() {
+  void TearDown() {
     if (client_) {
-      EXPECT_OK(client_->Delete(scratch_dir_, true));
+      if (client_->Exists(scratch_dir_)) {
+        EXPECT_OK(client_->Delete(scratch_dir_, true));
+      }
       EXPECT_OK(client_->Disconnect());
     }
   }
 
-  static bool loaded_libhdfs_;
+  HdfsConnectionConfig conf_;
+  bool loaded_driver_;
 
   // Resources shared amongst unit tests
-  static HdfsConnectionConfig conf_;
-  static std::string scratch_dir_;
-  static std::shared_ptr<HdfsClient> client_;
+  std::string scratch_dir_;
+  std::shared_ptr<HdfsClient> client_;
 };
 
-bool TestHdfsClient::loaded_libhdfs_ = false;
-HdfsConnectionConfig TestHdfsClient::conf_ = HdfsConnectionConfig();
+#define SKIP_IF_NO_DRIVER()                                  \
+  if (!this->loaded_driver_) {                               \
+    std::cout << "Driver not loaded, skipping" << std::endl; \
+    return;                                                  \
+  }
 
-std::string TestHdfsClient::scratch_dir_ =
-    boost::filesystem::unique_path("/tmp/arrow-hdfs/scratch-%%%%").native();
+struct JNIDriver {
+  static HdfsDriver type;
+};
 
-std::shared_ptr<HdfsClient> TestHdfsClient::client_ = nullptr;
+struct PivotalDriver {
+  static HdfsDriver type;
+};
 
-#define SKIP_IF_NO_LIBHDFS()                          \
-  if (!loaded_libhdfs_) {                             \
-    std::cout << "No libhdfs, skipping" << std::endl; \
-    return;                                           \
-  }
+HdfsDriver JNIDriver::type = HdfsDriver::LIBHDFS;
+HdfsDriver PivotalDriver::type = HdfsDriver::LIBHDFS3;
+
+typedef ::testing::Types<JNIDriver, PivotalDriver> DriverTypes;
+TYPED_TEST_CASE(TestHdfsClient, DriverTypes);
 
-TEST_F(TestHdfsClient, ConnectsAgain) {
-  SKIP_IF_NO_LIBHDFS();
+TYPED_TEST(TestHdfsClient, ConnectsAgain) {
+  SKIP_IF_NO_DRIVER();
 
   std::shared_ptr<HdfsClient> client;
-  ASSERT_OK(HdfsClient::Connect(&conf_, &client));
+  ASSERT_OK(HdfsClient::Connect(&this->conf_, &client));
   ASSERT_OK(client->Disconnect());
 }
 
-TEST_F(TestHdfsClient, CreateDirectory) {
-  SKIP_IF_NO_LIBHDFS();
+TYPED_TEST(TestHdfsClient, CreateDirectory) {
+  SKIP_IF_NO_DRIVER();
 
-  std::string path = ScratchPath("create-directory");
+  std::string path = this->ScratchPath("create-directory");
 
-  if (client_->Exists(path)) { ASSERT_OK(client_->Delete(path, true)); }
+  if (this->client_->Exists(path)) { ASSERT_OK(this->client_->Delete(path, 
true)); }
 
-  ASSERT_OK(client_->CreateDirectory(path));
-  ASSERT_TRUE(client_->Exists(path));
-  EXPECT_OK(client_->Delete(path, true));
-  ASSERT_FALSE(client_->Exists(path));
+  ASSERT_OK(this->client_->CreateDirectory(path));
+  ASSERT_TRUE(this->client_->Exists(path));
+  EXPECT_OK(this->client_->Delete(path, true));
+  ASSERT_FALSE(this->client_->Exists(path));
 }
 
-TEST_F(TestHdfsClient, GetCapacityUsed) {
-  SKIP_IF_NO_LIBHDFS();
+TYPED_TEST(TestHdfsClient, GetCapacityUsed) {
+  SKIP_IF_NO_DRIVER();
 
   // Who knows what is actually in your DFS cluster, but expect it to have
   // positive used bytes and capacity
   int64_t nbytes = 0;
-  ASSERT_OK(client_->GetCapacity(&nbytes));
+  ASSERT_OK(this->client_->GetCapacity(&nbytes));
   ASSERT_LT(0, nbytes);
 
-  ASSERT_OK(client_->GetUsed(&nbytes));
+  ASSERT_OK(this->client_->GetUsed(&nbytes));
   ASSERT_LT(0, nbytes);
 }
 
-TEST_F(TestHdfsClient, GetPathInfo) {
-  SKIP_IF_NO_LIBHDFS();
+TYPED_TEST(TestHdfsClient, GetPathInfo) {
+  SKIP_IF_NO_DRIVER();
 
   HdfsPathInfo info;
 
-  ASSERT_OK(MakeScratchDir());
+  ASSERT_OK(this->MakeScratchDir());
 
   // Directory info
-  ASSERT_OK(client_->GetPathInfo(scratch_dir_, &info));
+  ASSERT_OK(this->client_->GetPathInfo(this->scratch_dir_, &info));
   ASSERT_EQ(ObjectType::DIRECTORY, info.kind);
-  ASSERT_EQ(HdfsAbsPath(scratch_dir_), info.name);
-  ASSERT_EQ(conf_.user, info.owner);
+  ASSERT_EQ(this->HdfsAbsPath(this->scratch_dir_), info.name);
+  ASSERT_EQ(this->conf_.user, info.owner);
 
   // TODO(wesm): test group, other attrs
 
-  auto path = ScratchPath("test-file");
+  auto path = this->ScratchPath("test-file");
 
   const int size = 100;
 
   std::vector<uint8_t> buffer = RandomData(size);
 
-  ASSERT_OK(WriteDummyFile(path, buffer.data(), size));
-  ASSERT_OK(client_->GetPathInfo(path, &info));
+  ASSERT_OK(this->WriteDummyFile(path, buffer.data(), size));
+  ASSERT_OK(this->client_->GetPathInfo(path, &info));
 
   ASSERT_EQ(ObjectType::FILE, info.kind);
-  ASSERT_EQ(HdfsAbsPath(path), info.name);
-  ASSERT_EQ(conf_.user, info.owner);
+  ASSERT_EQ(this->HdfsAbsPath(path), info.name);
+  ASSERT_EQ(this->conf_.user, info.owner);
   ASSERT_EQ(size, info.size);
 }
 
-TEST_F(TestHdfsClient, AppendToFile) {
-  SKIP_IF_NO_LIBHDFS();
+TYPED_TEST(TestHdfsClient, AppendToFile) {
+  SKIP_IF_NO_DRIVER();
 
-  ASSERT_OK(MakeScratchDir());
+  ASSERT_OK(this->MakeScratchDir());
 
-  auto path = ScratchPath("test-file");
+  auto path = this->ScratchPath("test-file");
   const int size = 100;
 
   std::vector<uint8_t> buffer = RandomData(size);
-  ASSERT_OK(WriteDummyFile(path, buffer.data(), size));
+  ASSERT_OK(this->WriteDummyFile(path, buffer.data(), size));
 
   // now append
-  ASSERT_OK(WriteDummyFile(path, buffer.data(), size, true));
+  ASSERT_OK(this->WriteDummyFile(path, buffer.data(), size, true));
 
   HdfsPathInfo info;
-  ASSERT_OK(client_->GetPathInfo(path, &info));
+  ASSERT_OK(this->client_->GetPathInfo(path, &info));
   ASSERT_EQ(size * 2, info.size);
 }
 
-TEST_F(TestHdfsClient, ListDirectory) {
-  SKIP_IF_NO_LIBHDFS();
+TYPED_TEST(TestHdfsClient, ListDirectory) {
+  SKIP_IF_NO_DRIVER();
 
   const int size = 100;
   std::vector<uint8_t> data = RandomData(size);
 
-  auto p1 = ScratchPath("test-file-1");
-  auto p2 = ScratchPath("test-file-2");
-  auto d1 = ScratchPath("test-dir-1");
+  auto p1 = this->ScratchPath("test-file-1");
+  auto p2 = this->ScratchPath("test-file-2");
+  auto d1 = this->ScratchPath("test-dir-1");
 
-  ASSERT_OK(MakeScratchDir());
-  ASSERT_OK(WriteDummyFile(p1, data.data(), size));
-  ASSERT_OK(WriteDummyFile(p2, data.data(), size / 2));
-  ASSERT_OK(client_->CreateDirectory(d1));
+  ASSERT_OK(this->MakeScratchDir());
+  ASSERT_OK(this->WriteDummyFile(p1, data.data(), size));
+  ASSERT_OK(this->WriteDummyFile(p2, data.data(), size / 2));
+  ASSERT_OK(this->client_->CreateDirectory(d1));
 
   std::vector<HdfsPathInfo> listing;
-  ASSERT_OK(client_->ListDirectory(scratch_dir_, &listing));
+  ASSERT_OK(this->client_->ListDirectory(this->scratch_dir_, &listing));
 
   // Do it again, appends!
-  ASSERT_OK(client_->ListDirectory(scratch_dir_, &listing));
+  ASSERT_OK(this->client_->ListDirectory(this->scratch_dir_, &listing));
 
   ASSERT_EQ(6, static_cast<int>(listing.size()));
 
   // Argh, well, shouldn't expect the listing to be in any particular order
   for (size_t i = 0; i < listing.size(); ++i) {
     const HdfsPathInfo& info = listing[i];
-    if (info.name == HdfsAbsPath(p1)) {
+    if (info.name == this->HdfsAbsPath(p1)) {
       ASSERT_EQ(ObjectType::FILE, info.kind);
       ASSERT_EQ(size, info.size);
-    } else if (info.name == HdfsAbsPath(p2)) {
+    } else if (info.name == this->HdfsAbsPath(p2)) {
       ASSERT_EQ(ObjectType::FILE, info.kind);
       ASSERT_EQ(size / 2, info.size);
-    } else if (info.name == HdfsAbsPath(d1)) {
+    } else if (info.name == this->HdfsAbsPath(d1)) {
       ASSERT_EQ(ObjectType::DIRECTORY, info.kind);
     } else {
       FAIL() << "Unexpected path: " << info.name;
@@ -246,19 +275,19 @@ TEST_F(TestHdfsClient, ListDirectory) {
   }
 }
 
-TEST_F(TestHdfsClient, ReadableMethods) {
-  SKIP_IF_NO_LIBHDFS();
+TYPED_TEST(TestHdfsClient, ReadableMethods) {
+  SKIP_IF_NO_DRIVER();
 
-  ASSERT_OK(MakeScratchDir());
+  ASSERT_OK(this->MakeScratchDir());
 
-  auto path = ScratchPath("test-file");
+  auto path = this->ScratchPath("test-file");
   const int size = 100;
 
   std::vector<uint8_t> data = RandomData(size);
-  ASSERT_OK(WriteDummyFile(path, data.data(), size));
+  ASSERT_OK(this->WriteDummyFile(path, data.data(), size));
 
   std::shared_ptr<HdfsReadableFile> file;
-  ASSERT_OK(client_->OpenReadable(path, &file));
+  ASSERT_OK(this->client_->OpenReadable(path, &file));
 
   // Test GetSize -- move this into its own unit test if ever needed
   int64_t file_size;
@@ -293,19 +322,19 @@ TEST_F(TestHdfsClient, ReadableMethods) {
   ASSERT_EQ(60, position);
 }
 
-TEST_F(TestHdfsClient, LargeFile) {
-  SKIP_IF_NO_LIBHDFS();
+TYPED_TEST(TestHdfsClient, LargeFile) {
+  SKIP_IF_NO_DRIVER();
 
-  ASSERT_OK(MakeScratchDir());
+  ASSERT_OK(this->MakeScratchDir());
 
-  auto path = ScratchPath("test-large-file");
+  auto path = this->ScratchPath("test-large-file");
   const int size = 1000000;
 
   std::vector<uint8_t> data = RandomData(size);
-  ASSERT_OK(WriteDummyFile(path, data.data(), size));
+  ASSERT_OK(this->WriteDummyFile(path, data.data(), size));
 
   std::shared_ptr<HdfsReadableFile> file;
-  ASSERT_OK(client_->OpenReadable(path, &file));
+  ASSERT_OK(this->client_->OpenReadable(path, &file));
 
   auto buffer = std::make_shared<PoolBuffer>();
   ASSERT_OK(buffer->Resize(size));
@@ -317,7 +346,7 @@ TEST_F(TestHdfsClient, LargeFile) {
 
   // explicit buffer size
   std::shared_ptr<HdfsReadableFile> file2;
-  ASSERT_OK(client_->OpenReadable(path, 1 << 18, &file2));
+  ASSERT_OK(this->client_->OpenReadable(path, 1 << 18, &file2));
 
   auto buffer2 = std::make_shared<PoolBuffer>();
   ASSERT_OK(buffer2->Resize(size));
@@ -326,22 +355,22 @@ TEST_F(TestHdfsClient, LargeFile) {
   ASSERT_EQ(size, bytes_read);
 }
 
-TEST_F(TestHdfsClient, RenameFile) {
-  SKIP_IF_NO_LIBHDFS();
+TYPED_TEST(TestHdfsClient, RenameFile) {
+  SKIP_IF_NO_DRIVER();
 
-  ASSERT_OK(MakeScratchDir());
+  ASSERT_OK(this->MakeScratchDir());
 
-  auto src_path = ScratchPath("src-file");
-  auto dst_path = ScratchPath("dst-file");
+  auto src_path = this->ScratchPath("src-file");
+  auto dst_path = this->ScratchPath("dst-file");
   const int size = 100;
 
   std::vector<uint8_t> data = RandomData(size);
-  ASSERT_OK(WriteDummyFile(src_path, data.data(), size));
+  ASSERT_OK(this->WriteDummyFile(src_path, data.data(), size));
 
-  ASSERT_OK(client_->Rename(src_path, dst_path));
+  ASSERT_OK(this->client_->Rename(src_path, dst_path));
 
-  ASSERT_FALSE(client_->Exists(src_path));
-  ASSERT_TRUE(client_->Exists(dst_path));
+  ASSERT_FALSE(this->client_->Exists(src_path));
+  ASSERT_TRUE(this->client_->Exists(dst_path));
 }
 
 }  // namespace io

http://git-wip-us.apache.org/repos/asf/arrow/blob/cfde4607/cpp/src/arrow/io/libhdfs_shim.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/libhdfs_shim.cc b/cpp/src/arrow/io/libhdfs_shim.cc
deleted file mode 100644
index 3715376..0000000
--- a/cpp/src/arrow/io/libhdfs_shim.cc
+++ /dev/null
@@ -1,582 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// This shim interface to libhdfs (for runtime shared library loading) has been
-// adapted from the SFrame project, released under the ASF-compatible 3-clause
-// BSD license
-//
-// Using this required having the $JAVA_HOME and $HADOOP_HOME environment
-// variables set, so that libjvm and libhdfs can be located easily
-
-// Copyright (C) 2015 Dato, Inc.
-// All rights reserved.
-//
-// This software may be modified and distributed under the terms
-// of the BSD license. See the LICENSE file for details.
-
-#ifdef HAS_HADOOP
-
-#ifndef _WIN32
-#include <dlfcn.h>
-#else
-#include <windows.h>
-#include <winsock2.h>
-
-// TODO(wesm): address when/if we add windows support
-// #include <util/syserr_reporting.hpp>
-#endif
-
-extern "C" {
-#include <hdfs.h>
-}
-
-#include <iostream>
-#include <mutex>
-#include <sstream>
-#include <string>
-#include <type_traits>
-#include <vector>
-
-#include <boost/filesystem.hpp>  // NOLINT
-
-#include "arrow/status.h"
-#include "arrow/util/visibility.h"
-
-namespace fs = boost::filesystem;
-
-extern "C" {
-
-#ifndef _WIN32
-static void* libhdfs_handle = NULL;
-static void* libjvm_handle = NULL;
-#else
-static HINSTANCE libhdfs_handle = NULL;
-static HINSTANCE libjvm_handle = NULL;
-#endif
-/*
- * All the shim pointers
- */
-
-// NOTE(wesm): cpplint does not like use of short and other imprecise C types
-
-static hdfsBuilder* (*ptr_hdfsNewBuilder)(void) = NULL;
-static void (*ptr_hdfsBuilderSetNameNode)(hdfsBuilder* bld, const char* nn) = 
NULL;
-static void (*ptr_hdfsBuilderSetNameNodePort)(hdfsBuilder* bld, tPort port) = 
NULL;
-static void (*ptr_hdfsBuilderSetUserName)(hdfsBuilder* bld, const char* 
userName) = NULL;
-static void (*ptr_hdfsBuilderSetKerbTicketCachePath)(
-    hdfsBuilder* bld, const char* kerbTicketCachePath) = NULL;
-static hdfsFS (*ptr_hdfsBuilderConnect)(hdfsBuilder* bld) = NULL;
-
-static int (*ptr_hdfsDisconnect)(hdfsFS fs) = NULL;
-
-static hdfsFile (*ptr_hdfsOpenFile)(hdfsFS fs, const char* path, int flags,
-    int bufferSize, short replication, tSize blocksize) = NULL;  // NOLINT
-
-static int (*ptr_hdfsCloseFile)(hdfsFS fs, hdfsFile file) = NULL;
-static int (*ptr_hdfsExists)(hdfsFS fs, const char* path) = NULL;
-static int (*ptr_hdfsSeek)(hdfsFS fs, hdfsFile file, tOffset desiredPos) = 
NULL;
-static tOffset (*ptr_hdfsTell)(hdfsFS fs, hdfsFile file) = NULL;
-static tSize (*ptr_hdfsRead)(hdfsFS fs, hdfsFile file, void* buffer, tSize 
length) = NULL;
-static tSize (*ptr_hdfsPread)(
-    hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length) = 
NULL;
-static tSize (*ptr_hdfsWrite)(
-    hdfsFS fs, hdfsFile file, const void* buffer, tSize length) = NULL;
-static int (*ptr_hdfsFlush)(hdfsFS fs, hdfsFile file) = NULL;
-static int (*ptr_hdfsAvailable)(hdfsFS fs, hdfsFile file) = NULL;
-static int (*ptr_hdfsCopy)(
-    hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) = NULL;
-static int (*ptr_hdfsMove)(
-    hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) = NULL;
-static int (*ptr_hdfsDelete)(hdfsFS fs, const char* path, int recursive) = 
NULL;
-static int (*ptr_hdfsRename)(hdfsFS fs, const char* oldPath, const char* 
newPath) = NULL;
-static char* (*ptr_hdfsGetWorkingDirectory)(
-    hdfsFS fs, char* buffer, size_t bufferSize) = NULL;
-static int (*ptr_hdfsSetWorkingDirectory)(hdfsFS fs, const char* path) = NULL;
-static int (*ptr_hdfsCreateDirectory)(hdfsFS fs, const char* path) = NULL;
-static int (*ptr_hdfsSetReplication)(
-    hdfsFS fs, const char* path, int16_t replication) = NULL;
-static hdfsFileInfo* (*ptr_hdfsListDirectory)(
-    hdfsFS fs, const char* path, int* numEntries) = NULL;
-static hdfsFileInfo* (*ptr_hdfsGetPathInfo)(hdfsFS fs, const char* path) = 
NULL;
-static void (*ptr_hdfsFreeFileInfo)(hdfsFileInfo* hdfsFileInfo, int 
numEntries) = NULL;
-static char*** (*ptr_hdfsGetHosts)(
-    hdfsFS fs, const char* path, tOffset start, tOffset length) = NULL;
-static void (*ptr_hdfsFreeHosts)(char*** blockHosts) = NULL;
-static tOffset (*ptr_hdfsGetDefaultBlockSize)(hdfsFS fs) = NULL;
-static tOffset (*ptr_hdfsGetCapacity)(hdfsFS fs) = NULL;
-static tOffset (*ptr_hdfsGetUsed)(hdfsFS fs) = NULL;
-static int (*ptr_hdfsChown)(
-    hdfsFS fs, const char* path, const char* owner, const char* group) = NULL;
-static int (*ptr_hdfsChmod)(hdfsFS fs, const char* path, short mode) = NULL;  
// NOLINT
-static int (*ptr_hdfsUtime)(hdfsFS fs, const char* path, tTime mtime, tTime 
atime) = NULL;
-
-// Helper functions for dlopens
-static std::vector<fs::path> get_potential_libjvm_paths();
-static std::vector<fs::path> get_potential_libhdfs_paths();
-static arrow::Status try_dlopen(std::vector<fs::path> potential_paths, const 
char* name,
-#ifndef _WIN32
-    void*& out_handle);
-#else
-    HINSTANCE& out_handle);
-#endif
-
-#define GET_SYMBOL(SYMBOL_NAME)                                                
  \
-  if (!ptr_##SYMBOL_NAME) {                                                    
  \
-    *reinterpret_cast<void**>(&ptr_##SYMBOL_NAME) = get_symbol("" 
#SYMBOL_NAME); \
-  }
-
-static void* get_symbol(const char* symbol) {
-  if (libhdfs_handle == NULL) return NULL;
-#ifndef _WIN32
-  return dlsym(libhdfs_handle, symbol);
-#else
-
-  void* ret = reinterpret_cast<void*>(GetProcAddress(libhdfs_handle, symbol));
-  if (ret == NULL) {
-    // logstream(LOG_INFO) << "GetProcAddress error: "
-    //                     << get_last_err_str(GetLastError()) << std::endl;
-  }
-  return ret;
-#endif
-}
-
-hdfsBuilder* hdfsNewBuilder(void) {
-  return ptr_hdfsNewBuilder();
-}
-
-void hdfsBuilderSetNameNode(hdfsBuilder* bld, const char* nn) {
-  ptr_hdfsBuilderSetNameNode(bld, nn);
-}
-
-void hdfsBuilderSetNameNodePort(hdfsBuilder* bld, tPort port) {
-  ptr_hdfsBuilderSetNameNodePort(bld, port);
-}
-
-void hdfsBuilderSetUserName(hdfsBuilder* bld, const char* userName) {
-  ptr_hdfsBuilderSetUserName(bld, userName);
-}
-
-void hdfsBuilderSetKerbTicketCachePath(
-    hdfsBuilder* bld, const char* kerbTicketCachePath) {
-  ptr_hdfsBuilderSetKerbTicketCachePath(bld, kerbTicketCachePath);
-}
-
-hdfsFS hdfsBuilderConnect(hdfsBuilder* bld) {
-  return ptr_hdfsBuilderConnect(bld);
-}
-
-int hdfsDisconnect(hdfsFS fs) {
-  return ptr_hdfsDisconnect(fs);
-}
-
-hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags, int bufferSize,
-    short replication, tSize blocksize) {  // NOLINT
-  return ptr_hdfsOpenFile(fs, path, flags, bufferSize, replication, blocksize);
-}
-
-int hdfsCloseFile(hdfsFS fs, hdfsFile file) {
-  return ptr_hdfsCloseFile(fs, file);
-}
-
-int hdfsExists(hdfsFS fs, const char* path) {
-  return ptr_hdfsExists(fs, path);
-}
-
-int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) {
-  return ptr_hdfsSeek(fs, file, desiredPos);
-}
-
-tOffset hdfsTell(hdfsFS fs, hdfsFile file) {
-  return ptr_hdfsTell(fs, file);
-}
-
-tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length) {
-  return ptr_hdfsRead(fs, file, buffer, length);
-}
-
-tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void* buffer, 
tSize length) {
-  return ptr_hdfsPread(fs, file, position, buffer, length);
-}
-
-tSize hdfsWrite(hdfsFS fs, hdfsFile file, const void* buffer, tSize length) {
-  return ptr_hdfsWrite(fs, file, buffer, length);
-}
-
-int hdfsFlush(hdfsFS fs, hdfsFile file) {
-  return ptr_hdfsFlush(fs, file);
-}
-
-int hdfsAvailable(hdfsFS fs, hdfsFile file) {
-  GET_SYMBOL(hdfsAvailable);
-  if (ptr_hdfsAvailable)
-    return ptr_hdfsAvailable(fs, file);
-  else
-    return 0;
-}
-
-int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) {
-  GET_SYMBOL(hdfsCopy);
-  if (ptr_hdfsCopy)
-    return ptr_hdfsCopy(srcFS, src, dstFS, dst);
-  else
-    return 0;
-}
-
-int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) {
-  GET_SYMBOL(hdfsMove);
-  if (ptr_hdfsMove)
-    return ptr_hdfsMove(srcFS, src, dstFS, dst);
-  else
-    return 0;
-}
-
-int hdfsDelete(hdfsFS fs, const char* path, int recursive) {
-  return ptr_hdfsDelete(fs, path, recursive);
-}
-
-int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath) {
-  GET_SYMBOL(hdfsRename);
-  if (ptr_hdfsRename)
-    return ptr_hdfsRename(fs, oldPath, newPath);
-  else
-    return 0;
-}
-
-char* hdfsGetWorkingDirectory(hdfsFS fs, char* buffer, size_t bufferSize) {
-  GET_SYMBOL(hdfsGetWorkingDirectory);
-  if (ptr_hdfsGetWorkingDirectory) {
-    return ptr_hdfsGetWorkingDirectory(fs, buffer, bufferSize);
-  } else {
-    return NULL;
-  }
-}
-
-int hdfsSetWorkingDirectory(hdfsFS fs, const char* path) {
-  GET_SYMBOL(hdfsSetWorkingDirectory);
-  if (ptr_hdfsSetWorkingDirectory) {
-    return ptr_hdfsSetWorkingDirectory(fs, path);
-  } else {
-    return 0;
-  }
-}
-
-int hdfsCreateDirectory(hdfsFS fs, const char* path) {
-  return ptr_hdfsCreateDirectory(fs, path);
-}
-
-int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication) {
-  GET_SYMBOL(hdfsSetReplication);
-  if (ptr_hdfsSetReplication) {
-    return ptr_hdfsSetReplication(fs, path, replication);
-  } else {
-    return 0;
-  }
-}
-
-hdfsFileInfo* hdfsListDirectory(hdfsFS fs, const char* path, int* numEntries) {
-  return ptr_hdfsListDirectory(fs, path, numEntries);
-}
-
-hdfsFileInfo* hdfsGetPathInfo(hdfsFS fs, const char* path) {
-  return ptr_hdfsGetPathInfo(fs, path);
-}
-
-void hdfsFreeFileInfo(hdfsFileInfo* hdfsFileInfo, int numEntries) {
-  ptr_hdfsFreeFileInfo(hdfsFileInfo, numEntries);
-}
-
-char*** hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset 
length) {
-  GET_SYMBOL(hdfsGetHosts);
-  if (ptr_hdfsGetHosts) {
-    return ptr_hdfsGetHosts(fs, path, start, length);
-  } else {
-    return NULL;
-  }
-}
-
-void hdfsFreeHosts(char*** blockHosts) {
-  GET_SYMBOL(hdfsFreeHosts);
-  if (ptr_hdfsFreeHosts) { ptr_hdfsFreeHosts(blockHosts); }
-}
-
-tOffset hdfsGetDefaultBlockSize(hdfsFS fs) {
-  GET_SYMBOL(hdfsGetDefaultBlockSize);
-  if (ptr_hdfsGetDefaultBlockSize) {
-    return ptr_hdfsGetDefaultBlockSize(fs);
-  } else {
-    return 0;
-  }
-}
-
-tOffset hdfsGetCapacity(hdfsFS fs) {
-  return ptr_hdfsGetCapacity(fs);
-}
-
-tOffset hdfsGetUsed(hdfsFS fs) {
-  return ptr_hdfsGetUsed(fs);
-}
-
-int hdfsChown(hdfsFS fs, const char* path, const char* owner, const char* 
group) {
-  GET_SYMBOL(hdfsChown);
-  if (ptr_hdfsChown) {
-    return ptr_hdfsChown(fs, path, owner, group);
-  } else {
-    return 0;
-  }
-}
-
-int hdfsChmod(hdfsFS fs, const char* path, short mode) {  // NOLINT
-  GET_SYMBOL(hdfsChmod);
-  if (ptr_hdfsChmod) {
-    return ptr_hdfsChmod(fs, path, mode);
-  } else {
-    return 0;
-  }
-}
-
-int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime) {
-  GET_SYMBOL(hdfsUtime);
-  if (ptr_hdfsUtime) {
-    return ptr_hdfsUtime(fs, path, mtime, atime);
-  } else {
-    return 0;
-  }
-}
-
-static std::vector<fs::path> get_potential_libhdfs_paths() {
-  std::vector<fs::path> libhdfs_potential_paths;
-  std::string file_name;
-
-// OS-specific file name
-#ifdef __WIN32
-  file_name = "hdfs.dll";
-#elif __APPLE__
-  file_name = "libhdfs.dylib";
-#else
-  file_name = "libhdfs.so";
-#endif
-
-  // Common paths
-  std::vector<fs::path> search_paths = {fs::path(""), fs::path(".")};
-
-  // Path from environment variable
-  const char* hadoop_home = std::getenv("HADOOP_HOME");
-  if (hadoop_home != nullptr) {
-    auto path = fs::path(hadoop_home) / "lib/native";
-    search_paths.push_back(path);
-  }
-
-  const char* libhdfs_dir = std::getenv("ARROW_LIBHDFS_DIR");
-  if (libhdfs_dir != nullptr) { search_paths.push_back(fs::path(libhdfs_dir)); 
}
-
-  // All paths with file name
-  for (auto& path : search_paths) {
-    libhdfs_potential_paths.push_back(path / file_name);
-  }
-
-  return libhdfs_potential_paths;
-}
-
-static std::vector<fs::path> get_potential_libjvm_paths() {
-  std::vector<fs::path> libjvm_potential_paths;
-
-  std::vector<fs::path> search_prefixes;
-  std::vector<fs::path> search_suffixes;
-  std::string file_name;
-
-// From heuristics
-#ifdef __WIN32
-  search_prefixes = {""};
-  search_suffixes = {"/jre/bin/server", "/bin/server"};
-  file_name = "jvm.dll";
-#elif __APPLE__
-  search_prefixes = {""};
-  search_suffixes = {"", "/jre/lib/server"};
-  file_name = "libjvm.dylib";
-
-// SFrame uses /usr/libexec/java_home to find JAVA_HOME; for now we are
-// expecting users to set an environment variable
-#else
-  search_prefixes = {
-      "/usr/lib/jvm/default-java",                // ubuntu / debian distros
-      "/usr/lib/jvm/java",                        // rhel6
-      "/usr/lib/jvm",                             // centos6
-      "/usr/lib64/jvm",                           // opensuse 13
-      "/usr/local/lib/jvm/default-java",          // alt ubuntu / debian 
distros
-      "/usr/local/lib/jvm/java",                  // alt rhel6
-      "/usr/local/lib/jvm",                       // alt centos6
-      "/usr/local/lib64/jvm",                     // alt opensuse 13
-      "/usr/local/lib/jvm/java-7-openjdk-amd64",  // alt ubuntu / debian 
distros
-      "/usr/lib/jvm/java-7-openjdk-amd64",        // alt ubuntu / debian 
distros
-      "/usr/local/lib/jvm/java-6-openjdk-amd64",  // alt ubuntu / debian 
distros
-      "/usr/lib/jvm/java-6-openjdk-amd64",        // alt ubuntu / debian 
distros
-      "/usr/lib/jvm/java-7-oracle",               // alt ubuntu
-      "/usr/lib/jvm/java-8-oracle",               // alt ubuntu
-      "/usr/lib/jvm/java-6-oracle",               // alt ubuntu
-      "/usr/local/lib/jvm/java-7-oracle",         // alt ubuntu
-      "/usr/local/lib/jvm/java-8-oracle",         // alt ubuntu
-      "/usr/local/lib/jvm/java-6-oracle",         // alt ubuntu
-      "/usr/lib/jvm/default",                     // alt centos
-      "/usr/java/latest",                         // alt centos
-  };
-  search_suffixes = {"/jre/lib/amd64/server"};
-  file_name = "libjvm.so";
-#endif
-  // From direct environment variable
-  char* env_value = NULL;
-  if ((env_value = getenv("JAVA_HOME")) != NULL) {
-    // logstream(LOG_INFO) << "Found environment variable " << env_name << ": 
" <<
-    // env_value << std::endl;
-    search_prefixes.insert(search_prefixes.begin(), env_value);
-  }
-
-  // Generate cross product between search_prefixes, search_suffixes, and 
file_name
-  for (auto& prefix : search_prefixes) {
-    for (auto& suffix : search_suffixes) {
-      auto path = (fs::path(prefix) / fs::path(suffix) / fs::path(file_name));
-      libjvm_potential_paths.push_back(path);
-    }
-  }
-
-  return libjvm_potential_paths;
-}
-
-#ifndef _WIN32
-static arrow::Status try_dlopen(
-    std::vector<fs::path> potential_paths, const char* name, void*& 
out_handle) {
-  std::vector<std::string> error_messages;
-
-  for (auto& i : potential_paths) {
-    i.make_preferred();
-    // logstream(LOG_INFO) << "Trying " << i.string().c_str() << std::endl;
-    out_handle = dlopen(i.native().c_str(), RTLD_NOW | RTLD_LOCAL);
-
-    if (out_handle != NULL) {
-      // logstream(LOG_INFO) << "Success!" << std::endl;
-      break;
-    } else {
-      const char* err_msg = dlerror();
-      if (err_msg != NULL) {
-        error_messages.push_back(std::string(err_msg));
-      } else {
-        error_messages.push_back(std::string(" returned NULL"));
-      }
-    }
-  }
-
-  if (out_handle == NULL) {
-    std::stringstream ss;
-    ss << "Unable to load " << name;
-    return arrow::Status::IOError(ss.str());
-  }
-
-  return arrow::Status::OK();
-}
-
-#else
-static arrow::Status try_dlopen(
-    std::vector<fs::path> potential_paths, const char* name, HINSTANCE& 
out_handle) {
-  std::vector<std::string> error_messages;
-
-  for (auto& i : potential_paths) {
-    i.make_preferred();
-    // logstream(LOG_INFO) << "Trying " << i.string().c_str() << std::endl;
-
-    out_handle = LoadLibrary(i.string().c_str());
-
-    if (out_handle != NULL) {
-      // logstream(LOG_INFO) << "Success!" << std::endl;
-      break;
-    } else {
-      // error_messages.push_back(get_last_err_str(GetLastError()));
-    }
-  }
-
-  if (out_handle == NULL) {
-    std::stringstream ss;
-    ss << "Unable to load " << name;
-    return arrow::Status::IOError(ss.str());
-  }
-
-  return arrow::Status::OK();
-}
-#endif  // _WIN32
-
-}  // extern "C"
-
-#define GET_SYMBOL_REQUIRED(SYMBOL_NAME)                                       
    \
-  do {                                                                         
    \
-    if (!ptr_##SYMBOL_NAME) {                                                  
    \
-      *reinterpret_cast<void**>(&ptr_##SYMBOL_NAME) = get_symbol("" 
#SYMBOL_NAME); \
-    }                                                                          
    \
-    if (!ptr_##SYMBOL_NAME)                                                    
    \
-      return Status::IOError("Getting symbol " #SYMBOL_NAME "failed");         
    \
-  } while (0)
-
-namespace arrow {
-namespace io {
-
-Status ARROW_EXPORT ConnectLibHdfs() {
-  static std::mutex lock;
-  std::lock_guard<std::mutex> guard(lock);
-
-  static bool shim_attempted = false;
-  if (!shim_attempted) {
-    shim_attempted = true;
-
-    std::vector<fs::path> libjvm_potential_paths = 
get_potential_libjvm_paths();
-    RETURN_NOT_OK(try_dlopen(libjvm_potential_paths, "libjvm", libjvm_handle));
-
-    std::vector<fs::path> libhdfs_potential_paths = 
get_potential_libhdfs_paths();
-    RETURN_NOT_OK(try_dlopen(libhdfs_potential_paths, "libhdfs", 
libhdfs_handle));
-  } else if (libhdfs_handle == nullptr) {
-    return Status::IOError("Prior attempt to load libhdfs failed");
-  }
-
-  GET_SYMBOL_REQUIRED(hdfsNewBuilder);
-  GET_SYMBOL_REQUIRED(hdfsBuilderSetNameNode);
-  GET_SYMBOL_REQUIRED(hdfsBuilderSetNameNodePort);
-  GET_SYMBOL_REQUIRED(hdfsBuilderSetUserName);
-  GET_SYMBOL_REQUIRED(hdfsBuilderSetKerbTicketCachePath);
-  GET_SYMBOL_REQUIRED(hdfsBuilderConnect);
-  GET_SYMBOL_REQUIRED(hdfsCreateDirectory);
-  GET_SYMBOL_REQUIRED(hdfsDelete);
-  GET_SYMBOL_REQUIRED(hdfsDisconnect);
-  GET_SYMBOL_REQUIRED(hdfsExists);
-  GET_SYMBOL_REQUIRED(hdfsFreeFileInfo);
-  GET_SYMBOL_REQUIRED(hdfsGetCapacity);
-  GET_SYMBOL_REQUIRED(hdfsGetUsed);
-  GET_SYMBOL_REQUIRED(hdfsGetPathInfo);
-  GET_SYMBOL_REQUIRED(hdfsListDirectory);
-
-  // File methods
-  GET_SYMBOL_REQUIRED(hdfsCloseFile);
-  GET_SYMBOL_REQUIRED(hdfsFlush);
-  GET_SYMBOL_REQUIRED(hdfsOpenFile);
-  GET_SYMBOL_REQUIRED(hdfsRead);
-  GET_SYMBOL_REQUIRED(hdfsPread);
-  GET_SYMBOL_REQUIRED(hdfsSeek);
-  GET_SYMBOL_REQUIRED(hdfsTell);
-  GET_SYMBOL_REQUIRED(hdfsWrite);
-
-  return Status::OK();
-}
-
-}  // namespace io
-}  // namespace arrow
-
-#endif  // HAS_HADOOP

http://git-wip-us.apache.org/repos/asf/arrow/blob/cfde4607/python/.gitignore
----------------------------------------------------------------------
diff --git a/python/.gitignore b/python/.gitignore
index c37efc4..4ab8020 100644
--- a/python/.gitignore
+++ b/python/.gitignore
@@ -16,6 +16,7 @@ Testing/
 *.c
 *.cpp
 pyarrow/version.py
+pyarrow/table_api.h
 # Python files
 
 # setup.py working directory

http://git-wip-us.apache.org/repos/asf/arrow/blob/cfde4607/python/pyarrow/includes/libarrow_io.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow_io.pxd 
b/python/pyarrow/includes/libarrow_io.pxd
index 7703415..99f88ad 100644
--- a/python/pyarrow/includes/libarrow_io.pxd
+++ b/python/pyarrow/includes/libarrow_io.pxd
@@ -87,13 +87,19 @@ cdef extern from "arrow/io/file.h" namespace "arrow::io" 
nogil:
 
 
 cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil:
-    CStatus ConnectLibHdfs()
+    CStatus HaveLibHdfs()
+    CStatus HaveLibHdfs3()
+
+    enum HdfsDriver" arrow::io::HdfsDriver":
+        HdfsDriver_LIBHDFS" arrow::io::HdfsDriver::LIBHDFS"
+        HdfsDriver_LIBHDFS3" arrow::io::HdfsDriver::LIBHDFS3"
 
     cdef cppclass HdfsConnectionConfig:
         c_string host
         int port
         c_string user
         c_string kerb_ticket
+        HdfsDriver driver
 
     cdef cppclass HdfsPathInfo:
         ObjectType kind;

http://git-wip-us.apache.org/repos/asf/arrow/blob/cfde4607/python/pyarrow/io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx
index 2fa5fb6..6b0e392 100644
--- a/python/pyarrow/io.pyx
+++ b/python/pyarrow/io.pyx
@@ -35,6 +35,7 @@ cimport cpython as cp
 import re
 import sys
 import threading
+import time
 
 
 cdef class NativeFile:
@@ -269,7 +270,15 @@ except ImportError:
 
 def have_libhdfs():
     try:
-        check_status(ConnectLibHdfs())
+        check_status(HaveLibHdfs())
+        return True
+    except:
+        return False
+
+
+def have_libhdfs3():
+    try:
+        check_status(HaveLibHdfs3())
         return True
     except:
         return False
@@ -313,7 +322,8 @@ cdef class HdfsClient:
             raise IOError('HDFS client is closed')
 
     @classmethod
-    def connect(cls, host="default", port=0, user=None, kerb_ticket=None):
+    def connect(cls, host="default", port=0, user=None, kerb_ticket=None,
+                driver='libhdfs'):
         """
         Connect to an HDFS cluster. All parameters are optional and should
         only be set if the defaults need to be overridden.
@@ -328,6 +338,9 @@ cdef class HdfsClient:
         port : NameNode's port. Set to 0 for default or logical (HA) nodes.
         user : Username when connecting to HDFS; None implies login user.
         kerb_ticket : Path to Kerberos ticket cache.
+        driver : {'libhdfs', 'libhdfs3'}, default 'libhdfs'
+          Connect using libhdfs (JNI-based) or libhdfs3 (3rd-party C++
+          library from Pivotal Labs)
 
         Notes
         -----
@@ -350,6 +363,13 @@ cdef class HdfsClient:
         if kerb_ticket is not None:
             conf.kerb_ticket = tobytes(kerb_ticket)
 
+        if driver == 'libhdfs':
+            check_status(HaveLibHdfs())
+            conf.driver = HdfsDriver_LIBHDFS
+        else:
+            check_status(HaveLibHdfs3())
+            conf.driver = HdfsDriver_LIBHDFS3
+
         with nogil:
             check_status(CHdfsClient.Connect(&conf, &out.client))
         out.is_open = True
@@ -541,6 +561,12 @@ cdef class HdfsClient:
                     if not buf:
                         break
 
+                    if writer_thread.is_alive():
+                        while write_queue.full():
+                            time.sleep(0.01)
+                    else:
+                        break
+
                     write_queue.put_nowait(buf)
             finally:
                 done = True
@@ -609,22 +635,13 @@ cdef class HdfsFile(NativeFile):
 
         cdef int64_t total_bytes = 0
 
-        cdef int rpc_chunksize = min(self.buffer_size, nbytes)
-
         try:
             with nogil:
-                while total_bytes < nbytes:
-                    check_status(self.rd_file.get()
-                                 .Read(rpc_chunksize, &bytes_read,
-                                       buf + total_bytes))
-
-                    total_bytes += bytes_read
+                check_status(self.rd_file.get()
+                             .Read(nbytes, &bytes_read, buf))
 
-                    # EOF
-                    if bytes_read == 0:
-                        break
             result = cp.PyBytes_FromStringAndSize(<const char*>buf,
-                                                  total_bytes)
+                                                  bytes_read)
         finally:
             free(buf)
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/cfde4607/python/pyarrow/tests/test_hdfs.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_hdfs.py 
b/python/pyarrow/tests/test_hdfs.py
index c23543b..73d5a66 100644
--- a/python/pyarrow/tests/test_hdfs.py
+++ b/python/pyarrow/tests/test_hdfs.py
@@ -19,6 +19,7 @@ from io import BytesIO
 from os.path import join as pjoin
 import os
 import random
+import unittest
 
 import pytest
 
@@ -28,7 +29,7 @@ import pyarrow.io as io
 # HDFS tests
 
 
-def hdfs_test_client():
+def hdfs_test_client(driver='libhdfs'):
     host = os.environ.get('ARROW_HDFS_TEST_HOST', 'localhost')
     user = os.environ['ARROW_HDFS_TEST_USER']
     try:
@@ -37,115 +38,119 @@ def hdfs_test_client():
         raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not '
                          'an integer')
 
-    return io.HdfsClient.connect(host, port, user)
+    return io.HdfsClient.connect(host, port, user, driver=driver)
 
 
-libhdfs = pytest.mark.skipif(not io.have_libhdfs(),
-                             reason='No libhdfs available on system')
+class HdfsTestCases(object):
 
+    def _make_test_file(self, hdfs, test_name, test_path, test_data):
+        base_path = pjoin(self.tmp_path, test_name)
+        hdfs.mkdir(base_path)
 
-HDFS_TMP_PATH = '/tmp/pyarrow-test-{0}'.format(random.randint(0, 1000))
+        full_path = pjoin(base_path, test_path)
 
+        with hdfs.open(full_path, 'wb') as f:
+            f.write(test_data)
 
-@pytest.fixture(scope='session')
-def hdfs(request):
-    fixture = hdfs_test_client()
+        return full_path
 
-    def teardown():
-        fixture.delete(HDFS_TMP_PATH, recursive=True)
-        fixture.close()
-    request.addfinalizer(teardown)
-    return fixture
+    @classmethod
+    def setUpClass(cls):
+        cls.check_driver()
+        cls.hdfs = hdfs_test_client(cls.DRIVER)
+        cls.tmp_path = '/tmp/pyarrow-test-{0}'.format(random.randint(0, 1000))
+        cls.hdfs.mkdir(cls.tmp_path)
 
+    @classmethod
+    def tearDownClass(cls):
+        cls.hdfs.delete(cls.tmp_path, recursive=True)
+        cls.hdfs.close()
 
-@libhdfs
-def test_hdfs_close():
-    client = hdfs_test_client()
-    assert client.is_open
-    client.close()
-    assert not client.is_open
+    def test_hdfs_close(self):
+        client = hdfs_test_client()
+        assert client.is_open
+        client.close()
+        assert not client.is_open
 
-    with pytest.raises(Exception):
-        client.ls('/')
+        with pytest.raises(Exception):
+            client.ls('/')
 
+    def test_hdfs_mkdir(self):
+        path = pjoin(self.tmp_path, 'test-dir/test-dir')
+        parent_path = pjoin(self.tmp_path, 'test-dir')
 
-@libhdfs
-def test_hdfs_mkdir(hdfs):
-    path = pjoin(HDFS_TMP_PATH, 'test-dir/test-dir')
-    parent_path = pjoin(HDFS_TMP_PATH, 'test-dir')
+        self.hdfs.mkdir(path)
+        assert self.hdfs.exists(path)
 
-    hdfs.mkdir(path)
-    assert hdfs.exists(path)
+        self.hdfs.delete(parent_path, recursive=True)
+        assert not self.hdfs.exists(path)
 
-    hdfs.delete(parent_path, recursive=True)
-    assert not hdfs.exists(path)
+    def test_hdfs_ls(self):
+        base_path = pjoin(self.tmp_path, 'ls-test')
+        self.hdfs.mkdir(base_path)
 
+        dir_path = pjoin(base_path, 'a-dir')
+        f1_path = pjoin(base_path, 'a-file-1')
 
-@libhdfs
-def test_hdfs_ls(hdfs):
-    base_path = pjoin(HDFS_TMP_PATH, 'ls-test')
-    hdfs.mkdir(base_path)
+        self.hdfs.mkdir(dir_path)
 
-    dir_path = pjoin(base_path, 'a-dir')
-    f1_path = pjoin(base_path, 'a-file-1')
+        f = self.hdfs.open(f1_path, 'wb')
+        f.write('a' * 10)
 
-    hdfs.mkdir(dir_path)
+        contents = sorted(self.hdfs.ls(base_path, False))
+        assert contents == [dir_path, f1_path]
 
-    f = hdfs.open(f1_path, 'wb')
-    f.write('a' * 10)
+    def test_hdfs_download_upload(self):
+        base_path = pjoin(self.tmp_path, 'upload-test')
 
-    contents = sorted(hdfs.ls(base_path, False))
-    assert contents == [dir_path, f1_path]
+        data = b'foobarbaz'
+        buf = BytesIO(data)
+        buf.seek(0)
 
+        self.hdfs.upload(base_path, buf)
 
-def _make_test_file(hdfs, test_name, test_path, test_data):
-    base_path = pjoin(HDFS_TMP_PATH, test_name)
-    hdfs.mkdir(base_path)
+        out_buf = BytesIO()
+        self.hdfs.download(base_path, out_buf)
+        out_buf.seek(0)
+        assert out_buf.getvalue() == data
 
-    full_path = pjoin(base_path, test_path)
+    def test_hdfs_file_context_manager(self):
+        path = pjoin(self.tmp_path, 'ctx-manager')
 
-    f = hdfs.open(full_path, 'wb')
-    f.write(test_data)
+        data = b'foo'
+        with self.hdfs.open(path, 'wb') as f:
+            f.write(data)
 
-    return full_path
+        with self.hdfs.open(path, 'rb') as f:
+            assert f.size() == 3
+            result = f.read(10)
+            assert result == data
 
 
-@libhdfs
-def test_hdfs_orphaned_file():
-    hdfs = hdfs_test_client()
-    file_path = _make_test_file(hdfs, 'orphaned_file_test', 'fname',
-                                'foobarbaz')
+class TestLibHdfs(HdfsTestCases, unittest.TestCase):
 
-    f = hdfs.open(file_path)
-    hdfs = None
-    f = None  # noqa
+    DRIVER = 'libhdfs'
 
+    @classmethod
+    def check_driver(cls):
+        if not io.have_libhdfs():
+            pytest.skip('No libhdfs available on system')
 
-@libhdfs
-def test_hdfs_download_upload(hdfs):
-    base_path = pjoin(HDFS_TMP_PATH, 'upload-test')
+    def test_hdfs_orphaned_file(self):
+        hdfs = hdfs_test_client()
+        file_path = self._make_test_file(hdfs, 'orphaned_file_test', 'fname',
+                                         'foobarbaz')
 
-    data = b'foobarbaz'
-    buf = BytesIO(data)
-    buf.seek(0)
+        f = hdfs.open(file_path)
+        hdfs = None
+        f = None  # noqa
 
-    hdfs.upload(base_path, buf)
 
-    out_buf = BytesIO()
-    hdfs.download(base_path, out_buf)
-    out_buf.seek(0)
-    assert out_buf.getvalue() == data
+class TestLibHdfs3(HdfsTestCases, unittest.TestCase):
 
+    DRIVER = 'libhdfs3'
 
-@libhdfs
-def test_hdfs_file_context_manager(hdfs):
-    path = pjoin(HDFS_TMP_PATH, 'ctx-manager')
-
-    data = b'foo'
-    with hdfs.open(path, 'wb') as f:
-        f.write(data)
-
-    with hdfs.open(path, 'rb') as f:
-        assert f.size() == 3
-        result = f.read(10)
-        assert result == data
+    @classmethod
+    def check_driver(cls):
+        if not io.have_libhdfs3():
+            pytest.skip('No libhdfs3 available on system')

Reply via email to