This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new f66e365  Create directories on a local filesystem (#301)
f66e365 is described below

commit f66e3652fdf9720d6c63a6fcec7bcd08d5bb186c
Author: Kevin Liu <[email protected]>
AuthorDate: Mon Jan 29 11:05:30 2024 -0800

    Create directories on a local filesystem (#301)
    
    * add a test for writing data to disk
    
    * create dir first
    
    * add fsspec test
    
    * make lint
    
    * add fs tests
---
 pyiceberg/io/__init__.py  |  2 +-
 pyiceberg/io/fsspec.py    |  3 ++-
 pyiceberg/io/pyarrow.py   | 11 ++++++---
 tests/catalog/test_sql.py | 62 +++++++++++++++++++++++++++++++++++++++++++++--
 tests/io/test_fsspec.py   | 23 ++++++++++++++++++
 tests/io/test_pyarrow.py  | 20 +++++++++++++++
 6 files changed, 114 insertions(+), 7 deletions(-)

diff --git a/pyiceberg/io/__init__.py b/pyiceberg/io/__init__.py
index 1b0bc71..f03048c 100644
--- a/pyiceberg/io/__init__.py
+++ b/pyiceberg/io/__init__.py
@@ -275,7 +275,7 @@ SCHEMA_TO_FILE_IO: Dict[str, List[str]] = {
     "s3a": [ARROW_FILE_IO, FSSPEC_FILE_IO],
     "s3n": [ARROW_FILE_IO, FSSPEC_FILE_IO],
     "gs": [ARROW_FILE_IO],
-    "file": [ARROW_FILE_IO],
+    "file": [ARROW_FILE_IO, FSSPEC_FILE_IO],
     "hdfs": [ARROW_FILE_IO],
     "abfs": [FSSPEC_FILE_IO],
     "abfss": [FSSPEC_FILE_IO],
diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py
index 6887007..957cac6 100644
--- a/pyiceberg/io/fsspec.py
+++ b/pyiceberg/io/fsspec.py
@@ -99,7 +99,7 @@ SIGNERS: Dict[str, Callable[[Properties, AWSRequest], 
AWSRequest]] = {"S3V4RestS
 
 
 def _file(_: Properties) -> LocalFileSystem:
-    return LocalFileSystem()
+    return LocalFileSystem(auto_mkdir=True)
 
 
 def _s3(properties: Properties) -> AbstractFileSystem:
@@ -173,6 +173,7 @@ def _adlfs(properties: Properties) -> AbstractFileSystem:
 
 
 SCHEME_TO_FS = {
+    "": _file,
     "file": _file,
     "s3": _s3,
     "s3a": _s3,
diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index cbfb9f6..1d7dcbe 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -173,6 +173,13 @@ DOC = "doc"
 T = TypeVar("T")
 
 
+class PyArrowLocalFileSystem(pyarrow.fs.LocalFileSystem):
+    def open_output_stream(self, path: str, *args: Any, **kwargs: Any) -> 
pyarrow.NativeFile:
+        # In LocalFileSystem, parent directories must be first created before 
opening an output stream
+        self.create_dir(os.path.dirname(path), recursive=True)
+        return super().open_output_stream(path, *args, **kwargs)
+
+
 class PyArrowFile(InputFile, OutputFile):
     """A combined InputFile and OutputFile implementation that uses a pyarrow 
filesystem to generate pyarrow.lib.NativeFile instances.
 
@@ -379,9 +386,7 @@ class PyArrowFileIO(FileIO):
 
             return GcsFileSystem(**gcs_kwargs)
         elif scheme == "file":
-            from pyarrow.fs import LocalFileSystem
-
-            return LocalFileSystem()
+            return PyArrowLocalFileSystem()
         else:
             raise ValueError(f"Unrecognized filesystem type in URI: {scheme}")
 
diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py
index 217ea8f..9dbcf8f 100644
--- a/tests/catalog/test_sql.py
+++ b/tests/catalog/test_sql.py
@@ -19,6 +19,7 @@ import os
 from pathlib import Path
 from typing import Generator, List
 
+import pyarrow as pa
 import pytest
 from pytest import TempPathFactory
 from pytest_lazyfixture import lazy_fixture
@@ -35,7 +36,10 @@ from pyiceberg.exceptions import (
     NoSuchTableError,
     TableAlreadyExistsError,
 )
+from pyiceberg.io import FSSPEC_FILE_IO, PY_IO_IMPL
+from pyiceberg.io.pyarrow import schema_to_pyarrow
 from pyiceberg.schema import Schema
+from pyiceberg.table.snapshots import Operation
 from pyiceberg.table.sorting import (
     NullOrder,
     SortDirection,
@@ -80,7 +84,7 @@ def catalog_memory(warehouse: Path) -> Generator[SqlCatalog, 
None, None]:
 @pytest.fixture(scope="module")
 def catalog_sqlite(warehouse: Path) -> Generator[SqlCatalog, None, None]:
     props = {
-        "uri": "sqlite:////tmp/sql-catalog.db",
+        "uri": f"sqlite:////{warehouse}/sql-catalog.db",
         "warehouse": f"file://{warehouse}",
     }
     catalog = SqlCatalog("test_sql_catalog", **props)
@@ -92,7 +96,7 @@ def catalog_sqlite(warehouse: Path) -> Generator[SqlCatalog, 
None, None]:
 @pytest.fixture(scope="module")
 def catalog_sqlite_without_rowcount(warehouse: Path) -> Generator[SqlCatalog, 
None, None]:
     props = {
-        "uri": "sqlite:////tmp/sql-catalog.db",
+        "uri": f"sqlite:////{warehouse}/sql-catalog.db",
         "warehouse": f"file://{warehouse}",
     }
     catalog = SqlCatalog("test_sql_catalog", **props)
@@ -102,6 +106,19 @@ def catalog_sqlite_without_rowcount(warehouse: Path) -> 
Generator[SqlCatalog, No
     catalog.destroy_tables()
 
 
[email protected](scope="module")
+def catalog_sqlite_fsspec(warehouse: Path) -> Generator[SqlCatalog, None, 
None]:
+    props = {
+        "uri": f"sqlite:////{warehouse}/sql-catalog.db",
+        "warehouse": f"file://{warehouse}",
+        PY_IO_IMPL: FSSPEC_FILE_IO,
+    }
+    catalog = SqlCatalog("test_sql_catalog", **props)
+    catalog.create_tables()
+    yield catalog
+    catalog.destroy_tables()
+
+
 def test_creation_with_no_uri() -> None:
     with pytest.raises(NoSuchPropertyException):
         SqlCatalog("test_ddb_catalog", not_uri="unused")
@@ -722,6 +739,47 @@ def test_commit_table(catalog: SqlCatalog, 
table_schema_nested: Schema, random_i
     assert new_schema.find_field("b").field_type == IntegerType()
 
 
[email protected](
+    'catalog',
+    [
+        lazy_fixture('catalog_memory'),
+        lazy_fixture('catalog_sqlite'),
+        lazy_fixture('catalog_sqlite_without_rowcount'),
+        lazy_fixture('catalog_sqlite_fsspec'),
+    ],
+)
+def test_append_table(catalog: SqlCatalog, table_schema_simple: Schema, 
random_identifier: Identifier) -> None:
+    database_name, _table_name = random_identifier
+    catalog.create_namespace(database_name)
+    table = catalog.create_table(random_identifier, table_schema_simple)
+
+    df = pa.Table.from_pydict(
+        {
+            "foo": ["a"],
+            "bar": [1],
+            "baz": [True],
+        },
+        schema=schema_to_pyarrow(table_schema_simple),
+    )
+
+    table.append(df)
+
+    # new snapshot is written in APPEND mode
+    assert len(table.metadata.snapshots) == 1
+    assert table.metadata.snapshots[0].snapshot_id == 
table.metadata.current_snapshot_id
+    assert table.metadata.snapshots[0].parent_snapshot_id is None
+    assert table.metadata.snapshots[0].sequence_number == 1
+    assert table.metadata.snapshots[0].summary is not None
+    assert table.metadata.snapshots[0].summary.operation == Operation.APPEND
+    assert table.metadata.snapshots[0].summary['added-data-files'] == '1'
+    assert table.metadata.snapshots[0].summary['added-records'] == '1'
+    assert table.metadata.snapshots[0].summary['total-data-files'] == '1'
+    assert table.metadata.snapshots[0].summary['total-records'] == '1'
+
+    # read back the data
+    assert df == table.scan().to_arrow()
+
+
 @pytest.mark.parametrize(
     'catalog',
     [
diff --git a/tests/io/test_fsspec.py b/tests/io/test_fsspec.py
index f83268b..9f04445 100644
--- a/tests/io/test_fsspec.py
+++ b/tests/io/test_fsspec.py
@@ -15,10 +15,13 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import os
+import tempfile
 import uuid
 
 import pytest
 from botocore.awsrequest import AWSRequest
+from fsspec.implementations.local import LocalFileSystem
 from requests_mock import Mocker
 
 from pyiceberg.exceptions import SignError
@@ -27,6 +30,26 @@ from pyiceberg.io.fsspec import FsspecFileIO, 
s3v4_rest_signer
 from pyiceberg.io.pyarrow import PyArrowFileIO
 
 
+def test_fsspec_infer_local_fs_from_path(fsspec_fileio: FsspecFileIO) -> None:
+    """Test path with `file` scheme and no scheme both use LocalFileSystem"""
+    assert isinstance(fsspec_fileio.new_output("file://tmp/warehouse")._fs, 
LocalFileSystem)
+    assert isinstance(fsspec_fileio.new_output("/tmp/warehouse")._fs, 
LocalFileSystem)
+
+
+def test_fsspec_local_fs_can_create_path_without_parent_dir(fsspec_fileio: 
FsspecFileIO) -> None:
+    """Test LocalFileSystem can create path without first creating the parent 
directories"""
+    with tempfile.TemporaryDirectory() as tmpdirname:
+        file_path = f"{tmpdirname}/foo/bar/baz.txt"
+        output_file = fsspec_fileio.new_output(file_path)
+        parent_path = os.path.dirname(file_path)
+        assert output_file._fs.exists(parent_path) is False
+        try:
+            with output_file.create() as f:
+                f.write(b"foo")
+        except Exception:
+            pytest.fail("Failed to write to file without parent directory")
+
+
 @pytest.mark.s3
 def test_fsspec_new_input_file(fsspec_fileio: FsspecFileIO) -> None:
     """Test creating a new input file from a fsspec file-io"""
diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py
index e6f4de3..0628ed4 100644
--- a/tests/io/test_pyarrow.py
+++ b/tests/io/test_pyarrow.py
@@ -92,6 +92,26 @@ from pyiceberg.types import (
 )
 
 
+def test_pyarrow_infer_local_fs_from_path() -> None:
+    """Test path with `file` scheme and no scheme both use LocalFileSystem"""
+    assert 
isinstance(PyArrowFileIO().new_output("file://tmp/warehouse")._filesystem, 
LocalFileSystem)
+    assert 
isinstance(PyArrowFileIO().new_output("/tmp/warehouse")._filesystem, 
LocalFileSystem)
+
+
+def test_pyarrow_local_fs_can_create_path_without_parent_dir() -> None:
+    """Test LocalFileSystem can create path without first creating the parent 
directories"""
+    with tempfile.TemporaryDirectory() as tmpdirname:
+        file_path = f"{tmpdirname}/foo/bar/baz.txt"
+        output_file = PyArrowFileIO().new_output(file_path)
+        parent_path = os.path.dirname(file_path)
+        assert output_file._filesystem.get_file_info(parent_path).type == 
FileType.NotFound
+        try:
+            with output_file.create() as f:
+                f.write(b"foo")
+        except Exception:
+            pytest.fail("Failed to write to file without parent directory")
+
+
 def test_pyarrow_input_file() -> None:
     """Test reading a file using PyArrowFile"""
 

Reply via email to