pitrou commented on a change in pull request #7991:
URL: https://github.com/apache/arrow/pull/7991#discussion_r481241805



##########
File path: python/pyarrow/fs.py
##########
@@ -79,15 +83,44 @@ def _ensure_filesystem(filesystem, use_mmap=False):
             return PyFileSystem(FSSpecHandler(filesystem))
 
     # map old filesystems to new ones
-    from pyarrow.filesystem import LocalFileSystem as LegacyLocalFileSystem
+    from pyarrow.filesystem import (
+        FileSystem as LegacyFileSystem,
+        LocalFileSystem as LegacyLocalFileSystem
+    )
 
     if isinstance(filesystem, LegacyLocalFileSystem):
         return LocalFileSystem(use_mmap=use_mmap)
     # TODO handle HDFS?
+    if allow_legacy_filesystem and isinstance(filesystem, LegacyFileSystem):
+        return filesystem
 
     raise TypeError("Unrecognized filesystem: {}".format(type(filesystem)))
 
 
+def _resolve_filesystem_and_path(
+    path, filesystem=None, allow_legacy_filesystem=False
+):
+    """
+    Return filesystem/path from path which could be an URI or a plain
+    filesystem path.
+    """
+    if not _is_path_like(path):
+        if filesystem is not None:
+            raise ValueError("filesystem passed but where is file-like, so"

Review comment:
       "where"?

##########
File path: python/pyarrow/parquet.py
##########
@@ -35,8 +35,9 @@
                               FileMetaData, RowGroupMetaData,
                               ColumnChunkMetaData,
                               ParquetSchema, ColumnSchema)
-from pyarrow.filesystem import (LocalFileSystem, _ensure_filesystem,
-                                resolve_filesystem_and_path)
+from pyarrow.filesystem import (

Review comment:
       Perhaps `import pyarrow.filesystem as legacyfs` would make the code 
easier to read below.

##########
File path: python/pyarrow/parquet.py
##########
@@ -35,8 +35,9 @@
                               FileMetaData, RowGroupMetaData,
                               ColumnChunkMetaData,
                               ParquetSchema, ColumnSchema)
-from pyarrow.filesystem import (LocalFileSystem, _ensure_filesystem,
-                                resolve_filesystem_and_path)
+from pyarrow.filesystem import (

Review comment:
       These are legacy filesystem imports? Do we still need them?

##########
File path: python/pyarrow/tests/test_parquet.py
##########
@@ -3510,6 +3511,75 @@ def 
test_parquet_file_pass_directory_instead_of_file(tempdir):
         pq.ParquetFile(path)
 
 
[email protected]
[email protected]("filesystem", [
+    None,
+    LocalFileSystem.get_instance(),
+    fs.LocalFileSystem(),
+])
+def test_parquet_writer_filesystem_local(tempdir, filesystem):
+    df = _test_dataframe(100)
+    table = pa.Table.from_pandas(df, preserve_index=False)
+    path = str(tempdir / 'data.parquet')
+
+    with pq.ParquetWriter(
+        path, table.schema, filesystem=filesystem, version='2.0'
+    ) as writer:
+        writer.write_table(table)
+
+    result = _read_table(path).to_pandas()
+    tm.assert_frame_equal(result, df)
+
+
[email protected]
+def s3_example_fs(s3_connection, s3_server):
+    from pyarrow.fs import FileSystem
+
+    host, port, access_key, secret_key = s3_connection
+    uri = (
+        "s3://{}:{}@mybucket/data.parquet?scheme=http&endpoint_override={}:{}"
+        .format(access_key, secret_key, host, port)
+    )
+    fs, path = FileSystem.from_uri(uri)
+
+    fs.create_dir("mybucket")
+
+    yield fs, uri, path
+
+
[email protected]
[email protected]
+def test_parquet_writer_filesystem_s3(s3_example_fs):
+    df = _test_dataframe(100)
+    table = pa.Table.from_pandas(df, preserve_index=False)
+
+    fs, uri, path = s3_example_fs
+
+    with pq.ParquetWriter(
+        path, table.schema, filesystem=fs, version='2.0'
+    ) as writer:
+        writer.write_table(table)
+
+    result = _read_table(uri).to_pandas()
+    tm.assert_frame_equal(result, df)
+
+
[email protected]
+def test_parquet_writer_filesystem_buffer_raises():
+    df = _test_dataframe(100)
+    table = pa.Table.from_pandas(df, preserve_index=False)
+    filesystem = fs.LocalFileSystem()
+
+    # Should raise ValueError when filesystem is passed with file-like object
+    with pytest.raises(ValueError) as err_info:
+        pq.ParquetWriter(
+            pa.BufferOutputStream(), table.schema, filesystem=filesystem
+        )
+        expected_msg = ("filesystem passed but where is file-like, so"

Review comment:
       This must be lifted out of the `with` block.

##########
File path: python/pyarrow/fs.py
##########
@@ -79,15 +83,44 @@ def _ensure_filesystem(filesystem, use_mmap=False):
             return PyFileSystem(FSSpecHandler(filesystem))
 
     # map old filesystems to new ones
-    from pyarrow.filesystem import LocalFileSystem as LegacyLocalFileSystem
+    from pyarrow.filesystem import (
+        FileSystem as LegacyFileSystem,
+        LocalFileSystem as LegacyLocalFileSystem
+    )
 
     if isinstance(filesystem, LegacyLocalFileSystem):
         return LocalFileSystem(use_mmap=use_mmap)
     # TODO handle HDFS?
+    if allow_legacy_filesystem and isinstance(filesystem, LegacyFileSystem):
+        return filesystem
 
     raise TypeError("Unrecognized filesystem: {}".format(type(filesystem)))
 
 
+def _resolve_filesystem_and_path(
+    path, filesystem=None, allow_legacy_filesystem=False
+):
+    """
+    Return filesystem/path from path which could be an URI or a plain
+    filesystem path.
+    """
+    if not _is_path_like(path):
+        if filesystem is not None:
+            raise ValueError("filesystem passed but where is file-like, so"

Review comment:
       If it's the name of an argument, then put backquotes around it.

##########
File path: python/pyarrow/tests/test_parquet.py
##########
@@ -3510,6 +3511,75 @@ def 
test_parquet_file_pass_directory_instead_of_file(tempdir):
         pq.ParquetFile(path)
 
 
[email protected]
[email protected]("filesystem", [
+    None,
+    LocalFileSystem.get_instance(),
+    fs.LocalFileSystem(),
+])
+def test_parquet_writer_filesystem_local(tempdir, filesystem):
+    df = _test_dataframe(100)
+    table = pa.Table.from_pandas(df, preserve_index=False)
+    path = str(tempdir / 'data.parquet')
+
+    with pq.ParquetWriter(
+        path, table.schema, filesystem=filesystem, version='2.0'
+    ) as writer:
+        writer.write_table(table)
+
+    result = _read_table(path).to_pandas()
+    tm.assert_frame_equal(result, df)
+
+
[email protected]
+def s3_example_fs(s3_connection, s3_server):
+    from pyarrow.fs import FileSystem
+
+    host, port, access_key, secret_key = s3_connection
+    uri = (
+        "s3://{}:{}@mybucket/data.parquet?scheme=http&endpoint_override={}:{}"
+        .format(access_key, secret_key, host, port)
+    )
+    fs, path = FileSystem.from_uri(uri)
+
+    fs.create_dir("mybucket")
+
+    yield fs, uri, path
+
+
[email protected]
[email protected]
+def test_parquet_writer_filesystem_s3(s3_example_fs):
+    df = _test_dataframe(100)
+    table = pa.Table.from_pandas(df, preserve_index=False)
+
+    fs, uri, path = s3_example_fs
+
+    with pq.ParquetWriter(
+        path, table.schema, filesystem=fs, version='2.0'
+    ) as writer:
+        writer.write_table(table)
+
+    result = _read_table(uri).to_pandas()
+    tm.assert_frame_equal(result, df)
+
+
[email protected]
+def test_parquet_writer_filesystem_buffer_raises():
+    df = _test_dataframe(100)
+    table = pa.Table.from_pandas(df, preserve_index=False)
+    filesystem = fs.LocalFileSystem()
+
+    # Should raise ValueError when filesystem is passed with file-like object
+    with pytest.raises(ValueError) as err_info:
+        pq.ParquetWriter(
+            pa.BufferOutputStream(), table.schema, filesystem=filesystem
+        )
+        expected_msg = ("filesystem passed but where is file-like, so"

Review comment:
       Note that it may be simpler to use `pytest.raises(ValueError, 
match="...")`

##########
File path: python/pyarrow/tests/test_parquet.py
##########
@@ -31,6 +31,7 @@
 from pyarrow.tests import util
 from pyarrow.util import guid
 from pyarrow.filesystem import LocalFileSystem, FileSystem
+from pyarrow import fs

Review comment:
       Also `from pyarrow import filesystem as legacyfs`?

##########
File path: python/pyarrow/tests/test_parquet.py
##########
@@ -3510,6 +3511,75 @@ def 
test_parquet_file_pass_directory_instead_of_file(tempdir):
         pq.ParquetFile(path)
 
 
[email protected]
[email protected]("filesystem", [
+    None,
+    LocalFileSystem.get_instance(),
+    fs.LocalFileSystem(),
+])
+def test_parquet_writer_filesystem_local(tempdir, filesystem):
+    df = _test_dataframe(100)
+    table = pa.Table.from_pandas(df, preserve_index=False)
+    path = str(tempdir / 'data.parquet')
+
+    with pq.ParquetWriter(
+        path, table.schema, filesystem=filesystem, version='2.0'
+    ) as writer:
+        writer.write_table(table)
+
+    result = _read_table(path).to_pandas()
+    tm.assert_frame_equal(result, df)
+
+
[email protected]
+def s3_example_fs(s3_connection, s3_server):
+    from pyarrow.fs import FileSystem
+
+    host, port, access_key, secret_key = s3_connection
+    uri = (
+        "s3://{}:{}@mybucket/data.parquet?scheme=http&endpoint_override={}:{}"
+        .format(access_key, secret_key, host, port)
+    )
+    fs, path = FileSystem.from_uri(uri)
+
+    fs.create_dir("mybucket")
+
+    yield fs, uri, path
+
+
[email protected]
[email protected]
+def test_parquet_writer_filesystem_s3(s3_example_fs):
+    df = _test_dataframe(100)
+    table = pa.Table.from_pandas(df, preserve_index=False)
+
+    fs, uri, path = s3_example_fs
+
+    with pq.ParquetWriter(
+        path, table.schema, filesystem=fs, version='2.0'
+    ) as writer:
+        writer.write_table(table)

Review comment:
       Should we also test `ParquetWriter(path=uri)`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to