Repository: arrow Updated Branches: refs/heads/master 68e39c686 -> cfde4607d
ARROW-243: [C++] Add option to switch between libhdfs and libhdfs3 when creating HdfsClient Closes #108 Some users will not have a full Java Hadoop distribution and may wish to use the libhdfs3 package from Pivotal (https://github.com/Pivotal-Data-Attic/pivotalrd-libhdfs3), part of Apache HAWQ (incubating). In C++, you can switch by setting: ```c++ HdfsConnectionConfig conf; conf.driver = HdfsDriver::LIBHDFS3; ``` In Python, you can run: ```python con = arrow.io.HdfsClient.connect(..., driver='libhdfs3') ``` Author: Wes McKinney <wes.mckin...@twosigma.com> Closes #244 from wesm/ARROW-243 and squashes the following commits: 7ae197a [Wes McKinney] Refactor HdfsClient code to support both libhdfs and libhdfs3 at runtime. Add driver option to Python interface Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/cfde4607 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/cfde4607 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/cfde4607 Branch: refs/heads/master Commit: cfde4607df453e4b97560e64caff744fb3ba3d1f Parents: 68e39c6 Author: Wes McKinney <wes.mckin...@twosigma.com> Authored: Mon Dec 19 18:26:17 2016 -0500 Committer: Wes McKinney <wes.mckin...@twosigma.com> Committed: Mon Dec 19 18:26:17 2016 -0500 ---------------------------------------------------------------------- cpp/src/arrow/io/CMakeLists.txt | 2 +- cpp/src/arrow/io/hdfs-internal.cc | 590 +++++++++++++++++++++++++++ cpp/src/arrow/io/hdfs-internal.h | 203 +++++++++ cpp/src/arrow/io/hdfs.cc | 102 +++-- cpp/src/arrow/io/hdfs.h | 6 +- cpp/src/arrow/io/io-hdfs-test.cc | 211 +++++----- cpp/src/arrow/io/libhdfs_shim.cc | 582 -------------------------- python/.gitignore | 1 + python/pyarrow/includes/libarrow_io.pxd | 8 +- python/pyarrow/io.pyx | 45 +- python/pyarrow/tests/test_hdfs.py | 161 ++++---- 11 files changed, 1109 insertions(+), 802 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/cfde4607/cpp/src/arrow/io/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt index f285180..e2b6496 100644 --- a/cpp/src/arrow/io/CMakeLists.txt +++ b/cpp/src/arrow/io/CMakeLists.txt @@ -75,7 +75,7 @@ if(ARROW_HDFS) set(ARROW_HDFS_SRCS hdfs.cc - libhdfs_shim.cc) + hdfs-internal.cc) set_property(SOURCE ${ARROW_HDFS_SRCS} APPEND_STRING PROPERTY http://git-wip-us.apache.org/repos/asf/arrow/blob/cfde4607/cpp/src/arrow/io/hdfs-internal.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/hdfs-internal.cc b/cpp/src/arrow/io/hdfs-internal.cc new file mode 100644 index 0000000..7094785 --- /dev/null +++ b/cpp/src/arrow/io/hdfs-internal.cc @@ -0,0 +1,590 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This shim interface to libhdfs (for runtime shared library loading) has been +// adapted from the SFrame project, released under the ASF-compatible 3-clause +// BSD license +// +// Using this required having the $JAVA_HOME and $HADOOP_HOME environment +// variables set, so that libjvm and libhdfs can be located easily + +// Copyright (C) 2015 Dato, Inc. +// All rights reserved. +// +// This software may be modified and distributed under the terms +// of the BSD license. See the LICENSE file for details. + +#ifdef HAS_HADOOP + +#ifndef _WIN32 +#include <dlfcn.h> +#else +#include <windows.h> +#include <winsock2.h> + +// TODO(wesm): address when/if we add windows support +// #include <util/syserr_reporting.hpp> +#endif + +extern "C" { +#include <hdfs.h> +} + +#include <iostream> +#include <mutex> +#include <sstream> +#include <string> +#include <type_traits> +#include <vector> + +#include <boost/filesystem.hpp> // NOLINT + +#include "arrow/io/hdfs-internal.h" +#include "arrow/status.h" +#include "arrow/util/visibility.h" + +namespace fs = boost::filesystem; + +#ifndef _WIN32 +static void* libjvm_handle = NULL; +#else +static HINSTANCE libjvm_handle = NULL; +#endif +/* + * All the shim pointers + */ + +// Helper functions for dlopens +static std::vector<fs::path> get_potential_libjvm_paths(); +static std::vector<fs::path> get_potential_libhdfs_paths(); +static std::vector<fs::path> get_potential_libhdfs3_paths(); +static arrow::Status try_dlopen(std::vector<fs::path> potential_paths, const char* name, +#ifndef _WIN32 + void*& out_handle); +#else + HINSTANCE& out_handle); +#endif + +static std::vector<fs::path> get_potential_libhdfs_paths() { + std::vector<fs::path> libhdfs_potential_paths; + std::string file_name; + +// OS-specific file name +#ifdef __WIN32 + file_name = "hdfs.dll"; +#elif __APPLE__ + file_name = "libhdfs.dylib"; +#else + file_name = "libhdfs.so"; +#endif + + // Common paths + std::vector<fs::path> search_paths = {fs::path(""), fs::path(".")}; + + // Path from environment variable + const char* hadoop_home = std::getenv("HADOOP_HOME"); + if (hadoop_home != nullptr) { + auto path = fs::path(hadoop_home) / "lib/native"; + search_paths.push_back(path); + } + + const char* libhdfs_dir = std::getenv("ARROW_LIBHDFS_DIR"); + if (libhdfs_dir != nullptr) { search_paths.push_back(fs::path(libhdfs_dir)); } + + // All paths with file name + for (auto& path : search_paths) { + libhdfs_potential_paths.push_back(path / file_name); + } + + return libhdfs_potential_paths; +} + +static std::vector<fs::path> get_potential_libhdfs3_paths() { + std::vector<fs::path> potential_paths; + std::string file_name; + +// OS-specific file name +#ifdef __WIN32 + file_name = "hdfs3.dll"; +#elif __APPLE__ + file_name = "libhdfs3.dylib"; +#else + file_name = "libhdfs3.so"; +#endif + + // Common paths + std::vector<fs::path> search_paths = {fs::path(""), fs::path(".")}; + + const char* libhdfs3_dir = std::getenv("ARROW_LIBHDFS3_DIR"); + if (libhdfs3_dir != nullptr) { search_paths.push_back(fs::path(libhdfs3_dir)); } + + // All paths with file name + for (auto& path : search_paths) { + potential_paths.push_back(path / file_name); + } + + return potential_paths; +} + +static std::vector<fs::path> get_potential_libjvm_paths() { + std::vector<fs::path> libjvm_potential_paths; + + std::vector<fs::path> search_prefixes; + std::vector<fs::path> search_suffixes; + std::string file_name; + +// From heuristics +#ifdef __WIN32 + search_prefixes = {""}; + search_suffixes = {"/jre/bin/server", "/bin/server"}; + file_name = "jvm.dll"; +#elif __APPLE__ + search_prefixes = {""}; + search_suffixes = {"", "/jre/lib/server"}; + file_name = "libjvm.dylib"; + +// SFrame uses /usr/libexec/java_home to find JAVA_HOME; for now we are +// expecting users to set an environment variable +#else + search_prefixes = { + "/usr/lib/jvm/default-java", // ubuntu / debian distros + "/usr/lib/jvm/java", // rhel6 + "/usr/lib/jvm", // centos6 + "/usr/lib64/jvm", // opensuse 13 + "/usr/local/lib/jvm/default-java", // alt ubuntu / debian distros + "/usr/local/lib/jvm/java", // alt rhel6 + "/usr/local/lib/jvm", // alt centos6 + "/usr/local/lib64/jvm", // alt opensuse 13 + "/usr/local/lib/jvm/java-7-openjdk-amd64", // alt ubuntu / debian distros + "/usr/lib/jvm/java-7-openjdk-amd64", // alt ubuntu / debian distros + "/usr/local/lib/jvm/java-6-openjdk-amd64", // alt ubuntu / debian distros + "/usr/lib/jvm/java-6-openjdk-amd64", // alt ubuntu / debian distros + "/usr/lib/jvm/java-7-oracle", // alt ubuntu + "/usr/lib/jvm/java-8-oracle", // alt ubuntu + "/usr/lib/jvm/java-6-oracle", // alt ubuntu + "/usr/local/lib/jvm/java-7-oracle", // alt ubuntu + "/usr/local/lib/jvm/java-8-oracle", // alt ubuntu + "/usr/local/lib/jvm/java-6-oracle", // alt ubuntu + "/usr/lib/jvm/default", // alt centos + "/usr/java/latest", // alt centos + }; + search_suffixes = {"/jre/lib/amd64/server"}; + file_name = "libjvm.so"; +#endif + // From direct environment variable + char* env_value = NULL; + if ((env_value = getenv("JAVA_HOME")) != NULL) { + search_prefixes.insert(search_prefixes.begin(), env_value); + } + + // Generate cross product between search_prefixes, search_suffixes, and file_name + for (auto& prefix : search_prefixes) { + for (auto& suffix : search_suffixes) { + auto path = (fs::path(prefix) / fs::path(suffix) / fs::path(file_name)); + libjvm_potential_paths.push_back(path); + } + } + + return libjvm_potential_paths; +} + +#ifndef _WIN32 +static arrow::Status try_dlopen( + std::vector<fs::path> potential_paths, const char* name, void*& out_handle) { + std::vector<std::string> error_messages; + + for (auto& i : potential_paths) { + i.make_preferred(); + out_handle = dlopen(i.native().c_str(), RTLD_NOW | RTLD_LOCAL); + + if (out_handle != NULL) { + // std::cout << "Loaded " << i << std::endl; + break; + } else { + const char* err_msg = dlerror(); + if (err_msg != NULL) { + error_messages.push_back(std::string(err_msg)); + } else { + error_messages.push_back(std::string(" returned NULL")); + } + } + } + + if (out_handle == NULL) { + std::stringstream ss; + ss << "Unable to load " << name; + return arrow::Status::IOError(ss.str()); + } + + return arrow::Status::OK(); +} + +#else +static arrow::Status try_dlopen( + std::vector<fs::path> potential_paths, const char* name, HINSTANCE& out_handle) { + std::vector<std::string> error_messages; + + for (auto& i : potential_paths) { + i.make_preferred(); + out_handle = LoadLibrary(i.string().c_str()); + + if (out_handle != NULL) { + break; + } else { + // error_messages.push_back(get_last_err_str(GetLastError())); + } + } + + if (out_handle == NULL) { + std::stringstream ss; + ss << "Unable to load " << name; + return arrow::Status::IOError(ss.str()); + } + + return arrow::Status::OK(); +} +#endif // _WIN32 + +static inline void* GetLibrarySymbol(void* handle, const char* symbol) { + if (handle == NULL) return NULL; +#ifndef _WIN32 + return dlsym(handle, symbol); +#else + + void* ret = reinterpret_cast<void*>(GetProcAddress(handle, symbol)); + if (ret == NULL) { + // logstream(LOG_INFO) << "GetProcAddress error: " + // << get_last_err_str(GetLastError()) << std::endl; + } + return ret; +#endif +} + +#define GET_SYMBOL_REQUIRED(SHIM, SYMBOL_NAME) \ + do { \ + if (!SHIM->SYMBOL_NAME) { \ + *reinterpret_cast<void**>(&SHIM->SYMBOL_NAME) = \ + GetLibrarySymbol(SHIM->handle, "" #SYMBOL_NAME); \ + } \ + if (!SHIM->SYMBOL_NAME) \ + return Status::IOError("Getting symbol " #SYMBOL_NAME "failed"); \ + } while (0) + +#define GET_SYMBOL(SHIM, SYMBOL_NAME) \ + if (!SHIM->SYMBOL_NAME) { \ + *reinterpret_cast<void**>(&SHIM->SYMBOL_NAME) = \ + GetLibrarySymbol(SHIM->handle, "" #SYMBOL_NAME); \ + } + +namespace arrow { +namespace io { + +static LibHdfsShim libhdfs_shim; +static LibHdfsShim libhdfs3_shim; + +hdfsBuilder* LibHdfsShim::NewBuilder(void) { + return this->hdfsNewBuilder(); +} + +void LibHdfsShim::BuilderSetNameNode(hdfsBuilder* bld, const char* nn) { + this->hdfsBuilderSetNameNode(bld, nn); +} + +void LibHdfsShim::BuilderSetNameNodePort(hdfsBuilder* bld, tPort port) { + this->hdfsBuilderSetNameNodePort(bld, port); +} + +void LibHdfsShim::BuilderSetUserName(hdfsBuilder* bld, const char* userName) { + this->hdfsBuilderSetUserName(bld, userName); +} + +void LibHdfsShim::BuilderSetKerbTicketCachePath( + hdfsBuilder* bld, const char* kerbTicketCachePath) { + this->hdfsBuilderSetKerbTicketCachePath(bld, kerbTicketCachePath); +} + +hdfsFS LibHdfsShim::BuilderConnect(hdfsBuilder* bld) { + return this->hdfsBuilderConnect(bld); +} + +int LibHdfsShim::Disconnect(hdfsFS fs) { + return this->hdfsDisconnect(fs); +} + +hdfsFile LibHdfsShim::OpenFile(hdfsFS fs, const char* path, int flags, int bufferSize, + short replication, tSize blocksize) { // NOLINT + return this->hdfsOpenFile(fs, path, flags, bufferSize, replication, blocksize); +} + +int LibHdfsShim::CloseFile(hdfsFS fs, hdfsFile file) { + return this->hdfsCloseFile(fs, file); +} + +int LibHdfsShim::Exists(hdfsFS fs, const char* path) { + return this->hdfsExists(fs, path); +} + +int LibHdfsShim::Seek(hdfsFS fs, hdfsFile file, tOffset desiredPos) { + return this->hdfsSeek(fs, file, desiredPos); +} + +tOffset LibHdfsShim::Tell(hdfsFS fs, hdfsFile file) { + return this->hdfsTell(fs, file); +} + +tSize LibHdfsShim::Read(hdfsFS fs, hdfsFile file, void* buffer, tSize length) { + return this->hdfsRead(fs, file, buffer, length); +} + +bool LibHdfsShim::HasPread() { + GET_SYMBOL(this, hdfsPread); + return this->hdfsPread != nullptr; +} + +tSize LibHdfsShim::Pread( + hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length) { + GET_SYMBOL(this, hdfsPread); + return this->hdfsPread(fs, file, position, buffer, length); +} + +tSize LibHdfsShim::Write(hdfsFS fs, hdfsFile file, const void* buffer, tSize length) { + return this->hdfsWrite(fs, file, buffer, length); +} + +int LibHdfsShim::Flush(hdfsFS fs, hdfsFile file) { + return this->hdfsFlush(fs, file); +} + +int LibHdfsShim::Available(hdfsFS fs, hdfsFile file) { + GET_SYMBOL(this, hdfsAvailable); + if (this->hdfsAvailable) + return this->hdfsAvailable(fs, file); + else + return 0; +} + +int LibHdfsShim::Copy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) { + GET_SYMBOL(this, hdfsCopy); + if (this->hdfsCopy) + return this->hdfsCopy(srcFS, src, dstFS, dst); + else + return 0; +} + +int LibHdfsShim::Move(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) { + GET_SYMBOL(this, hdfsMove); + if (this->hdfsMove) + return this->hdfsMove(srcFS, src, dstFS, dst); + else + return 0; +} + +int LibHdfsShim::Delete(hdfsFS fs, const char* path, int recursive) { + return this->hdfsDelete(fs, path, recursive); +} + +int LibHdfsShim::Rename(hdfsFS fs, const char* oldPath, const char* newPath) { + GET_SYMBOL(this, hdfsRename); + if (this->hdfsRename) + return this->hdfsRename(fs, oldPath, newPath); + else + return 0; +} + +char* LibHdfsShim::GetWorkingDirectory(hdfsFS fs, char* buffer, size_t bufferSize) { + GET_SYMBOL(this, hdfsGetWorkingDirectory); + if (this->hdfsGetWorkingDirectory) { + return this->hdfsGetWorkingDirectory(fs, buffer, bufferSize); + } else { + return NULL; + } +} + +int LibHdfsShim::SetWorkingDirectory(hdfsFS fs, const char* path) { + GET_SYMBOL(this, hdfsSetWorkingDirectory); + if (this->hdfsSetWorkingDirectory) { + return this->hdfsSetWorkingDirectory(fs, path); + } else { + return 0; + } +} + +int LibHdfsShim::CreateDirectory(hdfsFS fs, const char* path) { + return this->hdfsCreateDirectory(fs, path); +} + +int LibHdfsShim::SetReplication(hdfsFS fs, const char* path, int16_t replication) { + GET_SYMBOL(this, hdfsSetReplication); + if (this->hdfsSetReplication) { + return this->hdfsSetReplication(fs, path, replication); + } else { + return 0; + } +} + +hdfsFileInfo* LibHdfsShim::ListDirectory(hdfsFS fs, const char* path, int* numEntries) { + return this->hdfsListDirectory(fs, path, numEntries); +} + +hdfsFileInfo* LibHdfsShim::GetPathInfo(hdfsFS fs, const char* path) { + return this->hdfsGetPathInfo(fs, path); +} + +void LibHdfsShim::FreeFileInfo(hdfsFileInfo* hdfsFileInfo, int numEntries) { + this->hdfsFreeFileInfo(hdfsFileInfo, numEntries); +} + +char*** LibHdfsShim::GetHosts( + hdfsFS fs, const char* path, tOffset start, tOffset length) { + GET_SYMBOL(this, hdfsGetHosts); + if (this->hdfsGetHosts) { + return this->hdfsGetHosts(fs, path, start, length); + } else { + return NULL; + } +} + +void LibHdfsShim::FreeHosts(char*** blockHosts) { + GET_SYMBOL(this, hdfsFreeHosts); + if (this->hdfsFreeHosts) { this->hdfsFreeHosts(blockHosts); } +} + +tOffset LibHdfsShim::GetDefaultBlockSize(hdfsFS fs) { + GET_SYMBOL(this, hdfsGetDefaultBlockSize); + if (this->hdfsGetDefaultBlockSize) { + return this->hdfsGetDefaultBlockSize(fs); + } else { + return 0; + } +} + +tOffset LibHdfsShim::GetCapacity(hdfsFS fs) { + return this->hdfsGetCapacity(fs); +} + +tOffset LibHdfsShim::GetUsed(hdfsFS fs) { + return this->hdfsGetUsed(fs); +} + +int LibHdfsShim::Chown( + hdfsFS fs, const char* path, const char* owner, const char* group) { + GET_SYMBOL(this, hdfsChown); + if (this->hdfsChown) { + return this->hdfsChown(fs, path, owner, group); + } else { + return 0; + } +} + +int LibHdfsShim::Chmod(hdfsFS fs, const char* path, short mode) { // NOLINT + GET_SYMBOL(this, hdfsChmod); + if (this->hdfsChmod) { + return this->hdfsChmod(fs, path, mode); + } else { + return 0; + } +} + +int LibHdfsShim::Utime(hdfsFS fs, const char* path, tTime mtime, tTime atime) { + GET_SYMBOL(this, hdfsUtime); + if (this->hdfsUtime) { + return this->hdfsUtime(fs, path, mtime, atime); + } else { + return 0; + } +} + +Status LibHdfsShim::GetRequiredSymbols() { + GET_SYMBOL_REQUIRED(this, hdfsNewBuilder); + GET_SYMBOL_REQUIRED(this, hdfsBuilderSetNameNode); + GET_SYMBOL_REQUIRED(this, hdfsBuilderSetNameNodePort); + GET_SYMBOL_REQUIRED(this, hdfsBuilderSetUserName); + GET_SYMBOL_REQUIRED(this, hdfsBuilderSetKerbTicketCachePath); + GET_SYMBOL_REQUIRED(this, hdfsBuilderConnect); + GET_SYMBOL_REQUIRED(this, hdfsCreateDirectory); + GET_SYMBOL_REQUIRED(this, hdfsDelete); + GET_SYMBOL_REQUIRED(this, hdfsDisconnect); + GET_SYMBOL_REQUIRED(this, hdfsExists); + GET_SYMBOL_REQUIRED(this, hdfsFreeFileInfo); + GET_SYMBOL_REQUIRED(this, hdfsGetCapacity); + GET_SYMBOL_REQUIRED(this, hdfsGetUsed); + GET_SYMBOL_REQUIRED(this, hdfsGetPathInfo); + GET_SYMBOL_REQUIRED(this, hdfsListDirectory); + + // File methods + GET_SYMBOL_REQUIRED(this, hdfsCloseFile); + GET_SYMBOL_REQUIRED(this, hdfsFlush); + GET_SYMBOL_REQUIRED(this, hdfsOpenFile); + GET_SYMBOL_REQUIRED(this, hdfsRead); + GET_SYMBOL_REQUIRED(this, hdfsSeek); + GET_SYMBOL_REQUIRED(this, hdfsTell); + GET_SYMBOL_REQUIRED(this, hdfsWrite); + + return Status::OK(); +} + +Status ARROW_EXPORT ConnectLibHdfs(LibHdfsShim** driver) { + static std::mutex lock; + std::lock_guard<std::mutex> guard(lock); + + LibHdfsShim* shim = &libhdfs_shim; + + static bool shim_attempted = false; + if (!shim_attempted) { + shim_attempted = true; + + shim->Initialize(); + + std::vector<fs::path> libjvm_potential_paths = get_potential_libjvm_paths(); + RETURN_NOT_OK(try_dlopen(libjvm_potential_paths, "libjvm", libjvm_handle)); + + std::vector<fs::path> libhdfs_potential_paths = get_potential_libhdfs_paths(); + RETURN_NOT_OK(try_dlopen(libhdfs_potential_paths, "libhdfs", shim->handle)); + } else if (shim->handle == nullptr) { + return Status::IOError("Prior attempt to load libhdfs failed"); + } + + *driver = shim; + return shim->GetRequiredSymbols(); +} + +Status ARROW_EXPORT ConnectLibHdfs3(LibHdfsShim** driver) { + static std::mutex lock; + std::lock_guard<std::mutex> guard(lock); + + LibHdfsShim* shim = &libhdfs3_shim; + + static bool shim_attempted = false; + if (!shim_attempted) { + shim_attempted = true; + + shim->Initialize(); + + std::vector<fs::path> libhdfs3_potential_paths = get_potential_libhdfs3_paths(); + RETURN_NOT_OK(try_dlopen(libhdfs3_potential_paths, "libhdfs3", shim->handle)); + } else if (shim->handle == nullptr) { + return Status::IOError("Prior attempt to load libhdfs3 failed"); + } + + *driver = shim; + return shim->GetRequiredSymbols(); +} + +} // namespace io +} // namespace arrow + +#endif // HAS_HADOOP http://git-wip-us.apache.org/repos/asf/arrow/blob/cfde4607/cpp/src/arrow/io/hdfs-internal.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/hdfs-internal.h b/cpp/src/arrow/io/hdfs-internal.h new file mode 100644 index 0000000..0ff118a --- /dev/null +++ b/cpp/src/arrow/io/hdfs-internal.h @@ -0,0 +1,203 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_IO_HDFS_INTERNAL +#define ARROW_IO_HDFS_INTERNAL + +#include <hdfs.h> + +namespace arrow { + +class Status; + +namespace io { + +// NOTE(wesm): cpplint does not like use of short and other imprecise C types +struct LibHdfsShim { +#ifndef _WIN32 + void* handle; +#else + HINSTANCE handle; +#endif + + hdfsBuilder* (*hdfsNewBuilder)(void); + void (*hdfsBuilderSetNameNode)(hdfsBuilder* bld, const char* nn); + void (*hdfsBuilderSetNameNodePort)(hdfsBuilder* bld, tPort port); + void (*hdfsBuilderSetUserName)(hdfsBuilder* bld, const char* userName); + void (*hdfsBuilderSetKerbTicketCachePath)( + hdfsBuilder* bld, const char* kerbTicketCachePath); + hdfsFS (*hdfsBuilderConnect)(hdfsBuilder* bld); + + int (*hdfsDisconnect)(hdfsFS fs); + + hdfsFile (*hdfsOpenFile)(hdfsFS fs, const char* path, int flags, int bufferSize, + short replication, tSize blocksize); // NOLINT + + int (*hdfsCloseFile)(hdfsFS fs, hdfsFile file); + int (*hdfsExists)(hdfsFS fs, const char* path); + int (*hdfsSeek)(hdfsFS fs, hdfsFile file, tOffset desiredPos); + tOffset (*hdfsTell)(hdfsFS fs, hdfsFile file); + tSize (*hdfsRead)(hdfsFS fs, hdfsFile file, void* buffer, tSize length); + tSize (*hdfsPread)( + hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length); + tSize (*hdfsWrite)(hdfsFS fs, hdfsFile file, const void* buffer, tSize length); + int (*hdfsFlush)(hdfsFS fs, hdfsFile file); + int (*hdfsAvailable)(hdfsFS fs, hdfsFile file); + int (*hdfsCopy)(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst); + int (*hdfsMove)(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst); + int (*hdfsDelete)(hdfsFS fs, const char* path, int recursive); + int (*hdfsRename)(hdfsFS fs, const char* oldPath, const char* newPath); + char* (*hdfsGetWorkingDirectory)(hdfsFS fs, char* buffer, size_t bufferSize); + int (*hdfsSetWorkingDirectory)(hdfsFS fs, const char* path); + int (*hdfsCreateDirectory)(hdfsFS fs, const char* path); + int (*hdfsSetReplication)(hdfsFS fs, const char* path, int16_t replication); + hdfsFileInfo* (*hdfsListDirectory)(hdfsFS fs, const char* path, int* numEntries); + hdfsFileInfo* (*hdfsGetPathInfo)(hdfsFS fs, const char* path); + void (*hdfsFreeFileInfo)(hdfsFileInfo* hdfsFileInfo, int numEntries); + char*** (*hdfsGetHosts)(hdfsFS fs, const char* path, tOffset start, tOffset length); + void (*hdfsFreeHosts)(char*** blockHosts); + tOffset (*hdfsGetDefaultBlockSize)(hdfsFS fs); + tOffset (*hdfsGetCapacity)(hdfsFS fs); + tOffset (*hdfsGetUsed)(hdfsFS fs); + int (*hdfsChown)(hdfsFS fs, const char* path, const char* owner, const char* group); + int (*hdfsChmod)(hdfsFS fs, const char* path, short mode); // NOLINT + int (*hdfsUtime)(hdfsFS fs, const char* path, tTime mtime, tTime atime); + + void Initialize() { + this->handle = nullptr; + this->hdfsNewBuilder = nullptr; + this->hdfsBuilderSetNameNode = nullptr; + this->hdfsBuilderSetNameNodePort = nullptr; + this->hdfsBuilderSetUserName = nullptr; + this->hdfsBuilderSetKerbTicketCachePath = nullptr; + this->hdfsBuilderConnect = nullptr; + this->hdfsDisconnect = nullptr; + this->hdfsOpenFile = nullptr; + this->hdfsCloseFile = nullptr; + this->hdfsExists = nullptr; + this->hdfsSeek = nullptr; + this->hdfsTell = nullptr; + this->hdfsRead = nullptr; + this->hdfsPread = nullptr; + this->hdfsWrite = nullptr; + this->hdfsFlush = nullptr; + this->hdfsAvailable = nullptr; + this->hdfsCopy = nullptr; + this->hdfsMove = nullptr; + this->hdfsDelete = nullptr; + this->hdfsRename = nullptr; + this->hdfsGetWorkingDirectory = nullptr; + this->hdfsSetWorkingDirectory = nullptr; + this->hdfsCreateDirectory = nullptr; + this->hdfsSetReplication = nullptr; + this->hdfsListDirectory = nullptr; + this->hdfsGetPathInfo = nullptr; + this->hdfsFreeFileInfo = nullptr; + this->hdfsGetHosts = nullptr; + this->hdfsFreeHosts = nullptr; + this->hdfsGetDefaultBlockSize = nullptr; + this->hdfsGetCapacity = nullptr; + this->hdfsGetUsed = nullptr; + this->hdfsChown = nullptr; + this->hdfsChmod = nullptr; + this->hdfsUtime = nullptr; + } + + hdfsBuilder* NewBuilder(void); + + void BuilderSetNameNode(hdfsBuilder* bld, const char* nn); + + void BuilderSetNameNodePort(hdfsBuilder* bld, tPort port); + + void BuilderSetUserName(hdfsBuilder* bld, const char* userName); + + void BuilderSetKerbTicketCachePath(hdfsBuilder* bld, const char* kerbTicketCachePath); + + hdfsFS BuilderConnect(hdfsBuilder* bld); + + int Disconnect(hdfsFS fs); + + hdfsFile OpenFile(hdfsFS fs, const char* path, int flags, int bufferSize, + short replication, tSize blocksize); // NOLINT + + int CloseFile(hdfsFS fs, hdfsFile file); + + int Exists(hdfsFS fs, const char* path); + + int Seek(hdfsFS fs, hdfsFile file, tOffset desiredPos); + + tOffset Tell(hdfsFS fs, hdfsFile file); + + tSize Read(hdfsFS fs, hdfsFile file, void* buffer, tSize length); + + bool HasPread(); + + tSize Pread(hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length); + + tSize Write(hdfsFS fs, hdfsFile file, const void* buffer, tSize length); + + int Flush(hdfsFS fs, hdfsFile file); + + int Available(hdfsFS fs, hdfsFile file); + + int Copy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst); + + int Move(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst); + + int Delete(hdfsFS fs, const char* path, int recursive); + + int Rename(hdfsFS fs, const char* oldPath, const char* newPath); + + char* GetWorkingDirectory(hdfsFS fs, char* buffer, size_t bufferSize); + + int SetWorkingDirectory(hdfsFS fs, const char* path); + + int CreateDirectory(hdfsFS fs, const char* path); + + int SetReplication(hdfsFS fs, const char* path, int16_t replication); + + hdfsFileInfo* ListDirectory(hdfsFS fs, const char* path, int* numEntries); + + hdfsFileInfo* GetPathInfo(hdfsFS fs, const char* path); + + void FreeFileInfo(hdfsFileInfo* hdfsFileInfo, int numEntries); + + char*** GetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length); + + void FreeHosts(char*** blockHosts); + + tOffset GetDefaultBlockSize(hdfsFS fs); + tOffset GetCapacity(hdfsFS fs); + + tOffset GetUsed(hdfsFS fs); + + int Chown(hdfsFS fs, const char* path, const char* owner, const char* group); + + int Chmod(hdfsFS fs, const char* path, short mode); // NOLINT + + int Utime(hdfsFS fs, const char* path, tTime mtime, tTime atime); + + Status GetRequiredSymbols(); +}; + +Status ConnectLibHdfs(LibHdfsShim** driver); +Status ConnectLibHdfs3(LibHdfsShim** driver); + +} // namespace io +} // namespace arrow + +#endif // ARROW_IO_HDFS_INTERNAL http://git-wip-us.apache.org/repos/asf/arrow/blob/cfde4607/cpp/src/arrow/io/hdfs.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc index b8e2120..44e503f 100644 --- a/cpp/src/arrow/io/hdfs.cc +++ b/cpp/src/arrow/io/hdfs.cc @@ -23,6 +23,7 @@ #include <string> #include "arrow/buffer.h" +#include "arrow/io/hdfs-internal.h" #include "arrow/io/hdfs.h" #include "arrow/memory_pool.h" #include "arrow/status.h" @@ -59,21 +60,23 @@ static constexpr int kDefaultHdfsBufferSize = 1 << 16; class HdfsAnyFileImpl { public: - void set_members(const std::string& path, hdfsFS fs, hdfsFile handle) { + void set_members( + const std::string& path, LibHdfsShim* driver, hdfsFS fs, hdfsFile handle) { path_ = path; + driver_ = driver; fs_ = fs; file_ = handle; is_open_ = true; } Status Seek(int64_t position) { - int ret = hdfsSeek(fs_, file_, position); + int ret = driver_->Seek(fs_, file_, position); CHECK_FAILURE(ret, "seek"); return Status::OK(); } Status Tell(int64_t* offset) { - int64_t ret = hdfsTell(fs_, file_); + int64_t ret = driver_->Tell(fs_, file_); CHECK_FAILURE(ret, "tell"); *offset = ret; return Status::OK(); @@ -84,6 +87,8 @@ class HdfsAnyFileImpl { protected: std::string path_; + LibHdfsShim* driver_; + // These are pointers in libhdfs, so OK to copy hdfsFS fs_; hdfsFile file_; @@ -98,7 +103,7 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { Status Close() { if (is_open_) { - int ret = hdfsCloseFile(fs_, file_); + int ret = driver_->CloseFile(fs_, file_); CHECK_FAILURE(ret, "CloseFile"); is_open_ = false; } @@ -106,8 +111,14 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { } Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) { - tSize ret = hdfsPread(fs_, file_, static_cast<tOffset>(position), - reinterpret_cast<void*>(buffer), nbytes); + tSize ret; + if (driver_->HasPread()) { + ret = driver_->Pread(fs_, file_, static_cast<tOffset>(position), + reinterpret_cast<void*>(buffer), nbytes); + } else { + RETURN_NOT_OK(Seek(position)); + return Read(nbytes, bytes_read, buffer); + } RETURN_NOT_OK(CheckReadResult(ret)); *bytes_read = ret; return Status::OK(); @@ -129,7 +140,7 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) { int64_t total_bytes = 0; while (total_bytes < nbytes) { - tSize ret = hdfsRead(fs_, file_, reinterpret_cast<void*>(buffer + total_bytes), + tSize ret = driver_->Read(fs_, file_, reinterpret_cast<void*>(buffer + total_bytes), std::min<int64_t>(buffer_size_, nbytes - total_bytes)); RETURN_NOT_OK(CheckReadResult(ret)); total_bytes += ret; @@ -153,11 +164,11 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { } Status GetSize(int64_t* size) { - hdfsFileInfo* entry = hdfsGetPathInfo(fs_, path_.c_str()); + hdfsFileInfo* entry = driver_->GetPathInfo(fs_, path_.c_str()); if (entry == nullptr) { return Status::IOError("HDFS: GetPathInfo failed"); } *size = entry->mSize; - hdfsFreeFileInfo(entry, 1); + driver_->FreeFileInfo(entry, 1); return Status::OK(); } @@ -227,9 +238,9 @@ class HdfsOutputStream::HdfsOutputStreamImpl : public HdfsAnyFileImpl { Status Close() { if (is_open_) { - int ret = hdfsFlush(fs_, file_); + int ret = driver_->Flush(fs_, file_); CHECK_FAILURE(ret, "Flush"); - ret = hdfsCloseFile(fs_, file_); + ret = driver_->CloseFile(fs_, file_); CHECK_FAILURE(ret, "CloseFile"); is_open_ = false; } @@ -237,7 +248,7 @@ class HdfsOutputStream::HdfsOutputStreamImpl : public HdfsAnyFileImpl { } Status Write(const uint8_t* buffer, int64_t nbytes, int64_t* bytes_written) { - tSize ret = hdfsWrite(fs_, file_, reinterpret_cast<const void*>(buffer), nbytes); + tSize ret = driver_->Write(fs_, file_, reinterpret_cast<const void*>(buffer), nbytes); CHECK_FAILURE(ret, "Write"); *bytes_written = ret; return Status::OK(); @@ -297,17 +308,25 @@ class HdfsClient::HdfsClientImpl { HdfsClientImpl() {} Status Connect(const HdfsConnectionConfig* config) { - RETURN_NOT_OK(ConnectLibHdfs()); + if (config->driver == HdfsDriver::LIBHDFS3) { + RETURN_NOT_OK(ConnectLibHdfs3(&driver_)); + } else { + RETURN_NOT_OK(ConnectLibHdfs(&driver_)); + } // connect to HDFS with the builder object - hdfsBuilder* builder = hdfsNewBuilder(); - if (!config->host.empty()) { hdfsBuilderSetNameNode(builder, config->host.c_str()); } - hdfsBuilderSetNameNodePort(builder, config->port); - if (!config->user.empty()) { hdfsBuilderSetUserName(builder, config->user.c_str()); } + hdfsBuilder* builder = driver_->NewBuilder(); + if (!config->host.empty()) { + driver_->BuilderSetNameNode(builder, config->host.c_str()); + } + driver_->BuilderSetNameNodePort(builder, config->port); + if (!config->user.empty()) { + driver_->BuilderSetUserName(builder, config->user.c_str()); + } if (!config->kerb_ticket.empty()) { - hdfsBuilderSetKerbTicketCachePath(builder, config->kerb_ticket.c_str()); + driver_->BuilderSetKerbTicketCachePath(builder, config->kerb_ticket.c_str()); } - fs_ = hdfsBuilderConnect(builder); + fs_ = driver_->BuilderConnect(builder); if (fs_ == nullptr) { return Status::IOError("HDFS connection failed"); } namenode_host_ = config->host; @@ -319,19 +338,19 @@ class HdfsClient::HdfsClientImpl { } Status CreateDirectory(const std::string& path) { - int ret = hdfsCreateDirectory(fs_, path.c_str()); + int ret = driver_->CreateDirectory(fs_, path.c_str()); CHECK_FAILURE(ret, "create directory"); return Status::OK(); } Status Delete(const std::string& path, bool recursive) { - int ret = hdfsDelete(fs_, path.c_str(), static_cast<int>(recursive)); + int ret = driver_->Delete(fs_, path.c_str(), static_cast<int>(recursive)); CHECK_FAILURE(ret, "delete"); return Status::OK(); } Status Disconnect() { - int ret = hdfsDisconnect(fs_); + int ret = driver_->Disconnect(fs_); CHECK_FAILURE(ret, "hdfsFS::Disconnect"); return Status::OK(); } @@ -339,38 +358,38 @@ class HdfsClient::HdfsClientImpl { bool Exists(const std::string& path) { // hdfsExists does not distinguish between RPC failure and the file not // existing - int ret = hdfsExists(fs_, path.c_str()); + int ret = driver_->Exists(fs_, path.c_str()); return ret == 0; } Status GetCapacity(int64_t* nbytes) { - tOffset ret = hdfsGetCapacity(fs_); + tOffset ret = driver_->GetCapacity(fs_); CHECK_FAILURE(ret, "GetCapacity"); *nbytes = ret; return Status::OK(); } Status GetUsed(int64_t* nbytes) { - tOffset ret = hdfsGetUsed(fs_); + tOffset ret = driver_->GetUsed(fs_); CHECK_FAILURE(ret, "GetUsed"); *nbytes = ret; return Status::OK(); } Status GetPathInfo(const std::string& path, HdfsPathInfo* info) { - hdfsFileInfo* entry = hdfsGetPathInfo(fs_, path.c_str()); + hdfsFileInfo* entry = driver_->GetPathInfo(fs_, path.c_str()); if (entry == nullptr) { return Status::IOError("HDFS: GetPathInfo failed"); } SetPathInfo(entry, info); - hdfsFreeFileInfo(entry, 1); + driver_->FreeFileInfo(entry, 1); return Status::OK(); } Status ListDirectory(const std::string& path, std::vector<HdfsPathInfo>* listing) { int num_entries = 0; - hdfsFileInfo* entries = hdfsListDirectory(fs_, path.c_str(), &num_entries); + hdfsFileInfo* entries = driver_->ListDirectory(fs_, path.c_str(), &num_entries); if (entries == nullptr) { // If the directory is empty, entries is NULL but errno is 0. Non-zero @@ -391,14 +410,14 @@ class HdfsClient::HdfsClientImpl { } // Free libhdfs file info - hdfsFreeFileInfo(entries, num_entries); + driver_->FreeFileInfo(entries, num_entries); return Status::OK(); } Status OpenReadable(const std::string& path, int32_t buffer_size, std::shared_ptr<HdfsReadableFile>* file) { - hdfsFile handle = hdfsOpenFile(fs_, path.c_str(), O_RDONLY, buffer_size, 0, 0); + hdfsFile handle = driver_->OpenFile(fs_, path.c_str(), O_RDONLY, buffer_size, 0, 0); if (handle == nullptr) { // TODO(wesm): determine cause of failure @@ -409,7 +428,7 @@ class HdfsClient::HdfsClientImpl { // std::make_shared does not work with private ctors *file = std::shared_ptr<HdfsReadableFile>(new HdfsReadableFile()); - (*file)->impl_->set_members(path, fs_, handle); + (*file)->impl_->set_members(path, driver_, fs_, handle); (*file)->impl_->set_buffer_size(buffer_size); return Status::OK(); @@ -421,7 +440,7 @@ class HdfsClient::HdfsClientImpl { int flags = O_WRONLY; if (append) flags |= O_APPEND; - hdfsFile handle = hdfsOpenFile( + hdfsFile handle = driver_->OpenFile( fs_, path.c_str(), flags, buffer_size, replication, default_block_size); if (handle == nullptr) { @@ -433,18 +452,20 @@ class HdfsClient::HdfsClientImpl { // std::make_shared does not work with private ctors *file = std::shared_ptr<HdfsOutputStream>(new HdfsOutputStream()); - (*file)->impl_->set_members(path, fs_, handle); + (*file)->impl_->set_members(path, driver_, fs_, handle); return Status::OK(); } Status Rename(const std::string& src, const std::string& dst) { - int ret = hdfsRename(fs_, src.c_str(), dst.c_str()); + int ret = driver_->Rename(fs_, src.c_str(), dst.c_str()); CHECK_FAILURE(ret, "Rename"); return Status::OK(); } private: + LibHdfsShim* driver_; + std::string namenode_host_; std::string user_; int port_; @@ -530,5 +551,18 @@ Status HdfsClient::Rename(const std::string& src, const std::string& dst) { return impl_->Rename(src, dst); } +// ---------------------------------------------------------------------- +// Allow public API users to check whether we are set up correctly + +Status HaveLibHdfs() { + LibHdfsShim* driver; + return ConnectLibHdfs(&driver); +} + +Status HaveLibHdfs3() { + LibHdfsShim* driver; + return ConnectLibHdfs3(&driver); +} + } // namespace io } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/cfde4607/cpp/src/arrow/io/hdfs.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/hdfs.h b/cpp/src/arrow/io/hdfs.h index 1c76f15..5cc783e 100644 --- a/cpp/src/arrow/io/hdfs.h +++ b/cpp/src/arrow/io/hdfs.h @@ -56,11 +56,14 @@ struct HdfsPathInfo { int16_t permissions; }; +enum class HdfsDriver : char { LIBHDFS, LIBHDFS3 }; + struct HdfsConnectionConfig { std::string host; int port; std::string user; std::string kerb_ticket; + HdfsDriver driver; }; class ARROW_EXPORT HdfsClient : public FileSystemClient { @@ -218,7 +221,8 @@ class ARROW_EXPORT HdfsOutputStream : public OutputStream { DISALLOW_COPY_AND_ASSIGN(HdfsOutputStream); }; -Status ARROW_EXPORT ConnectLibHdfs(); +Status ARROW_EXPORT HaveLibHdfs(); +Status ARROW_EXPORT HaveLibHdfs3(); } // namespace io } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/cfde4607/cpp/src/arrow/io/io-hdfs-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/io-hdfs-test.cc b/cpp/src/arrow/io/io-hdfs-test.cc index e07eaa3..4ef47b8 100644 --- a/cpp/src/arrow/io/io-hdfs-test.cc +++ b/cpp/src/arrow/io/io-hdfs-test.cc @@ -24,6 +24,7 @@ #include <boost/filesystem.hpp> // NOLINT +#include "arrow/io/hdfs-internal.h" #include "arrow/io/hdfs.h" #include "arrow/status.h" #include "arrow/test-util.h" @@ -37,6 +38,7 @@ std::vector<uint8_t> RandomData(int64_t size) { return buffer; } +template <typename DRIVER> class TestHdfsClient : public ::testing::Test { public: Status MakeScratchDir() { @@ -71,15 +73,34 @@ class TestHdfsClient : public ::testing::Test { return ss.str(); } - protected: // Set up shared state between unit tests - static void SetUpTestCase() { - if (!ConnectLibHdfs().ok()) { - std::cout << "Loading libhdfs failed, skipping tests gracefully" << std::endl; - return; + void SetUp() { + LibHdfsShim* driver_shim; + + client_ = nullptr; + scratch_dir_ = + boost::filesystem::unique_path("/tmp/arrow-hdfs/scratch-%%%%").native(); + + loaded_driver_ = false; + + Status msg; + + if (DRIVER::type == HdfsDriver::LIBHDFS) { + msg = ConnectLibHdfs(&driver_shim); + if (!msg.ok()) { + std::cout << "Loading libhdfs failed, skipping tests gracefully" << std::endl; + return; + } + } else { + msg = ConnectLibHdfs3(&driver_shim); + if (!msg.ok()) { + std::cout << "Loading libhdfs3 failed, skipping tests gracefully. " + << msg.ToString() << std::endl; + return; + } } - loaded_libhdfs_ = true; + loaded_driver_ = true; const char* host = std::getenv("ARROW_HDFS_TEST_HOST"); const char* port = std::getenv("ARROW_HDFS_TEST_PORT"); @@ -94,151 +115,159 @@ class TestHdfsClient : public ::testing::Test { ASSERT_OK(HdfsClient::Connect(&conf_, &client_)); } - static void TearDownTestCase() { + void TearDown() { if (client_) { - EXPECT_OK(client_->Delete(scratch_dir_, true)); + if (client_->Exists(scratch_dir_)) { + EXPECT_OK(client_->Delete(scratch_dir_, true)); + } EXPECT_OK(client_->Disconnect()); } } - static bool loaded_libhdfs_; + HdfsConnectionConfig conf_; + bool loaded_driver_; // Resources shared amongst unit tests - static HdfsConnectionConfig conf_; - static std::string scratch_dir_; - static std::shared_ptr<HdfsClient> client_; + std::string scratch_dir_; + std::shared_ptr<HdfsClient> client_; }; -bool TestHdfsClient::loaded_libhdfs_ = false; -HdfsConnectionConfig TestHdfsClient::conf_ = HdfsConnectionConfig(); +#define SKIP_IF_NO_DRIVER() \ + if (!this->loaded_driver_) { \ + std::cout << "Driver not loaded, skipping" << std::endl; \ + return; \ + } -std::string TestHdfsClient::scratch_dir_ = - boost::filesystem::unique_path("/tmp/arrow-hdfs/scratch-%%%%").native(); +struct JNIDriver { + static HdfsDriver type; +}; -std::shared_ptr<HdfsClient> TestHdfsClient::client_ = nullptr; +struct PivotalDriver { + static HdfsDriver type; +}; -#define SKIP_IF_NO_LIBHDFS() \ - if (!loaded_libhdfs_) { \ - std::cout << "No libhdfs, skipping" << std::endl; \ - return; \ - } +HdfsDriver JNIDriver::type = HdfsDriver::LIBHDFS; +HdfsDriver PivotalDriver::type = HdfsDriver::LIBHDFS3; + +typedef ::testing::Types<JNIDriver, PivotalDriver> DriverTypes; +TYPED_TEST_CASE(TestHdfsClient, DriverTypes); -TEST_F(TestHdfsClient, ConnectsAgain) { - SKIP_IF_NO_LIBHDFS(); +TYPED_TEST(TestHdfsClient, ConnectsAgain) { + SKIP_IF_NO_DRIVER(); std::shared_ptr<HdfsClient> client; - ASSERT_OK(HdfsClient::Connect(&conf_, &client)); + ASSERT_OK(HdfsClient::Connect(&this->conf_, &client)); ASSERT_OK(client->Disconnect()); } -TEST_F(TestHdfsClient, CreateDirectory) { - SKIP_IF_NO_LIBHDFS(); +TYPED_TEST(TestHdfsClient, CreateDirectory) { + SKIP_IF_NO_DRIVER(); - std::string path = ScratchPath("create-directory"); + std::string path = this->ScratchPath("create-directory"); - if (client_->Exists(path)) { ASSERT_OK(client_->Delete(path, true)); } + if (this->client_->Exists(path)) { ASSERT_OK(this->client_->Delete(path, true)); } - ASSERT_OK(client_->CreateDirectory(path)); - ASSERT_TRUE(client_->Exists(path)); - EXPECT_OK(client_->Delete(path, true)); - ASSERT_FALSE(client_->Exists(path)); + ASSERT_OK(this->client_->CreateDirectory(path)); + ASSERT_TRUE(this->client_->Exists(path)); + EXPECT_OK(this->client_->Delete(path, true)); + ASSERT_FALSE(this->client_->Exists(path)); } -TEST_F(TestHdfsClient, GetCapacityUsed) { - SKIP_IF_NO_LIBHDFS(); +TYPED_TEST(TestHdfsClient, GetCapacityUsed) { + SKIP_IF_NO_DRIVER(); // Who knows what is actually in your DFS cluster, but expect it to have // positive used bytes and capacity int64_t nbytes = 0; - ASSERT_OK(client_->GetCapacity(&nbytes)); + ASSERT_OK(this->client_->GetCapacity(&nbytes)); ASSERT_LT(0, nbytes); - ASSERT_OK(client_->GetUsed(&nbytes)); + ASSERT_OK(this->client_->GetUsed(&nbytes)); ASSERT_LT(0, nbytes); } -TEST_F(TestHdfsClient, GetPathInfo) { - SKIP_IF_NO_LIBHDFS(); +TYPED_TEST(TestHdfsClient, GetPathInfo) { + SKIP_IF_NO_DRIVER(); HdfsPathInfo info; - ASSERT_OK(MakeScratchDir()); + ASSERT_OK(this->MakeScratchDir()); // Directory info - ASSERT_OK(client_->GetPathInfo(scratch_dir_, &info)); + ASSERT_OK(this->client_->GetPathInfo(this->scratch_dir_, &info)); ASSERT_EQ(ObjectType::DIRECTORY, info.kind); - ASSERT_EQ(HdfsAbsPath(scratch_dir_), info.name); - ASSERT_EQ(conf_.user, info.owner); + ASSERT_EQ(this->HdfsAbsPath(this->scratch_dir_), info.name); + ASSERT_EQ(this->conf_.user, info.owner); // TODO(wesm): test group, other attrs - auto path = ScratchPath("test-file"); + auto path = this->ScratchPath("test-file"); const int size = 100; std::vector<uint8_t> buffer = RandomData(size); - ASSERT_OK(WriteDummyFile(path, buffer.data(), size)); - ASSERT_OK(client_->GetPathInfo(path, &info)); + ASSERT_OK(this->WriteDummyFile(path, buffer.data(), size)); + ASSERT_OK(this->client_->GetPathInfo(path, &info)); ASSERT_EQ(ObjectType::FILE, info.kind); - ASSERT_EQ(HdfsAbsPath(path), info.name); - ASSERT_EQ(conf_.user, info.owner); + ASSERT_EQ(this->HdfsAbsPath(path), info.name); + ASSERT_EQ(this->conf_.user, info.owner); ASSERT_EQ(size, info.size); } -TEST_F(TestHdfsClient, AppendToFile) { - SKIP_IF_NO_LIBHDFS(); +TYPED_TEST(TestHdfsClient, AppendToFile) { + SKIP_IF_NO_DRIVER(); - ASSERT_OK(MakeScratchDir()); + ASSERT_OK(this->MakeScratchDir()); - auto path = ScratchPath("test-file"); + auto path = this->ScratchPath("test-file"); const int size = 100; std::vector<uint8_t> buffer = RandomData(size); - ASSERT_OK(WriteDummyFile(path, buffer.data(), size)); + ASSERT_OK(this->WriteDummyFile(path, buffer.data(), size)); // now append - ASSERT_OK(WriteDummyFile(path, buffer.data(), size, true)); + ASSERT_OK(this->WriteDummyFile(path, buffer.data(), size, true)); HdfsPathInfo info; - ASSERT_OK(client_->GetPathInfo(path, &info)); + ASSERT_OK(this->client_->GetPathInfo(path, &info)); ASSERT_EQ(size * 2, info.size); } -TEST_F(TestHdfsClient, ListDirectory) { - SKIP_IF_NO_LIBHDFS(); +TYPED_TEST(TestHdfsClient, ListDirectory) { + SKIP_IF_NO_DRIVER(); const int size = 100; std::vector<uint8_t> data = RandomData(size); - auto p1 = ScratchPath("test-file-1"); - auto p2 = ScratchPath("test-file-2"); - auto d1 = ScratchPath("test-dir-1"); + auto p1 = this->ScratchPath("test-file-1"); + auto p2 = this->ScratchPath("test-file-2"); + auto d1 = this->ScratchPath("test-dir-1"); - ASSERT_OK(MakeScratchDir()); - ASSERT_OK(WriteDummyFile(p1, data.data(), size)); - ASSERT_OK(WriteDummyFile(p2, data.data(), size / 2)); - ASSERT_OK(client_->CreateDirectory(d1)); + ASSERT_OK(this->MakeScratchDir()); + ASSERT_OK(this->WriteDummyFile(p1, data.data(), size)); + ASSERT_OK(this->WriteDummyFile(p2, data.data(), size / 2)); + ASSERT_OK(this->client_->CreateDirectory(d1)); std::vector<HdfsPathInfo> listing; - ASSERT_OK(client_->ListDirectory(scratch_dir_, &listing)); + ASSERT_OK(this->client_->ListDirectory(this->scratch_dir_, &listing)); // Do it again, appends! - ASSERT_OK(client_->ListDirectory(scratch_dir_, &listing)); + ASSERT_OK(this->client_->ListDirectory(this->scratch_dir_, &listing)); ASSERT_EQ(6, static_cast<int>(listing.size())); // Argh, well, shouldn't expect the listing to be in any particular order for (size_t i = 0; i < listing.size(); ++i) { const HdfsPathInfo& info = listing[i]; - if (info.name == HdfsAbsPath(p1)) { + if (info.name == this->HdfsAbsPath(p1)) { ASSERT_EQ(ObjectType::FILE, info.kind); ASSERT_EQ(size, info.size); - } else if (info.name == HdfsAbsPath(p2)) { + } else if (info.name == this->HdfsAbsPath(p2)) { ASSERT_EQ(ObjectType::FILE, info.kind); ASSERT_EQ(size / 2, info.size); - } else if (info.name == HdfsAbsPath(d1)) { + } else if (info.name == this->HdfsAbsPath(d1)) { ASSERT_EQ(ObjectType::DIRECTORY, info.kind); } else { FAIL() << "Unexpected path: " << info.name; @@ -246,19 +275,19 @@ TEST_F(TestHdfsClient, ListDirectory) { } } -TEST_F(TestHdfsClient, ReadableMethods) { - SKIP_IF_NO_LIBHDFS(); +TYPED_TEST(TestHdfsClient, ReadableMethods) { + SKIP_IF_NO_DRIVER(); - ASSERT_OK(MakeScratchDir()); + ASSERT_OK(this->MakeScratchDir()); - auto path = ScratchPath("test-file"); + auto path = this->ScratchPath("test-file"); const int size = 100; std::vector<uint8_t> data = RandomData(size); - ASSERT_OK(WriteDummyFile(path, data.data(), size)); + ASSERT_OK(this->WriteDummyFile(path, data.data(), size)); std::shared_ptr<HdfsReadableFile> file; - ASSERT_OK(client_->OpenReadable(path, &file)); + ASSERT_OK(this->client_->OpenReadable(path, &file)); // Test GetSize -- move this into its own unit test if ever needed int64_t file_size; @@ -293,19 +322,19 @@ TEST_F(TestHdfsClient, ReadableMethods) { ASSERT_EQ(60, position); } -TEST_F(TestHdfsClient, LargeFile) { - SKIP_IF_NO_LIBHDFS(); +TYPED_TEST(TestHdfsClient, LargeFile) { + SKIP_IF_NO_DRIVER(); - ASSERT_OK(MakeScratchDir()); + ASSERT_OK(this->MakeScratchDir()); - auto path = ScratchPath("test-large-file"); + auto path = this->ScratchPath("test-large-file"); const int size = 1000000; std::vector<uint8_t> data = RandomData(size); - ASSERT_OK(WriteDummyFile(path, data.data(), size)); + ASSERT_OK(this->WriteDummyFile(path, data.data(), size)); std::shared_ptr<HdfsReadableFile> file; - ASSERT_OK(client_->OpenReadable(path, &file)); + ASSERT_OK(this->client_->OpenReadable(path, &file)); auto buffer = std::make_shared<PoolBuffer>(); ASSERT_OK(buffer->Resize(size)); @@ -317,7 +346,7 @@ TEST_F(TestHdfsClient, LargeFile) { // explicit buffer size std::shared_ptr<HdfsReadableFile> file2; - ASSERT_OK(client_->OpenReadable(path, 1 << 18, &file2)); + ASSERT_OK(this->client_->OpenReadable(path, 1 << 18, &file2)); auto buffer2 = std::make_shared<PoolBuffer>(); ASSERT_OK(buffer2->Resize(size)); @@ -326,22 +355,22 @@ TEST_F(TestHdfsClient, LargeFile) { ASSERT_EQ(size, bytes_read); } -TEST_F(TestHdfsClient, RenameFile) { - SKIP_IF_NO_LIBHDFS(); +TYPED_TEST(TestHdfsClient, RenameFile) { + SKIP_IF_NO_DRIVER(); - ASSERT_OK(MakeScratchDir()); + ASSERT_OK(this->MakeScratchDir()); - auto src_path = ScratchPath("src-file"); - auto dst_path = ScratchPath("dst-file"); + auto src_path = this->ScratchPath("src-file"); + auto dst_path = this->ScratchPath("dst-file"); const int size = 100; std::vector<uint8_t> data = RandomData(size); - ASSERT_OK(WriteDummyFile(src_path, data.data(), size)); + ASSERT_OK(this->WriteDummyFile(src_path, data.data(), size)); - ASSERT_OK(client_->Rename(src_path, dst_path)); + ASSERT_OK(this->client_->Rename(src_path, dst_path)); - ASSERT_FALSE(client_->Exists(src_path)); - ASSERT_TRUE(client_->Exists(dst_path)); + ASSERT_FALSE(this->client_->Exists(src_path)); + ASSERT_TRUE(this->client_->Exists(dst_path)); } } // namespace io http://git-wip-us.apache.org/repos/asf/arrow/blob/cfde4607/cpp/src/arrow/io/libhdfs_shim.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/libhdfs_shim.cc b/cpp/src/arrow/io/libhdfs_shim.cc deleted file mode 100644 index 3715376..0000000 --- a/cpp/src/arrow/io/libhdfs_shim.cc +++ /dev/null @@ -1,582 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -// This shim interface to libhdfs (for runtime shared library loading) has been -// adapted from the SFrame project, released under the ASF-compatible 3-clause -// BSD license -// -// Using this required having the $JAVA_HOME and $HADOOP_HOME environment -// variables set, so that libjvm and libhdfs can be located easily - -// Copyright (C) 2015 Dato, Inc. -// All rights reserved. -// -// This software may be modified and distributed under the terms -// of the BSD license. See the LICENSE file for details. - -#ifdef HAS_HADOOP - -#ifndef _WIN32 -#include <dlfcn.h> -#else -#include <windows.h> -#include <winsock2.h> - -// TODO(wesm): address when/if we add windows support -// #include <util/syserr_reporting.hpp> -#endif - -extern "C" { -#include <hdfs.h> -} - -#include <iostream> -#include <mutex> -#include <sstream> -#include <string> -#include <type_traits> -#include <vector> - -#include <boost/filesystem.hpp> // NOLINT - -#include "arrow/status.h" -#include "arrow/util/visibility.h" - -namespace fs = boost::filesystem; - -extern "C" { - -#ifndef _WIN32 -static void* libhdfs_handle = NULL; -static void* libjvm_handle = NULL; -#else -static HINSTANCE libhdfs_handle = NULL; -static HINSTANCE libjvm_handle = NULL; -#endif -/* - * All the shim pointers - */ - -// NOTE(wesm): cpplint does not like use of short and other imprecise C types - -static hdfsBuilder* (*ptr_hdfsNewBuilder)(void) = NULL; -static void (*ptr_hdfsBuilderSetNameNode)(hdfsBuilder* bld, const char* nn) = NULL; -static void (*ptr_hdfsBuilderSetNameNodePort)(hdfsBuilder* bld, tPort port) = NULL; -static void (*ptr_hdfsBuilderSetUserName)(hdfsBuilder* bld, const char* userName) = NULL; -static void (*ptr_hdfsBuilderSetKerbTicketCachePath)( - hdfsBuilder* bld, const char* kerbTicketCachePath) = NULL; -static hdfsFS (*ptr_hdfsBuilderConnect)(hdfsBuilder* bld) = NULL; - -static int (*ptr_hdfsDisconnect)(hdfsFS fs) = NULL; - -static hdfsFile (*ptr_hdfsOpenFile)(hdfsFS fs, const char* path, int flags, - int bufferSize, short replication, tSize blocksize) = NULL; // NOLINT - -static int (*ptr_hdfsCloseFile)(hdfsFS fs, hdfsFile file) = NULL; -static int (*ptr_hdfsExists)(hdfsFS fs, const char* path) = NULL; -static int (*ptr_hdfsSeek)(hdfsFS fs, hdfsFile file, tOffset desiredPos) = NULL; -static tOffset (*ptr_hdfsTell)(hdfsFS fs, hdfsFile file) = NULL; -static tSize (*ptr_hdfsRead)(hdfsFS fs, hdfsFile file, void* buffer, tSize length) = NULL; -static tSize (*ptr_hdfsPread)( - hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length) = NULL; -static tSize (*ptr_hdfsWrite)( - hdfsFS fs, hdfsFile file, const void* buffer, tSize length) = NULL; -static int (*ptr_hdfsFlush)(hdfsFS fs, hdfsFile file) = NULL; -static int (*ptr_hdfsAvailable)(hdfsFS fs, hdfsFile file) = NULL; -static int (*ptr_hdfsCopy)( - hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) = NULL; -static int (*ptr_hdfsMove)( - hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) = NULL; -static int (*ptr_hdfsDelete)(hdfsFS fs, const char* path, int recursive) = NULL; -static int (*ptr_hdfsRename)(hdfsFS fs, const char* oldPath, const char* newPath) = NULL; -static char* (*ptr_hdfsGetWorkingDirectory)( - hdfsFS fs, char* buffer, size_t bufferSize) = NULL; -static int (*ptr_hdfsSetWorkingDirectory)(hdfsFS fs, const char* path) = NULL; -static int (*ptr_hdfsCreateDirectory)(hdfsFS fs, const char* path) = NULL; -static int (*ptr_hdfsSetReplication)( - hdfsFS fs, const char* path, int16_t replication) = NULL; -static hdfsFileInfo* (*ptr_hdfsListDirectory)( - hdfsFS fs, const char* path, int* numEntries) = NULL; -static hdfsFileInfo* (*ptr_hdfsGetPathInfo)(hdfsFS fs, const char* path) = NULL; -static void (*ptr_hdfsFreeFileInfo)(hdfsFileInfo* hdfsFileInfo, int numEntries) = NULL; -static char*** (*ptr_hdfsGetHosts)( - hdfsFS fs, const char* path, tOffset start, tOffset length) = NULL; -static void (*ptr_hdfsFreeHosts)(char*** blockHosts) = NULL; -static tOffset (*ptr_hdfsGetDefaultBlockSize)(hdfsFS fs) = NULL; -static tOffset (*ptr_hdfsGetCapacity)(hdfsFS fs) = NULL; -static tOffset (*ptr_hdfsGetUsed)(hdfsFS fs) = NULL; -static int (*ptr_hdfsChown)( - hdfsFS fs, const char* path, const char* owner, const char* group) = NULL; -static int (*ptr_hdfsChmod)(hdfsFS fs, const char* path, short mode) = NULL; // NOLINT -static int (*ptr_hdfsUtime)(hdfsFS fs, const char* path, tTime mtime, tTime atime) = NULL; - -// Helper functions for dlopens -static std::vector<fs::path> get_potential_libjvm_paths(); -static std::vector<fs::path> get_potential_libhdfs_paths(); -static arrow::Status try_dlopen(std::vector<fs::path> potential_paths, const char* name, -#ifndef _WIN32 - void*& out_handle); -#else - HINSTANCE& out_handle); -#endif - -#define GET_SYMBOL(SYMBOL_NAME) \ - if (!ptr_##SYMBOL_NAME) { \ - *reinterpret_cast<void**>(&ptr_##SYMBOL_NAME) = get_symbol("" #SYMBOL_NAME); \ - } - -static void* get_symbol(const char* symbol) { - if (libhdfs_handle == NULL) return NULL; -#ifndef _WIN32 - return dlsym(libhdfs_handle, symbol); -#else - - void* ret = reinterpret_cast<void*>(GetProcAddress(libhdfs_handle, symbol)); - if (ret == NULL) { - // logstream(LOG_INFO) << "GetProcAddress error: " - // << get_last_err_str(GetLastError()) << std::endl; - } - return ret; -#endif -} - -hdfsBuilder* hdfsNewBuilder(void) { - return ptr_hdfsNewBuilder(); -} - -void hdfsBuilderSetNameNode(hdfsBuilder* bld, const char* nn) { - ptr_hdfsBuilderSetNameNode(bld, nn); -} - -void hdfsBuilderSetNameNodePort(hdfsBuilder* bld, tPort port) { - ptr_hdfsBuilderSetNameNodePort(bld, port); -} - -void hdfsBuilderSetUserName(hdfsBuilder* bld, const char* userName) { - ptr_hdfsBuilderSetUserName(bld, userName); -} - -void hdfsBuilderSetKerbTicketCachePath( - hdfsBuilder* bld, const char* kerbTicketCachePath) { - ptr_hdfsBuilderSetKerbTicketCachePath(bld, kerbTicketCachePath); -} - -hdfsFS hdfsBuilderConnect(hdfsBuilder* bld) { - return ptr_hdfsBuilderConnect(bld); -} - -int hdfsDisconnect(hdfsFS fs) { - return ptr_hdfsDisconnect(fs); -} - -hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags, int bufferSize, - short replication, tSize blocksize) { // NOLINT - return ptr_hdfsOpenFile(fs, path, flags, bufferSize, replication, blocksize); -} - -int hdfsCloseFile(hdfsFS fs, hdfsFile file) { - return ptr_hdfsCloseFile(fs, file); -} - -int hdfsExists(hdfsFS fs, const char* path) { - return ptr_hdfsExists(fs, path); -} - -int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) { - return ptr_hdfsSeek(fs, file, desiredPos); -} - -tOffset hdfsTell(hdfsFS fs, hdfsFile file) { - return ptr_hdfsTell(fs, file); -} - -tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length) { - return ptr_hdfsRead(fs, file, buffer, length); -} - -tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length) { - return ptr_hdfsPread(fs, file, position, buffer, length); -} - -tSize hdfsWrite(hdfsFS fs, hdfsFile file, const void* buffer, tSize length) { - return ptr_hdfsWrite(fs, file, buffer, length); -} - -int hdfsFlush(hdfsFS fs, hdfsFile file) { - return ptr_hdfsFlush(fs, file); -} - -int hdfsAvailable(hdfsFS fs, hdfsFile file) { - GET_SYMBOL(hdfsAvailable); - if (ptr_hdfsAvailable) - return ptr_hdfsAvailable(fs, file); - else - return 0; -} - -int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) { - GET_SYMBOL(hdfsCopy); - if (ptr_hdfsCopy) - return ptr_hdfsCopy(srcFS, src, dstFS, dst); - else - return 0; -} - -int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) { - GET_SYMBOL(hdfsMove); - if (ptr_hdfsMove) - return ptr_hdfsMove(srcFS, src, dstFS, dst); - else - return 0; -} - -int hdfsDelete(hdfsFS fs, const char* path, int recursive) { - return ptr_hdfsDelete(fs, path, recursive); -} - -int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath) { - GET_SYMBOL(hdfsRename); - if (ptr_hdfsRename) - return ptr_hdfsRename(fs, oldPath, newPath); - else - return 0; -} - -char* hdfsGetWorkingDirectory(hdfsFS fs, char* buffer, size_t bufferSize) { - GET_SYMBOL(hdfsGetWorkingDirectory); - if (ptr_hdfsGetWorkingDirectory) { - return ptr_hdfsGetWorkingDirectory(fs, buffer, bufferSize); - } else { - return NULL; - } -} - -int hdfsSetWorkingDirectory(hdfsFS fs, const char* path) { - GET_SYMBOL(hdfsSetWorkingDirectory); - if (ptr_hdfsSetWorkingDirectory) { - return ptr_hdfsSetWorkingDirectory(fs, path); - } else { - return 0; - } -} - -int hdfsCreateDirectory(hdfsFS fs, const char* path) { - return ptr_hdfsCreateDirectory(fs, path); -} - -int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication) { - GET_SYMBOL(hdfsSetReplication); - if (ptr_hdfsSetReplication) { - return ptr_hdfsSetReplication(fs, path, replication); - } else { - return 0; - } -} - -hdfsFileInfo* hdfsListDirectory(hdfsFS fs, const char* path, int* numEntries) { - return ptr_hdfsListDirectory(fs, path, numEntries); -} - -hdfsFileInfo* hdfsGetPathInfo(hdfsFS fs, const char* path) { - return ptr_hdfsGetPathInfo(fs, path); -} - -void hdfsFreeFileInfo(hdfsFileInfo* hdfsFileInfo, int numEntries) { - ptr_hdfsFreeFileInfo(hdfsFileInfo, numEntries); -} - -char*** hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length) { - GET_SYMBOL(hdfsGetHosts); - if (ptr_hdfsGetHosts) { - return ptr_hdfsGetHosts(fs, path, start, length); - } else { - return NULL; - } -} - -void hdfsFreeHosts(char*** blockHosts) { - GET_SYMBOL(hdfsFreeHosts); - if (ptr_hdfsFreeHosts) { ptr_hdfsFreeHosts(blockHosts); } -} - -tOffset hdfsGetDefaultBlockSize(hdfsFS fs) { - GET_SYMBOL(hdfsGetDefaultBlockSize); - if (ptr_hdfsGetDefaultBlockSize) { - return ptr_hdfsGetDefaultBlockSize(fs); - } else { - return 0; - } -} - -tOffset hdfsGetCapacity(hdfsFS fs) { - return ptr_hdfsGetCapacity(fs); -} - -tOffset hdfsGetUsed(hdfsFS fs) { - return ptr_hdfsGetUsed(fs); -} - -int hdfsChown(hdfsFS fs, const char* path, const char* owner, const char* group) { - GET_SYMBOL(hdfsChown); - if (ptr_hdfsChown) { - return ptr_hdfsChown(fs, path, owner, group); - } else { - return 0; - } -} - -int hdfsChmod(hdfsFS fs, const char* path, short mode) { // NOLINT - GET_SYMBOL(hdfsChmod); - if (ptr_hdfsChmod) { - return ptr_hdfsChmod(fs, path, mode); - } else { - return 0; - } -} - -int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime) { - GET_SYMBOL(hdfsUtime); - if (ptr_hdfsUtime) { - return ptr_hdfsUtime(fs, path, mtime, atime); - } else { - return 0; - } -} - -static std::vector<fs::path> get_potential_libhdfs_paths() { - std::vector<fs::path> libhdfs_potential_paths; - std::string file_name; - -// OS-specific file name -#ifdef __WIN32 - file_name = "hdfs.dll"; -#elif __APPLE__ - file_name = "libhdfs.dylib"; -#else - file_name = "libhdfs.so"; -#endif - - // Common paths - std::vector<fs::path> search_paths = {fs::path(""), fs::path(".")}; - - // Path from environment variable - const char* hadoop_home = std::getenv("HADOOP_HOME"); - if (hadoop_home != nullptr) { - auto path = fs::path(hadoop_home) / "lib/native"; - search_paths.push_back(path); - } - - const char* libhdfs_dir = std::getenv("ARROW_LIBHDFS_DIR"); - if (libhdfs_dir != nullptr) { search_paths.push_back(fs::path(libhdfs_dir)); } - - // All paths with file name - for (auto& path : search_paths) { - libhdfs_potential_paths.push_back(path / file_name); - } - - return libhdfs_potential_paths; -} - -static std::vector<fs::path> get_potential_libjvm_paths() { - std::vector<fs::path> libjvm_potential_paths; - - std::vector<fs::path> search_prefixes; - std::vector<fs::path> search_suffixes; - std::string file_name; - -// From heuristics -#ifdef __WIN32 - search_prefixes = {""}; - search_suffixes = {"/jre/bin/server", "/bin/server"}; - file_name = "jvm.dll"; -#elif __APPLE__ - search_prefixes = {""}; - search_suffixes = {"", "/jre/lib/server"}; - file_name = "libjvm.dylib"; - -// SFrame uses /usr/libexec/java_home to find JAVA_HOME; for now we are -// expecting users to set an environment variable -#else - search_prefixes = { - "/usr/lib/jvm/default-java", // ubuntu / debian distros - "/usr/lib/jvm/java", // rhel6 - "/usr/lib/jvm", // centos6 - "/usr/lib64/jvm", // opensuse 13 - "/usr/local/lib/jvm/default-java", // alt ubuntu / debian distros - "/usr/local/lib/jvm/java", // alt rhel6 - "/usr/local/lib/jvm", // alt centos6 - "/usr/local/lib64/jvm", // alt opensuse 13 - "/usr/local/lib/jvm/java-7-openjdk-amd64", // alt ubuntu / debian distros - "/usr/lib/jvm/java-7-openjdk-amd64", // alt ubuntu / debian distros - "/usr/local/lib/jvm/java-6-openjdk-amd64", // alt ubuntu / debian distros - "/usr/lib/jvm/java-6-openjdk-amd64", // alt ubuntu / debian distros - "/usr/lib/jvm/java-7-oracle", // alt ubuntu - "/usr/lib/jvm/java-8-oracle", // alt ubuntu - "/usr/lib/jvm/java-6-oracle", // alt ubuntu - "/usr/local/lib/jvm/java-7-oracle", // alt ubuntu - "/usr/local/lib/jvm/java-8-oracle", // alt ubuntu - "/usr/local/lib/jvm/java-6-oracle", // alt ubuntu - "/usr/lib/jvm/default", // alt centos - "/usr/java/latest", // alt centos - }; - search_suffixes = {"/jre/lib/amd64/server"}; - file_name = "libjvm.so"; -#endif - // From direct environment variable - char* env_value = NULL; - if ((env_value = getenv("JAVA_HOME")) != NULL) { - // logstream(LOG_INFO) << "Found environment variable " << env_name << ": " << - // env_value << std::endl; - search_prefixes.insert(search_prefixes.begin(), env_value); - } - - // Generate cross product between search_prefixes, search_suffixes, and file_name - for (auto& prefix : search_prefixes) { - for (auto& suffix : search_suffixes) { - auto path = (fs::path(prefix) / fs::path(suffix) / fs::path(file_name)); - libjvm_potential_paths.push_back(path); - } - } - - return libjvm_potential_paths; -} - -#ifndef _WIN32 -static arrow::Status try_dlopen( - std::vector<fs::path> potential_paths, const char* name, void*& out_handle) { - std::vector<std::string> error_messages; - - for (auto& i : potential_paths) { - i.make_preferred(); - // logstream(LOG_INFO) << "Trying " << i.string().c_str() << std::endl; - out_handle = dlopen(i.native().c_str(), RTLD_NOW | RTLD_LOCAL); - - if (out_handle != NULL) { - // logstream(LOG_INFO) << "Success!" << std::endl; - break; - } else { - const char* err_msg = dlerror(); - if (err_msg != NULL) { - error_messages.push_back(std::string(err_msg)); - } else { - error_messages.push_back(std::string(" returned NULL")); - } - } - } - - if (out_handle == NULL) { - std::stringstream ss; - ss << "Unable to load " << name; - return arrow::Status::IOError(ss.str()); - } - - return arrow::Status::OK(); -} - -#else -static arrow::Status try_dlopen( - std::vector<fs::path> potential_paths, const char* name, HINSTANCE& out_handle) { - std::vector<std::string> error_messages; - - for (auto& i : potential_paths) { - i.make_preferred(); - // logstream(LOG_INFO) << "Trying " << i.string().c_str() << std::endl; - - out_handle = LoadLibrary(i.string().c_str()); - - if (out_handle != NULL) { - // logstream(LOG_INFO) << "Success!" << std::endl; - break; - } else { - // error_messages.push_back(get_last_err_str(GetLastError())); - } - } - - if (out_handle == NULL) { - std::stringstream ss; - ss << "Unable to load " << name; - return arrow::Status::IOError(ss.str()); - } - - return arrow::Status::OK(); -} -#endif // _WIN32 - -} // extern "C" - -#define GET_SYMBOL_REQUIRED(SYMBOL_NAME) \ - do { \ - if (!ptr_##SYMBOL_NAME) { \ - *reinterpret_cast<void**>(&ptr_##SYMBOL_NAME) = get_symbol("" #SYMBOL_NAME); \ - } \ - if (!ptr_##SYMBOL_NAME) \ - return Status::IOError("Getting symbol " #SYMBOL_NAME "failed"); \ - } while (0) - -namespace arrow { -namespace io { - -Status ARROW_EXPORT ConnectLibHdfs() { - static std::mutex lock; - std::lock_guard<std::mutex> guard(lock); - - static bool shim_attempted = false; - if (!shim_attempted) { - shim_attempted = true; - - std::vector<fs::path> libjvm_potential_paths = get_potential_libjvm_paths(); - RETURN_NOT_OK(try_dlopen(libjvm_potential_paths, "libjvm", libjvm_handle)); - - std::vector<fs::path> libhdfs_potential_paths = get_potential_libhdfs_paths(); - RETURN_NOT_OK(try_dlopen(libhdfs_potential_paths, "libhdfs", libhdfs_handle)); - } else if (libhdfs_handle == nullptr) { - return Status::IOError("Prior attempt to load libhdfs failed"); - } - - GET_SYMBOL_REQUIRED(hdfsNewBuilder); - GET_SYMBOL_REQUIRED(hdfsBuilderSetNameNode); - GET_SYMBOL_REQUIRED(hdfsBuilderSetNameNodePort); - GET_SYMBOL_REQUIRED(hdfsBuilderSetUserName); - GET_SYMBOL_REQUIRED(hdfsBuilderSetKerbTicketCachePath); - GET_SYMBOL_REQUIRED(hdfsBuilderConnect); - GET_SYMBOL_REQUIRED(hdfsCreateDirectory); - GET_SYMBOL_REQUIRED(hdfsDelete); - GET_SYMBOL_REQUIRED(hdfsDisconnect); - GET_SYMBOL_REQUIRED(hdfsExists); - GET_SYMBOL_REQUIRED(hdfsFreeFileInfo); - GET_SYMBOL_REQUIRED(hdfsGetCapacity); - GET_SYMBOL_REQUIRED(hdfsGetUsed); - GET_SYMBOL_REQUIRED(hdfsGetPathInfo); - GET_SYMBOL_REQUIRED(hdfsListDirectory); - - // File methods - GET_SYMBOL_REQUIRED(hdfsCloseFile); - GET_SYMBOL_REQUIRED(hdfsFlush); - GET_SYMBOL_REQUIRED(hdfsOpenFile); - GET_SYMBOL_REQUIRED(hdfsRead); - GET_SYMBOL_REQUIRED(hdfsPread); - GET_SYMBOL_REQUIRED(hdfsSeek); - GET_SYMBOL_REQUIRED(hdfsTell); - GET_SYMBOL_REQUIRED(hdfsWrite); - - return Status::OK(); -} - -} // namespace io -} // namespace arrow - -#endif // HAS_HADOOP http://git-wip-us.apache.org/repos/asf/arrow/blob/cfde4607/python/.gitignore ---------------------------------------------------------------------- diff --git a/python/.gitignore b/python/.gitignore index c37efc4..4ab8020 100644 --- a/python/.gitignore +++ b/python/.gitignore @@ -16,6 +16,7 @@ Testing/ *.c *.cpp pyarrow/version.py +pyarrow/table_api.h # Python files # setup.py working directory http://git-wip-us.apache.org/repos/asf/arrow/blob/cfde4607/python/pyarrow/includes/libarrow_io.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/libarrow_io.pxd b/python/pyarrow/includes/libarrow_io.pxd index 7703415..99f88ad 100644 --- a/python/pyarrow/includes/libarrow_io.pxd +++ b/python/pyarrow/includes/libarrow_io.pxd @@ -87,13 +87,19 @@ cdef extern from "arrow/io/file.h" namespace "arrow::io" nogil: cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil: - CStatus ConnectLibHdfs() + CStatus HaveLibHdfs() + CStatus HaveLibHdfs3() + + enum HdfsDriver" arrow::io::HdfsDriver": + HdfsDriver_LIBHDFS" arrow::io::HdfsDriver::LIBHDFS" + HdfsDriver_LIBHDFS3" arrow::io::HdfsDriver::LIBHDFS3" cdef cppclass HdfsConnectionConfig: c_string host int port c_string user c_string kerb_ticket + HdfsDriver driver cdef cppclass HdfsPathInfo: ObjectType kind; http://git-wip-us.apache.org/repos/asf/arrow/blob/cfde4607/python/pyarrow/io.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx index 2fa5fb6..6b0e392 100644 --- a/python/pyarrow/io.pyx +++ b/python/pyarrow/io.pyx @@ -35,6 +35,7 @@ cimport cpython as cp import re import sys import threading +import time cdef class NativeFile: @@ -269,7 +270,15 @@ except ImportError: def have_libhdfs(): try: - check_status(ConnectLibHdfs()) + check_status(HaveLibHdfs()) + return True + except: + return False + + +def have_libhdfs3(): + try: + check_status(HaveLibHdfs3()) return True except: return False @@ -313,7 +322,8 @@ cdef class HdfsClient: raise IOError('HDFS client is closed') @classmethod - def connect(cls, host="default", port=0, user=None, kerb_ticket=None): + def connect(cls, host="default", port=0, user=None, kerb_ticket=None, + driver='libhdfs'): """ Connect to an HDFS cluster. All parameters are optional and should only be set if the defaults need to be overridden. @@ -328,6 +338,9 @@ cdef class HdfsClient: port : NameNode's port. Set to 0 for default or logical (HA) nodes. user : Username when connecting to HDFS; None implies login user. kerb_ticket : Path to Kerberos ticket cache. + driver : {'libhdfs', 'libhdfs3'}, default 'libhdfs' + Connect using libhdfs (JNI-based) or libhdfs3 (3rd-party C++ + library from Pivotal Labs) Notes ----- @@ -350,6 +363,13 @@ cdef class HdfsClient: if kerb_ticket is not None: conf.kerb_ticket = tobytes(kerb_ticket) + if driver == 'libhdfs': + check_status(HaveLibHdfs()) + conf.driver = HdfsDriver_LIBHDFS + else: + check_status(HaveLibHdfs3()) + conf.driver = HdfsDriver_LIBHDFS3 + with nogil: check_status(CHdfsClient.Connect(&conf, &out.client)) out.is_open = True @@ -541,6 +561,12 @@ cdef class HdfsClient: if not buf: break + if writer_thread.is_alive(): + while write_queue.full(): + time.sleep(0.01) + else: + break + write_queue.put_nowait(buf) finally: done = True @@ -609,22 +635,13 @@ cdef class HdfsFile(NativeFile): cdef int64_t total_bytes = 0 - cdef int rpc_chunksize = min(self.buffer_size, nbytes) - try: with nogil: - while total_bytes < nbytes: - check_status(self.rd_file.get() - .Read(rpc_chunksize, &bytes_read, - buf + total_bytes)) - - total_bytes += bytes_read + check_status(self.rd_file.get() + .Read(nbytes, &bytes_read, buf)) - # EOF - if bytes_read == 0: - break result = cp.PyBytes_FromStringAndSize(<const char*>buf, - total_bytes) + bytes_read) finally: free(buf) http://git-wip-us.apache.org/repos/asf/arrow/blob/cfde4607/python/pyarrow/tests/test_hdfs.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_hdfs.py b/python/pyarrow/tests/test_hdfs.py index c23543b..73d5a66 100644 --- a/python/pyarrow/tests/test_hdfs.py +++ b/python/pyarrow/tests/test_hdfs.py @@ -19,6 +19,7 @@ from io import BytesIO from os.path import join as pjoin import os import random +import unittest import pytest @@ -28,7 +29,7 @@ import pyarrow.io as io # HDFS tests -def hdfs_test_client(): +def hdfs_test_client(driver='libhdfs'): host = os.environ.get('ARROW_HDFS_TEST_HOST', 'localhost') user = os.environ['ARROW_HDFS_TEST_USER'] try: @@ -37,115 +38,119 @@ def hdfs_test_client(): raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not ' 'an integer') - return io.HdfsClient.connect(host, port, user) + return io.HdfsClient.connect(host, port, user, driver=driver) -libhdfs = pytest.mark.skipif(not io.have_libhdfs(), - reason='No libhdfs available on system') +class HdfsTestCases(object): + def _make_test_file(self, hdfs, test_name, test_path, test_data): + base_path = pjoin(self.tmp_path, test_name) + hdfs.mkdir(base_path) -HDFS_TMP_PATH = '/tmp/pyarrow-test-{0}'.format(random.randint(0, 1000)) + full_path = pjoin(base_path, test_path) + with hdfs.open(full_path, 'wb') as f: + f.write(test_data) -@pytest.fixture(scope='session') -def hdfs(request): - fixture = hdfs_test_client() + return full_path - def teardown(): - fixture.delete(HDFS_TMP_PATH, recursive=True) - fixture.close() - request.addfinalizer(teardown) - return fixture + @classmethod + def setUpClass(cls): + cls.check_driver() + cls.hdfs = hdfs_test_client(cls.DRIVER) + cls.tmp_path = '/tmp/pyarrow-test-{0}'.format(random.randint(0, 1000)) + cls.hdfs.mkdir(cls.tmp_path) + @classmethod + def tearDownClass(cls): + cls.hdfs.delete(cls.tmp_path, recursive=True) + cls.hdfs.close() -@libhdfs -def test_hdfs_close(): - client = hdfs_test_client() - assert client.is_open - client.close() - assert not client.is_open + def test_hdfs_close(self): + client = hdfs_test_client() + assert client.is_open + client.close() + assert not client.is_open - with pytest.raises(Exception): - client.ls('/') + with pytest.raises(Exception): + client.ls('/') + def test_hdfs_mkdir(self): + path = pjoin(self.tmp_path, 'test-dir/test-dir') + parent_path = pjoin(self.tmp_path, 'test-dir') -@libhdfs -def test_hdfs_mkdir(hdfs): - path = pjoin(HDFS_TMP_PATH, 'test-dir/test-dir') - parent_path = pjoin(HDFS_TMP_PATH, 'test-dir') + self.hdfs.mkdir(path) + assert self.hdfs.exists(path) - hdfs.mkdir(path) - assert hdfs.exists(path) + self.hdfs.delete(parent_path, recursive=True) + assert not self.hdfs.exists(path) - hdfs.delete(parent_path, recursive=True) - assert not hdfs.exists(path) + def test_hdfs_ls(self): + base_path = pjoin(self.tmp_path, 'ls-test') + self.hdfs.mkdir(base_path) + dir_path = pjoin(base_path, 'a-dir') + f1_path = pjoin(base_path, 'a-file-1') -@libhdfs -def test_hdfs_ls(hdfs): - base_path = pjoin(HDFS_TMP_PATH, 'ls-test') - hdfs.mkdir(base_path) + self.hdfs.mkdir(dir_path) - dir_path = pjoin(base_path, 'a-dir') - f1_path = pjoin(base_path, 'a-file-1') + f = self.hdfs.open(f1_path, 'wb') + f.write('a' * 10) - hdfs.mkdir(dir_path) + contents = sorted(self.hdfs.ls(base_path, False)) + assert contents == [dir_path, f1_path] - f = hdfs.open(f1_path, 'wb') - f.write('a' * 10) + def test_hdfs_download_upload(self): + base_path = pjoin(self.tmp_path, 'upload-test') - contents = sorted(hdfs.ls(base_path, False)) - assert contents == [dir_path, f1_path] + data = b'foobarbaz' + buf = BytesIO(data) + buf.seek(0) + self.hdfs.upload(base_path, buf) -def _make_test_file(hdfs, test_name, test_path, test_data): - base_path = pjoin(HDFS_TMP_PATH, test_name) - hdfs.mkdir(base_path) + out_buf = BytesIO() + self.hdfs.download(base_path, out_buf) + out_buf.seek(0) + assert out_buf.getvalue() == data - full_path = pjoin(base_path, test_path) + def test_hdfs_file_context_manager(self): + path = pjoin(self.tmp_path, 'ctx-manager') - f = hdfs.open(full_path, 'wb') - f.write(test_data) + data = b'foo' + with self.hdfs.open(path, 'wb') as f: + f.write(data) - return full_path + with self.hdfs.open(path, 'rb') as f: + assert f.size() == 3 + result = f.read(10) + assert result == data -@libhdfs -def test_hdfs_orphaned_file(): - hdfs = hdfs_test_client() - file_path = _make_test_file(hdfs, 'orphaned_file_test', 'fname', - 'foobarbaz') +class TestLibHdfs(HdfsTestCases, unittest.TestCase): - f = hdfs.open(file_path) - hdfs = None - f = None # noqa + DRIVER = 'libhdfs' + @classmethod + def check_driver(cls): + if not io.have_libhdfs(): + pytest.skip('No libhdfs available on system') -@libhdfs -def test_hdfs_download_upload(hdfs): - base_path = pjoin(HDFS_TMP_PATH, 'upload-test') + def test_hdfs_orphaned_file(self): + hdfs = hdfs_test_client() + file_path = self._make_test_file(hdfs, 'orphaned_file_test', 'fname', + 'foobarbaz') - data = b'foobarbaz' - buf = BytesIO(data) - buf.seek(0) + f = hdfs.open(file_path) + hdfs = None + f = None # noqa - hdfs.upload(base_path, buf) - out_buf = BytesIO() - hdfs.download(base_path, out_buf) - out_buf.seek(0) - assert out_buf.getvalue() == data +class TestLibHdfs3(HdfsTestCases, unittest.TestCase): + DRIVER = 'libhdfs3' -@libhdfs -def test_hdfs_file_context_manager(hdfs): - path = pjoin(HDFS_TMP_PATH, 'ctx-manager') - - data = b'foo' - with hdfs.open(path, 'wb') as f: - f.write(data) - - with hdfs.open(path, 'rb') as f: - assert f.size() == 3 - result = f.read(10) - assert result == data + @classmethod + def check_driver(cls): + if not io.have_libhdfs3(): + pytest.skip('No libhdfs3 available on system')