larssk opened a new pull request, #7769:
URL: https://github.com/apache/paimon/pull/7769

   ### Purpose
   
   Use Google Cloud Storage (GCS) as warehouse / object storage for pypaimon 
(without java dependencies). This PR adds PyArrowFileIO handling for GCS.
   
   ### Changes
   
   #### 1. Scheme dispatch (`pyarrow_file_io.py`)
   
   Added a `gs` branch in `__init__` alongside the existing `s3` and `hdfs` 
branches:
   
   ```python
   elif scheme == "gs":
       self.filesystem = self._initialize_gcs_fs()
   ```
   
   #### 2. `_initialize_gcs_fs()` method (`pyarrow_file_io.py`)
   
   Uses `pyarrow.fs.GcsFileSystem`, which is already available via 
`pyarrow[gcs]`.
   
   With no arguments, `GcsFileSystem` picks up credentials automatically via 
Application Default Credentials (ADC): `GOOGLE_APPLICATION_CREDENTIALS`, the 
GCP metadata server, or Workload Identity on GKE/GCE. Three optional properties 
allow explicit credential passing:
   
   | Property | Description |
   |---|---|
   | `gcs.access-token` | OAuth2 access token. If unset, ADC is used. |
   | `gcs.access-token.expiration` | ISO 8601 expiry datetime for the token. |
   | `gcs.project-id` | GCP project ID for billing/quota. |
   
   ```python
   def _initialize_gcs_fs(self) -> FileSystem:
       access_token = self._get_property("gcs.access-token")
       token_expiry = self._get_property("gcs.access-token.expiration")
       project_id = self._get_property("gcs.project-id")
   
       kwargs = {}
       if access_token:
           from datetime import datetime
           kwargs["access_token"] = access_token
           kwargs["credential_token_expiration"] = (
               datetime.fromisoformat(token_expiry) if token_expiry
               else datetime(9999, 12, 31)
           )
       if project_id:
           kwargs["project_id"] = project_id
   
       # With no kwargs, GcsFileSystem uses ADC automatically
       return pafs.GcsFileSystem(**kwargs)
   ```
   
   #### 3. Path fix in `to_filesystem_path()` (`pyarrow_file_io.py`)
   
   `to_filesystem_path()` had a catch-all for non-S3/non-HDFS schemes that 
returned only
   `uri.path` — e.g. for `gs://my-bucket/data/table` it returned `/data/table`, 
stripping the
   bucket name. `GcsFileSystem` (like `S3FileSystem`) expects paths in 
`bucket/key` form without a leading slash.
   
   Added a `GcsFileSystem` branch mirroring the existing S3 logic:
   
   ```python
   from pyarrow.fs import GcsFileSystem
   if isinstance(self.filesystem, GcsFileSystem):
       if parsed.scheme and parsed.netloc:
           path_part = normalized_path.lstrip('/')
           return f"{parsed.netloc}/{path_part}" if path_part else parsed.netloc
       return str(path)
   ```
   
   #### 4. `GcsOptions` class (`config.py`)
   
   Added a `GcsOptions` class alongside the existing `S3Options` documenting 
the three new
   properties.
   
   ```
   class GcsOptions:
       GCS_ACCESS_TOKEN = 
ConfigOptions.key("gcs.access-token").string_type().no_default_value().with_description(
           "GCS access token. If not set, ADC (Application Default Credentials) 
is used automatically.")
       ...
   ```
   
   ### Linked issue
   
   https://github.com/apache/paimon/issues/7768
   
   ### Tests
   
   #### Unit tests — `pypaimon/tests/file_io_test.py`
   
   A new `GCSFileIOPathTest` class is added to the existing `file_io_test.py`, 
following the same
   pattern as the existing S3/OSS path conversion tests. These tests require no 
GCS credentials.
   
   | Test | What it checks |
   |---|---|
   | `test_gcs_filesystem_type` | `PyArrowFileIO("gs://...")` produces a 
`pafs.GcsFileSystem` instance |
   | `test_gcs_path_conversion` | `gs://bucket/key` maps to `bucket/key` 
(bucket prepended, no leading slash) |
   | `test_gcs_path_bucket_only` | `gs://bucket` with no path component maps to 
`bucket` |
   | `test_gcs_path_normalization` | Consecutive slashes in the path are 
collapsed (e.g. `gs://bucket///a///b` → `bucket/a/b`) |
   | `test_gcs_path_idempotency` | Already-converted `bucket/key` paths pass 
through unchanged |
   | `test_gcs_path_no_leading_slash` | `to_filesystem_path` never returns a 
path starting with `/` for any GCS URI |
   
   ```bash
   python -m pytest pypaimon/tests/file_io_test.py::GCSFileIOPathTest -v
   ```
   
   #### Integration tests — `pypaimon/tests/gcs_file_io_test.py`
   
   A new `GCSFileIOTest` class is added following the pattern of 
`oss_file_io_test.py`. All tests
   are skipped automatically when `GCS_TEST_BUCKET` is not set. Credentials are 
picked up via ADC — no explicit credential properties are required.
   
   | Test | What it checks |
   |---|---|
   | `test_gcs_filesystem_type` | Live `PyArrowFileIO` with `gs://` uses 
`GcsFileSystem` |
   | `test_exists` | `exists()` returns `False` for non-existent paths; 
`get_file_status()` raises `FileNotFoundError` |
   | `test_write_and_read_file` | `write_file()` / `read_file_utf8()` 
round-trip |
   | `test_write_file_overwrite` | `write_file(..., overwrite=False)` raises 
`FileExistsError`; `overwrite=True` replaces content |
   | `test_new_input_stream_read` | `new_output_stream()` / 
`new_input_stream()` binary round-trip; `FileNotFoundError` for missing file |
   | `test_get_file_status_directory` | `get_file_status()` returns 
`FileType.Directory` for a directory |
   | `test_get_file_status_file` | `get_file_status()` returns `FileType.File` 
with non-None size |
   | `test_delete_returns_false_when_not_exists` | `delete()` returns `False` 
for non-existent file/directory |
   | `test_delete_non_empty_directory_raises_error` | `delete(..., 
recursive=False)` raises `OSError` for non-empty directory |
   | `test_rename_returns_false_when_dst_exists` | `rename()` returns `False` 
when destination already exists |
   | `test_copy_file` | `copy_file(..., overwrite=False)` raises 
`FileExistsError`; `overwrite=True` replaces content |
   | `test_try_to_write_atomic` | `try_to_write_atomic()` writes content and 
returns `True` on success |
   | `test_mkdirs_raises_error_when_path_is_file` | `mkdirs()` raises 
`FileExistsError` when path is an existing file |
   
   ```bash
   # Skips all tests (no bucket configured)
   python -m pytest pypaimon/tests/gcs_file_io_test.py -v
   
   # Runs all tests against a real GCS bucket using ADC
   GCS_TEST_BUCKET=my-bucket python -m pytest 
pypaimon/tests/gcs_file_io_test.py -v
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to