pitrou commented on a change in pull request #7991:
URL: https://github.com/apache/arrow/pull/7991#discussion_r481241805
##########
File path: python/pyarrow/fs.py
##########
@@ -79,15 +83,44 @@ def _ensure_filesystem(filesystem, use_mmap=False):
return PyFileSystem(FSSpecHandler(filesystem))
# map old filesystems to new ones
- from pyarrow.filesystem import LocalFileSystem as LegacyLocalFileSystem
+ from pyarrow.filesystem import (
+ FileSystem as LegacyFileSystem,
+ LocalFileSystem as LegacyLocalFileSystem
+ )
if isinstance(filesystem, LegacyLocalFileSystem):
return LocalFileSystem(use_mmap=use_mmap)
# TODO handle HDFS?
+ if allow_legacy_filesystem and isinstance(filesystem, LegacyFileSystem):
+ return filesystem
raise TypeError("Unrecognized filesystem: {}".format(type(filesystem)))
+def _resolve_filesystem_and_path(
+ path, filesystem=None, allow_legacy_filesystem=False
+):
+ """
+ Return filesystem/path from path which could be an URI or a plain
+ filesystem path.
+ """
+ if not _is_path_like(path):
+ if filesystem is not None:
+ raise ValueError("filesystem passed but where is file-like, so"
Review comment:
"where"?
##########
File path: python/pyarrow/parquet.py
##########
@@ -35,8 +35,9 @@
FileMetaData, RowGroupMetaData,
ColumnChunkMetaData,
ParquetSchema, ColumnSchema)
-from pyarrow.filesystem import (LocalFileSystem, _ensure_filesystem,
- resolve_filesystem_and_path)
+from pyarrow.filesystem import (
Review comment:
Perhaps `import pyarrow.filesystem as legacyfs` would make the code
easier to read below.
##########
File path: python/pyarrow/parquet.py
##########
@@ -35,8 +35,9 @@
FileMetaData, RowGroupMetaData,
ColumnChunkMetaData,
ParquetSchema, ColumnSchema)
-from pyarrow.filesystem import (LocalFileSystem, _ensure_filesystem,
- resolve_filesystem_and_path)
+from pyarrow.filesystem import (
Review comment:
These are legacy filesystem imports? Do we still need them?
##########
File path: python/pyarrow/tests/test_parquet.py
##########
@@ -3510,6 +3511,75 @@ def
test_parquet_file_pass_directory_instead_of_file(tempdir):
pq.ParquetFile(path)
[email protected]
[email protected]("filesystem", [
+ None,
+ LocalFileSystem.get_instance(),
+ fs.LocalFileSystem(),
+])
+def test_parquet_writer_filesystem_local(tempdir, filesystem):
+ df = _test_dataframe(100)
+ table = pa.Table.from_pandas(df, preserve_index=False)
+ path = str(tempdir / 'data.parquet')
+
+ with pq.ParquetWriter(
+ path, table.schema, filesystem=filesystem, version='2.0'
+ ) as writer:
+ writer.write_table(table)
+
+ result = _read_table(path).to_pandas()
+ tm.assert_frame_equal(result, df)
+
+
[email protected]
+def s3_example_fs(s3_connection, s3_server):
+ from pyarrow.fs import FileSystem
+
+ host, port, access_key, secret_key = s3_connection
+ uri = (
+ "s3://{}:{}@mybucket/data.parquet?scheme=http&endpoint_override={}:{}"
+ .format(access_key, secret_key, host, port)
+ )
+ fs, path = FileSystem.from_uri(uri)
+
+ fs.create_dir("mybucket")
+
+ yield fs, uri, path
+
+
[email protected]
[email protected]
+def test_parquet_writer_filesystem_s3(s3_example_fs):
+ df = _test_dataframe(100)
+ table = pa.Table.from_pandas(df, preserve_index=False)
+
+ fs, uri, path = s3_example_fs
+
+ with pq.ParquetWriter(
+ path, table.schema, filesystem=fs, version='2.0'
+ ) as writer:
+ writer.write_table(table)
+
+ result = _read_table(uri).to_pandas()
+ tm.assert_frame_equal(result, df)
+
+
[email protected]
+def test_parquet_writer_filesystem_buffer_raises():
+ df = _test_dataframe(100)
+ table = pa.Table.from_pandas(df, preserve_index=False)
+ filesystem = fs.LocalFileSystem()
+
+ # Should raise ValueError when filesystem is passed with file-like object
+ with pytest.raises(ValueError) as err_info:
+ pq.ParquetWriter(
+ pa.BufferOutputStream(), table.schema, filesystem=filesystem
+ )
+ expected_msg = ("filesystem passed but where is file-like, so"
Review comment:
This must be lifted out of the `with` block.
##########
File path: python/pyarrow/fs.py
##########
@@ -79,15 +83,44 @@ def _ensure_filesystem(filesystem, use_mmap=False):
return PyFileSystem(FSSpecHandler(filesystem))
# map old filesystems to new ones
- from pyarrow.filesystem import LocalFileSystem as LegacyLocalFileSystem
+ from pyarrow.filesystem import (
+ FileSystem as LegacyFileSystem,
+ LocalFileSystem as LegacyLocalFileSystem
+ )
if isinstance(filesystem, LegacyLocalFileSystem):
return LocalFileSystem(use_mmap=use_mmap)
# TODO handle HDFS?
+ if allow_legacy_filesystem and isinstance(filesystem, LegacyFileSystem):
+ return filesystem
raise TypeError("Unrecognized filesystem: {}".format(type(filesystem)))
+def _resolve_filesystem_and_path(
+ path, filesystem=None, allow_legacy_filesystem=False
+):
+ """
+ Return filesystem/path from path which could be an URI or a plain
+ filesystem path.
+ """
+ if not _is_path_like(path):
+ if filesystem is not None:
+ raise ValueError("filesystem passed but where is file-like, so"
Review comment:
If it's the name of an argument, then put backquotes around it.
##########
File path: python/pyarrow/tests/test_parquet.py
##########
@@ -3510,6 +3511,75 @@ def
test_parquet_file_pass_directory_instead_of_file(tempdir):
pq.ParquetFile(path)
[email protected]
[email protected]("filesystem", [
+ None,
+ LocalFileSystem.get_instance(),
+ fs.LocalFileSystem(),
+])
+def test_parquet_writer_filesystem_local(tempdir, filesystem):
+ df = _test_dataframe(100)
+ table = pa.Table.from_pandas(df, preserve_index=False)
+ path = str(tempdir / 'data.parquet')
+
+ with pq.ParquetWriter(
+ path, table.schema, filesystem=filesystem, version='2.0'
+ ) as writer:
+ writer.write_table(table)
+
+ result = _read_table(path).to_pandas()
+ tm.assert_frame_equal(result, df)
+
+
[email protected]
+def s3_example_fs(s3_connection, s3_server):
+ from pyarrow.fs import FileSystem
+
+ host, port, access_key, secret_key = s3_connection
+ uri = (
+ "s3://{}:{}@mybucket/data.parquet?scheme=http&endpoint_override={}:{}"
+ .format(access_key, secret_key, host, port)
+ )
+ fs, path = FileSystem.from_uri(uri)
+
+ fs.create_dir("mybucket")
+
+ yield fs, uri, path
+
+
[email protected]
[email protected]
+def test_parquet_writer_filesystem_s3(s3_example_fs):
+ df = _test_dataframe(100)
+ table = pa.Table.from_pandas(df, preserve_index=False)
+
+ fs, uri, path = s3_example_fs
+
+ with pq.ParquetWriter(
+ path, table.schema, filesystem=fs, version='2.0'
+ ) as writer:
+ writer.write_table(table)
+
+ result = _read_table(uri).to_pandas()
+ tm.assert_frame_equal(result, df)
+
+
[email protected]
+def test_parquet_writer_filesystem_buffer_raises():
+ df = _test_dataframe(100)
+ table = pa.Table.from_pandas(df, preserve_index=False)
+ filesystem = fs.LocalFileSystem()
+
+ # Should raise ValueError when filesystem is passed with file-like object
+ with pytest.raises(ValueError) as err_info:
+ pq.ParquetWriter(
+ pa.BufferOutputStream(), table.schema, filesystem=filesystem
+ )
+ expected_msg = ("filesystem passed but where is file-like, so"
Review comment:
Note that it may be simpler to use `pytest.raises(ValueError,
match="...")`
##########
File path: python/pyarrow/tests/test_parquet.py
##########
@@ -31,6 +31,7 @@
from pyarrow.tests import util
from pyarrow.util import guid
from pyarrow.filesystem import LocalFileSystem, FileSystem
+from pyarrow import fs
Review comment:
Also `from pyarrow import filesystem as legacyfs`?
##########
File path: python/pyarrow/tests/test_parquet.py
##########
@@ -3510,6 +3511,75 @@ def
test_parquet_file_pass_directory_instead_of_file(tempdir):
pq.ParquetFile(path)
[email protected]
[email protected]("filesystem", [
+ None,
+ LocalFileSystem.get_instance(),
+ fs.LocalFileSystem(),
+])
+def test_parquet_writer_filesystem_local(tempdir, filesystem):
+ df = _test_dataframe(100)
+ table = pa.Table.from_pandas(df, preserve_index=False)
+ path = str(tempdir / 'data.parquet')
+
+ with pq.ParquetWriter(
+ path, table.schema, filesystem=filesystem, version='2.0'
+ ) as writer:
+ writer.write_table(table)
+
+ result = _read_table(path).to_pandas()
+ tm.assert_frame_equal(result, df)
+
+
[email protected]
+def s3_example_fs(s3_connection, s3_server):
+ from pyarrow.fs import FileSystem
+
+ host, port, access_key, secret_key = s3_connection
+ uri = (
+ "s3://{}:{}@mybucket/data.parquet?scheme=http&endpoint_override={}:{}"
+ .format(access_key, secret_key, host, port)
+ )
+ fs, path = FileSystem.from_uri(uri)
+
+ fs.create_dir("mybucket")
+
+ yield fs, uri, path
+
+
[email protected]
[email protected]
+def test_parquet_writer_filesystem_s3(s3_example_fs):
+ df = _test_dataframe(100)
+ table = pa.Table.from_pandas(df, preserve_index=False)
+
+ fs, uri, path = s3_example_fs
+
+ with pq.ParquetWriter(
+ path, table.schema, filesystem=fs, version='2.0'
+ ) as writer:
+ writer.write_table(table)
Review comment:
Should we also test `ParquetWriter(path=uri)`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]