This is an automated email from the ASF dual-hosted git repository. uwe pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new b1d1633 ARROW-2661: [Python] Adding the ability to programmatically pass hdfs configration key/value pairs via pyarrow b1d1633 is described below commit b1d1633c90d20d3cbcbcee9a29c3f057164bb53c Author: Matthew Topol <mto...@factset.com> AuthorDate: Mon Jun 4 18:32:45 2018 +0200 ARROW-2661: [Python] Adding the ability to programmatically pass hdfs configration key/value pairs via pyarrow https://issues.apache.org/jira/browse/ARROW-2661 Both the JNI and libhdfs3 support hdfsBuilderConfSetStr so we can utilize that to allow passing arbitrary configuration values for hdfs connection similiar to how https://hdfs3.readthedocs.io/en/latest/hdfs.html supports passing them. I've added a param called `extra_conf` to facilitate it in pyarrow, such as: ```python import pyarrow conf = {"dfs.nameservices": "nameservice1", "dfs.ha.namenodes.nameservice1": "namenode113,namenode188", "dfs.namenode.rpc-address.nameservice1.namenode113": "hostname_of_server1:8020", "dfs.namenode.rpc-address.nameservice1.namenode188": "hostname_of_server2:8020", "dfs.namenode.http-address.nameservice1.namenode188": "hostname_of_server1:50070", "dfs.namenode.http-address.nameservice1.namenode188": "hostname_of_server2:50070", "hadoop.security.authentication": "kerberos" } hdfs = pyarrow.hdfs.connect(host='nameservice1', driver='libhdfs3', extra_conf=conf) ``` Author: Matthew Topol <mto...@factset.com> Closes #2097 from zeroshade/configs and squashes the following commits: 047dd4b1 <Matthew Topol> forgot to use make format to fix the order of includes. oops d27e3c3e <Matthew Topol> switching to unordered_map 858b44bb <Matthew Topol> missed a flake8 spot 77eeae09 <Matthew Topol> Adding the ability to programmatically pass hdfs configuration key/value pairs in the C++ and via pyarrow --- cpp/src/arrow/io/hdfs-internal.cc | 5 +++++ cpp/src/arrow/io/hdfs-internal.h | 4 ++++ cpp/src/arrow/io/hdfs.cc | 6 ++++++ cpp/src/arrow/io/hdfs.h | 2 ++ python/pyarrow/hdfs.py | 12 +++++++----- python/pyarrow/includes/libarrow.pxd | 2 +- python/pyarrow/io-hdfs.pxi | 8 +++++++- 7 files changed, 32 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/io/hdfs-internal.cc b/cpp/src/arrow/io/hdfs-internal.cc index efceb8a..c8be516 100644 --- a/cpp/src/arrow/io/hdfs-internal.cc +++ b/cpp/src/arrow/io/hdfs-internal.cc @@ -318,6 +318,10 @@ hdfsFS LibHdfsShim::BuilderConnect(hdfsBuilder* bld) { return this->hdfsBuilderConnect(bld); } +int LibHdfsShim::BuilderConfSetStr(hdfsBuilder* bld, const char* key, const char* val) { + return this->hdfsBuilderConfSetStr(bld, key, val); +} + int LibHdfsShim::Disconnect(hdfsFS fs) { return this->hdfsDisconnect(fs); } hdfsFile LibHdfsShim::OpenFile(hdfsFS fs, const char* path, int flags, int bufferSize, @@ -495,6 +499,7 @@ Status LibHdfsShim::GetRequiredSymbols() { GET_SYMBOL_REQUIRED(this, hdfsBuilderSetUserName); GET_SYMBOL_REQUIRED(this, hdfsBuilderSetKerbTicketCachePath); GET_SYMBOL_REQUIRED(this, hdfsBuilderSetForceNewInstance); + GET_SYMBOL_REQUIRED(this, hdfsBuilderConfSetStr); GET_SYMBOL_REQUIRED(this, hdfsBuilderConnect); GET_SYMBOL_REQUIRED(this, hdfsCreateDirectory); GET_SYMBOL_REQUIRED(this, hdfsDelete); diff --git a/cpp/src/arrow/io/hdfs-internal.h b/cpp/src/arrow/io/hdfs-internal.h index f0fce23..9321b21 100644 --- a/cpp/src/arrow/io/hdfs-internal.h +++ b/cpp/src/arrow/io/hdfs-internal.h @@ -53,6 +53,7 @@ struct LibHdfsShim { const char* kerbTicketCachePath); void (*hdfsBuilderSetForceNewInstance)(hdfsBuilder* bld); hdfsFS (*hdfsBuilderConnect)(hdfsBuilder* bld); + int (*hdfsBuilderConfSetStr)(hdfsBuilder* bld, const char* key, const char* val); int (*hdfsDisconnect)(hdfsFS fs); @@ -97,6 +98,7 @@ struct LibHdfsShim { this->hdfsBuilderSetUserName = nullptr; this->hdfsBuilderSetKerbTicketCachePath = nullptr; this->hdfsBuilderSetForceNewInstance = nullptr; + this->hdfsBuilderConfSetStr = nullptr; this->hdfsBuilderConnect = nullptr; this->hdfsDisconnect = nullptr; this->hdfsOpenFile = nullptr; @@ -142,6 +144,8 @@ struct LibHdfsShim { void BuilderSetForceNewInstance(hdfsBuilder* bld); + int BuilderConfSetStr(hdfsBuilder* bld, const char* key, const char* val); + hdfsFS BuilderConnect(hdfsBuilder* bld); int Disconnect(hdfsFS fs); diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc index 6c569ae..ba89b48 100644 --- a/cpp/src/arrow/io/hdfs.cc +++ b/cpp/src/arrow/io/hdfs.cc @@ -335,6 +335,12 @@ class HadoopFileSystem::HadoopFileSystemImpl { if (!config->kerb_ticket.empty()) { driver_->BuilderSetKerbTicketCachePath(builder, config->kerb_ticket.c_str()); } + + for (auto& kv : config->extra_conf) { + int ret = driver_->BuilderConfSetStr(builder, kv.first.c_str(), kv.second.c_str()); + CHECK_FAILURE(ret, "confsetstr"); + } + driver_->BuilderSetForceNewInstance(builder); fs_ = driver_->BuilderConnect(builder); diff --git a/cpp/src/arrow/io/hdfs.h b/cpp/src/arrow/io/hdfs.h index a52ec0b..446764e 100644 --- a/cpp/src/arrow/io/hdfs.h +++ b/cpp/src/arrow/io/hdfs.h @@ -21,6 +21,7 @@ #include <cstdint> #include <memory> #include <string> +#include <unordered_map> #include <vector> #include "arrow/io/interfaces.h" @@ -63,6 +64,7 @@ struct HdfsConnectionConfig { int port; std::string user; std::string kerb_ticket; + std::unordered_map<std::string, std::string> extra_conf; HdfsDriver driver; }; diff --git a/python/pyarrow/hdfs.py b/python/pyarrow/hdfs.py index 34ddfae..6eff0c3 100644 --- a/python/pyarrow/hdfs.py +++ b/python/pyarrow/hdfs.py @@ -30,15 +30,16 @@ class HadoopFileSystem(lib.HadoopFileSystem, FileSystem): """ def __init__(self, host="default", port=0, user=None, kerb_ticket=None, - driver='libhdfs'): + driver='libhdfs', extra_conf=None): if driver == 'libhdfs': _maybe_set_hadoop_classpath() - self._connect(host, port, user, kerb_ticket, driver) + self._connect(host, port, user, kerb_ticket, driver, extra_conf) def __reduce__(self): return (HadoopFileSystem, (self.host, self.port, self.user, - self.kerb_ticket, self.driver)) + self.kerb_ticket, self.driver, + self.extra_conf)) def _isfilestore(self): """ @@ -149,7 +150,7 @@ def _libhdfs_walk_files_dirs(top_path, contents): def connect(host="default", port=0, user=None, kerb_ticket=None, - driver='libhdfs'): + driver='libhdfs', extra_conf=None): """ Connect to an HDFS cluster. All parameters are optional and should only be set if the defaults need to be overridden. @@ -178,5 +179,6 @@ def connect(host="default", port=0, user=None, kerb_ticket=None, filesystem : HadoopFileSystem """ fs = HadoopFileSystem(host=host, port=port, user=user, - kerb_ticket=kerb_ticket, driver=driver) + kerb_ticket=kerb_ticket, driver=driver, + extra_conf=extra_conf) return fs diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 4c67256..a794444 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -19,7 +19,6 @@ from pyarrow.includes.common cimport * - cdef extern from "arrow/util/key_value_metadata.h" namespace "arrow" nogil: cdef cppclass CKeyValueMetadata" arrow::KeyValueMetadata": CKeyValueMetadata() @@ -656,6 +655,7 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil: int port c_string user c_string kerb_ticket + unordered_map[c_string, c_string] extra_conf HdfsDriver driver cdef cppclass HdfsPathInfo: diff --git a/python/pyarrow/io-hdfs.pxi b/python/pyarrow/io-hdfs.pxi index 31c0437..882cdf5 100644 --- a/python/pyarrow/io-hdfs.pxi +++ b/python/pyarrow/io-hdfs.pxi @@ -64,8 +64,9 @@ cdef class HadoopFileSystem: str kerb_ticket str driver int port + dict extra_conf - def _connect(self, host, port, user, kerb_ticket, driver): + def _connect(self, host, port, user, kerb_ticket, driver, extra_conf): cdef HdfsConnectionConfig conf if host is not None: @@ -95,6 +96,11 @@ cdef class HadoopFileSystem: raise ValueError("unknown driver: %r" % driver) self.driver = driver + if extra_conf is not None and isinstance(extra_conf, dict): + conf.extra_conf = {tobytes(k): tobytes(v) + for k, v in extra_conf.items()} + self.extra_conf = extra_conf + with nogil: check_status(CHadoopFileSystem.Connect(&conf, &self.client)) self.is_open = True -- To stop receiving notification emails like this one, please contact u...@apache.org.