Repository: arrow Updated Branches: refs/heads/master c8eb49e41 -> e139b8b7c
ARROW-404: [Python] Fix segfault caused by HdfsClient getting closed before an HdfsFile The one downside of this patch is that HdfsFile handles don't get garbage-collected until the cyclic GC runs -- I tried to fix this but couldn't get it working. So bytes don't always get flushed to HDFS until `close()` is called. The flush issue should be addressed on the C++ side Author: Wes McKinney <[email protected]> Closes #230 from wesm/ARROW-404 and squashes the following commits: 3a8e641 [Wes McKinney] Use weakref in _HdfsFileNanny to avoid cyclic gc 274d0c5 [Wes McKinney] amend comment 1539a2c [Wes McKinney] Ensure that HdfsClient does not get closed before an open file does when the last user-accessible client reference goes out of scope Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/e139b8b7 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/e139b8b7 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/e139b8b7 Branch: refs/heads/master Commit: e139b8b7c11b7f36fa57a625a39d9c8779d033f4 Parents: c8eb49e Author: Wes McKinney <[email protected]> Authored: Fri Dec 9 06:49:49 2016 +0100 Committer: Uwe L. Korn <[email protected]> Committed: Fri Dec 9 06:49:49 2016 +0100 ---------------------------------------------------------------------- python/pyarrow/io.pyx | 86 ++++++++++++++++++++++------------ python/pyarrow/tests/test_hdfs.py | 23 +++++++++ 2 files changed, 79 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/e139b8b7/python/pyarrow/io.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx index 0e6b81e..2fa5fb6 100644 --- a/python/pyarrow/io.pyx +++ b/python/pyarrow/io.pyx @@ -504,7 +504,7 @@ cdef class HdfsClient: out.mode = mode out.buffer_size = c_buffer_size - out.parent = self + out.parent = _HdfsFileNanny(self, out) out.is_open = True out.own_file = True @@ -516,48 +516,69 @@ cdef class HdfsClient: """ write_queue = Queue(50) - f = self.open(path, 'wb') + with self.open(path, 'wb') as f: + done = False + exc_info = None + def bg_write(): + try: + while not done or write_queue.qsize() > 0: + try: + buf = write_queue.get(timeout=0.01) + except QueueEmpty: + continue - done = False - exc_info = None - def bg_write(): - try: - while not done or write_queue.qsize() > 0: - try: - buf = write_queue.get(timeout=0.01) - except QueueEmpty: - continue + f.write(buf) - f.write(buf) + except Exception as e: + exc_info = sys.exc_info() - except Exception as e: - exc_info = sys.exc_info() - - writer_thread = threading.Thread(target=bg_write) - writer_thread.start() + writer_thread = threading.Thread(target=bg_write) + writer_thread.start() - try: - while True: - buf = stream.read(buffer_size) - if not buf: - break + try: + while True: + buf = stream.read(buffer_size) + if not buf: + break - write_queue.put_nowait(buf) - finally: - done = True + write_queue.put_nowait(buf) + finally: + done = True - writer_thread.join() - if exc_info is not None: - raise exc_info[0], exc_info[1], exc_info[2] + writer_thread.join() + if exc_info is not None: + raise exc_info[0], exc_info[1], exc_info[2] def download(self, path, stream, buffer_size=None): - f = self.open(path, 'rb', buffer_size=buffer_size) - f.download(stream) + with self.open(path, 'rb', buffer_size=buffer_size) as f: + f.download(stream) # ---------------------------------------------------------------------- # Specialization for HDFS +# ARROW-404: Helper class to ensure that files are closed before the +# client. During deallocation of the extension class, the attributes are +# decref'd which can cause the client to get closed first if the file has the +# last remaining reference +cdef class _HdfsFileNanny: + cdef: + object client + object file_handle_ref + + def __cinit__(self, client, file_handle): + import weakref + self.client = client + self.file_handle_ref = weakref.ref(file_handle) + + def __dealloc__(self): + fh = self.file_handle_ref() + if fh: + fh.close() + # avoid cyclic GC + self.file_handle_ref = None + self.client = None + cdef class HdfsFile(NativeFile): cdef readonly: @@ -565,6 +586,11 @@ cdef class HdfsFile(NativeFile): object mode object parent + cdef object __weakref__ + + def __dealloc__(self): + self.parent = None + def read(self, int nbytes): """ Read indicated number of bytes from the file, up to EOF http://git-wip-us.apache.org/repos/asf/arrow/blob/e139b8b7/python/pyarrow/tests/test_hdfs.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_hdfs.py b/python/pyarrow/tests/test_hdfs.py index ed8d419..c23543b 100644 --- a/python/pyarrow/tests/test_hdfs.py +++ b/python/pyarrow/tests/test_hdfs.py @@ -98,6 +98,29 @@ def test_hdfs_ls(hdfs): assert contents == [dir_path, f1_path] +def _make_test_file(hdfs, test_name, test_path, test_data): + base_path = pjoin(HDFS_TMP_PATH, test_name) + hdfs.mkdir(base_path) + + full_path = pjoin(base_path, test_path) + + f = hdfs.open(full_path, 'wb') + f.write(test_data) + + return full_path + + +@libhdfs +def test_hdfs_orphaned_file(): + hdfs = hdfs_test_client() + file_path = _make_test_file(hdfs, 'orphaned_file_test', 'fname', + 'foobarbaz') + + f = hdfs.open(file_path) + hdfs = None + f = None # noqa + + @libhdfs def test_hdfs_download_upload(hdfs): base_path = pjoin(HDFS_TMP_PATH, 'upload-test')
