This is an automated email from the ASF dual-hosted git repository.
jorisvandenbossche pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 2b194ad222 GH-20127: [Python] Remove deprecated pyarrow.filesystem
legacy implementations (#39825)
2b194ad222 is described below
commit 2b194ad222f4dc8ecf2eb73539ab8cab5b1fc5e7
Author: Alenka Frim <[email protected]>
AuthorDate: Mon Mar 4 13:33:18 2024 +0100
GH-20127: [Python] Remove deprecated pyarrow.filesystem legacy
implementations (#39825)
This PR removes the `pyarrow.filesystem` and `pyarrow.hdfs` filesystems
that have been deprecated since 2.0.0.
* Closes: #20127
Lead-authored-by: AlenkaF <[email protected]>
Co-authored-by: Alenka Frim <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Co-authored-by: Joris Van den Bossche <[email protected]>
Signed-off-by: Joris Van den Bossche <[email protected]>
---
docs/source/python/filesystems_deprecated.rst | 88 ----
docs/source/python/index.rst | 1 -
python/CMakeLists.txt | 1 -
python/pyarrow/__init__.py | 47 +-
python/pyarrow/_hdfsio.pyx | 478 -------------------
python/pyarrow/filesystem.py | 511 ---------------------
python/pyarrow/fs.py | 25 +-
python/pyarrow/hdfs.py | 240 ----------
python/pyarrow/io.pxi | 13 +
python/pyarrow/parquet/core.py | 30 +-
python/pyarrow/tests/parquet/test_basic.py | 5 +-
python/pyarrow/tests/parquet/test_dataset.py | 137 +++---
.../pyarrow/tests/parquet/test_parquet_writer.py | 43 --
python/pyarrow/tests/test_filesystem.py | 75 ---
python/pyarrow/tests/test_hdfs.py | 451 ------------------
python/setup.py | 1 -
16 files changed, 93 insertions(+), 2053 deletions(-)
diff --git a/docs/source/python/filesystems_deprecated.rst
b/docs/source/python/filesystems_deprecated.rst
deleted file mode 100644
index c51245341b..0000000000
--- a/docs/source/python/filesystems_deprecated.rst
+++ /dev/null
@@ -1,88 +0,0 @@
-.. Licensed to the Apache Software Foundation (ASF) under one
-.. or more contributor license agreements. See the NOTICE file
-.. distributed with this work for additional information
-.. regarding copyright ownership. The ASF licenses this file
-.. to you under the Apache License, Version 2.0 (the
-.. "License"); you may not use this file except in compliance
-.. with the License. You may obtain a copy of the License at
-
-.. http://www.apache.org/licenses/LICENSE-2.0
-
-.. Unless required by applicable law or agreed to in writing,
-.. software distributed under the License is distributed on an
-.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-.. KIND, either express or implied. See the License for the
-.. specific language governing permissions and limitations
-.. under the License.
-
-Filesystem Interface (legacy)
-=============================
-
-.. warning::
- This section documents the deprecated filesystem layer. You should
- use the :ref:`new filesystem layer <filesystem>` instead.
-
-.. _hdfs:
-
-Hadoop File System (HDFS)
--------------------------
-
-PyArrow comes with bindings to a C++-based interface to the Hadoop File
-System. You connect like so:
-
-.. code-block:: python
-
- import pyarrow as pa
- fs = pa.hdfs.connect(host, port, user=user, kerb_ticket=ticket_cache_path)
- with fs.open(path, 'rb') as f:
- # Do something with f
-
-By default, ``pyarrow.hdfs.HadoopFileSystem`` uses libhdfs, a JNI-based
-interface to the Java Hadoop client. This library is loaded **at runtime**
-(rather than at link / library load time, since the library may not be in your
-LD_LIBRARY_PATH), and relies on some environment variables.
-
-* ``HADOOP_HOME``: the root of your installed Hadoop distribution. Often has
- `lib/native/libhdfs.so`.
-
-* ``JAVA_HOME``: the location of your Java SDK installation.
-
-* ``ARROW_LIBHDFS_DIR`` (optional): explicit location of ``libhdfs.so`` if it
is
- installed somewhere other than ``$HADOOP_HOME/lib/native``.
-
-* ``CLASSPATH``: must contain the Hadoop jars. You can set these using:
-
-.. code-block:: shell
-
- export CLASSPATH=`$HADOOP_HOME/bin/hdfs classpath --glob`
-
-If ``CLASSPATH`` is not set, then it will be set automatically if the
-``hadoop`` executable is in your system path, or if ``HADOOP_HOME`` is set.
-
-HDFS API
-~~~~~~~~
-
-.. currentmodule:: pyarrow
-
-.. autosummary::
- :toctree: generated/
-
- hdfs.connect
- HadoopFileSystem.cat
- HadoopFileSystem.chmod
- HadoopFileSystem.chown
- HadoopFileSystem.delete
- HadoopFileSystem.df
- HadoopFileSystem.disk_usage
- HadoopFileSystem.download
- HadoopFileSystem.exists
- HadoopFileSystem.get_capacity
- HadoopFileSystem.get_space_used
- HadoopFileSystem.info
- HadoopFileSystem.ls
- HadoopFileSystem.mkdir
- HadoopFileSystem.open
- HadoopFileSystem.rename
- HadoopFileSystem.rm
- HadoopFileSystem.upload
- HdfsFile
diff --git a/docs/source/python/index.rst b/docs/source/python/index.rst
index 08939bc760..7acff940ba 100644
--- a/docs/source/python/index.rst
+++ b/docs/source/python/index.rst
@@ -49,7 +49,6 @@ files into Arrow structures.
memory
ipc
filesystems
- filesystems_deprecated
numpy
pandas
interchange_protocol
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index 1d6524373a..c3a1c57868 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -545,7 +545,6 @@ set(CYTHON_EXTENSIONS
_csv
_feather
_fs
- _hdfsio
_json
_pyarrow_cpp_tests)
set_source_files_properties(pyarrow/lib.pyx PROPERTIES CYTHON_API TRUE)
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 2ee97ddb66..7ede69da66 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -255,9 +255,8 @@ from pyarrow.lib import (NativeFile, PythonFile,
BufferReader, BufferOutputStream,
OSFile, MemoryMappedFile, memory_map,
create_memory_map, MockOutputStream,
- input_stream, output_stream)
-
-from pyarrow._hdfsio import HdfsFile, have_libhdfs
+ input_stream, output_stream,
+ have_libhdfs)
from pyarrow.lib import (ChunkedArray, RecordBatch, Table, table,
concat_arrays, concat_tables, TableGroupBy,
@@ -276,54 +275,12 @@ from pyarrow.lib import (ArrowCancelled,
ArrowTypeError,
ArrowSerializationError)
-import pyarrow.hdfs as hdfs
-
from pyarrow.ipc import serialize_pandas, deserialize_pandas
import pyarrow.ipc as ipc
import pyarrow.types as types
-# deprecated top-level access
-
-
-from pyarrow.filesystem import FileSystem as _FileSystem
-from pyarrow.filesystem import LocalFileSystem as _LocalFileSystem
-from pyarrow.hdfs import HadoopFileSystem as _HadoopFileSystem
-
-
-_localfs = _LocalFileSystem._get_instance()
-
-
-_msg = (
- "pyarrow.{0} is deprecated as of 2.0.0, please use pyarrow.fs.{1} instead."
-)
-
-_serialization_msg = (
- "'pyarrow.{0}' is deprecated and will be removed in a future version. "
- "Use pickle or the pyarrow IPC functionality instead."
-)
-
-_deprecated = {
- "localfs": (_localfs, "LocalFileSystem"),
- "FileSystem": (_FileSystem, "FileSystem"),
- "LocalFileSystem": (_LocalFileSystem, "LocalFileSystem"),
- "HadoopFileSystem": (_HadoopFileSystem, "HadoopFileSystem"),
-}
-
-
-def __getattr__(name):
- if name in _deprecated:
- obj, new_name = _deprecated[name]
- _warnings.warn(_msg.format(name, new_name),
- FutureWarning, stacklevel=2)
- return obj
-
- raise AttributeError(
- "module 'pyarrow' has no attribute '{0}'".format(name)
- )
-
-
# ----------------------------------------------------------------------
# Deprecations
diff --git a/python/pyarrow/_hdfsio.pyx b/python/pyarrow/_hdfsio.pyx
deleted file mode 100644
index cbcc5d28ca..0000000000
--- a/python/pyarrow/_hdfsio.pyx
+++ /dev/null
@@ -1,478 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# ----------------------------------------------------------------------
-# HDFS IO implementation
-
-# cython: language_level = 3
-
-import re
-
-from pyarrow.lib cimport check_status, _Weakrefable, NativeFile
-from pyarrow.includes.common cimport *
-from pyarrow.includes.libarrow cimport *
-from pyarrow.includes.libarrow_fs cimport *
-from pyarrow.lib import frombytes, tobytes, ArrowIOError
-
-
-_HDFS_PATH_RE = re.compile(r'hdfs://(.*):(\d+)(.*)')
-
-
-def have_libhdfs():
- try:
- with nogil:
- check_status(HaveLibHdfs())
- return True
- except Exception:
- return False
-
-
-def strip_hdfs_abspath(path):
- m = _HDFS_PATH_RE.match(path)
- if m:
- return m.group(3)
- else:
- return path
-
-
-cdef class HadoopFileSystem(_Weakrefable):
- cdef:
- shared_ptr[CIOHadoopFileSystem] client
-
- cdef readonly:
- bint is_open
- object host
- object user
- object kerb_ticket
- int port
- dict extra_conf
-
- def _connect(self, host, port, user, kerb_ticket, extra_conf):
- cdef HdfsConnectionConfig conf
-
- if host is not None:
- conf.host = tobytes(host)
- self.host = host
-
- conf.port = port
- self.port = port
-
- if user is not None:
- conf.user = tobytes(user)
- self.user = user
-
- if kerb_ticket is not None:
- conf.kerb_ticket = tobytes(kerb_ticket)
- self.kerb_ticket = kerb_ticket
-
- with nogil:
- check_status(HaveLibHdfs())
-
- if extra_conf is not None and isinstance(extra_conf, dict):
- conf.extra_conf = {tobytes(k): tobytes(v)
- for k, v in extra_conf.items()}
- self.extra_conf = extra_conf
-
- with nogil:
- check_status(CIOHadoopFileSystem.Connect(&conf, &self.client))
- self.is_open = True
-
- @classmethod
- def connect(cls, *args, **kwargs):
- return cls(*args, **kwargs)
-
- def __dealloc__(self):
- if self.is_open:
- self.close()
-
- def close(self):
- """
- Disconnect from the HDFS cluster
- """
- self._ensure_client()
- with nogil:
- check_status(self.client.get().Disconnect())
- self.is_open = False
-
- cdef _ensure_client(self):
- if self.client.get() == NULL:
- raise IOError('HDFS client improperly initialized')
- elif not self.is_open:
- raise IOError('HDFS client is closed')
-
- def exists(self, path):
- """
- Returns True if the path is known to the cluster, False if it does not
- (or there is an RPC error)
- """
- self._ensure_client()
-
- cdef c_string c_path = tobytes(path)
- cdef c_bool result
- with nogil:
- result = self.client.get().Exists(c_path)
- return result
-
- def isdir(self, path):
- cdef HdfsPathInfo info
- try:
- self._path_info(path, &info)
- except ArrowIOError:
- return False
- return info.kind == ObjectType_DIRECTORY
-
- def isfile(self, path):
- cdef HdfsPathInfo info
- try:
- self._path_info(path, &info)
- except ArrowIOError:
- return False
- return info.kind == ObjectType_FILE
-
- def get_capacity(self):
- """
- Get reported total capacity of file system
-
- Returns
- -------
- capacity : int
- """
- cdef int64_t capacity = 0
- with nogil:
- check_status(self.client.get().GetCapacity(&capacity))
- return capacity
-
- def get_space_used(self):
- """
- Get space used on file system
-
- Returns
- -------
- space_used : int
- """
- cdef int64_t space_used = 0
- with nogil:
- check_status(self.client.get().GetUsed(&space_used))
- return space_used
-
- def df(self):
- """
- Return free space on disk, like the UNIX df command
-
- Returns
- -------
- space : int
- """
- return self.get_capacity() - self.get_space_used()
-
- def rename(self, path, new_path):
- cdef c_string c_path = tobytes(path)
- cdef c_string c_new_path = tobytes(new_path)
- with nogil:
- check_status(self.client.get().Rename(c_path, c_new_path))
-
- def info(self, path):
- """
- Return detailed HDFS information for path
-
- Parameters
- ----------
- path : string
- Path to file or directory
-
- Returns
- -------
- path_info : dict
- """
- cdef HdfsPathInfo info
- self._path_info(path, &info)
- return {
- 'path': frombytes(info.name),
- 'owner': frombytes(info.owner),
- 'group': frombytes(info.group),
- 'size': info.size,
- 'block_size': info.block_size,
- 'last_modified': info.last_modified_time,
- 'last_accessed': info.last_access_time,
- 'replication': info.replication,
- 'permissions': info.permissions,
- 'kind': ('directory' if info.kind == ObjectType_DIRECTORY
- else 'file')
- }
-
- def stat(self, path):
- """
- Return basic file system statistics about path
-
- Parameters
- ----------
- path : string
- Path to file or directory
-
- Returns
- -------
- stat : dict
- """
- cdef FileStatistics info
- cdef c_string c_path = tobytes(path)
- with nogil:
- check_status(self.client.get()
- .Stat(c_path, &info))
- return {
- 'size': info.size,
- 'kind': ('directory' if info.kind == ObjectType_DIRECTORY
- else 'file')
- }
-
- cdef _path_info(self, path, HdfsPathInfo* info):
- cdef c_string c_path = tobytes(path)
-
- with nogil:
- check_status(self.client.get()
- .GetPathInfo(c_path, info))
-
- def ls(self, path, bint full_info):
- cdef:
- c_string c_path = tobytes(path)
- vector[HdfsPathInfo] listing
- list results = []
- int i
-
- self._ensure_client()
-
- with nogil:
- check_status(self.client.get()
- .ListDirectory(c_path, &listing))
-
- cdef const HdfsPathInfo* info
- for i in range(<int> listing.size()):
- info = &listing[i]
-
- # Try to trim off the hdfs://HOST:PORT piece
- name = strip_hdfs_abspath(frombytes(info.name))
-
- if full_info:
- kind = ('file' if info.kind == ObjectType_FILE
- else 'directory')
-
- results.append({
- 'kind': kind,
- 'name': name,
- 'owner': frombytes(info.owner),
- 'group': frombytes(info.group),
- 'last_modified_time': info.last_modified_time,
- 'last_access_time': info.last_access_time,
- 'size': info.size,
- 'replication': info.replication,
- 'block_size': info.block_size,
- 'permissions': info.permissions
- })
- else:
- results.append(name)
-
- return results
-
- def chmod(self, path, mode):
- """
- Change file permissions
-
- Parameters
- ----------
- path : string
- absolute path to file or directory
- mode : int
- POSIX-like bitmask
- """
- self._ensure_client()
- cdef c_string c_path = tobytes(path)
- cdef int c_mode = mode
- with nogil:
- check_status(self.client.get()
- .Chmod(c_path, c_mode))
-
- def chown(self, path, owner=None, group=None):
- """
- Change file permissions
-
- Parameters
- ----------
- path : string
- absolute path to file or directory
- owner : string, default None
- New owner, None for no change
- group : string, default None
- New group, None for no change
- """
- cdef:
- c_string c_path
- c_string c_owner
- c_string c_group
- const char* c_owner_ptr = NULL
- const char* c_group_ptr = NULL
-
- self._ensure_client()
-
- c_path = tobytes(path)
- if owner is not None:
- c_owner = tobytes(owner)
- c_owner_ptr = c_owner.c_str()
-
- if group is not None:
- c_group = tobytes(group)
- c_group_ptr = c_group.c_str()
-
- with nogil:
- check_status(self.client.get()
- .Chown(c_path, c_owner_ptr, c_group_ptr))
-
- def mkdir(self, path):
- """
- Create indicated directory and any necessary parent directories
- """
- self._ensure_client()
- cdef c_string c_path = tobytes(path)
- with nogil:
- check_status(self.client.get()
- .MakeDirectory(c_path))
-
- def delete(self, path, bint recursive=False):
- """
- Delete the indicated file or directory
-
- Parameters
- ----------
- path : string
- recursive : boolean, default False
- If True, also delete child paths for directories
- """
- self._ensure_client()
-
- cdef c_string c_path = tobytes(path)
- with nogil:
- check_status(self.client.get()
- .Delete(c_path, recursive == 1))
-
- def open(self, path, mode='rb', buffer_size=None, replication=None,
- default_block_size=None):
- """
- Open HDFS file for reading or writing
-
- Parameters
- ----------
- mode : string
- Must be one of 'rb', 'wb', 'ab'
-
- Returns
- -------
- handle : HdfsFile
- """
- self._ensure_client()
-
- cdef HdfsFile out = HdfsFile()
-
- if mode not in ('rb', 'wb', 'ab'):
- raise Exception("Mode must be 'rb' (read), "
- "'wb' (write, new file), or 'ab' (append)")
-
- cdef c_string c_path = tobytes(path)
- cdef c_bool append = False
-
- # 0 in libhdfs means "use the default"
- cdef int32_t c_buffer_size = buffer_size or 0
- cdef int16_t c_replication = replication or 0
- cdef int64_t c_default_block_size = default_block_size or 0
-
- cdef shared_ptr[HdfsOutputStream] wr_handle
- cdef shared_ptr[HdfsReadableFile] rd_handle
-
- if mode in ('wb', 'ab'):
- if mode == 'ab':
- append = True
-
- with nogil:
- check_status(
- self.client.get()
- .OpenWritable(c_path, append, c_buffer_size,
- c_replication, c_default_block_size,
- &wr_handle))
-
- out.set_output_stream(<shared_ptr[COutputStream]> wr_handle)
- out.is_writable = True
- else:
- with nogil:
- check_status(self.client.get()
- .OpenReadable(c_path, &rd_handle))
-
- out.set_random_access_file(
- <shared_ptr[CRandomAccessFile]> rd_handle)
- out.is_readable = True
-
- assert not out.closed
-
- if c_buffer_size == 0:
- c_buffer_size = 2 ** 16
-
- out.mode = mode
- out.buffer_size = c_buffer_size
- out.parent = _HdfsFileNanny(self, out)
- out.own_file = True
-
- return out
-
- def download(self, path, stream, buffer_size=None):
- with self.open(path, 'rb') as f:
- f.download(stream, buffer_size=buffer_size)
-
- def upload(self, path, stream, buffer_size=None):
- """
- Upload file-like object to HDFS path
- """
- with self.open(path, 'wb') as f:
- f.upload(stream, buffer_size=buffer_size)
-
-
-# ARROW-404: Helper class to ensure that files are closed before the
-# client. During deallocation of the extension class, the attributes are
-# decref'd which can cause the client to get closed first if the file has the
-# last remaining reference
-cdef class _HdfsFileNanny(_Weakrefable):
- cdef:
- object client
- object file_handle_ref
-
- def __cinit__(self, client, file_handle):
- import weakref
- self.client = client
- self.file_handle_ref = weakref.ref(file_handle)
-
- def __dealloc__(self):
- fh = self.file_handle_ref()
- if fh:
- fh.close()
- # avoid cyclic GC
- self.file_handle_ref = None
- self.client = None
-
-
-cdef class HdfsFile(NativeFile):
- cdef readonly:
- int32_t buffer_size
- object mode
- object parent
-
- def __dealloc__(self):
- self.parent = None
diff --git a/python/pyarrow/filesystem.py b/python/pyarrow/filesystem.py
deleted file mode 100644
index c1e70a1ee6..0000000000
--- a/python/pyarrow/filesystem.py
+++ /dev/null
@@ -1,511 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-
-import os
-import posixpath
-import sys
-import urllib.parse
-import warnings
-
-from os.path import join as pjoin
-
-import pyarrow as pa
-from pyarrow.util import doc, _stringify_path, _is_path_like, _DEPR_MSG
-
-
-_FS_DEPR_MSG = _DEPR_MSG.format(
- "filesystem.LocalFileSystem", "2.0.0", "fs.LocalFileSystem"
-)
-
-
-class FileSystem:
- """
- Abstract filesystem interface.
- """
-
- def cat(self, path):
- """
- Return contents of file as a bytes object.
-
- Parameters
- ----------
- path : str
- File path to read content from.
-
- Returns
- -------
- contents : bytes
- """
- with self.open(path, 'rb') as f:
- return f.read()
-
- def ls(self, path):
- """
- Return list of file paths.
-
- Parameters
- ----------
- path : str
- Directory to list contents from.
- """
- raise NotImplementedError
-
- def delete(self, path, recursive=False):
- """
- Delete the indicated file or directory.
-
- Parameters
- ----------
- path : str
- Path to delete.
- recursive : bool, default False
- If True, also delete child paths for directories.
- """
- raise NotImplementedError
-
- def disk_usage(self, path):
- """
- Compute bytes used by all contents under indicated path in file tree.
-
- Parameters
- ----------
- path : str
- Can be a file path or directory.
-
- Returns
- -------
- usage : int
- """
- path = _stringify_path(path)
- path_info = self.stat(path)
- if path_info['kind'] == 'file':
- return path_info['size']
-
- total = 0
- for root, directories, files in self.walk(path):
- for child_path in files:
- abspath = self._path_join(root, child_path)
- total += self.stat(abspath)['size']
-
- return total
-
- def _path_join(self, *args):
- return self.pathsep.join(args)
-
- def stat(self, path):
- """
- Information about a filesystem entry.
-
- Returns
- -------
- stat : dict
- """
- raise NotImplementedError('FileSystem.stat')
-
- def rm(self, path, recursive=False):
- """
- Alias for FileSystem.delete.
- """
- return self.delete(path, recursive=recursive)
-
- def mv(self, path, new_path):
- """
- Alias for FileSystem.rename.
- """
- return self.rename(path, new_path)
-
- def rename(self, path, new_path):
- """
- Rename file, like UNIX mv command.
-
- Parameters
- ----------
- path : str
- Path to alter.
- new_path : str
- Path to move to.
- """
- raise NotImplementedError('FileSystem.rename')
-
- def mkdir(self, path, create_parents=True):
- """
- Create a directory.
-
- Parameters
- ----------
- path : str
- Path to the directory.
- create_parents : bool, default True
- If the parent directories don't exists create them as well.
- """
- raise NotImplementedError
-
- def exists(self, path):
- """
- Return True if path exists.
-
- Parameters
- ----------
- path : str
- Path to check.
- """
- raise NotImplementedError
-
- def isdir(self, path):
- """
- Return True if path is a directory.
-
- Parameters
- ----------
- path : str
- Path to check.
- """
- raise NotImplementedError
-
- def isfile(self, path):
- """
- Return True if path is a file.
-
- Parameters
- ----------
- path : str
- Path to check.
- """
- raise NotImplementedError
-
- def _isfilestore(self):
- """
- Returns True if this FileSystem is a unix-style file store with
- directories.
- """
- raise NotImplementedError
-
- def read_parquet(self, path, columns=None, metadata=None, schema=None,
- use_threads=True, use_pandas_metadata=False):
- """
- Read Parquet data from path in file system. Can read from a single file
- or a directory of files.
-
- Parameters
- ----------
- path : str
- Single file path or directory
- columns : List[str], optional
- Subset of columns to read.
- metadata : pyarrow.parquet.FileMetaData
- Known metadata to validate files against.
- schema : pyarrow.parquet.Schema
- Known schema to validate files against. Alternative to metadata
- argument.
- use_threads : bool, default True
- Perform multi-threaded column reads.
- use_pandas_metadata : bool, default False
- If True and file has custom pandas schema metadata, ensure that
- index columns are also loaded.
-
- Returns
- -------
- table : pyarrow.Table
- """
- from pyarrow.parquet import ParquetDataset
- dataset = ParquetDataset(path, schema=schema, metadata=metadata,
- filesystem=self)
- return dataset.read(columns=columns, use_threads=use_threads,
- use_pandas_metadata=use_pandas_metadata)
-
- def open(self, path, mode='rb'):
- """
- Open file for reading or writing.
- """
- raise NotImplementedError
-
- @property
- def pathsep(self):
- return '/'
-
-
-class LocalFileSystem(FileSystem):
-
- _instance = None
-
- def __init__(self):
- warnings.warn(_FS_DEPR_MSG, FutureWarning, stacklevel=2)
- super().__init__()
-
- @classmethod
- def _get_instance(cls):
- if cls._instance is None:
- with warnings.catch_warnings():
- warnings.simplefilter("ignore")
- cls._instance = LocalFileSystem()
- return cls._instance
-
- @classmethod
- def get_instance(cls):
- warnings.warn(_FS_DEPR_MSG, FutureWarning, stacklevel=2)
- return cls._get_instance()
-
- @doc(FileSystem.ls)
- def ls(self, path):
- path = _stringify_path(path)
- return sorted(pjoin(path, x) for x in os.listdir(path))
-
- @doc(FileSystem.mkdir)
- def mkdir(self, path, create_parents=True):
- path = _stringify_path(path)
- if create_parents:
- os.makedirs(path)
- else:
- os.mkdir(path)
-
- @doc(FileSystem.isdir)
- def isdir(self, path):
- path = _stringify_path(path)
- return os.path.isdir(path)
-
- @doc(FileSystem.isfile)
- def isfile(self, path):
- path = _stringify_path(path)
- return os.path.isfile(path)
-
- @doc(FileSystem._isfilestore)
- def _isfilestore(self):
- return True
-
- @doc(FileSystem.exists)
- def exists(self, path):
- path = _stringify_path(path)
- return os.path.exists(path)
-
- @doc(FileSystem.open)
- def open(self, path, mode='rb'):
- """
- Open file for reading or writing.
- """
- path = _stringify_path(path)
- return open(path, mode=mode)
-
- @property
- def pathsep(self):
- return os.path.sep
-
- def walk(self, path):
- """
- Directory tree generator, see os.walk.
- """
- path = _stringify_path(path)
- return os.walk(path)
-
-
-class DaskFileSystem(FileSystem):
- """
- Wraps s3fs Dask filesystem implementation like s3fs, gcsfs, etc.
- """
-
- def __init__(self, fs):
- warnings.warn(
- "The pyarrow.filesystem.DaskFileSystem/S3FSWrapper are deprecated "
- "as of pyarrow 3.0.0, and will be removed in a future version.",
- FutureWarning, stacklevel=2)
- self.fs = fs
-
- @doc(FileSystem.isdir)
- def isdir(self, path):
- raise NotImplementedError("Unsupported file system API")
-
- @doc(FileSystem.isfile)
- def isfile(self, path):
- raise NotImplementedError("Unsupported file system API")
-
- @doc(FileSystem._isfilestore)
- def _isfilestore(self):
- """
- Object Stores like S3 and GCSFS are based on key lookups, not true
- file-paths.
- """
- return False
-
- @doc(FileSystem.delete)
- def delete(self, path, recursive=False):
- path = _stringify_path(path)
- return self.fs.rm(path, recursive=recursive)
-
- @doc(FileSystem.exists)
- def exists(self, path):
- path = _stringify_path(path)
- return self.fs.exists(path)
-
- @doc(FileSystem.mkdir)
- def mkdir(self, path, create_parents=True):
- path = _stringify_path(path)
- if create_parents:
- return self.fs.mkdirs(path)
- else:
- return self.fs.mkdir(path)
-
- @doc(FileSystem.open)
- def open(self, path, mode='rb'):
- """
- Open file for reading or writing.
- """
- path = _stringify_path(path)
- return self.fs.open(path, mode=mode)
-
- def ls(self, path, detail=False):
- path = _stringify_path(path)
- return self.fs.ls(path, detail=detail)
-
- def walk(self, path):
- """
- Directory tree generator, like os.walk.
- """
- path = _stringify_path(path)
- return self.fs.walk(path)
-
-
-class S3FSWrapper(DaskFileSystem):
-
- @doc(FileSystem.isdir)
- def isdir(self, path):
- path = _sanitize_s3(_stringify_path(path))
- try:
- contents = self.fs.ls(path)
- if len(contents) == 1 and contents[0] == path:
- return False
- else:
- return True
- except OSError:
- return False
-
- @doc(FileSystem.isfile)
- def isfile(self, path):
- path = _sanitize_s3(_stringify_path(path))
- try:
- contents = self.fs.ls(path)
- return len(contents) == 1 and contents[0] == path
- except OSError:
- return False
-
- def walk(self, path, refresh=False):
- """
- Directory tree generator, like os.walk.
-
- Generator version of what is in s3fs, which yields a flattened list of
- files.
- """
- path = _sanitize_s3(_stringify_path(path))
- directories = set()
- files = set()
-
- for key in list(self.fs._ls(path, refresh=refresh)):
- path = key['Key']
- if key['StorageClass'] == 'DIRECTORY':
- directories.add(path)
- elif key['StorageClass'] == 'BUCKET':
- pass
- else:
- files.add(path)
-
- # s3fs creates duplicate 'DIRECTORY' entries
- files = sorted([posixpath.split(f)[1] for f in files
- if f not in directories])
- directories = sorted([posixpath.split(x)[1]
- for x in directories])
-
- yield path, directories, files
-
- for directory in directories:
- yield from self.walk(directory, refresh=refresh)
-
-
-def _sanitize_s3(path):
- if path.startswith('s3://'):
- return path.replace('s3://', '')
- else:
- return path
-
-
-def _ensure_filesystem(fs):
- fs_type = type(fs)
-
- # If the arrow filesystem was subclassed, assume it supports the full
- # interface and return it
- if not issubclass(fs_type, FileSystem):
- if "fsspec" in sys.modules:
- fsspec = sys.modules["fsspec"]
- if isinstance(fs, fsspec.AbstractFileSystem):
- # for recent fsspec versions that stop inheriting from
- # pyarrow.filesystem.FileSystem, still allow fsspec
- # filesystems (which should be compatible with our legacy fs)
- return fs
-
- raise OSError('Unrecognized filesystem: {}'.format(fs_type))
- else:
- return fs
-
-
-def resolve_filesystem_and_path(where, filesystem=None):
- """
- Return filesystem from path which could be an HDFS URI, a local URI,
- or a plain filesystem path.
- """
- if not _is_path_like(where):
- if filesystem is not None:
- raise ValueError("filesystem passed but where is file-like, so"
- " there is nothing to open with filesystem.")
- return filesystem, where
-
- if filesystem is not None:
- filesystem = _ensure_filesystem(filesystem)
- if isinstance(filesystem, LocalFileSystem):
- path = _stringify_path(where)
- elif not isinstance(where, str):
- raise TypeError(
- "Expected string path; path-like objects are only allowed "
- "with a local filesystem"
- )
- else:
- path = where
- return filesystem, path
-
- path = _stringify_path(where)
-
- parsed_uri = urllib.parse.urlparse(path)
- if parsed_uri.scheme == 'hdfs' or parsed_uri.scheme == 'viewfs':
- # Input is hdfs URI such as hdfs://host:port/myfile.parquet
- netloc_split = parsed_uri.netloc.split(':')
- host = netloc_split[0]
- if host == '':
- host = 'default'
- else:
- host = parsed_uri.scheme + "://" + host
- port = 0
- if len(netloc_split) == 2 and netloc_split[1].isnumeric():
- port = int(netloc_split[1])
- fs = pa.hdfs._connect(host=host, port=port)
- fs_path = parsed_uri.path
- elif parsed_uri.scheme == 'file':
- # Input is local URI such as file:///home/user/myfile.parquet
- fs = LocalFileSystem._get_instance()
- fs_path = parsed_uri.path
- else:
- # Input is local path such as /home/user/myfile.parquet
- fs = LocalFileSystem._get_instance()
- fs_path = path
-
- return fs, fs_path
diff --git a/python/pyarrow/fs.py b/python/pyarrow/fs.py
index ead750ca44..a256cc540f 100644
--- a/python/pyarrow/fs.py
+++ b/python/pyarrow/fs.py
@@ -98,9 +98,7 @@ def _filesystem_from_str(uri):
return filesystem
-def _ensure_filesystem(
- filesystem, use_mmap=False, allow_legacy_filesystem=False
-):
+def _ensure_filesystem(filesystem, *, use_mmap=False):
if isinstance(filesystem, FileSystem):
return filesystem
elif isinstance(filesystem, str):
@@ -123,15 +121,6 @@ def _ensure_filesystem(
return LocalFileSystem(use_mmap=use_mmap)
return PyFileSystem(FSSpecHandler(filesystem))
- # map old filesystems to new ones
- import pyarrow.filesystem as legacyfs
-
- if isinstance(filesystem, legacyfs.LocalFileSystem):
- return LocalFileSystem(use_mmap=use_mmap)
- # TODO handle HDFS?
- if allow_legacy_filesystem and isinstance(filesystem, legacyfs.FileSystem):
- return filesystem
-
raise TypeError(
"Unrecognized filesystem: {}. `filesystem` argument must be a "
"FileSystem instance or a valid file system URI'".format(
@@ -139,9 +128,7 @@ def _ensure_filesystem(
)
-def _resolve_filesystem_and_path(
- path, filesystem=None, allow_legacy_filesystem=False, memory_map=False
-):
+def _resolve_filesystem_and_path(path, filesystem=None, *, memory_map=False):
"""
Return filesystem/path from path which could be an URI or a plain
filesystem path.
@@ -155,10 +142,7 @@ def _resolve_filesystem_and_path(
return filesystem, path
if filesystem is not None:
- filesystem = _ensure_filesystem(
- filesystem, use_mmap=memory_map,
- allow_legacy_filesystem=allow_legacy_filesystem
- )
+ filesystem = _ensure_filesystem(filesystem, use_mmap=memory_map)
if isinstance(filesystem, LocalFileSystem):
path = _stringify_path(path)
elif not isinstance(path, str):
@@ -166,8 +150,7 @@ def _resolve_filesystem_and_path(
"Expected string path; path-like objects are only allowed "
"with a local filesystem"
)
- if not allow_legacy_filesystem:
- path = filesystem.normalize_path(path)
+ path = filesystem.normalize_path(path)
return filesystem, path
path = _stringify_path(path)
diff --git a/python/pyarrow/hdfs.py b/python/pyarrow/hdfs.py
deleted file mode 100644
index 2e6c387a8f..0000000000
--- a/python/pyarrow/hdfs.py
+++ /dev/null
@@ -1,240 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-
-import os
-import posixpath
-import sys
-import warnings
-
-from pyarrow.util import doc, _DEPR_MSG
-from pyarrow.filesystem import FileSystem
-import pyarrow._hdfsio as _hdfsio
-
-
-class HadoopFileSystem(_hdfsio.HadoopFileSystem, FileSystem):
- """
- DEPRECATED: FileSystem interface for HDFS cluster.
-
- See pyarrow.hdfs.connect for full connection details
-
- .. deprecated:: 2.0
- ``pyarrow.hdfs.HadoopFileSystem`` is deprecated,
- please use ``pyarrow.fs.HadoopFileSystem`` instead.
- """
-
- def __init__(self, host="default", port=0, user=None, kerb_ticket=None,
- driver='libhdfs', extra_conf=None):
- warnings.warn(
- _DEPR_MSG.format(
- "hdfs.HadoopFileSystem", "2.0.0", "fs.HadoopFileSystem"),
- FutureWarning, stacklevel=2)
- if driver == 'libhdfs':
- _maybe_set_hadoop_classpath()
-
- self._connect(host, port, user, kerb_ticket, extra_conf)
-
- def __reduce__(self):
- return (HadoopFileSystem, (self.host, self.port, self.user,
- self.kerb_ticket, self.extra_conf))
-
- def _isfilestore(self):
- """
- Return True if this is a Unix-style file store with directories.
- """
- return True
-
- @doc(FileSystem.isdir)
- def isdir(self, path):
- return super().isdir(path)
-
- @doc(FileSystem.isfile)
- def isfile(self, path):
- return super().isfile(path)
-
- @doc(FileSystem.delete)
- def delete(self, path, recursive=False):
- return super().delete(path, recursive)
-
- def mkdir(self, path, **kwargs):
- """
- Create directory in HDFS.
-
- Parameters
- ----------
- path : str
- Directory path to create, including any parent directories.
-
- Notes
- -----
- libhdfs does not support create_parents=False, so we ignore this here
- """
- return super().mkdir(path)
-
- @doc(FileSystem.rename)
- def rename(self, path, new_path):
- return super().rename(path, new_path)
-
- @doc(FileSystem.exists)
- def exists(self, path):
- return super().exists(path)
-
- def ls(self, path, detail=False):
- """
- Retrieve directory contents and metadata, if requested.
-
- Parameters
- ----------
- path : str
- HDFS path to retrieve contents of.
- detail : bool, default False
- If False, only return list of paths.
-
- Returns
- -------
- result : list of dicts (detail=True) or strings (detail=False)
- """
- return super().ls(path, detail)
-
- def walk(self, top_path):
- """
- Directory tree generator for HDFS, like os.walk.
-
- Parameters
- ----------
- top_path : str
- Root directory for tree traversal.
-
- Returns
- -------
- Generator yielding 3-tuple (dirpath, dirnames, filename)
- """
- contents = self.ls(top_path, detail=True)
-
- directories, files = _libhdfs_walk_files_dirs(top_path, contents)
- yield top_path, directories, files
- for dirname in directories:
- yield from self.walk(self._path_join(top_path, dirname))
-
-
-def _maybe_set_hadoop_classpath():
- import re
-
- if re.search(r'hadoop-common[^/]+.jar', os.environ.get('CLASSPATH', '')):
- return
-
- if 'HADOOP_HOME' in os.environ:
- if sys.platform != 'win32':
- classpath = _derive_hadoop_classpath()
- else:
- hadoop_bin = '{}/bin/hadoop'.format(os.environ['HADOOP_HOME'])
- classpath = _hadoop_classpath_glob(hadoop_bin)
- else:
- classpath = _hadoop_classpath_glob('hadoop')
-
- os.environ['CLASSPATH'] = classpath.decode('utf-8')
-
-
-def _derive_hadoop_classpath():
- import subprocess
-
- find_args = ('find', '-L', os.environ['HADOOP_HOME'], '-name', '*.jar')
- find = subprocess.Popen(find_args, stdout=subprocess.PIPE)
- xargs_echo = subprocess.Popen(('xargs', 'echo'),
- stdin=find.stdout,
- stdout=subprocess.PIPE)
- jars = subprocess.check_output(('tr', "' '", "':'"),
- stdin=xargs_echo.stdout)
- hadoop_conf = os.environ["HADOOP_CONF_DIR"] \
- if "HADOOP_CONF_DIR" in os.environ \
- else os.environ["HADOOP_HOME"] + "/etc/hadoop"
- return (hadoop_conf + ":").encode("utf-8") + jars
-
-
-def _hadoop_classpath_glob(hadoop_bin):
- import subprocess
-
- hadoop_classpath_args = (hadoop_bin, 'classpath', '--glob')
- return subprocess.check_output(hadoop_classpath_args)
-
-
-def _libhdfs_walk_files_dirs(top_path, contents):
- files = []
- directories = []
- for c in contents:
- scrubbed_name = posixpath.split(c['name'])[1]
- if c['kind'] == 'file':
- files.append(scrubbed_name)
- else:
- directories.append(scrubbed_name)
-
- return directories, files
-
-
-def connect(host="default", port=0, user=None, kerb_ticket=None,
- extra_conf=None):
- """
- DEPRECATED: Connect to an HDFS cluster.
-
- All parameters are optional and should only be set if the defaults need
- to be overridden.
-
- Authentication should be automatic if the HDFS cluster uses Kerberos.
- However, if a username is specified, then the ticket cache will likely
- be required.
-
- .. deprecated:: 2.0
- ``pyarrow.hdfs.connect`` is deprecated,
- please use ``pyarrow.fs.HadoopFileSystem`` instead.
-
- Parameters
- ----------
- host : NameNode. Set to "default" for fs.defaultFS from core-site.xml.
- port : NameNode's port. Set to 0 for default or logical (HA) nodes.
- user : Username when connecting to HDFS; None implies login user.
- kerb_ticket : Path to Kerberos ticket cache.
- extra_conf : dict, default None
- extra Key/Value pairs for config; Will override any
- hdfs-site.xml properties
-
- Notes
- -----
- The first time you call this method, it will take longer than usual due
- to JNI spin-up time.
-
- Returns
- -------
- filesystem : HadoopFileSystem
- """
- warnings.warn(
- _DEPR_MSG.format("hdfs.connect", "2.0.0", "fs.HadoopFileSystem"),
- FutureWarning, stacklevel=2
- )
- return _connect(
- host=host, port=port, user=user, kerb_ticket=kerb_ticket,
- extra_conf=extra_conf
- )
-
-
-def _connect(host="default", port=0, user=None, kerb_ticket=None,
- extra_conf=None):
- with warnings.catch_warnings():
- warnings.simplefilter("ignore")
- fs = HadoopFileSystem(host=host, port=port, user=user,
- kerb_ticket=kerb_ticket,
- extra_conf=extra_conf)
- return fs
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index b57980b3d6..7890bf4b2d 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -30,6 +30,7 @@ import warnings
from io import BufferedIOBase, IOBase, TextIOBase, UnsupportedOperation
from queue import Queue, Empty as QueueEmpty
+from pyarrow.lib cimport check_status, HaveLibHdfs
from pyarrow.util import _is_path_like, _stringify_path
@@ -46,6 +47,18 @@ cdef extern from "Python.h":
bytearray PyByteArray_FromStringAndSize(char *string, Py_ssize_t len)
+def have_libhdfs():
+ """
+ Return true if HDFS (HadoopFileSystem) library is set up correctly.
+ """
+ try:
+ with nogil:
+ check_status(HaveLibHdfs())
+ return True
+ except Exception:
+ return False
+
+
def io_thread_count():
"""
Return the number of threads to use for I/O operations.
diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py
index 98a4b2a113..69a1c9d19a 100644
--- a/python/pyarrow/parquet/core.py
+++ b/python/pyarrow/parquet/core.py
@@ -47,7 +47,6 @@ from pyarrow._parquet import (ParquetReader, Statistics, #
noqa
SortingColumn)
from pyarrow.fs import (LocalFileSystem, FileSystem, FileType,
_resolve_filesystem_and_path, _ensure_filesystem)
-from pyarrow import filesystem as legacyfs
from pyarrow.util import guid, _is_path_like, _stringify_path, _deprecate_api
@@ -309,7 +308,7 @@ class ParquetFile:
self._close_source = getattr(source, 'closed', True)
filesystem, source = _resolve_filesystem_and_path(
- source, filesystem, memory_map)
+ source, filesystem, memory_map=memory_map)
if filesystem is not None:
source = filesystem.open_input_file(source)
self._close_source = True # We opened it here, ensure we close it.
@@ -989,20 +988,13 @@ Examples
# sure to close it when `self.close` is called.
self.file_handle = None
- filesystem, path = _resolve_filesystem_and_path(
- where, filesystem, allow_legacy_filesystem=True
- )
+ filesystem, path = _resolve_filesystem_and_path(where, filesystem)
if filesystem is not None:
- if isinstance(filesystem, legacyfs.FileSystem):
- # legacy filesystem (eg custom subclass)
- # TODO deprecate
- sink = self.file_handle = filesystem.open(path, 'wb')
- else:
- # ARROW-10480: do not auto-detect compression. While
- # a filename like foo.parquet.gz is nonconforming, it
- # shouldn't implicitly apply compression.
- sink = self.file_handle = filesystem.open_output_stream(
- path, compression=None)
+ # ARROW-10480: do not auto-detect compression. While
+ # a filename like foo.parquet.gz is nonconforming, it
+ # shouldn't implicitly apply compression.
+ sink = self.file_handle = filesystem.open_output_stream(
+ path, compression=None)
else:
sink = where
self._metadata_collector = options.pop('metadata_collector', None)
@@ -1124,12 +1116,6 @@ def _get_pandas_index_columns(keyvalues):
EXCLUDED_PARQUET_PATHS = {'_SUCCESS'}
-def _is_local_file_system(fs):
- return isinstance(fs, LocalFileSystem) or isinstance(
- fs, legacyfs.LocalFileSystem
- )
-
-
_read_docstring_common = """\
read_dictionary : list, default None
List of names or column paths (for nested types) to read directly
@@ -1306,7 +1292,7 @@ Examples
if (
hasattr(path_or_paths, "__fspath__") and
filesystem is not None and
- not _is_local_file_system(filesystem)
+ not isinstance(filesystem, LocalFileSystem)
):
raise TypeError(
"Path-like objects with __fspath__ must only be used with "
diff --git a/python/pyarrow/tests/parquet/test_basic.py
b/python/pyarrow/tests/parquet/test_basic.py
index 3c867776ac..bc21d709ec 100644
--- a/python/pyarrow/tests/parquet/test_basic.py
+++ b/python/pyarrow/tests/parquet/test_basic.py
@@ -25,7 +25,6 @@ import pytest
import pyarrow as pa
from pyarrow import fs
-from pyarrow.filesystem import LocalFileSystem, FileSystem
from pyarrow.tests import util
from pyarrow.tests.parquet.common import (_check_roundtrip, _roundtrip_table,
_test_dataframe)
@@ -259,11 +258,11 @@ def test_fspath(tempdir):
# combined with non-local filesystem raises
with pytest.raises(TypeError):
- _read_table(fs_protocol_obj, filesystem=FileSystem())
+ _read_table(fs_protocol_obj, filesystem=fs.FileSystem())
@pytest.mark.parametrize("filesystem", [
- None, fs.LocalFileSystem(), LocalFileSystem._get_instance()
+ None, fs.LocalFileSystem()
])
@pytest.mark.parametrize("name", ("data.parquet", "δΎ‹.parquet"))
def test_relative_paths(tempdir, filesystem, name):
diff --git a/python/pyarrow/tests/parquet/test_dataset.py
b/python/pyarrow/tests/parquet/test_dataset.py
index b6e351bdef..30dae05124 100644
--- a/python/pyarrow/tests/parquet/test_dataset.py
+++ b/python/pyarrow/tests/parquet/test_dataset.py
@@ -26,11 +26,10 @@ import unittest.mock as mock
import pyarrow as pa
import pyarrow.compute as pc
-from pyarrow import fs
-from pyarrow.filesystem import LocalFileSystem
+from pyarrow.fs import (FileSelector, FileSystem, LocalFileSystem,
+ PyFileSystem, SubTreeFileSystem, FSSpecHandler)
from pyarrow.tests import util
from pyarrow.util import guid
-from pyarrow.vendored.version import Version
try:
import pyarrow.parquet as pq
@@ -63,7 +62,7 @@ def test_filesystem_uri(tempdir):
# filesystem object
result = pq.read_table(
- path, filesystem=fs.LocalFileSystem())
+ path, filesystem=LocalFileSystem())
assert result.equals(table)
# filesystem URI
@@ -74,17 +73,17 @@ def test_filesystem_uri(tempdir):
@pytest.mark.pandas
def test_read_partitioned_directory(tempdir):
- fs = LocalFileSystem._get_instance()
- _partition_test_for_filesystem(fs, tempdir)
+ local = LocalFileSystem()
+ _partition_test_for_filesystem(local, tempdir)
@pytest.mark.pandas
def test_read_partitioned_columns_selection(tempdir):
# ARROW-3861 - do not include partition columns in resulting table when
# `columns` keyword was passed without those columns
- fs = LocalFileSystem._get_instance()
+ local = LocalFileSystem()
base_path = tempdir
- _partition_test_for_filesystem(fs, base_path)
+ _partition_test_for_filesystem(local, base_path)
dataset = pq.ParquetDataset(base_path)
result = dataset.read(columns=["values"])
@@ -93,7 +92,7 @@ def test_read_partitioned_columns_selection(tempdir):
@pytest.mark.pandas
def test_filters_equivalency(tempdir):
- fs = LocalFileSystem._get_instance()
+ local = LocalFileSystem()
base_path = tempdir
integer_keys = [0, 1]
@@ -112,12 +111,12 @@ def test_filters_equivalency(tempdir):
3),
}, columns=['integer', 'string', 'boolean'])
- _generate_partition_directories(fs, base_path, partition_spec, df)
+ _generate_partition_directories(local, base_path, partition_spec, df)
# Old filters syntax:
# integer == 1 AND string != b AND boolean == True
dataset = pq.ParquetDataset(
- base_path, filesystem=fs,
+ base_path, filesystem=local,
filters=[('integer', '=', 1), ('string', '!=', 'b'),
('boolean', '==', 'True')],
)
@@ -141,7 +140,7 @@ def test_filters_equivalency(tempdir):
[('integer', '=', 0), ('boolean', '==', 'False')]
]
dataset = pq.ParquetDataset(
- base_path, filesystem=fs, filters=filters)
+ base_path, filesystem=local, filters=filters)
table = dataset.read()
result_df = table.to_pandas().reset_index(drop=True)
@@ -158,13 +157,13 @@ def test_filters_equivalency(tempdir):
for filters in [[[('string', '==', b'1\0a')]],
[[('string', '==', '1\0a')]]]:
dataset = pq.ParquetDataset(
- base_path, filesystem=fs, filters=filters)
+ base_path, filesystem=local, filters=filters)
assert dataset.read().num_rows == 0
@pytest.mark.pandas
def test_filters_cutoff_exclusive_integer(tempdir):
- fs = LocalFileSystem._get_instance()
+ local = LocalFileSystem()
base_path = tempdir
integer_keys = [0, 1, 2, 3, 4]
@@ -178,10 +177,10 @@ def test_filters_cutoff_exclusive_integer(tempdir):
'integers': np.array(integer_keys, dtype='i4'),
}, columns=['index', 'integers'])
- _generate_partition_directories(fs, base_path, partition_spec, df)
+ _generate_partition_directories(local, base_path, partition_spec, df)
dataset = pq.ParquetDataset(
- base_path, filesystem=fs,
+ base_path, filesystem=local,
filters=[
('integers', '<', 4),
('integers', '>', 1),
@@ -204,7 +203,7 @@ def test_filters_cutoff_exclusive_integer(tempdir):
)
@pytest.mark.pandas
def test_filters_cutoff_exclusive_datetime(tempdir):
- fs = LocalFileSystem._get_instance()
+ local = LocalFileSystem()
base_path = tempdir
date_keys = [
@@ -224,10 +223,10 @@ def test_filters_cutoff_exclusive_datetime(tempdir):
'dates': np.array(date_keys, dtype='datetime64'),
}, columns=['index', 'dates'])
- _generate_partition_directories(fs, base_path, partition_spec, df)
+ _generate_partition_directories(local, base_path, partition_spec, df)
dataset = pq.ParquetDataset(
- base_path, filesystem=fs,
+ base_path, filesystem=local,
filters=[
('dates', '<', "2018-04-12"),
('dates', '>', "2018-04-10")
@@ -264,7 +263,7 @@ def test_filters_inclusive_datetime(tempdir):
@pytest.mark.pandas
def test_filters_inclusive_integer(tempdir):
- fs = LocalFileSystem._get_instance()
+ local = LocalFileSystem()
base_path = tempdir
integer_keys = [0, 1, 2, 3, 4]
@@ -278,10 +277,10 @@ def test_filters_inclusive_integer(tempdir):
'integers': np.array(integer_keys, dtype='i4'),
}, columns=['index', 'integers'])
- _generate_partition_directories(fs, base_path, partition_spec, df)
+ _generate_partition_directories(local, base_path, partition_spec, df)
dataset = pq.ParquetDataset(
- base_path, filesystem=fs,
+ base_path, filesystem=local,
filters=[
('integers', '<=', 3),
('integers', '>=', 2),
@@ -298,7 +297,7 @@ def test_filters_inclusive_integer(tempdir):
@pytest.mark.pandas
def test_filters_inclusive_set(tempdir):
- fs = LocalFileSystem._get_instance()
+ local = LocalFileSystem()
base_path = tempdir
integer_keys = [0, 1]
@@ -317,10 +316,10 @@ def test_filters_inclusive_set(tempdir):
3),
}, columns=['integer', 'string', 'boolean'])
- _generate_partition_directories(fs, base_path, partition_spec, df)
+ _generate_partition_directories(local, base_path, partition_spec, df)
dataset = pq.ParquetDataset(
- base_path, filesystem=fs,
+ base_path, filesystem=local,
filters=[('string', 'in', 'ab')],
)
table = dataset.read()
@@ -331,7 +330,7 @@ def test_filters_inclusive_set(tempdir):
assert 'c' not in result_df['string'].values
dataset = pq.ParquetDataset(
- base_path, filesystem=fs,
+ base_path, filesystem=local,
filters=[('integer', 'in', [1]), ('string', 'in', ('a', 'b')),
('boolean', 'not in', {'False'})],
)
@@ -345,7 +344,7 @@ def test_filters_inclusive_set(tempdir):
@pytest.mark.pandas
def test_filters_invalid_pred_op(tempdir):
- fs = LocalFileSystem._get_instance()
+ local = LocalFileSystem()
base_path = tempdir
integer_keys = [0, 1, 2, 3, 4]
@@ -359,26 +358,26 @@ def test_filters_invalid_pred_op(tempdir):
'integers': np.array(integer_keys, dtype='i4'),
}, columns=['index', 'integers'])
- _generate_partition_directories(fs, base_path, partition_spec, df)
+ _generate_partition_directories(local, base_path, partition_spec, df)
with pytest.raises(TypeError):
pq.ParquetDataset(base_path,
- filesystem=fs,
+ filesystem=local,
filters=[('integers', 'in', 3), ])
with pytest.raises(ValueError):
pq.ParquetDataset(base_path,
- filesystem=fs,
+ filesystem=local,
filters=[('integers', '=<', 3), ])
# Dataset API returns empty table
dataset = pq.ParquetDataset(base_path,
- filesystem=fs,
+ filesystem=local,
filters=[('integers', 'in', set()), ])
assert dataset.read().num_rows == 0
dataset = pq.ParquetDataset(base_path,
- filesystem=fs,
+ filesystem=local,
filters=[('integers', '!=', {3})])
with pytest.raises(NotImplementedError):
assert dataset.read().num_rows == 0
@@ -388,7 +387,7 @@ def test_filters_invalid_pred_op(tempdir):
def test_filters_invalid_column(tempdir):
# ARROW-5572 - raise error on invalid name in filter specification
# works with new dataset
- fs = LocalFileSystem._get_instance()
+ local = LocalFileSystem()
base_path = tempdir
integer_keys = [0, 1, 2, 3, 4]
@@ -400,11 +399,11 @@ def test_filters_invalid_column(tempdir):
'integers': np.array(integer_keys, dtype='i4'),
}, columns=['index', 'integers'])
- _generate_partition_directories(fs, base_path, partition_spec, df)
+ _generate_partition_directories(local, base_path, partition_spec, df)
msg = r"No match for FieldRef.Name\(non_existent_column\)"
with pytest.raises(ValueError, match=msg):
- pq.ParquetDataset(base_path, filesystem=fs,
+ pq.ParquetDataset(base_path, filesystem=local,
filters=[('non_existent_column', '<', 3), ]).read()
@@ -419,7 +418,7 @@ def test_filters_invalid_column(tempdir):
def test_filters_read_table(tempdir, filters, read_method):
read = getattr(pq, read_method)
# test that filters keyword is passed through in read_table
- fs = LocalFileSystem._get_instance()
+ local = LocalFileSystem()
base_path = tempdir
integer_keys = [0, 1, 2, 3, 4]
@@ -434,9 +433,9 @@ def test_filters_read_table(tempdir, filters, read_method):
'nested': np.array([{'a': i, 'b': str(i)} for i in range(N)])
})
- _generate_partition_directories(fs, base_path, partition_spec, df)
+ _generate_partition_directories(local, base_path, partition_spec, df)
- kwargs = dict(filesystem=fs, filters=filters)
+ kwargs = dict(filesystem=local, filters=filters)
table = read(base_path, **kwargs)
assert table.num_rows == 3
@@ -445,7 +444,7 @@ def test_filters_read_table(tempdir, filters, read_method):
@pytest.mark.pandas
def test_partition_keys_with_underscores(tempdir):
# ARROW-5666 - partition field values with underscores preserve underscores
- fs = LocalFileSystem._get_instance()
+ local = LocalFileSystem()
base_path = tempdir
string_keys = ["2019_2", "2019_3"]
@@ -459,7 +458,7 @@ def test_partition_keys_with_underscores(tempdir):
'year_week': np.array(string_keys, dtype='object'),
}, columns=['index', 'year_week'])
- _generate_partition_directories(fs, base_path, partition_spec, df)
+ _generate_partition_directories(local, base_path, partition_spec, df)
dataset = pq.ParquetDataset(base_path)
result = dataset.read()
@@ -499,26 +498,6 @@ def test_read_single_file_list(tempdir):
assert result.equals(table)
[email protected]
[email protected]
-def test_read_partitioned_directory_s3fs_wrapper(s3_example_s3fs):
- import s3fs
-
- from pyarrow.filesystem import S3FSWrapper
-
- if Version(s3fs.__version__) >= Version("0.5"):
- pytest.skip("S3FSWrapper no longer working for s3fs 0.5+")
-
- fs, path = s3_example_s3fs
- with pytest.warns(FutureWarning):
- wrapper = S3FSWrapper(fs)
- _partition_test_for_filesystem(wrapper, path)
-
- # Check that we can auto-wrap
- dataset = pq.ParquetDataset(path, filesystem=fs)
- dataset.read()
-
-
@pytest.mark.pandas
@pytest.mark.s3
def test_read_partitioned_directory_s3fs(s3_example_s3fs):
@@ -569,6 +548,9 @@ def _generate_partition_directories(fs, base_dir,
partition_spec, df):
# partition_spec : list of lists, e.g. [['foo', [0, 1, 2],
# ['bar', ['a', 'b', 'c']]
# part_table : a pyarrow.Table to write to each partition
+ if not isinstance(fs, FileSystem):
+ fs = PyFileSystem(FSSpecHandler(fs))
+
DEPTH = len(partition_spec)
pathsep = getattr(fs, "pathsep", getattr(fs, "sep", "/"))
@@ -582,24 +564,27 @@ def _generate_partition_directories(fs, base_dir,
partition_spec, df):
str(base_dir),
'{}={}'.format(name, value)
])
- fs.mkdir(level_dir)
+ fs.create_dir(level_dir)
if level == DEPTH - 1:
# Generate example data
+ from pyarrow.fs import FileType
+
file_path = pathsep.join([level_dir, guid()])
filtered_df = _filter_partition(df, this_part_keys)
part_table = pa.Table.from_pandas(filtered_df)
- with fs.open(file_path, 'wb') as f:
+ with fs.open_output_stream(file_path) as f:
_write_table(part_table, f)
- assert fs.exists(file_path)
+ assert fs.get_file_info(file_path).type != FileType.NotFound
+ assert fs.get_file_info(file_path).type == FileType.File
file_success = pathsep.join([level_dir, '_SUCCESS'])
- with fs.open(file_success, 'wb') as f:
+ with fs.open_output_stream(file_success) as f:
pass
else:
_visit_level(level_dir, level + 1, this_part_keys)
file_success = pathsep.join([level_dir, '_SUCCESS'])
- with fs.open(file_success, 'wb') as f:
+ with fs.open_output_stream(file_success) as f:
pass
_visit_level(base_dir, 0, [])
@@ -1009,15 +994,21 @@ def _test_write_to_dataset_no_partitions(base_path,
output_table = pa.Table.from_pandas(output_df)
if filesystem is None:
- filesystem = LocalFileSystem._get_instance()
+ filesystem = LocalFileSystem()
+ elif not isinstance(filesystem, FileSystem):
+ filesystem = PyFileSystem(FSSpecHandler(filesystem))
# Without partitions, append files to root_path
n = 5
for i in range(n):
pq.write_to_dataset(output_table, base_path,
filesystem=filesystem)
- output_files = [file for file in filesystem.ls(str(base_path))
- if file.endswith(".parquet")]
+
+ selector = FileSelector(str(base_path), allow_not_found=False,
+ recursive=True)
+
+ infos = filesystem.get_file_info(selector)
+ output_files = [info for info in infos if info.path.endswith(".parquet")]
assert len(output_files) == n
# Deduplicated incoming DataFrame should match
@@ -1103,14 +1094,14 @@ def test_write_to_dataset_filesystem(tempdir):
table = pa.Table.from_pandas(df)
path = str(tempdir)
- pq.write_to_dataset(table, path, filesystem=fs.LocalFileSystem())
+ pq.write_to_dataset(table, path, filesystem=LocalFileSystem())
result = pq.read_table(path)
assert result.equals(table)
def _make_dataset_for_pickling(tempdir, N=100):
path = tempdir / 'data.parquet'
- fs = LocalFileSystem._get_instance()
+ local = LocalFileSystem()
df = pd.DataFrame({
'index': np.arange(N),
@@ -1127,11 +1118,11 @@ def _make_dataset_for_pickling(tempdir, N=100):
assert reader.metadata.num_row_groups == num_groups
metadata_path = tempdir / '_metadata'
- with fs.open(metadata_path, 'wb') as f:
+ with local.open_output_stream(str(metadata_path)) as f:
pq.write_metadata(table.schema, f)
dataset = pq.ParquetDataset(
- tempdir, filesystem=fs)
+ tempdir, filesystem=local)
return dataset
@@ -1249,7 +1240,7 @@ def test_parquet_dataset_new_filesystem(tempdir):
# Ensure we can pass new FileSystem object to ParquetDataset
table = pa.table({'a': [1, 2, 3]})
pq.write_table(table, tempdir / 'data.parquet')
- filesystem = fs.SubTreeFileSystem(str(tempdir), fs.LocalFileSystem())
+ filesystem = SubTreeFileSystem(str(tempdir), LocalFileSystem())
dataset = pq.ParquetDataset('.', filesystem=filesystem)
result = dataset.read()
assert result.equals(table)
diff --git a/python/pyarrow/tests/parquet/test_parquet_writer.py
b/python/pyarrow/tests/parquet/test_parquet_writer.py
index 16584684f5..f4ee7529ae 100644
--- a/python/pyarrow/tests/parquet/test_parquet_writer.py
+++ b/python/pyarrow/tests/parquet/test_parquet_writer.py
@@ -19,7 +19,6 @@ import pytest
import pyarrow as pa
from pyarrow import fs
-from pyarrow.filesystem import FileSystem, LocalFileSystem
try:
import pyarrow.parquet as pq
@@ -161,7 +160,6 @@ def test_parquet_writer_context_obj_with_exception(tempdir):
@pytest.mark.pandas
@pytest.mark.parametrize("filesystem", [
None,
- LocalFileSystem._get_instance(),
fs.LocalFileSystem(),
])
def test_parquet_writer_write_wrappers(tempdir, filesystem):
@@ -250,7 +248,6 @@ def test_parquet_writer_chunk_size(tempdir):
@pytest.mark.pandas
@pytest.mark.parametrize("filesystem", [
None,
- LocalFileSystem._get_instance(),
fs.LocalFileSystem(),
])
def test_parquet_writer_filesystem_local(tempdir, filesystem):
@@ -330,46 +327,6 @@ def test_parquet_writer_filesystem_buffer_raises():
)
[email protected]
-def test_parquet_writer_with_caller_provided_filesystem():
- out = pa.BufferOutputStream()
-
- class CustomFS(FileSystem):
- def __init__(self):
- self.path = None
- self.mode = None
-
- def open(self, path, mode='rb'):
- self.path = path
- self.mode = mode
- return out
-
- fs = CustomFS()
- fname = 'expected_fname.parquet'
- df = _test_dataframe(100)
- table = pa.Table.from_pandas(df, preserve_index=False)
-
- with pq.ParquetWriter(fname, table.schema, filesystem=fs, version='2.6') \
- as writer:
- writer.write_table(table)
-
- assert fs.path == fname
- assert fs.mode == 'wb'
- assert out.closed
-
- buf = out.getvalue()
- table_read = _read_table(pa.BufferReader(buf))
- df_read = table_read.to_pandas()
- tm.assert_frame_equal(df_read, df)
-
- # Should raise ValueError when filesystem is passed with file-like object
- with pytest.raises(ValueError) as err_info:
- pq.ParquetWriter(pa.BufferOutputStream(), table.schema, filesystem=fs)
- expected_msg = ("filesystem passed but where is file-like, so"
- " there is nothing to open with filesystem.")
- assert str(err_info) == expected_msg
-
-
def test_parquet_writer_store_schema(tempdir):
table = pa.table({'a': [1, 2, 3]})
diff --git a/python/pyarrow/tests/test_filesystem.py
b/python/pyarrow/tests/test_filesystem.py
deleted file mode 100644
index 9862c5990d..0000000000
--- a/python/pyarrow/tests/test_filesystem.py
+++ /dev/null
@@ -1,75 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import pyarrow as pa
-from pyarrow import filesystem
-
-import os
-import pytest
-
-
-def test_filesystem_deprecated():
- with pytest.warns(FutureWarning):
- filesystem.LocalFileSystem()
-
- with pytest.warns(FutureWarning):
- filesystem.LocalFileSystem.get_instance()
-
-
-def test_filesystem_deprecated_toplevel():
- with pytest.warns(FutureWarning):
- pa.localfs
-
- with pytest.warns(FutureWarning):
- pa.FileSystem
-
- with pytest.warns(FutureWarning):
- pa.LocalFileSystem
-
- with pytest.warns(FutureWarning):
- pa.HadoopFileSystem
-
-
-def test_resolve_uri():
- uri = "file:///home/user/myfile.parquet"
- fs, path = filesystem.resolve_filesystem_and_path(uri)
- assert isinstance(fs, filesystem.LocalFileSystem)
- assert path == "/home/user/myfile.parquet"
-
-
-def test_resolve_local_path():
- for uri in ['/home/user/myfile.parquet',
- 'myfile.parquet',
- 'my # file ? parquet',
- 'C:/Windows/myfile.parquet',
- r'C:\\Windows\\myfile.parquet',
- ]:
- fs, path = filesystem.resolve_filesystem_and_path(uri)
- assert isinstance(fs, filesystem.LocalFileSystem)
- assert path == uri
-
-
[email protected]("ignore:pyarrow.filesystem.LocalFileSystem")
-def test_resolve_home_directory():
- uri = '~/myfile.parquet'
- fs, path = filesystem.resolve_filesystem_and_path(uri)
- assert isinstance(fs, filesystem.LocalFileSystem)
- assert path == os.path.expanduser(uri)
-
- local_fs = filesystem.LocalFileSystem()
- fs, path = filesystem.resolve_filesystem_and_path(uri, local_fs)
- assert path == os.path.expanduser(uri)
diff --git a/python/pyarrow/tests/test_hdfs.py
b/python/pyarrow/tests/test_hdfs.py
deleted file mode 100644
index 5b94c200f3..0000000000
--- a/python/pyarrow/tests/test_hdfs.py
+++ /dev/null
@@ -1,451 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import os
-import random
-from io import BytesIO
-from os.path import join as pjoin
-
-import numpy as np
-import pytest
-
-import pyarrow as pa
-from pyarrow.tests import util
-from pyarrow.tests.parquet.common import _test_dataframe
-from pyarrow.tests.parquet.test_dataset import (
- _test_write_to_dataset_with_partitions,
- _test_write_to_dataset_no_partitions
-)
-from pyarrow.util import guid
-
-try:
- from pandas.testing import assert_frame_equal
-except ImportError:
- pass
-
-
-# ----------------------------------------------------------------------
-# HDFS tests
-
-
-def check_libhdfs_present():
- if not pa.have_libhdfs():
- message = 'No libhdfs available on system'
- if os.environ.get('PYARROW_HDFS_TEST_LIBHDFS_REQUIRE'):
- pytest.fail(message)
- else:
- pytest.skip(message)
-
-
-def hdfs_test_client():
- host = os.environ.get('ARROW_HDFS_TEST_HOST', 'default')
- user = os.environ.get('ARROW_HDFS_TEST_USER', None)
- try:
- port = int(os.environ.get('ARROW_HDFS_TEST_PORT', 0))
- except ValueError:
- raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not '
- 'an integer')
-
- with pytest.warns(FutureWarning):
- return pa.hdfs.connect(host, port, user)
-
-
[email protected]
-class HdfsTestCases:
-
- def _make_test_file(self, hdfs, test_name, test_path, test_data):
- base_path = pjoin(self.tmp_path, test_name)
- hdfs.mkdir(base_path)
-
- full_path = pjoin(base_path, test_path)
-
- with hdfs.open(full_path, 'wb') as f:
- f.write(test_data)
-
- return full_path
-
- @classmethod
- def setup_class(cls):
- cls.check_driver()
- cls.hdfs = hdfs_test_client()
- cls.tmp_path = '/tmp/pyarrow-test-{}'.format(random.randint(0, 1000))
- cls.hdfs.mkdir(cls.tmp_path)
-
- @classmethod
- def teardown_class(cls):
- cls.hdfs.delete(cls.tmp_path, recursive=True)
- cls.hdfs.close()
-
- def test_pickle(self, pickle_module):
- s = pickle_module.dumps(self.hdfs)
- h2 = pickle_module.loads(s)
- assert h2.is_open
- assert h2.host == self.hdfs.host
- assert h2.port == self.hdfs.port
- assert h2.user == self.hdfs.user
- assert h2.kerb_ticket == self.hdfs.kerb_ticket
- # smoketest unpickled client works
- h2.ls(self.tmp_path)
-
- def test_cat(self):
- path = pjoin(self.tmp_path, 'cat-test')
-
- data = b'foobarbaz'
- with self.hdfs.open(path, 'wb') as f:
- f.write(data)
-
- contents = self.hdfs.cat(path)
- assert contents == data
-
- def test_capacity_space(self):
- capacity = self.hdfs.get_capacity()
- space_used = self.hdfs.get_space_used()
- disk_free = self.hdfs.df()
-
- assert capacity > 0
- assert capacity > space_used
- assert disk_free == (capacity - space_used)
-
- def test_close(self):
- client = hdfs_test_client()
- assert client.is_open
- client.close()
- assert not client.is_open
-
- with pytest.raises(Exception):
- client.ls('/')
-
- def test_mkdir(self):
- path = pjoin(self.tmp_path, 'test-dir/test-dir')
- parent_path = pjoin(self.tmp_path, 'test-dir')
-
- self.hdfs.mkdir(path)
- assert self.hdfs.exists(path)
-
- self.hdfs.delete(parent_path, recursive=True)
- assert not self.hdfs.exists(path)
-
- def test_mv_rename(self):
- path = pjoin(self.tmp_path, 'mv-test')
- new_path = pjoin(self.tmp_path, 'mv-new-test')
-
- data = b'foobarbaz'
- with self.hdfs.open(path, 'wb') as f:
- f.write(data)
-
- assert self.hdfs.exists(path)
- self.hdfs.mv(path, new_path)
- assert not self.hdfs.exists(path)
- assert self.hdfs.exists(new_path)
-
- assert self.hdfs.cat(new_path) == data
-
- self.hdfs.rename(new_path, path)
- assert self.hdfs.cat(path) == data
-
- def test_info(self):
- path = pjoin(self.tmp_path, 'info-base')
- file_path = pjoin(path, 'ex')
- self.hdfs.mkdir(path)
-
- data = b'foobarbaz'
- with self.hdfs.open(file_path, 'wb') as f:
- f.write(data)
-
- path_info = self.hdfs.info(path)
- file_path_info = self.hdfs.info(file_path)
-
- assert path_info['kind'] == 'directory'
-
- assert file_path_info['kind'] == 'file'
- assert file_path_info['size'] == len(data)
-
- def test_exists_isdir_isfile(self):
- dir_path = pjoin(self.tmp_path, 'info-base')
- file_path = pjoin(dir_path, 'ex')
- missing_path = pjoin(dir_path, 'this-path-is-missing')
-
- self.hdfs.mkdir(dir_path)
- with self.hdfs.open(file_path, 'wb') as f:
- f.write(b'foobarbaz')
-
- assert self.hdfs.exists(dir_path)
- assert self.hdfs.exists(file_path)
- assert not self.hdfs.exists(missing_path)
-
- assert self.hdfs.isdir(dir_path)
- assert not self.hdfs.isdir(file_path)
- assert not self.hdfs.isdir(missing_path)
-
- assert not self.hdfs.isfile(dir_path)
- assert self.hdfs.isfile(file_path)
- assert not self.hdfs.isfile(missing_path)
-
- def test_disk_usage(self):
- path = pjoin(self.tmp_path, 'disk-usage-base')
- p1 = pjoin(path, 'p1')
- p2 = pjoin(path, 'p2')
-
- subdir = pjoin(path, 'subdir')
- p3 = pjoin(subdir, 'p3')
-
- if self.hdfs.exists(path):
- self.hdfs.delete(path, True)
-
- self.hdfs.mkdir(path)
- self.hdfs.mkdir(subdir)
-
- data = b'foobarbaz'
-
- for file_path in [p1, p2, p3]:
- with self.hdfs.open(file_path, 'wb') as f:
- f.write(data)
-
- assert self.hdfs.disk_usage(path) == len(data) * 3
-
- def test_ls(self):
- base_path = pjoin(self.tmp_path, 'ls-test')
- self.hdfs.mkdir(base_path)
-
- dir_path = pjoin(base_path, 'a-dir')
- f1_path = pjoin(base_path, 'a-file-1')
-
- self.hdfs.mkdir(dir_path)
-
- f = self.hdfs.open(f1_path, 'wb')
- f.write(b'a' * 10)
-
- contents = sorted(self.hdfs.ls(base_path, False))
- assert contents == [dir_path, f1_path]
-
- def test_chmod_chown(self):
- path = pjoin(self.tmp_path, 'chmod-test')
- with self.hdfs.open(path, 'wb') as f:
- f.write(b'a' * 10)
-
- def test_download_upload(self):
- base_path = pjoin(self.tmp_path, 'upload-test')
-
- data = b'foobarbaz'
- buf = BytesIO(data)
- buf.seek(0)
-
- self.hdfs.upload(base_path, buf)
-
- out_buf = BytesIO()
- self.hdfs.download(base_path, out_buf)
- out_buf.seek(0)
- assert out_buf.getvalue() == data
-
- def test_file_context_manager(self):
- path = pjoin(self.tmp_path, 'ctx-manager')
-
- data = b'foo'
- with self.hdfs.open(path, 'wb') as f:
- f.write(data)
-
- with self.hdfs.open(path, 'rb') as f:
- assert f.size() == 3
- result = f.read(10)
- assert result == data
-
- def test_open_not_exist(self):
- path = pjoin(self.tmp_path, 'does-not-exist-123')
-
- with pytest.raises(FileNotFoundError):
- self.hdfs.open(path)
-
- def test_open_write_error(self):
- with pytest.raises((FileExistsError, IsADirectoryError)):
- self.hdfs.open('/', 'wb')
-
- def test_read_whole_file(self):
- path = pjoin(self.tmp_path, 'read-whole-file')
-
- data = b'foo' * 1000
- with self.hdfs.open(path, 'wb') as f:
- f.write(data)
-
- with self.hdfs.open(path, 'rb') as f:
- result = f.read()
-
- assert result == data
-
- def _write_multiple_hdfs_pq_files(self, tmpdir):
- import pyarrow.parquet as pq
- nfiles = 10
- size = 5
- test_data = []
- for i in range(nfiles):
- df = _test_dataframe(size, seed=i)
-
- df['index'] = np.arange(i * size, (i + 1) * size)
-
- # Hack so that we don't have a dtype cast in v1 files
- df['uint32'] = df['uint32'].astype(np.int64)
-
- path = pjoin(tmpdir, '{}.parquet'.format(i))
-
- table = pa.Table.from_pandas(df, preserve_index=False)
- with self.hdfs.open(path, 'wb') as f:
- pq.write_table(table, f)
-
- test_data.append(table)
-
- expected = pa.concat_tables(test_data)
- return expected
-
- @pytest.mark.xfail(reason="legacy.FileSystem not supported with
ParquetDataset "
- "due to legacy path being removed in PyArrow 15.0.0.",
- raises=TypeError)
- @pytest.mark.pandas
- @pytest.mark.parquet
- def test_read_multiple_parquet_files(self):
-
- tmpdir = pjoin(self.tmp_path, 'multi-parquet-' + guid())
-
- self.hdfs.mkdir(tmpdir)
-
- expected = self._write_multiple_hdfs_pq_files(tmpdir)
- result = self.hdfs.read_parquet(tmpdir)
-
- assert_frame_equal(
- result.to_pandas().sort_values(by='index').reset_index(drop=True),
- expected.to_pandas()
- )
-
- @pytest.mark.pandas
- @pytest.mark.parquet
- def test_read_multiple_parquet_files_with_uri(self):
- import pyarrow.parquet as pq
-
- tmpdir = pjoin(self.tmp_path, 'multi-parquet-uri-' + guid())
-
- self.hdfs.mkdir(tmpdir)
-
- expected = self._write_multiple_hdfs_pq_files(tmpdir)
- path = _get_hdfs_uri(tmpdir)
- result = pq.read_table(path)
-
- assert_frame_equal(
- result.to_pandas().sort_values(by='index').reset_index(drop=True),
- expected.to_pandas()
- )
-
- @pytest.mark.xfail(reason="legacy.FileSystem not supported with
ParquetDataset "
- "due to legacy path being removed in PyArrow 15.0.0.",
- raises=TypeError)
- @pytest.mark.pandas
- @pytest.mark.parquet
- def test_read_write_parquet_files_with_uri(self):
- import pyarrow.parquet as pq
-
- tmpdir = pjoin(self.tmp_path, 'uri-parquet-' + guid())
- self.hdfs.mkdir(tmpdir)
- path = _get_hdfs_uri(pjoin(tmpdir, 'test.parquet'))
-
- size = 5
- df = _test_dataframe(size, seed=0)
- # Hack so that we don't have a dtype cast in v1 files
- df['uint32'] = df['uint32'].astype(np.int64)
- table = pa.Table.from_pandas(df, preserve_index=False)
-
- pq.write_table(table, path, filesystem=self.hdfs)
-
- result = pq.read_table(path, filesystem=self.hdfs).to_pandas()
-
- assert_frame_equal(result, df)
-
- @pytest.mark.xfail(reason="legacy.FileSystem not supported with
ParquetDataset "
- "due to legacy path being removed in PyArrow 15.0.0.",
- raises=TypeError)
- @pytest.mark.parquet
- @pytest.mark.pandas
- def test_write_to_dataset_with_partitions(self):
- tmpdir = pjoin(self.tmp_path, 'write-partitions-' + guid())
- self.hdfs.mkdir(tmpdir)
- _test_write_to_dataset_with_partitions(
- tmpdir, filesystem=self.hdfs)
-
- @pytest.mark.xfail(reason="legacy.FileSystem not supported with
ParquetDataset "
- "due to legacy path being removed in PyArrow 15.0.0.",
- raises=TypeError)
- @pytest.mark.parquet
- @pytest.mark.pandas
- def test_write_to_dataset_no_partitions(self):
- tmpdir = pjoin(self.tmp_path, 'write-no_partitions-' + guid())
- self.hdfs.mkdir(tmpdir)
- _test_write_to_dataset_no_partitions(
- tmpdir, filesystem=self.hdfs)
-
-
-class TestLibHdfs(HdfsTestCases):
-
- @classmethod
- def check_driver(cls):
- check_libhdfs_present()
-
- def test_orphaned_file(self):
- hdfs = hdfs_test_client()
- file_path = self._make_test_file(hdfs, 'orphaned_file_test', 'fname',
- b'foobarbaz')
-
- f = hdfs.open(file_path)
- hdfs = None
- f = None # noqa
-
-
-def _get_hdfs_uri(path):
- host = os.environ.get('ARROW_HDFS_TEST_HOST', 'localhost')
- try:
- port = int(os.environ.get('ARROW_HDFS_TEST_PORT', 0))
- except ValueError:
- raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not '
- 'an integer')
- uri = "hdfs://{}:{}{}".format(host, port, path)
-
- return uri
-
-
[email protected]
[email protected]
[email protected]
[email protected]
-def test_fastparquet_read_with_hdfs():
- check_libhdfs_present()
- try:
- import snappy # noqa
- except ImportError:
- pytest.skip('fastparquet test requires snappy')
-
- import pyarrow.parquet as pq
- fastparquet = pytest.importorskip('fastparquet')
-
- fs = hdfs_test_client()
-
- df = util.make_dataframe()
-
- table = pa.Table.from_pandas(df)
-
- path = '/tmp/testing.parquet'
- with fs.open(path, 'wb') as f:
- pq.write_table(table, f)
-
- parquet_file = fastparquet.ParquetFile(path, open_with=fs.open)
-
- result = parquet_file.to_pandas()
- assert_frame_equal(result, df)
diff --git a/python/setup.py b/python/setup.py
index 423de708e8..798bd6b05f 100755
--- a/python/setup.py
+++ b/python/setup.py
@@ -211,7 +211,6 @@ class build_ext(_build_ext):
'_s3fs',
'_substrait',
'_hdfs',
- '_hdfsio',
'gandiva']
def _run_cmake(self):