This is an automated email from the ASF dual-hosted git repository.

jorisvandenbossche pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b194ad222 GH-20127: [Python] Remove deprecated pyarrow.filesystem 
legacy implementations (#39825)
2b194ad222 is described below

commit 2b194ad222f4dc8ecf2eb73539ab8cab5b1fc5e7
Author: Alenka Frim <[email protected]>
AuthorDate: Mon Mar 4 13:33:18 2024 +0100

    GH-20127: [Python] Remove deprecated pyarrow.filesystem legacy 
implementations (#39825)
    
    This PR removes the `pyarrow.filesystem` and `pyarrow.hdfs` filesystems 
that have been deprecated since 2.0.0.
    * Closes: #20127
    
    Lead-authored-by: AlenkaF <[email protected]>
    Co-authored-by: Alenka Frim <[email protected]>
    Co-authored-by: Antoine Pitrou <[email protected]>
    Co-authored-by: Joris Van den Bossche <[email protected]>
    Signed-off-by: Joris Van den Bossche <[email protected]>
---
 docs/source/python/filesystems_deprecated.rst      |  88 ----
 docs/source/python/index.rst                       |   1 -
 python/CMakeLists.txt                              |   1 -
 python/pyarrow/__init__.py                         |  47 +-
 python/pyarrow/_hdfsio.pyx                         | 478 -------------------
 python/pyarrow/filesystem.py                       | 511 ---------------------
 python/pyarrow/fs.py                               |  25 +-
 python/pyarrow/hdfs.py                             | 240 ----------
 python/pyarrow/io.pxi                              |  13 +
 python/pyarrow/parquet/core.py                     |  30 +-
 python/pyarrow/tests/parquet/test_basic.py         |   5 +-
 python/pyarrow/tests/parquet/test_dataset.py       | 137 +++---
 .../pyarrow/tests/parquet/test_parquet_writer.py   |  43 --
 python/pyarrow/tests/test_filesystem.py            |  75 ---
 python/pyarrow/tests/test_hdfs.py                  | 451 ------------------
 python/setup.py                                    |   1 -
 16 files changed, 93 insertions(+), 2053 deletions(-)

diff --git a/docs/source/python/filesystems_deprecated.rst 
b/docs/source/python/filesystems_deprecated.rst
deleted file mode 100644
index c51245341b..0000000000
--- a/docs/source/python/filesystems_deprecated.rst
+++ /dev/null
@@ -1,88 +0,0 @@
-.. Licensed to the Apache Software Foundation (ASF) under one
-.. or more contributor license agreements.  See the NOTICE file
-.. distributed with this work for additional information
-.. regarding copyright ownership.  The ASF licenses this file
-.. to you under the Apache License, Version 2.0 (the
-.. "License"); you may not use this file except in compliance
-.. with the License.  You may obtain a copy of the License at
-
-..   http://www.apache.org/licenses/LICENSE-2.0
-
-.. Unless required by applicable law or agreed to in writing,
-.. software distributed under the License is distributed on an
-.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-.. KIND, either express or implied.  See the License for the
-.. specific language governing permissions and limitations
-.. under the License.
-
-Filesystem Interface (legacy)
-=============================
-
-.. warning::
-   This section documents the deprecated filesystem layer.  You should
-   use the :ref:`new filesystem layer <filesystem>` instead.
-
-.. _hdfs:
-
-Hadoop File System (HDFS)
--------------------------
-
-PyArrow comes with bindings to a C++-based interface to the Hadoop File
-System. You connect like so:
-
-.. code-block:: python
-
-   import pyarrow as pa
-   fs = pa.hdfs.connect(host, port, user=user, kerb_ticket=ticket_cache_path)
-   with fs.open(path, 'rb') as f:
-       # Do something with f
-
-By default, ``pyarrow.hdfs.HadoopFileSystem`` uses libhdfs, a JNI-based
-interface to the Java Hadoop client. This library is loaded **at runtime**
-(rather than at link / library load time, since the library may not be in your
-LD_LIBRARY_PATH), and relies on some environment variables.
-
-* ``HADOOP_HOME``: the root of your installed Hadoop distribution. Often has
-  `lib/native/libhdfs.so`.
-
-* ``JAVA_HOME``: the location of your Java SDK installation.
-
-* ``ARROW_LIBHDFS_DIR`` (optional): explicit location of ``libhdfs.so`` if it 
is
-  installed somewhere other than ``$HADOOP_HOME/lib/native``.
-
-* ``CLASSPATH``: must contain the Hadoop jars. You can set these using:
-
-.. code-block:: shell
-
-    export CLASSPATH=`$HADOOP_HOME/bin/hdfs classpath --glob`
-
-If ``CLASSPATH`` is not set, then it will be set automatically if the
-``hadoop`` executable is in your system path, or if ``HADOOP_HOME`` is set.
-
-HDFS API
-~~~~~~~~
-
-.. currentmodule:: pyarrow
-
-.. autosummary::
-   :toctree: generated/
-
-   hdfs.connect
-   HadoopFileSystem.cat
-   HadoopFileSystem.chmod
-   HadoopFileSystem.chown
-   HadoopFileSystem.delete
-   HadoopFileSystem.df
-   HadoopFileSystem.disk_usage
-   HadoopFileSystem.download
-   HadoopFileSystem.exists
-   HadoopFileSystem.get_capacity
-   HadoopFileSystem.get_space_used
-   HadoopFileSystem.info
-   HadoopFileSystem.ls
-   HadoopFileSystem.mkdir
-   HadoopFileSystem.open
-   HadoopFileSystem.rename
-   HadoopFileSystem.rm
-   HadoopFileSystem.upload
-   HdfsFile
diff --git a/docs/source/python/index.rst b/docs/source/python/index.rst
index 08939bc760..7acff940ba 100644
--- a/docs/source/python/index.rst
+++ b/docs/source/python/index.rst
@@ -49,7 +49,6 @@ files into Arrow structures.
    memory
    ipc
    filesystems
-   filesystems_deprecated
    numpy
    pandas
    interchange_protocol
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index 1d6524373a..c3a1c57868 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -545,7 +545,6 @@ set(CYTHON_EXTENSIONS
     _csv
     _feather
     _fs
-    _hdfsio
     _json
     _pyarrow_cpp_tests)
 set_source_files_properties(pyarrow/lib.pyx PROPERTIES CYTHON_API TRUE)
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 2ee97ddb66..7ede69da66 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -255,9 +255,8 @@ from pyarrow.lib import (NativeFile, PythonFile,
                          BufferReader, BufferOutputStream,
                          OSFile, MemoryMappedFile, memory_map,
                          create_memory_map, MockOutputStream,
-                         input_stream, output_stream)
-
-from pyarrow._hdfsio import HdfsFile, have_libhdfs
+                         input_stream, output_stream,
+                         have_libhdfs)
 
 from pyarrow.lib import (ChunkedArray, RecordBatch, Table, table,
                          concat_arrays, concat_tables, TableGroupBy,
@@ -276,54 +275,12 @@ from pyarrow.lib import (ArrowCancelled,
                          ArrowTypeError,
                          ArrowSerializationError)
 
-import pyarrow.hdfs as hdfs
-
 from pyarrow.ipc import serialize_pandas, deserialize_pandas
 import pyarrow.ipc as ipc
 
 import pyarrow.types as types
 
 
-# deprecated top-level access
-
-
-from pyarrow.filesystem import FileSystem as _FileSystem
-from pyarrow.filesystem import LocalFileSystem as _LocalFileSystem
-from pyarrow.hdfs import HadoopFileSystem as _HadoopFileSystem
-
-
-_localfs = _LocalFileSystem._get_instance()
-
-
-_msg = (
-    "pyarrow.{0} is deprecated as of 2.0.0, please use pyarrow.fs.{1} instead."
-)
-
-_serialization_msg = (
-    "'pyarrow.{0}' is deprecated and will be removed in a future version. "
-    "Use pickle or the pyarrow IPC functionality instead."
-)
-
-_deprecated = {
-    "localfs": (_localfs, "LocalFileSystem"),
-    "FileSystem": (_FileSystem, "FileSystem"),
-    "LocalFileSystem": (_LocalFileSystem, "LocalFileSystem"),
-    "HadoopFileSystem": (_HadoopFileSystem, "HadoopFileSystem"),
-}
-
-
-def __getattr__(name):
-    if name in _deprecated:
-        obj, new_name = _deprecated[name]
-        _warnings.warn(_msg.format(name, new_name),
-                       FutureWarning, stacklevel=2)
-        return obj
-
-    raise AttributeError(
-        "module 'pyarrow' has no attribute '{0}'".format(name)
-    )
-
-
 # ----------------------------------------------------------------------
 # Deprecations
 
diff --git a/python/pyarrow/_hdfsio.pyx b/python/pyarrow/_hdfsio.pyx
deleted file mode 100644
index cbcc5d28ca..0000000000
--- a/python/pyarrow/_hdfsio.pyx
+++ /dev/null
@@ -1,478 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# ----------------------------------------------------------------------
-# HDFS IO implementation
-
-# cython: language_level = 3
-
-import re
-
-from pyarrow.lib cimport check_status, _Weakrefable, NativeFile
-from pyarrow.includes.common cimport *
-from pyarrow.includes.libarrow cimport *
-from pyarrow.includes.libarrow_fs cimport *
-from pyarrow.lib import frombytes, tobytes, ArrowIOError
-
-
-_HDFS_PATH_RE = re.compile(r'hdfs://(.*):(\d+)(.*)')
-
-
-def have_libhdfs():
-    try:
-        with nogil:
-            check_status(HaveLibHdfs())
-        return True
-    except Exception:
-        return False
-
-
-def strip_hdfs_abspath(path):
-    m = _HDFS_PATH_RE.match(path)
-    if m:
-        return m.group(3)
-    else:
-        return path
-
-
-cdef class HadoopFileSystem(_Weakrefable):
-    cdef:
-        shared_ptr[CIOHadoopFileSystem] client
-
-    cdef readonly:
-        bint is_open
-        object host
-        object user
-        object kerb_ticket
-        int port
-        dict extra_conf
-
-    def _connect(self, host, port, user, kerb_ticket, extra_conf):
-        cdef HdfsConnectionConfig conf
-
-        if host is not None:
-            conf.host = tobytes(host)
-        self.host = host
-
-        conf.port = port
-        self.port = port
-
-        if user is not None:
-            conf.user = tobytes(user)
-        self.user = user
-
-        if kerb_ticket is not None:
-            conf.kerb_ticket = tobytes(kerb_ticket)
-        self.kerb_ticket = kerb_ticket
-
-        with nogil:
-            check_status(HaveLibHdfs())
-
-        if extra_conf is not None and isinstance(extra_conf, dict):
-            conf.extra_conf = {tobytes(k): tobytes(v)
-                               for k, v in extra_conf.items()}
-        self.extra_conf = extra_conf
-
-        with nogil:
-            check_status(CIOHadoopFileSystem.Connect(&conf, &self.client))
-        self.is_open = True
-
-    @classmethod
-    def connect(cls, *args, **kwargs):
-        return cls(*args, **kwargs)
-
-    def __dealloc__(self):
-        if self.is_open:
-            self.close()
-
-    def close(self):
-        """
-        Disconnect from the HDFS cluster
-        """
-        self._ensure_client()
-        with nogil:
-            check_status(self.client.get().Disconnect())
-        self.is_open = False
-
-    cdef _ensure_client(self):
-        if self.client.get() == NULL:
-            raise IOError('HDFS client improperly initialized')
-        elif not self.is_open:
-            raise IOError('HDFS client is closed')
-
-    def exists(self, path):
-        """
-        Returns True if the path is known to the cluster, False if it does not
-        (or there is an RPC error)
-        """
-        self._ensure_client()
-
-        cdef c_string c_path = tobytes(path)
-        cdef c_bool result
-        with nogil:
-            result = self.client.get().Exists(c_path)
-        return result
-
-    def isdir(self, path):
-        cdef HdfsPathInfo info
-        try:
-            self._path_info(path, &info)
-        except ArrowIOError:
-            return False
-        return info.kind == ObjectType_DIRECTORY
-
-    def isfile(self, path):
-        cdef HdfsPathInfo info
-        try:
-            self._path_info(path, &info)
-        except ArrowIOError:
-            return False
-        return info.kind == ObjectType_FILE
-
-    def get_capacity(self):
-        """
-        Get reported total capacity of file system
-
-        Returns
-        -------
-        capacity : int
-        """
-        cdef int64_t capacity = 0
-        with nogil:
-            check_status(self.client.get().GetCapacity(&capacity))
-        return capacity
-
-    def get_space_used(self):
-        """
-        Get space used on file system
-
-        Returns
-        -------
-        space_used : int
-        """
-        cdef int64_t space_used = 0
-        with nogil:
-            check_status(self.client.get().GetUsed(&space_used))
-        return space_used
-
-    def df(self):
-        """
-        Return free space on disk, like the UNIX df command
-
-        Returns
-        -------
-        space : int
-        """
-        return self.get_capacity() - self.get_space_used()
-
-    def rename(self, path, new_path):
-        cdef c_string c_path = tobytes(path)
-        cdef c_string c_new_path = tobytes(new_path)
-        with nogil:
-            check_status(self.client.get().Rename(c_path, c_new_path))
-
-    def info(self, path):
-        """
-        Return detailed HDFS information for path
-
-        Parameters
-        ----------
-        path : string
-            Path to file or directory
-
-        Returns
-        -------
-        path_info : dict
-        """
-        cdef HdfsPathInfo info
-        self._path_info(path, &info)
-        return {
-            'path': frombytes(info.name),
-            'owner': frombytes(info.owner),
-            'group': frombytes(info.group),
-            'size': info.size,
-            'block_size': info.block_size,
-            'last_modified': info.last_modified_time,
-            'last_accessed': info.last_access_time,
-            'replication': info.replication,
-            'permissions': info.permissions,
-            'kind': ('directory' if info.kind == ObjectType_DIRECTORY
-                     else 'file')
-        }
-
-    def stat(self, path):
-        """
-        Return basic file system statistics about path
-
-        Parameters
-        ----------
-        path : string
-            Path to file or directory
-
-        Returns
-        -------
-        stat : dict
-        """
-        cdef FileStatistics info
-        cdef c_string c_path = tobytes(path)
-        with nogil:
-            check_status(self.client.get()
-                         .Stat(c_path, &info))
-        return {
-            'size': info.size,
-            'kind': ('directory' if info.kind == ObjectType_DIRECTORY
-                     else 'file')
-        }
-
-    cdef _path_info(self, path, HdfsPathInfo* info):
-        cdef c_string c_path = tobytes(path)
-
-        with nogil:
-            check_status(self.client.get()
-                         .GetPathInfo(c_path, info))
-
-    def ls(self, path, bint full_info):
-        cdef:
-            c_string c_path = tobytes(path)
-            vector[HdfsPathInfo] listing
-            list results = []
-            int i
-
-        self._ensure_client()
-
-        with nogil:
-            check_status(self.client.get()
-                         .ListDirectory(c_path, &listing))
-
-        cdef const HdfsPathInfo* info
-        for i in range(<int> listing.size()):
-            info = &listing[i]
-
-            # Try to trim off the hdfs://HOST:PORT piece
-            name = strip_hdfs_abspath(frombytes(info.name))
-
-            if full_info:
-                kind = ('file' if info.kind == ObjectType_FILE
-                        else 'directory')
-
-                results.append({
-                    'kind': kind,
-                    'name': name,
-                    'owner': frombytes(info.owner),
-                    'group': frombytes(info.group),
-                    'last_modified_time': info.last_modified_time,
-                    'last_access_time': info.last_access_time,
-                    'size': info.size,
-                    'replication': info.replication,
-                    'block_size': info.block_size,
-                    'permissions': info.permissions
-                })
-            else:
-                results.append(name)
-
-        return results
-
-    def chmod(self, path, mode):
-        """
-        Change file permissions
-
-        Parameters
-        ----------
-        path : string
-            absolute path to file or directory
-        mode : int
-            POSIX-like bitmask
-        """
-        self._ensure_client()
-        cdef c_string c_path = tobytes(path)
-        cdef int c_mode = mode
-        with nogil:
-            check_status(self.client.get()
-                         .Chmod(c_path, c_mode))
-
-    def chown(self, path, owner=None, group=None):
-        """
-        Change file permissions
-
-        Parameters
-        ----------
-        path : string
-            absolute path to file or directory
-        owner : string, default None
-            New owner, None for no change
-        group : string, default None
-            New group, None for no change
-        """
-        cdef:
-            c_string c_path
-            c_string c_owner
-            c_string c_group
-            const char* c_owner_ptr = NULL
-            const char* c_group_ptr = NULL
-
-        self._ensure_client()
-
-        c_path = tobytes(path)
-        if owner is not None:
-            c_owner = tobytes(owner)
-            c_owner_ptr = c_owner.c_str()
-
-        if group is not None:
-            c_group = tobytes(group)
-            c_group_ptr = c_group.c_str()
-
-        with nogil:
-            check_status(self.client.get()
-                         .Chown(c_path, c_owner_ptr, c_group_ptr))
-
-    def mkdir(self, path):
-        """
-        Create indicated directory and any necessary parent directories
-        """
-        self._ensure_client()
-        cdef c_string c_path = tobytes(path)
-        with nogil:
-            check_status(self.client.get()
-                         .MakeDirectory(c_path))
-
-    def delete(self, path, bint recursive=False):
-        """
-        Delete the indicated file or directory
-
-        Parameters
-        ----------
-        path : string
-        recursive : boolean, default False
-            If True, also delete child paths for directories
-        """
-        self._ensure_client()
-
-        cdef c_string c_path = tobytes(path)
-        with nogil:
-            check_status(self.client.get()
-                         .Delete(c_path, recursive == 1))
-
-    def open(self, path, mode='rb', buffer_size=None, replication=None,
-             default_block_size=None):
-        """
-        Open HDFS file for reading or writing
-
-        Parameters
-        ----------
-        mode : string
-            Must be one of 'rb', 'wb', 'ab'
-
-        Returns
-        -------
-        handle : HdfsFile
-        """
-        self._ensure_client()
-
-        cdef HdfsFile out = HdfsFile()
-
-        if mode not in ('rb', 'wb', 'ab'):
-            raise Exception("Mode must be 'rb' (read), "
-                            "'wb' (write, new file), or 'ab' (append)")
-
-        cdef c_string c_path = tobytes(path)
-        cdef c_bool append = False
-
-        # 0 in libhdfs means "use the default"
-        cdef int32_t c_buffer_size = buffer_size or 0
-        cdef int16_t c_replication = replication or 0
-        cdef int64_t c_default_block_size = default_block_size or 0
-
-        cdef shared_ptr[HdfsOutputStream] wr_handle
-        cdef shared_ptr[HdfsReadableFile] rd_handle
-
-        if mode in ('wb', 'ab'):
-            if mode == 'ab':
-                append = True
-
-            with nogil:
-                check_status(
-                    self.client.get()
-                    .OpenWritable(c_path, append, c_buffer_size,
-                                  c_replication, c_default_block_size,
-                                  &wr_handle))
-
-            out.set_output_stream(<shared_ptr[COutputStream]> wr_handle)
-            out.is_writable = True
-        else:
-            with nogil:
-                check_status(self.client.get()
-                             .OpenReadable(c_path, &rd_handle))
-
-            out.set_random_access_file(
-                <shared_ptr[CRandomAccessFile]> rd_handle)
-            out.is_readable = True
-
-        assert not out.closed
-
-        if c_buffer_size == 0:
-            c_buffer_size = 2 ** 16
-
-        out.mode = mode
-        out.buffer_size = c_buffer_size
-        out.parent = _HdfsFileNanny(self, out)
-        out.own_file = True
-
-        return out
-
-    def download(self, path, stream, buffer_size=None):
-        with self.open(path, 'rb') as f:
-            f.download(stream, buffer_size=buffer_size)
-
-    def upload(self, path, stream, buffer_size=None):
-        """
-        Upload file-like object to HDFS path
-        """
-        with self.open(path, 'wb') as f:
-            f.upload(stream, buffer_size=buffer_size)
-
-
-# ARROW-404: Helper class to ensure that files are closed before the
-# client. During deallocation of the extension class, the attributes are
-# decref'd which can cause the client to get closed first if the file has the
-# last remaining reference
-cdef class _HdfsFileNanny(_Weakrefable):
-    cdef:
-        object client
-        object file_handle_ref
-
-    def __cinit__(self, client, file_handle):
-        import weakref
-        self.client = client
-        self.file_handle_ref = weakref.ref(file_handle)
-
-    def __dealloc__(self):
-        fh = self.file_handle_ref()
-        if fh:
-            fh.close()
-        # avoid cyclic GC
-        self.file_handle_ref = None
-        self.client = None
-
-
-cdef class HdfsFile(NativeFile):
-    cdef readonly:
-        int32_t buffer_size
-        object mode
-        object parent
-
-    def __dealloc__(self):
-        self.parent = None
diff --git a/python/pyarrow/filesystem.py b/python/pyarrow/filesystem.py
deleted file mode 100644
index c1e70a1ee6..0000000000
--- a/python/pyarrow/filesystem.py
+++ /dev/null
@@ -1,511 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-
-import os
-import posixpath
-import sys
-import urllib.parse
-import warnings
-
-from os.path import join as pjoin
-
-import pyarrow as pa
-from pyarrow.util import doc, _stringify_path, _is_path_like, _DEPR_MSG
-
-
-_FS_DEPR_MSG = _DEPR_MSG.format(
-    "filesystem.LocalFileSystem", "2.0.0", "fs.LocalFileSystem"
-)
-
-
-class FileSystem:
-    """
-    Abstract filesystem interface.
-    """
-
-    def cat(self, path):
-        """
-        Return contents of file as a bytes object.
-
-        Parameters
-        ----------
-        path : str
-            File path to read content from.
-
-        Returns
-        -------
-        contents : bytes
-        """
-        with self.open(path, 'rb') as f:
-            return f.read()
-
-    def ls(self, path):
-        """
-        Return list of file paths.
-
-        Parameters
-        ----------
-        path : str
-            Directory to list contents from.
-        """
-        raise NotImplementedError
-
-    def delete(self, path, recursive=False):
-        """
-        Delete the indicated file or directory.
-
-        Parameters
-        ----------
-        path : str
-            Path to delete.
-        recursive : bool, default False
-            If True, also delete child paths for directories.
-        """
-        raise NotImplementedError
-
-    def disk_usage(self, path):
-        """
-        Compute bytes used by all contents under indicated path in file tree.
-
-        Parameters
-        ----------
-        path : str
-            Can be a file path or directory.
-
-        Returns
-        -------
-        usage : int
-        """
-        path = _stringify_path(path)
-        path_info = self.stat(path)
-        if path_info['kind'] == 'file':
-            return path_info['size']
-
-        total = 0
-        for root, directories, files in self.walk(path):
-            for child_path in files:
-                abspath = self._path_join(root, child_path)
-                total += self.stat(abspath)['size']
-
-        return total
-
-    def _path_join(self, *args):
-        return self.pathsep.join(args)
-
-    def stat(self, path):
-        """
-        Information about a filesystem entry.
-
-        Returns
-        -------
-        stat : dict
-        """
-        raise NotImplementedError('FileSystem.stat')
-
-    def rm(self, path, recursive=False):
-        """
-        Alias for FileSystem.delete.
-        """
-        return self.delete(path, recursive=recursive)
-
-    def mv(self, path, new_path):
-        """
-        Alias for FileSystem.rename.
-        """
-        return self.rename(path, new_path)
-
-    def rename(self, path, new_path):
-        """
-        Rename file, like UNIX mv command.
-
-        Parameters
-        ----------
-        path : str
-            Path to alter.
-        new_path : str
-            Path to move to.
-        """
-        raise NotImplementedError('FileSystem.rename')
-
-    def mkdir(self, path, create_parents=True):
-        """
-        Create a directory.
-
-        Parameters
-        ----------
-        path : str
-            Path to the directory.
-        create_parents : bool, default True
-            If the parent directories don't exists create them as well.
-        """
-        raise NotImplementedError
-
-    def exists(self, path):
-        """
-        Return True if path exists.
-
-        Parameters
-        ----------
-        path : str
-            Path to check.
-        """
-        raise NotImplementedError
-
-    def isdir(self, path):
-        """
-        Return True if path is a directory.
-
-        Parameters
-        ----------
-        path : str
-            Path to check.
-        """
-        raise NotImplementedError
-
-    def isfile(self, path):
-        """
-        Return True if path is a file.
-
-        Parameters
-        ----------
-        path : str
-            Path to check.
-        """
-        raise NotImplementedError
-
-    def _isfilestore(self):
-        """
-        Returns True if this FileSystem is a unix-style file store with
-        directories.
-        """
-        raise NotImplementedError
-
-    def read_parquet(self, path, columns=None, metadata=None, schema=None,
-                     use_threads=True, use_pandas_metadata=False):
-        """
-        Read Parquet data from path in file system. Can read from a single file
-        or a directory of files.
-
-        Parameters
-        ----------
-        path : str
-            Single file path or directory
-        columns : List[str], optional
-            Subset of columns to read.
-        metadata : pyarrow.parquet.FileMetaData
-            Known metadata to validate files against.
-        schema : pyarrow.parquet.Schema
-            Known schema to validate files against. Alternative to metadata
-            argument.
-        use_threads : bool, default True
-            Perform multi-threaded column reads.
-        use_pandas_metadata : bool, default False
-            If True and file has custom pandas schema metadata, ensure that
-            index columns are also loaded.
-
-        Returns
-        -------
-        table : pyarrow.Table
-        """
-        from pyarrow.parquet import ParquetDataset
-        dataset = ParquetDataset(path, schema=schema, metadata=metadata,
-                                 filesystem=self)
-        return dataset.read(columns=columns, use_threads=use_threads,
-                            use_pandas_metadata=use_pandas_metadata)
-
-    def open(self, path, mode='rb'):
-        """
-        Open file for reading or writing.
-        """
-        raise NotImplementedError
-
-    @property
-    def pathsep(self):
-        return '/'
-
-
-class LocalFileSystem(FileSystem):
-
-    _instance = None
-
-    def __init__(self):
-        warnings.warn(_FS_DEPR_MSG, FutureWarning, stacklevel=2)
-        super().__init__()
-
-    @classmethod
-    def _get_instance(cls):
-        if cls._instance is None:
-            with warnings.catch_warnings():
-                warnings.simplefilter("ignore")
-                cls._instance = LocalFileSystem()
-        return cls._instance
-
-    @classmethod
-    def get_instance(cls):
-        warnings.warn(_FS_DEPR_MSG, FutureWarning, stacklevel=2)
-        return cls._get_instance()
-
-    @doc(FileSystem.ls)
-    def ls(self, path):
-        path = _stringify_path(path)
-        return sorted(pjoin(path, x) for x in os.listdir(path))
-
-    @doc(FileSystem.mkdir)
-    def mkdir(self, path, create_parents=True):
-        path = _stringify_path(path)
-        if create_parents:
-            os.makedirs(path)
-        else:
-            os.mkdir(path)
-
-    @doc(FileSystem.isdir)
-    def isdir(self, path):
-        path = _stringify_path(path)
-        return os.path.isdir(path)
-
-    @doc(FileSystem.isfile)
-    def isfile(self, path):
-        path = _stringify_path(path)
-        return os.path.isfile(path)
-
-    @doc(FileSystem._isfilestore)
-    def _isfilestore(self):
-        return True
-
-    @doc(FileSystem.exists)
-    def exists(self, path):
-        path = _stringify_path(path)
-        return os.path.exists(path)
-
-    @doc(FileSystem.open)
-    def open(self, path, mode='rb'):
-        """
-        Open file for reading or writing.
-        """
-        path = _stringify_path(path)
-        return open(path, mode=mode)
-
-    @property
-    def pathsep(self):
-        return os.path.sep
-
-    def walk(self, path):
-        """
-        Directory tree generator, see os.walk.
-        """
-        path = _stringify_path(path)
-        return os.walk(path)
-
-
-class DaskFileSystem(FileSystem):
-    """
-    Wraps s3fs Dask filesystem implementation like s3fs, gcsfs, etc.
-    """
-
-    def __init__(self, fs):
-        warnings.warn(
-            "The pyarrow.filesystem.DaskFileSystem/S3FSWrapper are deprecated "
-            "as of pyarrow 3.0.0, and will be removed in a future version.",
-            FutureWarning, stacklevel=2)
-        self.fs = fs
-
-    @doc(FileSystem.isdir)
-    def isdir(self, path):
-        raise NotImplementedError("Unsupported file system API")
-
-    @doc(FileSystem.isfile)
-    def isfile(self, path):
-        raise NotImplementedError("Unsupported file system API")
-
-    @doc(FileSystem._isfilestore)
-    def _isfilestore(self):
-        """
-        Object Stores like S3 and GCSFS are based on key lookups, not true
-        file-paths.
-        """
-        return False
-
-    @doc(FileSystem.delete)
-    def delete(self, path, recursive=False):
-        path = _stringify_path(path)
-        return self.fs.rm(path, recursive=recursive)
-
-    @doc(FileSystem.exists)
-    def exists(self, path):
-        path = _stringify_path(path)
-        return self.fs.exists(path)
-
-    @doc(FileSystem.mkdir)
-    def mkdir(self, path, create_parents=True):
-        path = _stringify_path(path)
-        if create_parents:
-            return self.fs.mkdirs(path)
-        else:
-            return self.fs.mkdir(path)
-
-    @doc(FileSystem.open)
-    def open(self, path, mode='rb'):
-        """
-        Open file for reading or writing.
-        """
-        path = _stringify_path(path)
-        return self.fs.open(path, mode=mode)
-
-    def ls(self, path, detail=False):
-        path = _stringify_path(path)
-        return self.fs.ls(path, detail=detail)
-
-    def walk(self, path):
-        """
-        Directory tree generator, like os.walk.
-        """
-        path = _stringify_path(path)
-        return self.fs.walk(path)
-
-
-class S3FSWrapper(DaskFileSystem):
-
-    @doc(FileSystem.isdir)
-    def isdir(self, path):
-        path = _sanitize_s3(_stringify_path(path))
-        try:
-            contents = self.fs.ls(path)
-            if len(contents) == 1 and contents[0] == path:
-                return False
-            else:
-                return True
-        except OSError:
-            return False
-
-    @doc(FileSystem.isfile)
-    def isfile(self, path):
-        path = _sanitize_s3(_stringify_path(path))
-        try:
-            contents = self.fs.ls(path)
-            return len(contents) == 1 and contents[0] == path
-        except OSError:
-            return False
-
-    def walk(self, path, refresh=False):
-        """
-        Directory tree generator, like os.walk.
-
-        Generator version of what is in s3fs, which yields a flattened list of
-        files.
-        """
-        path = _sanitize_s3(_stringify_path(path))
-        directories = set()
-        files = set()
-
-        for key in list(self.fs._ls(path, refresh=refresh)):
-            path = key['Key']
-            if key['StorageClass'] == 'DIRECTORY':
-                directories.add(path)
-            elif key['StorageClass'] == 'BUCKET':
-                pass
-            else:
-                files.add(path)
-
-        # s3fs creates duplicate 'DIRECTORY' entries
-        files = sorted([posixpath.split(f)[1] for f in files
-                        if f not in directories])
-        directories = sorted([posixpath.split(x)[1]
-                              for x in directories])
-
-        yield path, directories, files
-
-        for directory in directories:
-            yield from self.walk(directory, refresh=refresh)
-
-
-def _sanitize_s3(path):
-    if path.startswith('s3://'):
-        return path.replace('s3://', '')
-    else:
-        return path
-
-
-def _ensure_filesystem(fs):
-    fs_type = type(fs)
-
-    # If the arrow filesystem was subclassed, assume it supports the full
-    # interface and return it
-    if not issubclass(fs_type, FileSystem):
-        if "fsspec" in sys.modules:
-            fsspec = sys.modules["fsspec"]
-            if isinstance(fs, fsspec.AbstractFileSystem):
-                # for recent fsspec versions that stop inheriting from
-                # pyarrow.filesystem.FileSystem, still allow fsspec
-                # filesystems (which should be compatible with our legacy fs)
-                return fs
-
-        raise OSError('Unrecognized filesystem: {}'.format(fs_type))
-    else:
-        return fs
-
-
-def resolve_filesystem_and_path(where, filesystem=None):
-    """
-    Return filesystem from path which could be an HDFS URI, a local URI,
-    or a plain filesystem path.
-    """
-    if not _is_path_like(where):
-        if filesystem is not None:
-            raise ValueError("filesystem passed but where is file-like, so"
-                             " there is nothing to open with filesystem.")
-        return filesystem, where
-
-    if filesystem is not None:
-        filesystem = _ensure_filesystem(filesystem)
-        if isinstance(filesystem, LocalFileSystem):
-            path = _stringify_path(where)
-        elif not isinstance(where, str):
-            raise TypeError(
-                "Expected string path; path-like objects are only allowed "
-                "with a local filesystem"
-            )
-        else:
-            path = where
-        return filesystem, path
-
-    path = _stringify_path(where)
-
-    parsed_uri = urllib.parse.urlparse(path)
-    if parsed_uri.scheme == 'hdfs' or parsed_uri.scheme == 'viewfs':
-        # Input is hdfs URI such as hdfs://host:port/myfile.parquet
-        netloc_split = parsed_uri.netloc.split(':')
-        host = netloc_split[0]
-        if host == '':
-            host = 'default'
-        else:
-            host = parsed_uri.scheme + "://" + host
-        port = 0
-        if len(netloc_split) == 2 and netloc_split[1].isnumeric():
-            port = int(netloc_split[1])
-        fs = pa.hdfs._connect(host=host, port=port)
-        fs_path = parsed_uri.path
-    elif parsed_uri.scheme == 'file':
-        # Input is local URI such as file:///home/user/myfile.parquet
-        fs = LocalFileSystem._get_instance()
-        fs_path = parsed_uri.path
-    else:
-        # Input is local path such as /home/user/myfile.parquet
-        fs = LocalFileSystem._get_instance()
-        fs_path = path
-
-    return fs, fs_path
diff --git a/python/pyarrow/fs.py b/python/pyarrow/fs.py
index ead750ca44..a256cc540f 100644
--- a/python/pyarrow/fs.py
+++ b/python/pyarrow/fs.py
@@ -98,9 +98,7 @@ def _filesystem_from_str(uri):
     return filesystem
 
 
-def _ensure_filesystem(
-    filesystem, use_mmap=False, allow_legacy_filesystem=False
-):
+def _ensure_filesystem(filesystem, *, use_mmap=False):
     if isinstance(filesystem, FileSystem):
         return filesystem
     elif isinstance(filesystem, str):
@@ -123,15 +121,6 @@ def _ensure_filesystem(
                 return LocalFileSystem(use_mmap=use_mmap)
             return PyFileSystem(FSSpecHandler(filesystem))
 
-    # map old filesystems to new ones
-    import pyarrow.filesystem as legacyfs
-
-    if isinstance(filesystem, legacyfs.LocalFileSystem):
-        return LocalFileSystem(use_mmap=use_mmap)
-    # TODO handle HDFS?
-    if allow_legacy_filesystem and isinstance(filesystem, legacyfs.FileSystem):
-        return filesystem
-
     raise TypeError(
         "Unrecognized filesystem: {}. `filesystem` argument must be a "
         "FileSystem instance or a valid file system URI'".format(
@@ -139,9 +128,7 @@ def _ensure_filesystem(
     )
 
 
-def _resolve_filesystem_and_path(
-    path, filesystem=None, allow_legacy_filesystem=False, memory_map=False
-):
+def _resolve_filesystem_and_path(path, filesystem=None, *, memory_map=False):
     """
     Return filesystem/path from path which could be an URI or a plain
     filesystem path.
@@ -155,10 +142,7 @@ def _resolve_filesystem_and_path(
         return filesystem, path
 
     if filesystem is not None:
-        filesystem = _ensure_filesystem(
-            filesystem, use_mmap=memory_map,
-            allow_legacy_filesystem=allow_legacy_filesystem
-        )
+        filesystem = _ensure_filesystem(filesystem, use_mmap=memory_map)
         if isinstance(filesystem, LocalFileSystem):
             path = _stringify_path(path)
         elif not isinstance(path, str):
@@ -166,8 +150,7 @@ def _resolve_filesystem_and_path(
                 "Expected string path; path-like objects are only allowed "
                 "with a local filesystem"
             )
-        if not allow_legacy_filesystem:
-            path = filesystem.normalize_path(path)
+        path = filesystem.normalize_path(path)
         return filesystem, path
 
     path = _stringify_path(path)
diff --git a/python/pyarrow/hdfs.py b/python/pyarrow/hdfs.py
deleted file mode 100644
index 2e6c387a8f..0000000000
--- a/python/pyarrow/hdfs.py
+++ /dev/null
@@ -1,240 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-
-import os
-import posixpath
-import sys
-import warnings
-
-from pyarrow.util import doc, _DEPR_MSG
-from pyarrow.filesystem import FileSystem
-import pyarrow._hdfsio as _hdfsio
-
-
-class HadoopFileSystem(_hdfsio.HadoopFileSystem, FileSystem):
-    """
-    DEPRECATED: FileSystem interface for HDFS cluster.
-
-    See pyarrow.hdfs.connect for full connection details
-
-    .. deprecated:: 2.0
-        ``pyarrow.hdfs.HadoopFileSystem`` is deprecated,
-        please use ``pyarrow.fs.HadoopFileSystem`` instead.
-    """
-
-    def __init__(self, host="default", port=0, user=None, kerb_ticket=None,
-                 driver='libhdfs', extra_conf=None):
-        warnings.warn(
-            _DEPR_MSG.format(
-                "hdfs.HadoopFileSystem", "2.0.0", "fs.HadoopFileSystem"),
-            FutureWarning, stacklevel=2)
-        if driver == 'libhdfs':
-            _maybe_set_hadoop_classpath()
-
-        self._connect(host, port, user, kerb_ticket, extra_conf)
-
-    def __reduce__(self):
-        return (HadoopFileSystem, (self.host, self.port, self.user,
-                                   self.kerb_ticket, self.extra_conf))
-
-    def _isfilestore(self):
-        """
-        Return True if this is a Unix-style file store with directories.
-        """
-        return True
-
-    @doc(FileSystem.isdir)
-    def isdir(self, path):
-        return super().isdir(path)
-
-    @doc(FileSystem.isfile)
-    def isfile(self, path):
-        return super().isfile(path)
-
-    @doc(FileSystem.delete)
-    def delete(self, path, recursive=False):
-        return super().delete(path, recursive)
-
-    def mkdir(self, path, **kwargs):
-        """
-        Create directory in HDFS.
-
-        Parameters
-        ----------
-        path : str
-            Directory path to create, including any parent directories.
-
-        Notes
-        -----
-        libhdfs does not support create_parents=False, so we ignore this here
-        """
-        return super().mkdir(path)
-
-    @doc(FileSystem.rename)
-    def rename(self, path, new_path):
-        return super().rename(path, new_path)
-
-    @doc(FileSystem.exists)
-    def exists(self, path):
-        return super().exists(path)
-
-    def ls(self, path, detail=False):
-        """
-        Retrieve directory contents and metadata, if requested.
-
-        Parameters
-        ----------
-        path : str
-            HDFS path to retrieve contents of.
-        detail : bool, default False
-            If False, only return list of paths.
-
-        Returns
-        -------
-        result : list of dicts (detail=True) or strings (detail=False)
-        """
-        return super().ls(path, detail)
-
-    def walk(self, top_path):
-        """
-        Directory tree generator for HDFS, like os.walk.
-
-        Parameters
-        ----------
-        top_path : str
-            Root directory for tree traversal.
-
-        Returns
-        -------
-        Generator yielding 3-tuple (dirpath, dirnames, filename)
-        """
-        contents = self.ls(top_path, detail=True)
-
-        directories, files = _libhdfs_walk_files_dirs(top_path, contents)
-        yield top_path, directories, files
-        for dirname in directories:
-            yield from self.walk(self._path_join(top_path, dirname))
-
-
-def _maybe_set_hadoop_classpath():
-    import re
-
-    if re.search(r'hadoop-common[^/]+.jar', os.environ.get('CLASSPATH', '')):
-        return
-
-    if 'HADOOP_HOME' in os.environ:
-        if sys.platform != 'win32':
-            classpath = _derive_hadoop_classpath()
-        else:
-            hadoop_bin = '{}/bin/hadoop'.format(os.environ['HADOOP_HOME'])
-            classpath = _hadoop_classpath_glob(hadoop_bin)
-    else:
-        classpath = _hadoop_classpath_glob('hadoop')
-
-    os.environ['CLASSPATH'] = classpath.decode('utf-8')
-
-
-def _derive_hadoop_classpath():
-    import subprocess
-
-    find_args = ('find', '-L', os.environ['HADOOP_HOME'], '-name', '*.jar')
-    find = subprocess.Popen(find_args, stdout=subprocess.PIPE)
-    xargs_echo = subprocess.Popen(('xargs', 'echo'),
-                                  stdin=find.stdout,
-                                  stdout=subprocess.PIPE)
-    jars = subprocess.check_output(('tr', "' '", "':'"),
-                                   stdin=xargs_echo.stdout)
-    hadoop_conf = os.environ["HADOOP_CONF_DIR"] \
-        if "HADOOP_CONF_DIR" in os.environ \
-        else os.environ["HADOOP_HOME"] + "/etc/hadoop"
-    return (hadoop_conf + ":").encode("utf-8") + jars
-
-
-def _hadoop_classpath_glob(hadoop_bin):
-    import subprocess
-
-    hadoop_classpath_args = (hadoop_bin, 'classpath', '--glob')
-    return subprocess.check_output(hadoop_classpath_args)
-
-
-def _libhdfs_walk_files_dirs(top_path, contents):
-    files = []
-    directories = []
-    for c in contents:
-        scrubbed_name = posixpath.split(c['name'])[1]
-        if c['kind'] == 'file':
-            files.append(scrubbed_name)
-        else:
-            directories.append(scrubbed_name)
-
-    return directories, files
-
-
-def connect(host="default", port=0, user=None, kerb_ticket=None,
-            extra_conf=None):
-    """
-    DEPRECATED: Connect to an HDFS cluster.
-
-    All parameters are optional and should only be set if the defaults need
-    to be overridden.
-
-    Authentication should be automatic if the HDFS cluster uses Kerberos.
-    However, if a username is specified, then the ticket cache will likely
-    be required.
-
-    .. deprecated:: 2.0
-        ``pyarrow.hdfs.connect`` is deprecated,
-        please use ``pyarrow.fs.HadoopFileSystem`` instead.
-
-    Parameters
-    ----------
-    host : NameNode. Set to "default" for fs.defaultFS from core-site.xml.
-    port : NameNode's port. Set to 0 for default or logical (HA) nodes.
-    user : Username when connecting to HDFS; None implies login user.
-    kerb_ticket : Path to Kerberos ticket cache.
-    extra_conf : dict, default None
-      extra Key/Value pairs for config; Will override any
-      hdfs-site.xml properties
-
-    Notes
-    -----
-    The first time you call this method, it will take longer than usual due
-    to JNI spin-up time.
-
-    Returns
-    -------
-    filesystem : HadoopFileSystem
-    """
-    warnings.warn(
-        _DEPR_MSG.format("hdfs.connect", "2.0.0", "fs.HadoopFileSystem"),
-        FutureWarning, stacklevel=2
-    )
-    return _connect(
-        host=host, port=port, user=user, kerb_ticket=kerb_ticket,
-        extra_conf=extra_conf
-    )
-
-
-def _connect(host="default", port=0, user=None, kerb_ticket=None,
-             extra_conf=None):
-    with warnings.catch_warnings():
-        warnings.simplefilter("ignore")
-        fs = HadoopFileSystem(host=host, port=port, user=user,
-                              kerb_ticket=kerb_ticket,
-                              extra_conf=extra_conf)
-    return fs
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index b57980b3d6..7890bf4b2d 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -30,6 +30,7 @@ import warnings
 from io import BufferedIOBase, IOBase, TextIOBase, UnsupportedOperation
 from queue import Queue, Empty as QueueEmpty
 
+from pyarrow.lib cimport check_status, HaveLibHdfs
 from pyarrow.util import _is_path_like, _stringify_path
 
 
@@ -46,6 +47,18 @@ cdef extern from "Python.h":
     bytearray PyByteArray_FromStringAndSize(char *string, Py_ssize_t len)
 
 
+def have_libhdfs():
+    """
+    Return true if HDFS (HadoopFileSystem) library is set up correctly.
+    """
+    try:
+        with nogil:
+            check_status(HaveLibHdfs())
+        return True
+    except Exception:
+        return False
+
+
 def io_thread_count():
     """
     Return the number of threads to use for I/O operations.
diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py
index 98a4b2a113..69a1c9d19a 100644
--- a/python/pyarrow/parquet/core.py
+++ b/python/pyarrow/parquet/core.py
@@ -47,7 +47,6 @@ from pyarrow._parquet import (ParquetReader, Statistics,  # 
noqa
                               SortingColumn)
 from pyarrow.fs import (LocalFileSystem, FileSystem, FileType,
                         _resolve_filesystem_and_path, _ensure_filesystem)
-from pyarrow import filesystem as legacyfs
 from pyarrow.util import guid, _is_path_like, _stringify_path, _deprecate_api
 
 
@@ -309,7 +308,7 @@ class ParquetFile:
         self._close_source = getattr(source, 'closed', True)
 
         filesystem, source = _resolve_filesystem_and_path(
-            source, filesystem, memory_map)
+            source, filesystem, memory_map=memory_map)
         if filesystem is not None:
             source = filesystem.open_input_file(source)
             self._close_source = True  # We opened it here, ensure we close it.
@@ -989,20 +988,13 @@ Examples
         # sure to close it when `self.close` is called.
         self.file_handle = None
 
-        filesystem, path = _resolve_filesystem_and_path(
-            where, filesystem, allow_legacy_filesystem=True
-        )
+        filesystem, path = _resolve_filesystem_and_path(where, filesystem)
         if filesystem is not None:
-            if isinstance(filesystem, legacyfs.FileSystem):
-                # legacy filesystem (eg custom subclass)
-                # TODO deprecate
-                sink = self.file_handle = filesystem.open(path, 'wb')
-            else:
-                # ARROW-10480: do not auto-detect compression.  While
-                # a filename like foo.parquet.gz is nonconforming, it
-                # shouldn't implicitly apply compression.
-                sink = self.file_handle = filesystem.open_output_stream(
-                    path, compression=None)
+            # ARROW-10480: do not auto-detect compression.  While
+            # a filename like foo.parquet.gz is nonconforming, it
+            # shouldn't implicitly apply compression.
+            sink = self.file_handle = filesystem.open_output_stream(
+                path, compression=None)
         else:
             sink = where
         self._metadata_collector = options.pop('metadata_collector', None)
@@ -1124,12 +1116,6 @@ def _get_pandas_index_columns(keyvalues):
 EXCLUDED_PARQUET_PATHS = {'_SUCCESS'}
 
 
-def _is_local_file_system(fs):
-    return isinstance(fs, LocalFileSystem) or isinstance(
-        fs, legacyfs.LocalFileSystem
-    )
-
-
 _read_docstring_common = """\
 read_dictionary : list, default None
     List of names or column paths (for nested types) to read directly
@@ -1306,7 +1292,7 @@ Examples
         if (
             hasattr(path_or_paths, "__fspath__") and
             filesystem is not None and
-            not _is_local_file_system(filesystem)
+            not isinstance(filesystem, LocalFileSystem)
         ):
             raise TypeError(
                 "Path-like objects with __fspath__ must only be used with "
diff --git a/python/pyarrow/tests/parquet/test_basic.py 
b/python/pyarrow/tests/parquet/test_basic.py
index 3c867776ac..bc21d709ec 100644
--- a/python/pyarrow/tests/parquet/test_basic.py
+++ b/python/pyarrow/tests/parquet/test_basic.py
@@ -25,7 +25,6 @@ import pytest
 
 import pyarrow as pa
 from pyarrow import fs
-from pyarrow.filesystem import LocalFileSystem, FileSystem
 from pyarrow.tests import util
 from pyarrow.tests.parquet.common import (_check_roundtrip, _roundtrip_table,
                                           _test_dataframe)
@@ -259,11 +258,11 @@ def test_fspath(tempdir):
 
     # combined with non-local filesystem raises
     with pytest.raises(TypeError):
-        _read_table(fs_protocol_obj, filesystem=FileSystem())
+        _read_table(fs_protocol_obj, filesystem=fs.FileSystem())
 
 
 @pytest.mark.parametrize("filesystem", [
-    None, fs.LocalFileSystem(), LocalFileSystem._get_instance()
+    None, fs.LocalFileSystem()
 ])
 @pytest.mark.parametrize("name", ("data.parquet", "δΎ‹.parquet"))
 def test_relative_paths(tempdir, filesystem, name):
diff --git a/python/pyarrow/tests/parquet/test_dataset.py 
b/python/pyarrow/tests/parquet/test_dataset.py
index b6e351bdef..30dae05124 100644
--- a/python/pyarrow/tests/parquet/test_dataset.py
+++ b/python/pyarrow/tests/parquet/test_dataset.py
@@ -26,11 +26,10 @@ import unittest.mock as mock
 
 import pyarrow as pa
 import pyarrow.compute as pc
-from pyarrow import fs
-from pyarrow.filesystem import LocalFileSystem
+from pyarrow.fs import (FileSelector, FileSystem, LocalFileSystem,
+                        PyFileSystem, SubTreeFileSystem, FSSpecHandler)
 from pyarrow.tests import util
 from pyarrow.util import guid
-from pyarrow.vendored.version import Version
 
 try:
     import pyarrow.parquet as pq
@@ -63,7 +62,7 @@ def test_filesystem_uri(tempdir):
 
     # filesystem object
     result = pq.read_table(
-        path, filesystem=fs.LocalFileSystem())
+        path, filesystem=LocalFileSystem())
     assert result.equals(table)
 
     # filesystem URI
@@ -74,17 +73,17 @@ def test_filesystem_uri(tempdir):
 
 @pytest.mark.pandas
 def test_read_partitioned_directory(tempdir):
-    fs = LocalFileSystem._get_instance()
-    _partition_test_for_filesystem(fs, tempdir)
+    local = LocalFileSystem()
+    _partition_test_for_filesystem(local, tempdir)
 
 
 @pytest.mark.pandas
 def test_read_partitioned_columns_selection(tempdir):
     # ARROW-3861 - do not include partition columns in resulting table when
     # `columns` keyword was passed without those columns
-    fs = LocalFileSystem._get_instance()
+    local = LocalFileSystem()
     base_path = tempdir
-    _partition_test_for_filesystem(fs, base_path)
+    _partition_test_for_filesystem(local, base_path)
 
     dataset = pq.ParquetDataset(base_path)
     result = dataset.read(columns=["values"])
@@ -93,7 +92,7 @@ def test_read_partitioned_columns_selection(tempdir):
 
 @pytest.mark.pandas
 def test_filters_equivalency(tempdir):
-    fs = LocalFileSystem._get_instance()
+    local = LocalFileSystem()
     base_path = tempdir
 
     integer_keys = [0, 1]
@@ -112,12 +111,12 @@ def test_filters_equivalency(tempdir):
                            3),
     }, columns=['integer', 'string', 'boolean'])
 
-    _generate_partition_directories(fs, base_path, partition_spec, df)
+    _generate_partition_directories(local, base_path, partition_spec, df)
 
     # Old filters syntax:
     #  integer == 1 AND string != b AND boolean == True
     dataset = pq.ParquetDataset(
-        base_path, filesystem=fs,
+        base_path, filesystem=local,
         filters=[('integer', '=', 1), ('string', '!=', 'b'),
                  ('boolean', '==', 'True')],
     )
@@ -141,7 +140,7 @@ def test_filters_equivalency(tempdir):
         [('integer', '=', 0), ('boolean', '==', 'False')]
     ]
     dataset = pq.ParquetDataset(
-        base_path, filesystem=fs, filters=filters)
+        base_path, filesystem=local, filters=filters)
     table = dataset.read()
     result_df = table.to_pandas().reset_index(drop=True)
 
@@ -158,13 +157,13 @@ def test_filters_equivalency(tempdir):
     for filters in [[[('string', '==', b'1\0a')]],
                     [[('string', '==', '1\0a')]]]:
         dataset = pq.ParquetDataset(
-            base_path, filesystem=fs, filters=filters)
+            base_path, filesystem=local, filters=filters)
         assert dataset.read().num_rows == 0
 
 
 @pytest.mark.pandas
 def test_filters_cutoff_exclusive_integer(tempdir):
-    fs = LocalFileSystem._get_instance()
+    local = LocalFileSystem()
     base_path = tempdir
 
     integer_keys = [0, 1, 2, 3, 4]
@@ -178,10 +177,10 @@ def test_filters_cutoff_exclusive_integer(tempdir):
         'integers': np.array(integer_keys, dtype='i4'),
     }, columns=['index', 'integers'])
 
-    _generate_partition_directories(fs, base_path, partition_spec, df)
+    _generate_partition_directories(local, base_path, partition_spec, df)
 
     dataset = pq.ParquetDataset(
-        base_path, filesystem=fs,
+        base_path, filesystem=local,
         filters=[
             ('integers', '<', 4),
             ('integers', '>', 1),
@@ -204,7 +203,7 @@ def test_filters_cutoff_exclusive_integer(tempdir):
 )
 @pytest.mark.pandas
 def test_filters_cutoff_exclusive_datetime(tempdir):
-    fs = LocalFileSystem._get_instance()
+    local = LocalFileSystem()
     base_path = tempdir
 
     date_keys = [
@@ -224,10 +223,10 @@ def test_filters_cutoff_exclusive_datetime(tempdir):
         'dates': np.array(date_keys, dtype='datetime64'),
     }, columns=['index', 'dates'])
 
-    _generate_partition_directories(fs, base_path, partition_spec, df)
+    _generate_partition_directories(local, base_path, partition_spec, df)
 
     dataset = pq.ParquetDataset(
-        base_path, filesystem=fs,
+        base_path, filesystem=local,
         filters=[
             ('dates', '<', "2018-04-12"),
             ('dates', '>', "2018-04-10")
@@ -264,7 +263,7 @@ def test_filters_inclusive_datetime(tempdir):
 
 @pytest.mark.pandas
 def test_filters_inclusive_integer(tempdir):
-    fs = LocalFileSystem._get_instance()
+    local = LocalFileSystem()
     base_path = tempdir
 
     integer_keys = [0, 1, 2, 3, 4]
@@ -278,10 +277,10 @@ def test_filters_inclusive_integer(tempdir):
         'integers': np.array(integer_keys, dtype='i4'),
     }, columns=['index', 'integers'])
 
-    _generate_partition_directories(fs, base_path, partition_spec, df)
+    _generate_partition_directories(local, base_path, partition_spec, df)
 
     dataset = pq.ParquetDataset(
-        base_path, filesystem=fs,
+        base_path, filesystem=local,
         filters=[
             ('integers', '<=', 3),
             ('integers', '>=', 2),
@@ -298,7 +297,7 @@ def test_filters_inclusive_integer(tempdir):
 
 @pytest.mark.pandas
 def test_filters_inclusive_set(tempdir):
-    fs = LocalFileSystem._get_instance()
+    local = LocalFileSystem()
     base_path = tempdir
 
     integer_keys = [0, 1]
@@ -317,10 +316,10 @@ def test_filters_inclusive_set(tempdir):
                            3),
     }, columns=['integer', 'string', 'boolean'])
 
-    _generate_partition_directories(fs, base_path, partition_spec, df)
+    _generate_partition_directories(local, base_path, partition_spec, df)
 
     dataset = pq.ParquetDataset(
-        base_path, filesystem=fs,
+        base_path, filesystem=local,
         filters=[('string', 'in', 'ab')],
     )
     table = dataset.read()
@@ -331,7 +330,7 @@ def test_filters_inclusive_set(tempdir):
     assert 'c' not in result_df['string'].values
 
     dataset = pq.ParquetDataset(
-        base_path, filesystem=fs,
+        base_path, filesystem=local,
         filters=[('integer', 'in', [1]), ('string', 'in', ('a', 'b')),
                  ('boolean', 'not in', {'False'})],
     )
@@ -345,7 +344,7 @@ def test_filters_inclusive_set(tempdir):
 
 @pytest.mark.pandas
 def test_filters_invalid_pred_op(tempdir):
-    fs = LocalFileSystem._get_instance()
+    local = LocalFileSystem()
     base_path = tempdir
 
     integer_keys = [0, 1, 2, 3, 4]
@@ -359,26 +358,26 @@ def test_filters_invalid_pred_op(tempdir):
         'integers': np.array(integer_keys, dtype='i4'),
     }, columns=['index', 'integers'])
 
-    _generate_partition_directories(fs, base_path, partition_spec, df)
+    _generate_partition_directories(local, base_path, partition_spec, df)
 
     with pytest.raises(TypeError):
         pq.ParquetDataset(base_path,
-                          filesystem=fs,
+                          filesystem=local,
                           filters=[('integers', 'in', 3), ])
 
     with pytest.raises(ValueError):
         pq.ParquetDataset(base_path,
-                          filesystem=fs,
+                          filesystem=local,
                           filters=[('integers', '=<', 3), ])
 
     # Dataset API returns empty table
     dataset = pq.ParquetDataset(base_path,
-                                filesystem=fs,
+                                filesystem=local,
                                 filters=[('integers', 'in', set()), ])
     assert dataset.read().num_rows == 0
 
     dataset = pq.ParquetDataset(base_path,
-                                filesystem=fs,
+                                filesystem=local,
                                 filters=[('integers', '!=', {3})])
     with pytest.raises(NotImplementedError):
         assert dataset.read().num_rows == 0
@@ -388,7 +387,7 @@ def test_filters_invalid_pred_op(tempdir):
 def test_filters_invalid_column(tempdir):
     # ARROW-5572 - raise error on invalid name in filter specification
     # works with new dataset
-    fs = LocalFileSystem._get_instance()
+    local = LocalFileSystem()
     base_path = tempdir
 
     integer_keys = [0, 1, 2, 3, 4]
@@ -400,11 +399,11 @@ def test_filters_invalid_column(tempdir):
         'integers': np.array(integer_keys, dtype='i4'),
     }, columns=['index', 'integers'])
 
-    _generate_partition_directories(fs, base_path, partition_spec, df)
+    _generate_partition_directories(local, base_path, partition_spec, df)
 
     msg = r"No match for FieldRef.Name\(non_existent_column\)"
     with pytest.raises(ValueError, match=msg):
-        pq.ParquetDataset(base_path, filesystem=fs,
+        pq.ParquetDataset(base_path, filesystem=local,
                           filters=[('non_existent_column', '<', 3), ]).read()
 
 
@@ -419,7 +418,7 @@ def test_filters_invalid_column(tempdir):
 def test_filters_read_table(tempdir, filters, read_method):
     read = getattr(pq, read_method)
     # test that filters keyword is passed through in read_table
-    fs = LocalFileSystem._get_instance()
+    local = LocalFileSystem()
     base_path = tempdir
 
     integer_keys = [0, 1, 2, 3, 4]
@@ -434,9 +433,9 @@ def test_filters_read_table(tempdir, filters, read_method):
         'nested': np.array([{'a': i, 'b': str(i)} for i in range(N)])
     })
 
-    _generate_partition_directories(fs, base_path, partition_spec, df)
+    _generate_partition_directories(local, base_path, partition_spec, df)
 
-    kwargs = dict(filesystem=fs, filters=filters)
+    kwargs = dict(filesystem=local, filters=filters)
 
     table = read(base_path, **kwargs)
     assert table.num_rows == 3
@@ -445,7 +444,7 @@ def test_filters_read_table(tempdir, filters, read_method):
 @pytest.mark.pandas
 def test_partition_keys_with_underscores(tempdir):
     # ARROW-5666 - partition field values with underscores preserve underscores
-    fs = LocalFileSystem._get_instance()
+    local = LocalFileSystem()
     base_path = tempdir
 
     string_keys = ["2019_2", "2019_3"]
@@ -459,7 +458,7 @@ def test_partition_keys_with_underscores(tempdir):
         'year_week': np.array(string_keys, dtype='object'),
     }, columns=['index', 'year_week'])
 
-    _generate_partition_directories(fs, base_path, partition_spec, df)
+    _generate_partition_directories(local, base_path, partition_spec, df)
 
     dataset = pq.ParquetDataset(base_path)
     result = dataset.read()
@@ -499,26 +498,6 @@ def test_read_single_file_list(tempdir):
     assert result.equals(table)
 
 
[email protected]
[email protected]
-def test_read_partitioned_directory_s3fs_wrapper(s3_example_s3fs):
-    import s3fs
-
-    from pyarrow.filesystem import S3FSWrapper
-
-    if Version(s3fs.__version__) >= Version("0.5"):
-        pytest.skip("S3FSWrapper no longer working for s3fs 0.5+")
-
-    fs, path = s3_example_s3fs
-    with pytest.warns(FutureWarning):
-        wrapper = S3FSWrapper(fs)
-    _partition_test_for_filesystem(wrapper, path)
-
-    # Check that we can auto-wrap
-    dataset = pq.ParquetDataset(path, filesystem=fs)
-    dataset.read()
-
-
 @pytest.mark.pandas
 @pytest.mark.s3
 def test_read_partitioned_directory_s3fs(s3_example_s3fs):
@@ -569,6 +548,9 @@ def _generate_partition_directories(fs, base_dir, 
partition_spec, df):
     # partition_spec : list of lists, e.g. [['foo', [0, 1, 2],
     #                                       ['bar', ['a', 'b', 'c']]
     # part_table : a pyarrow.Table to write to each partition
+    if not isinstance(fs, FileSystem):
+        fs = PyFileSystem(FSSpecHandler(fs))
+
     DEPTH = len(partition_spec)
 
     pathsep = getattr(fs, "pathsep", getattr(fs, "sep", "/"))
@@ -582,24 +564,27 @@ def _generate_partition_directories(fs, base_dir, 
partition_spec, df):
                 str(base_dir),
                 '{}={}'.format(name, value)
             ])
-            fs.mkdir(level_dir)
+            fs.create_dir(level_dir)
 
             if level == DEPTH - 1:
                 # Generate example data
+                from pyarrow.fs import FileType
+
                 file_path = pathsep.join([level_dir, guid()])
                 filtered_df = _filter_partition(df, this_part_keys)
                 part_table = pa.Table.from_pandas(filtered_df)
-                with fs.open(file_path, 'wb') as f:
+                with fs.open_output_stream(file_path) as f:
                     _write_table(part_table, f)
-                assert fs.exists(file_path)
+                assert fs.get_file_info(file_path).type != FileType.NotFound
+                assert fs.get_file_info(file_path).type == FileType.File
 
                 file_success = pathsep.join([level_dir, '_SUCCESS'])
-                with fs.open(file_success, 'wb') as f:
+                with fs.open_output_stream(file_success) as f:
                     pass
             else:
                 _visit_level(level_dir, level + 1, this_part_keys)
                 file_success = pathsep.join([level_dir, '_SUCCESS'])
-                with fs.open(file_success, 'wb') as f:
+                with fs.open_output_stream(file_success) as f:
                     pass
 
     _visit_level(base_dir, 0, [])
@@ -1009,15 +994,21 @@ def _test_write_to_dataset_no_partitions(base_path,
     output_table = pa.Table.from_pandas(output_df)
 
     if filesystem is None:
-        filesystem = LocalFileSystem._get_instance()
+        filesystem = LocalFileSystem()
+    elif not isinstance(filesystem, FileSystem):
+        filesystem = PyFileSystem(FSSpecHandler(filesystem))
 
     # Without partitions, append files to root_path
     n = 5
     for i in range(n):
         pq.write_to_dataset(output_table, base_path,
                             filesystem=filesystem)
-    output_files = [file for file in filesystem.ls(str(base_path))
-                    if file.endswith(".parquet")]
+
+    selector = FileSelector(str(base_path), allow_not_found=False,
+                            recursive=True)
+
+    infos = filesystem.get_file_info(selector)
+    output_files = [info for info in infos if info.path.endswith(".parquet")]
     assert len(output_files) == n
 
     # Deduplicated incoming DataFrame should match
@@ -1103,14 +1094,14 @@ def test_write_to_dataset_filesystem(tempdir):
     table = pa.Table.from_pandas(df)
     path = str(tempdir)
 
-    pq.write_to_dataset(table, path, filesystem=fs.LocalFileSystem())
+    pq.write_to_dataset(table, path, filesystem=LocalFileSystem())
     result = pq.read_table(path)
     assert result.equals(table)
 
 
 def _make_dataset_for_pickling(tempdir, N=100):
     path = tempdir / 'data.parquet'
-    fs = LocalFileSystem._get_instance()
+    local = LocalFileSystem()
 
     df = pd.DataFrame({
         'index': np.arange(N),
@@ -1127,11 +1118,11 @@ def _make_dataset_for_pickling(tempdir, N=100):
     assert reader.metadata.num_row_groups == num_groups
 
     metadata_path = tempdir / '_metadata'
-    with fs.open(metadata_path, 'wb') as f:
+    with local.open_output_stream(str(metadata_path)) as f:
         pq.write_metadata(table.schema, f)
 
     dataset = pq.ParquetDataset(
-        tempdir, filesystem=fs)
+        tempdir, filesystem=local)
 
     return dataset
 
@@ -1249,7 +1240,7 @@ def test_parquet_dataset_new_filesystem(tempdir):
     # Ensure we can pass new FileSystem object to ParquetDataset
     table = pa.table({'a': [1, 2, 3]})
     pq.write_table(table, tempdir / 'data.parquet')
-    filesystem = fs.SubTreeFileSystem(str(tempdir), fs.LocalFileSystem())
+    filesystem = SubTreeFileSystem(str(tempdir), LocalFileSystem())
     dataset = pq.ParquetDataset('.', filesystem=filesystem)
     result = dataset.read()
     assert result.equals(table)
diff --git a/python/pyarrow/tests/parquet/test_parquet_writer.py 
b/python/pyarrow/tests/parquet/test_parquet_writer.py
index 16584684f5..f4ee7529ae 100644
--- a/python/pyarrow/tests/parquet/test_parquet_writer.py
+++ b/python/pyarrow/tests/parquet/test_parquet_writer.py
@@ -19,7 +19,6 @@ import pytest
 
 import pyarrow as pa
 from pyarrow import fs
-from pyarrow.filesystem import FileSystem, LocalFileSystem
 
 try:
     import pyarrow.parquet as pq
@@ -161,7 +160,6 @@ def test_parquet_writer_context_obj_with_exception(tempdir):
 @pytest.mark.pandas
 @pytest.mark.parametrize("filesystem", [
     None,
-    LocalFileSystem._get_instance(),
     fs.LocalFileSystem(),
 ])
 def test_parquet_writer_write_wrappers(tempdir, filesystem):
@@ -250,7 +248,6 @@ def test_parquet_writer_chunk_size(tempdir):
 @pytest.mark.pandas
 @pytest.mark.parametrize("filesystem", [
     None,
-    LocalFileSystem._get_instance(),
     fs.LocalFileSystem(),
 ])
 def test_parquet_writer_filesystem_local(tempdir, filesystem):
@@ -330,46 +327,6 @@ def test_parquet_writer_filesystem_buffer_raises():
         )
 
 
[email protected]
-def test_parquet_writer_with_caller_provided_filesystem():
-    out = pa.BufferOutputStream()
-
-    class CustomFS(FileSystem):
-        def __init__(self):
-            self.path = None
-            self.mode = None
-
-        def open(self, path, mode='rb'):
-            self.path = path
-            self.mode = mode
-            return out
-
-    fs = CustomFS()
-    fname = 'expected_fname.parquet'
-    df = _test_dataframe(100)
-    table = pa.Table.from_pandas(df, preserve_index=False)
-
-    with pq.ParquetWriter(fname, table.schema, filesystem=fs, version='2.6') \
-            as writer:
-        writer.write_table(table)
-
-    assert fs.path == fname
-    assert fs.mode == 'wb'
-    assert out.closed
-
-    buf = out.getvalue()
-    table_read = _read_table(pa.BufferReader(buf))
-    df_read = table_read.to_pandas()
-    tm.assert_frame_equal(df_read, df)
-
-    # Should raise ValueError when filesystem is passed with file-like object
-    with pytest.raises(ValueError) as err_info:
-        pq.ParquetWriter(pa.BufferOutputStream(), table.schema, filesystem=fs)
-        expected_msg = ("filesystem passed but where is file-like, so"
-                        " there is nothing to open with filesystem.")
-        assert str(err_info) == expected_msg
-
-
 def test_parquet_writer_store_schema(tempdir):
     table = pa.table({'a': [1, 2, 3]})
 
diff --git a/python/pyarrow/tests/test_filesystem.py 
b/python/pyarrow/tests/test_filesystem.py
deleted file mode 100644
index 9862c5990d..0000000000
--- a/python/pyarrow/tests/test_filesystem.py
+++ /dev/null
@@ -1,75 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import pyarrow as pa
-from pyarrow import filesystem
-
-import os
-import pytest
-
-
-def test_filesystem_deprecated():
-    with pytest.warns(FutureWarning):
-        filesystem.LocalFileSystem()
-
-    with pytest.warns(FutureWarning):
-        filesystem.LocalFileSystem.get_instance()
-
-
-def test_filesystem_deprecated_toplevel():
-    with pytest.warns(FutureWarning):
-        pa.localfs
-
-    with pytest.warns(FutureWarning):
-        pa.FileSystem
-
-    with pytest.warns(FutureWarning):
-        pa.LocalFileSystem
-
-    with pytest.warns(FutureWarning):
-        pa.HadoopFileSystem
-
-
-def test_resolve_uri():
-    uri = "file:///home/user/myfile.parquet"
-    fs, path = filesystem.resolve_filesystem_and_path(uri)
-    assert isinstance(fs, filesystem.LocalFileSystem)
-    assert path == "/home/user/myfile.parquet"
-
-
-def test_resolve_local_path():
-    for uri in ['/home/user/myfile.parquet',
-                'myfile.parquet',
-                'my # file ? parquet',
-                'C:/Windows/myfile.parquet',
-                r'C:\\Windows\\myfile.parquet',
-                ]:
-        fs, path = filesystem.resolve_filesystem_and_path(uri)
-        assert isinstance(fs, filesystem.LocalFileSystem)
-        assert path == uri
-
-
[email protected]("ignore:pyarrow.filesystem.LocalFileSystem")
-def test_resolve_home_directory():
-    uri = '~/myfile.parquet'
-    fs, path = filesystem.resolve_filesystem_and_path(uri)
-    assert isinstance(fs, filesystem.LocalFileSystem)
-    assert path == os.path.expanduser(uri)
-
-    local_fs = filesystem.LocalFileSystem()
-    fs, path = filesystem.resolve_filesystem_and_path(uri, local_fs)
-    assert path == os.path.expanduser(uri)
diff --git a/python/pyarrow/tests/test_hdfs.py 
b/python/pyarrow/tests/test_hdfs.py
deleted file mode 100644
index 5b94c200f3..0000000000
--- a/python/pyarrow/tests/test_hdfs.py
+++ /dev/null
@@ -1,451 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import os
-import random
-from io import BytesIO
-from os.path import join as pjoin
-
-import numpy as np
-import pytest
-
-import pyarrow as pa
-from pyarrow.tests import util
-from pyarrow.tests.parquet.common import _test_dataframe
-from pyarrow.tests.parquet.test_dataset import (
-    _test_write_to_dataset_with_partitions,
-    _test_write_to_dataset_no_partitions
-)
-from pyarrow.util import guid
-
-try:
-    from pandas.testing import assert_frame_equal
-except ImportError:
-    pass
-
-
-# ----------------------------------------------------------------------
-# HDFS tests
-
-
-def check_libhdfs_present():
-    if not pa.have_libhdfs():
-        message = 'No libhdfs available on system'
-        if os.environ.get('PYARROW_HDFS_TEST_LIBHDFS_REQUIRE'):
-            pytest.fail(message)
-        else:
-            pytest.skip(message)
-
-
-def hdfs_test_client():
-    host = os.environ.get('ARROW_HDFS_TEST_HOST', 'default')
-    user = os.environ.get('ARROW_HDFS_TEST_USER', None)
-    try:
-        port = int(os.environ.get('ARROW_HDFS_TEST_PORT', 0))
-    except ValueError:
-        raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not '
-                         'an integer')
-
-    with pytest.warns(FutureWarning):
-        return pa.hdfs.connect(host, port, user)
-
-
[email protected]
-class HdfsTestCases:
-
-    def _make_test_file(self, hdfs, test_name, test_path, test_data):
-        base_path = pjoin(self.tmp_path, test_name)
-        hdfs.mkdir(base_path)
-
-        full_path = pjoin(base_path, test_path)
-
-        with hdfs.open(full_path, 'wb') as f:
-            f.write(test_data)
-
-        return full_path
-
-    @classmethod
-    def setup_class(cls):
-        cls.check_driver()
-        cls.hdfs = hdfs_test_client()
-        cls.tmp_path = '/tmp/pyarrow-test-{}'.format(random.randint(0, 1000))
-        cls.hdfs.mkdir(cls.tmp_path)
-
-    @classmethod
-    def teardown_class(cls):
-        cls.hdfs.delete(cls.tmp_path, recursive=True)
-        cls.hdfs.close()
-
-    def test_pickle(self, pickle_module):
-        s = pickle_module.dumps(self.hdfs)
-        h2 = pickle_module.loads(s)
-        assert h2.is_open
-        assert h2.host == self.hdfs.host
-        assert h2.port == self.hdfs.port
-        assert h2.user == self.hdfs.user
-        assert h2.kerb_ticket == self.hdfs.kerb_ticket
-        # smoketest unpickled client works
-        h2.ls(self.tmp_path)
-
-    def test_cat(self):
-        path = pjoin(self.tmp_path, 'cat-test')
-
-        data = b'foobarbaz'
-        with self.hdfs.open(path, 'wb') as f:
-            f.write(data)
-
-        contents = self.hdfs.cat(path)
-        assert contents == data
-
-    def test_capacity_space(self):
-        capacity = self.hdfs.get_capacity()
-        space_used = self.hdfs.get_space_used()
-        disk_free = self.hdfs.df()
-
-        assert capacity > 0
-        assert capacity > space_used
-        assert disk_free == (capacity - space_used)
-
-    def test_close(self):
-        client = hdfs_test_client()
-        assert client.is_open
-        client.close()
-        assert not client.is_open
-
-        with pytest.raises(Exception):
-            client.ls('/')
-
-    def test_mkdir(self):
-        path = pjoin(self.tmp_path, 'test-dir/test-dir')
-        parent_path = pjoin(self.tmp_path, 'test-dir')
-
-        self.hdfs.mkdir(path)
-        assert self.hdfs.exists(path)
-
-        self.hdfs.delete(parent_path, recursive=True)
-        assert not self.hdfs.exists(path)
-
-    def test_mv_rename(self):
-        path = pjoin(self.tmp_path, 'mv-test')
-        new_path = pjoin(self.tmp_path, 'mv-new-test')
-
-        data = b'foobarbaz'
-        with self.hdfs.open(path, 'wb') as f:
-            f.write(data)
-
-        assert self.hdfs.exists(path)
-        self.hdfs.mv(path, new_path)
-        assert not self.hdfs.exists(path)
-        assert self.hdfs.exists(new_path)
-
-        assert self.hdfs.cat(new_path) == data
-
-        self.hdfs.rename(new_path, path)
-        assert self.hdfs.cat(path) == data
-
-    def test_info(self):
-        path = pjoin(self.tmp_path, 'info-base')
-        file_path = pjoin(path, 'ex')
-        self.hdfs.mkdir(path)
-
-        data = b'foobarbaz'
-        with self.hdfs.open(file_path, 'wb') as f:
-            f.write(data)
-
-        path_info = self.hdfs.info(path)
-        file_path_info = self.hdfs.info(file_path)
-
-        assert path_info['kind'] == 'directory'
-
-        assert file_path_info['kind'] == 'file'
-        assert file_path_info['size'] == len(data)
-
-    def test_exists_isdir_isfile(self):
-        dir_path = pjoin(self.tmp_path, 'info-base')
-        file_path = pjoin(dir_path, 'ex')
-        missing_path = pjoin(dir_path, 'this-path-is-missing')
-
-        self.hdfs.mkdir(dir_path)
-        with self.hdfs.open(file_path, 'wb') as f:
-            f.write(b'foobarbaz')
-
-        assert self.hdfs.exists(dir_path)
-        assert self.hdfs.exists(file_path)
-        assert not self.hdfs.exists(missing_path)
-
-        assert self.hdfs.isdir(dir_path)
-        assert not self.hdfs.isdir(file_path)
-        assert not self.hdfs.isdir(missing_path)
-
-        assert not self.hdfs.isfile(dir_path)
-        assert self.hdfs.isfile(file_path)
-        assert not self.hdfs.isfile(missing_path)
-
-    def test_disk_usage(self):
-        path = pjoin(self.tmp_path, 'disk-usage-base')
-        p1 = pjoin(path, 'p1')
-        p2 = pjoin(path, 'p2')
-
-        subdir = pjoin(path, 'subdir')
-        p3 = pjoin(subdir, 'p3')
-
-        if self.hdfs.exists(path):
-            self.hdfs.delete(path, True)
-
-        self.hdfs.mkdir(path)
-        self.hdfs.mkdir(subdir)
-
-        data = b'foobarbaz'
-
-        for file_path in [p1, p2, p3]:
-            with self.hdfs.open(file_path, 'wb') as f:
-                f.write(data)
-
-        assert self.hdfs.disk_usage(path) == len(data) * 3
-
-    def test_ls(self):
-        base_path = pjoin(self.tmp_path, 'ls-test')
-        self.hdfs.mkdir(base_path)
-
-        dir_path = pjoin(base_path, 'a-dir')
-        f1_path = pjoin(base_path, 'a-file-1')
-
-        self.hdfs.mkdir(dir_path)
-
-        f = self.hdfs.open(f1_path, 'wb')
-        f.write(b'a' * 10)
-
-        contents = sorted(self.hdfs.ls(base_path, False))
-        assert contents == [dir_path, f1_path]
-
-    def test_chmod_chown(self):
-        path = pjoin(self.tmp_path, 'chmod-test')
-        with self.hdfs.open(path, 'wb') as f:
-            f.write(b'a' * 10)
-
-    def test_download_upload(self):
-        base_path = pjoin(self.tmp_path, 'upload-test')
-
-        data = b'foobarbaz'
-        buf = BytesIO(data)
-        buf.seek(0)
-
-        self.hdfs.upload(base_path, buf)
-
-        out_buf = BytesIO()
-        self.hdfs.download(base_path, out_buf)
-        out_buf.seek(0)
-        assert out_buf.getvalue() == data
-
-    def test_file_context_manager(self):
-        path = pjoin(self.tmp_path, 'ctx-manager')
-
-        data = b'foo'
-        with self.hdfs.open(path, 'wb') as f:
-            f.write(data)
-
-        with self.hdfs.open(path, 'rb') as f:
-            assert f.size() == 3
-            result = f.read(10)
-            assert result == data
-
-    def test_open_not_exist(self):
-        path = pjoin(self.tmp_path, 'does-not-exist-123')
-
-        with pytest.raises(FileNotFoundError):
-            self.hdfs.open(path)
-
-    def test_open_write_error(self):
-        with pytest.raises((FileExistsError, IsADirectoryError)):
-            self.hdfs.open('/', 'wb')
-
-    def test_read_whole_file(self):
-        path = pjoin(self.tmp_path, 'read-whole-file')
-
-        data = b'foo' * 1000
-        with self.hdfs.open(path, 'wb') as f:
-            f.write(data)
-
-        with self.hdfs.open(path, 'rb') as f:
-            result = f.read()
-
-        assert result == data
-
-    def _write_multiple_hdfs_pq_files(self, tmpdir):
-        import pyarrow.parquet as pq
-        nfiles = 10
-        size = 5
-        test_data = []
-        for i in range(nfiles):
-            df = _test_dataframe(size, seed=i)
-
-            df['index'] = np.arange(i * size, (i + 1) * size)
-
-            # Hack so that we don't have a dtype cast in v1 files
-            df['uint32'] = df['uint32'].astype(np.int64)
-
-            path = pjoin(tmpdir, '{}.parquet'.format(i))
-
-            table = pa.Table.from_pandas(df, preserve_index=False)
-            with self.hdfs.open(path, 'wb') as f:
-                pq.write_table(table, f)
-
-            test_data.append(table)
-
-        expected = pa.concat_tables(test_data)
-        return expected
-
-    @pytest.mark.xfail(reason="legacy.FileSystem not supported with 
ParquetDataset "
-                       "due to legacy path being removed in PyArrow 15.0.0.",
-                       raises=TypeError)
-    @pytest.mark.pandas
-    @pytest.mark.parquet
-    def test_read_multiple_parquet_files(self):
-
-        tmpdir = pjoin(self.tmp_path, 'multi-parquet-' + guid())
-
-        self.hdfs.mkdir(tmpdir)
-
-        expected = self._write_multiple_hdfs_pq_files(tmpdir)
-        result = self.hdfs.read_parquet(tmpdir)
-
-        assert_frame_equal(
-            result.to_pandas().sort_values(by='index').reset_index(drop=True),
-            expected.to_pandas()
-        )
-
-    @pytest.mark.pandas
-    @pytest.mark.parquet
-    def test_read_multiple_parquet_files_with_uri(self):
-        import pyarrow.parquet as pq
-
-        tmpdir = pjoin(self.tmp_path, 'multi-parquet-uri-' + guid())
-
-        self.hdfs.mkdir(tmpdir)
-
-        expected = self._write_multiple_hdfs_pq_files(tmpdir)
-        path = _get_hdfs_uri(tmpdir)
-        result = pq.read_table(path)
-
-        assert_frame_equal(
-            result.to_pandas().sort_values(by='index').reset_index(drop=True),
-            expected.to_pandas()
-        )
-
-    @pytest.mark.xfail(reason="legacy.FileSystem not supported with 
ParquetDataset "
-                       "due to legacy path being removed in PyArrow 15.0.0.",
-                       raises=TypeError)
-    @pytest.mark.pandas
-    @pytest.mark.parquet
-    def test_read_write_parquet_files_with_uri(self):
-        import pyarrow.parquet as pq
-
-        tmpdir = pjoin(self.tmp_path, 'uri-parquet-' + guid())
-        self.hdfs.mkdir(tmpdir)
-        path = _get_hdfs_uri(pjoin(tmpdir, 'test.parquet'))
-
-        size = 5
-        df = _test_dataframe(size, seed=0)
-        # Hack so that we don't have a dtype cast in v1 files
-        df['uint32'] = df['uint32'].astype(np.int64)
-        table = pa.Table.from_pandas(df, preserve_index=False)
-
-        pq.write_table(table, path, filesystem=self.hdfs)
-
-        result = pq.read_table(path, filesystem=self.hdfs).to_pandas()
-
-        assert_frame_equal(result, df)
-
-    @pytest.mark.xfail(reason="legacy.FileSystem not supported with 
ParquetDataset "
-                       "due to legacy path being removed in PyArrow 15.0.0.",
-                       raises=TypeError)
-    @pytest.mark.parquet
-    @pytest.mark.pandas
-    def test_write_to_dataset_with_partitions(self):
-        tmpdir = pjoin(self.tmp_path, 'write-partitions-' + guid())
-        self.hdfs.mkdir(tmpdir)
-        _test_write_to_dataset_with_partitions(
-            tmpdir, filesystem=self.hdfs)
-
-    @pytest.mark.xfail(reason="legacy.FileSystem not supported with 
ParquetDataset "
-                       "due to legacy path being removed in PyArrow 15.0.0.",
-                       raises=TypeError)
-    @pytest.mark.parquet
-    @pytest.mark.pandas
-    def test_write_to_dataset_no_partitions(self):
-        tmpdir = pjoin(self.tmp_path, 'write-no_partitions-' + guid())
-        self.hdfs.mkdir(tmpdir)
-        _test_write_to_dataset_no_partitions(
-            tmpdir, filesystem=self.hdfs)
-
-
-class TestLibHdfs(HdfsTestCases):
-
-    @classmethod
-    def check_driver(cls):
-        check_libhdfs_present()
-
-    def test_orphaned_file(self):
-        hdfs = hdfs_test_client()
-        file_path = self._make_test_file(hdfs, 'orphaned_file_test', 'fname',
-                                         b'foobarbaz')
-
-        f = hdfs.open(file_path)
-        hdfs = None
-        f = None  # noqa
-
-
-def _get_hdfs_uri(path):
-    host = os.environ.get('ARROW_HDFS_TEST_HOST', 'localhost')
-    try:
-        port = int(os.environ.get('ARROW_HDFS_TEST_PORT', 0))
-    except ValueError:
-        raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not '
-                         'an integer')
-    uri = "hdfs://{}:{}{}".format(host, port, path)
-
-    return uri
-
-
[email protected]
[email protected]
[email protected]
[email protected]
-def test_fastparquet_read_with_hdfs():
-    check_libhdfs_present()
-    try:
-        import snappy  # noqa
-    except ImportError:
-        pytest.skip('fastparquet test requires snappy')
-
-    import pyarrow.parquet as pq
-    fastparquet = pytest.importorskip('fastparquet')
-
-    fs = hdfs_test_client()
-
-    df = util.make_dataframe()
-
-    table = pa.Table.from_pandas(df)
-
-    path = '/tmp/testing.parquet'
-    with fs.open(path, 'wb') as f:
-        pq.write_table(table, f)
-
-    parquet_file = fastparquet.ParquetFile(path, open_with=fs.open)
-
-    result = parquet_file.to_pandas()
-    assert_frame_equal(result, df)
diff --git a/python/setup.py b/python/setup.py
index 423de708e8..798bd6b05f 100755
--- a/python/setup.py
+++ b/python/setup.py
@@ -211,7 +211,6 @@ class build_ext(_build_ext):
         '_s3fs',
         '_substrait',
         '_hdfs',
-        '_hdfsio',
         'gandiva']
 
     def _run_cmake(self):


Reply via email to