This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new f66e365 Create directories on a local filesystem (#301)
f66e365 is described below
commit f66e3652fdf9720d6c63a6fcec7bcd08d5bb186c
Author: Kevin Liu <[email protected]>
AuthorDate: Mon Jan 29 11:05:30 2024 -0800
Create directories on a local filesystem (#301)
* add a test for writing data to disk
* create dir first
* add fsspec test
* make lint
* add fs tests
---
pyiceberg/io/__init__.py | 2 +-
pyiceberg/io/fsspec.py | 3 ++-
pyiceberg/io/pyarrow.py | 11 ++++++---
tests/catalog/test_sql.py | 62 +++++++++++++++++++++++++++++++++++++++++++++--
tests/io/test_fsspec.py | 23 ++++++++++++++++++
tests/io/test_pyarrow.py | 20 +++++++++++++++
6 files changed, 114 insertions(+), 7 deletions(-)
diff --git a/pyiceberg/io/__init__.py b/pyiceberg/io/__init__.py
index 1b0bc71..f03048c 100644
--- a/pyiceberg/io/__init__.py
+++ b/pyiceberg/io/__init__.py
@@ -275,7 +275,7 @@ SCHEMA_TO_FILE_IO: Dict[str, List[str]] = {
"s3a": [ARROW_FILE_IO, FSSPEC_FILE_IO],
"s3n": [ARROW_FILE_IO, FSSPEC_FILE_IO],
"gs": [ARROW_FILE_IO],
- "file": [ARROW_FILE_IO],
+ "file": [ARROW_FILE_IO, FSSPEC_FILE_IO],
"hdfs": [ARROW_FILE_IO],
"abfs": [FSSPEC_FILE_IO],
"abfss": [FSSPEC_FILE_IO],
diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py
index 6887007..957cac6 100644
--- a/pyiceberg/io/fsspec.py
+++ b/pyiceberg/io/fsspec.py
@@ -99,7 +99,7 @@ SIGNERS: Dict[str, Callable[[Properties, AWSRequest],
AWSRequest]] = {"S3V4RestS
def _file(_: Properties) -> LocalFileSystem:
- return LocalFileSystem()
+ return LocalFileSystem(auto_mkdir=True)
def _s3(properties: Properties) -> AbstractFileSystem:
@@ -173,6 +173,7 @@ def _adlfs(properties: Properties) -> AbstractFileSystem:
SCHEME_TO_FS = {
+ "": _file,
"file": _file,
"s3": _s3,
"s3a": _s3,
diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index cbfb9f6..1d7dcbe 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -173,6 +173,13 @@ DOC = "doc"
T = TypeVar("T")
+class PyArrowLocalFileSystem(pyarrow.fs.LocalFileSystem):
+ def open_output_stream(self, path: str, *args: Any, **kwargs: Any) ->
pyarrow.NativeFile:
+ # In LocalFileSystem, parent directories must be first created before
opening an output stream
+ self.create_dir(os.path.dirname(path), recursive=True)
+ return super().open_output_stream(path, *args, **kwargs)
+
+
class PyArrowFile(InputFile, OutputFile):
"""A combined InputFile and OutputFile implementation that uses a pyarrow
filesystem to generate pyarrow.lib.NativeFile instances.
@@ -379,9 +386,7 @@ class PyArrowFileIO(FileIO):
return GcsFileSystem(**gcs_kwargs)
elif scheme == "file":
- from pyarrow.fs import LocalFileSystem
-
- return LocalFileSystem()
+ return PyArrowLocalFileSystem()
else:
raise ValueError(f"Unrecognized filesystem type in URI: {scheme}")
diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py
index 217ea8f..9dbcf8f 100644
--- a/tests/catalog/test_sql.py
+++ b/tests/catalog/test_sql.py
@@ -19,6 +19,7 @@ import os
from pathlib import Path
from typing import Generator, List
+import pyarrow as pa
import pytest
from pytest import TempPathFactory
from pytest_lazyfixture import lazy_fixture
@@ -35,7 +36,10 @@ from pyiceberg.exceptions import (
NoSuchTableError,
TableAlreadyExistsError,
)
+from pyiceberg.io import FSSPEC_FILE_IO, PY_IO_IMPL
+from pyiceberg.io.pyarrow import schema_to_pyarrow
from pyiceberg.schema import Schema
+from pyiceberg.table.snapshots import Operation
from pyiceberg.table.sorting import (
NullOrder,
SortDirection,
@@ -80,7 +84,7 @@ def catalog_memory(warehouse: Path) -> Generator[SqlCatalog,
None, None]:
@pytest.fixture(scope="module")
def catalog_sqlite(warehouse: Path) -> Generator[SqlCatalog, None, None]:
props = {
- "uri": "sqlite:////tmp/sql-catalog.db",
+ "uri": f"sqlite:////{warehouse}/sql-catalog.db",
"warehouse": f"file://{warehouse}",
}
catalog = SqlCatalog("test_sql_catalog", **props)
@@ -92,7 +96,7 @@ def catalog_sqlite(warehouse: Path) -> Generator[SqlCatalog,
None, None]:
@pytest.fixture(scope="module")
def catalog_sqlite_without_rowcount(warehouse: Path) -> Generator[SqlCatalog,
None, None]:
props = {
- "uri": "sqlite:////tmp/sql-catalog.db",
+ "uri": f"sqlite:////{warehouse}/sql-catalog.db",
"warehouse": f"file://{warehouse}",
}
catalog = SqlCatalog("test_sql_catalog", **props)
@@ -102,6 +106,19 @@ def catalog_sqlite_without_rowcount(warehouse: Path) ->
Generator[SqlCatalog, No
catalog.destroy_tables()
[email protected](scope="module")
+def catalog_sqlite_fsspec(warehouse: Path) -> Generator[SqlCatalog, None,
None]:
+ props = {
+ "uri": f"sqlite:////{warehouse}/sql-catalog.db",
+ "warehouse": f"file://{warehouse}",
+ PY_IO_IMPL: FSSPEC_FILE_IO,
+ }
+ catalog = SqlCatalog("test_sql_catalog", **props)
+ catalog.create_tables()
+ yield catalog
+ catalog.destroy_tables()
+
+
def test_creation_with_no_uri() -> None:
with pytest.raises(NoSuchPropertyException):
SqlCatalog("test_ddb_catalog", not_uri="unused")
@@ -722,6 +739,47 @@ def test_commit_table(catalog: SqlCatalog,
table_schema_nested: Schema, random_i
assert new_schema.find_field("b").field_type == IntegerType()
[email protected](
+ 'catalog',
+ [
+ lazy_fixture('catalog_memory'),
+ lazy_fixture('catalog_sqlite'),
+ lazy_fixture('catalog_sqlite_without_rowcount'),
+ lazy_fixture('catalog_sqlite_fsspec'),
+ ],
+)
+def test_append_table(catalog: SqlCatalog, table_schema_simple: Schema,
random_identifier: Identifier) -> None:
+ database_name, _table_name = random_identifier
+ catalog.create_namespace(database_name)
+ table = catalog.create_table(random_identifier, table_schema_simple)
+
+ df = pa.Table.from_pydict(
+ {
+ "foo": ["a"],
+ "bar": [1],
+ "baz": [True],
+ },
+ schema=schema_to_pyarrow(table_schema_simple),
+ )
+
+ table.append(df)
+
+ # new snapshot is written in APPEND mode
+ assert len(table.metadata.snapshots) == 1
+ assert table.metadata.snapshots[0].snapshot_id ==
table.metadata.current_snapshot_id
+ assert table.metadata.snapshots[0].parent_snapshot_id is None
+ assert table.metadata.snapshots[0].sequence_number == 1
+ assert table.metadata.snapshots[0].summary is not None
+ assert table.metadata.snapshots[0].summary.operation == Operation.APPEND
+ assert table.metadata.snapshots[0].summary['added-data-files'] == '1'
+ assert table.metadata.snapshots[0].summary['added-records'] == '1'
+ assert table.metadata.snapshots[0].summary['total-data-files'] == '1'
+ assert table.metadata.snapshots[0].summary['total-records'] == '1'
+
+ # read back the data
+ assert df == table.scan().to_arrow()
+
+
@pytest.mark.parametrize(
'catalog',
[
diff --git a/tests/io/test_fsspec.py b/tests/io/test_fsspec.py
index f83268b..9f04445 100644
--- a/tests/io/test_fsspec.py
+++ b/tests/io/test_fsspec.py
@@ -15,10 +15,13 @@
# specific language governing permissions and limitations
# under the License.
+import os
+import tempfile
import uuid
import pytest
from botocore.awsrequest import AWSRequest
+from fsspec.implementations.local import LocalFileSystem
from requests_mock import Mocker
from pyiceberg.exceptions import SignError
@@ -27,6 +30,26 @@ from pyiceberg.io.fsspec import FsspecFileIO,
s3v4_rest_signer
from pyiceberg.io.pyarrow import PyArrowFileIO
+def test_fsspec_infer_local_fs_from_path(fsspec_fileio: FsspecFileIO) -> None:
+ """Test path with `file` scheme and no scheme both use LocalFileSystem"""
+ assert isinstance(fsspec_fileio.new_output("file://tmp/warehouse")._fs,
LocalFileSystem)
+ assert isinstance(fsspec_fileio.new_output("/tmp/warehouse")._fs,
LocalFileSystem)
+
+
+def test_fsspec_local_fs_can_create_path_without_parent_dir(fsspec_fileio:
FsspecFileIO) -> None:
+ """Test LocalFileSystem can create path without first creating the parent
directories"""
+ with tempfile.TemporaryDirectory() as tmpdirname:
+ file_path = f"{tmpdirname}/foo/bar/baz.txt"
+ output_file = fsspec_fileio.new_output(file_path)
+ parent_path = os.path.dirname(file_path)
+ assert output_file._fs.exists(parent_path) is False
+ try:
+ with output_file.create() as f:
+ f.write(b"foo")
+ except Exception:
+ pytest.fail("Failed to write to file without parent directory")
+
+
@pytest.mark.s3
def test_fsspec_new_input_file(fsspec_fileio: FsspecFileIO) -> None:
"""Test creating a new input file from a fsspec file-io"""
diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py
index e6f4de3..0628ed4 100644
--- a/tests/io/test_pyarrow.py
+++ b/tests/io/test_pyarrow.py
@@ -92,6 +92,26 @@ from pyiceberg.types import (
)
+def test_pyarrow_infer_local_fs_from_path() -> None:
+ """Test path with `file` scheme and no scheme both use LocalFileSystem"""
+ assert
isinstance(PyArrowFileIO().new_output("file://tmp/warehouse")._filesystem,
LocalFileSystem)
+ assert
isinstance(PyArrowFileIO().new_output("/tmp/warehouse")._filesystem,
LocalFileSystem)
+
+
+def test_pyarrow_local_fs_can_create_path_without_parent_dir() -> None:
+ """Test LocalFileSystem can create path without first creating the parent
directories"""
+ with tempfile.TemporaryDirectory() as tmpdirname:
+ file_path = f"{tmpdirname}/foo/bar/baz.txt"
+ output_file = PyArrowFileIO().new_output(file_path)
+ parent_path = os.path.dirname(file_path)
+ assert output_file._filesystem.get_file_info(parent_path).type ==
FileType.NotFound
+ try:
+ with output_file.create() as f:
+ f.write(b"foo")
+ except Exception:
+ pytest.fail("Failed to write to file without parent directory")
+
+
def test_pyarrow_input_file() -> None:
"""Test reading a file using PyArrowFile"""