larssk opened a new pull request, #7769:
URL: https://github.com/apache/paimon/pull/7769
### Purpose
Use Google Cloud Storage (GCS) as warehouse / object storage for pypaimon
(without java dependencies). This PR adds PyArrowFileIO handling for GCS.
### Changes
#### 1. Scheme dispatch (`pyarrow_file_io.py`)
Added a `gs` branch in `__init__` alongside the existing `s3` and `hdfs`
branches:
```python
elif scheme == "gs":
self.filesystem = self._initialize_gcs_fs()
```
#### 2. `_initialize_gcs_fs()` method (`pyarrow_file_io.py`)
Uses `pyarrow.fs.GcsFileSystem`, which is already available via
`pyarrow[gcs]`.
With no arguments, `GcsFileSystem` picks up credentials automatically via
Application Default Credentials (ADC): `GOOGLE_APPLICATION_CREDENTIALS`, the
GCP metadata server, or Workload Identity on GKE/GCE. Three optional properties
allow explicit credential passing:
| Property | Description |
|---|---|
| `gcs.access-token` | OAuth2 access token. If unset, ADC is used. |
| `gcs.access-token.expiration` | ISO 8601 expiry datetime for the token. |
| `gcs.project-id` | GCP project ID for billing/quota. |
```python
def _initialize_gcs_fs(self) -> FileSystem:
access_token = self._get_property("gcs.access-token")
token_expiry = self._get_property("gcs.access-token.expiration")
project_id = self._get_property("gcs.project-id")
kwargs = {}
if access_token:
from datetime import datetime
kwargs["access_token"] = access_token
kwargs["credential_token_expiration"] = (
datetime.fromisoformat(token_expiry) if token_expiry
else datetime(9999, 12, 31)
)
if project_id:
kwargs["project_id"] = project_id
# With no kwargs, GcsFileSystem uses ADC automatically
return pafs.GcsFileSystem(**kwargs)
```
#### 3. Path fix in `to_filesystem_path()` (`pyarrow_file_io.py`)
`to_filesystem_path()` had a catch-all for non-S3/non-HDFS schemes that
returned only
`uri.path` — e.g. for `gs://my-bucket/data/table` it returned `/data/table`,
stripping the
bucket name. `GcsFileSystem` (like `S3FileSystem`) expects paths in
`bucket/key` form without a leading slash.
Added a `GcsFileSystem` branch mirroring the existing S3 logic:
```python
from pyarrow.fs import GcsFileSystem
if isinstance(self.filesystem, GcsFileSystem):
if parsed.scheme and parsed.netloc:
path_part = normalized_path.lstrip('/')
return f"{parsed.netloc}/{path_part}" if path_part else parsed.netloc
return str(path)
```
#### 4. `GcsOptions` class (`config.py`)
Added a `GcsOptions` class alongside the existing `S3Options` documenting
the three new
properties.
```
class GcsOptions:
GCS_ACCESS_TOKEN =
ConfigOptions.key("gcs.access-token").string_type().no_default_value().with_description(
"GCS access token. If not set, ADC (Application Default Credentials)
is used automatically.")
...
```
### Linked issue
https://github.com/apache/paimon/issues/7768
### Tests
#### Unit tests — `pypaimon/tests/file_io_test.py`
A new `GCSFileIOPathTest` class is added to the existing `file_io_test.py`,
following the same
pattern as the existing S3/OSS path conversion tests. These tests require no
GCS credentials.
| Test | What it checks |
|---|---|
| `test_gcs_filesystem_type` | `PyArrowFileIO("gs://...")` produces a
`pafs.GcsFileSystem` instance |
| `test_gcs_path_conversion` | `gs://bucket/key` maps to `bucket/key`
(bucket prepended, no leading slash) |
| `test_gcs_path_bucket_only` | `gs://bucket` with no path component maps to
`bucket` |
| `test_gcs_path_normalization` | Consecutive slashes in the path are
collapsed (e.g. `gs://bucket///a///b` → `bucket/a/b`) |
| `test_gcs_path_idempotency` | Already-converted `bucket/key` paths pass
through unchanged |
| `test_gcs_path_no_leading_slash` | `to_filesystem_path` never returns a
path starting with `/` for any GCS URI |
```bash
python -m pytest pypaimon/tests/file_io_test.py::GCSFileIOPathTest -v
```
#### Integration tests — `pypaimon/tests/gcs_file_io_test.py`
A new `GCSFileIOTest` class is added following the pattern of
`oss_file_io_test.py`. All tests
are skipped automatically when `GCS_TEST_BUCKET` is not set. Credentials are
picked up via ADC — no explicit credential properties are required.
| Test | What it checks |
|---|---|
| `test_gcs_filesystem_type` | Live `PyArrowFileIO` with `gs://` uses
`GcsFileSystem` |
| `test_exists` | `exists()` returns `False` for non-existent paths;
`get_file_status()` raises `FileNotFoundError` |
| `test_write_and_read_file` | `write_file()` / `read_file_utf8()`
round-trip |
| `test_write_file_overwrite` | `write_file(..., overwrite=False)` raises
`FileExistsError`; `overwrite=True` replaces content |
| `test_new_input_stream_read` | `new_output_stream()` /
`new_input_stream()` binary round-trip; `FileNotFoundError` for missing file |
| `test_get_file_status_directory` | `get_file_status()` returns
`FileType.Directory` for a directory |
| `test_get_file_status_file` | `get_file_status()` returns `FileType.File`
with non-None size |
| `test_delete_returns_false_when_not_exists` | `delete()` returns `False`
for non-existent file/directory |
| `test_delete_non_empty_directory_raises_error` | `delete(...,
recursive=False)` raises `OSError` for non-empty directory |
| `test_rename_returns_false_when_dst_exists` | `rename()` returns `False`
when destination already exists |
| `test_copy_file` | `copy_file(..., overwrite=False)` raises
`FileExistsError`; `overwrite=True` replaces content |
| `test_try_to_write_atomic` | `try_to_write_atomic()` writes content and
returns `True` on success |
| `test_mkdirs_raises_error_when_path_is_file` | `mkdirs()` raises
`FileExistsError` when path is an existing file |
```bash
# Skips all tests (no bucket configured)
python -m pytest pypaimon/tests/gcs_file_io_test.py -v
# Runs all tests against a real GCS bucket using ADC
GCS_TEST_BUCKET=my-bucket python -m pytest
pypaimon/tests/gcs_file_io_test.py -v
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]