This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5d290f0da2 [python] Use pypaimon with Google Cloud Storage (#7769)
5d290f0da2 is described below
commit 5d290f0da27a7bf7d4b4e761f8761e17e002c094
Author: Lars Skjærven <[email protected]>
AuthorDate: Sat May 23 17:00:58 2026 +0200
[python] Use pypaimon with Google Cloud Storage (#7769)
Use Google Cloud Storage (GCS) as warehouse / object storage for
pypaimon (without java dependencies). This PR adds PyArrowFileIO
handling for GCS.
---
paimon-python/pypaimon/common/options/config.py | 16 ++
.../pypaimon/filesystem/pyarrow_file_io.py | 29 ++++
paimon-python/pypaimon/tests/file_io_test.py | 57 +++++++
paimon-python/pypaimon/tests/gcs_file_io_test.py | 184 +++++++++++++++++++++
4 files changed, 286 insertions(+)
diff --git a/paimon-python/pypaimon/common/options/config.py
b/paimon-python/pypaimon/common/options/config.py
index 8112715239..465671725d 100644
--- a/paimon-python/pypaimon/common/options/config.py
+++ b/paimon-python/pypaimon/common/options/config.py
@@ -43,6 +43,22 @@ class S3Options:
S3_REGION =
ConfigOptions.key("fs.s3.region").string_type().no_default_value().with_description("S3
region")
+class GcsOptions:
+ GCS_ACCESS_TOKEN = (
+ ConfigOptions.key("gcs.access-token").string_type().no_default_value()
+ .with_description(
+ "GCS access token. If not set, ADC (Application Default
Credentials) is used "
+ "automatically."))
+ GCS_ACCESS_TOKEN_EXPIRATION = (
+
ConfigOptions.key("gcs.access-token.expiration").string_type().no_default_value()
+ .with_description(
+ "ISO 8601 expiration datetime for the GCS access token. "
+ "Required when gcs.access-token is set."))
+ GCS_PROJECT_ID = (
+ ConfigOptions.key("gcs.project-id").string_type().no_default_value()
+ .with_description("GCP project ID for GCS requests."))
+
+
class PVFSOptions:
CACHE_ENABLED =
ConfigOptions.key("cache-enabled").boolean_type().default_value("true").with_description(
"Enable cache")
diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
index d81238c89f..4ee75f464d 100644
--- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
+++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
@@ -78,6 +78,8 @@ class PyArrowFileIO(FileIO):
self.filesystem = self._initialize_s3_fs()
elif scheme in {"hdfs", "viewfs"}:
self.filesystem = self._initialize_hdfs_fs(scheme, netloc)
+ elif scheme == "gs":
+ self.filesystem = self._initialize_gcs_fs()
else:
raise ValueError(f"Unrecognized filesystem type in URI: {scheme}")
@@ -301,6 +303,26 @@ class PyArrowFileIO(FileIO):
user=os.environ.get('HADOOP_USER_NAME', 'hadoop')
)
+ def _initialize_gcs_fs(self) -> FileSystem:
+ access_token = self._get_property("gcs.access-token")
+ token_expiry = self._get_property("gcs.access-token.expiration")
+ project_id = self._get_property("gcs.project-id")
+
+ kwargs = {}
+ if access_token:
+ from datetime import datetime
+ kwargs["access_token"] = access_token
+ kwargs["credential_token_expiration"] = (
+ datetime.fromisoformat(token_expiry) if token_expiry
+ else datetime(9999, 12, 31)
+ )
+ if project_id:
+ kwargs["project_id"] = project_id
+
+ # With no kwargs, GcsFileSystem uses ADC automatically
+ # (GOOGLE_APPLICATION_CREDENTIALS or GCP metadata server / Workload
Identity)
+ return pafs.GcsFileSystem(**kwargs)
+
@staticmethod
def _kerberos_login_from_keytab(principal: str, keytab: str):
if not os.path.isfile(keytab):
@@ -706,6 +728,13 @@ class PyArrowFileIO(FileIO):
else:
return str(path)
+ from pyarrow.fs import GcsFileSystem
+ if isinstance(self.filesystem, GcsFileSystem):
+ if parsed.scheme and parsed.netloc:
+ path_part = normalized_path.lstrip('/')
+ return f"{parsed.netloc}/{path_part}" if path_part else
parsed.netloc
+ return str(path)
+
if parsed.scheme:
if not normalized_path:
return '.'
diff --git a/paimon-python/pypaimon/tests/file_io_test.py
b/paimon-python/pypaimon/tests/file_io_test.py
index 24a3806395..2ca5dc6ec7 100644
--- a/paimon-python/pypaimon/tests/file_io_test.py
+++ b/paimon-python/pypaimon/tests/file_io_test.py
@@ -562,5 +562,62 @@ class HdfsFileIOTest(unittest.TestCase):
self.assertIn('HADOOP_CONF_DIR', str(ctx.exception))
+class GCSFileIOPathTest(unittest.TestCase):
+ """Unit tests for PyArrowFileIO.to_filesystem_path with GCS (no
credentials required)."""
+
+ def setUp(self):
+ self.file_io = PyArrowFileIO("gs://my-bucket/warehouse", Options({}))
+
+ def test_gcs_filesystem_type(self):
+ """GCS warehouse path should produce a GcsFileSystem."""
+ self.assertIsInstance(self.file_io.filesystem, pafs.GcsFileSystem)
+
+ def test_gcs_path_conversion(self):
+ """gs://bucket/key should map to bucket/key (no leading slash)."""
+ self.assertEqual(
+
self.file_io.to_filesystem_path("gs://my-bucket/path/to/file.parquet"),
+ "my-bucket/path/to/file.parquet"
+ )
+ self.assertEqual(
+
self.file_io.to_filesystem_path("gs://my-bucket/warehouse/db.db/tbl/data-0.parquet"),
+ "my-bucket/warehouse/db.db/tbl/data-0.parquet"
+ )
+
+ def test_gcs_path_bucket_only(self):
+ """gs://bucket with no path should return just the bucket name."""
+ self.assertEqual(
+ self.file_io.to_filesystem_path("gs://my-bucket"),
+ "my-bucket"
+ )
+
+ def test_gcs_path_normalization(self):
+ """Multiple consecutive slashes in the path should be collapsed."""
+ self.assertEqual(
+
self.file_io.to_filesystem_path("gs://my-bucket///path///to///file.parquet"),
+ "my-bucket/path/to/file.parquet"
+ )
+
+ def test_gcs_path_idempotency(self):
+ """Already-converted bucket/key paths should pass through unchanged."""
+ converted = "my-bucket/path/to/file.parquet"
+ self.assertEqual(self.file_io.to_filesystem_path(converted), converted)
+ parent = str(Path(converted).parent)
+ self.assertEqual(self.file_io.to_filesystem_path(parent), parent)
+
+ def test_gcs_path_no_leading_slash(self):
+ """to_filesystem_path must never return a path starting with '/'."""
+ cases = [
+ "gs://my-bucket/path/to/file.parquet",
+ "gs://my-bucket",
+ "gs://my-bucket///path",
+ ]
+ for uri in cases:
+ result = self.file_io.to_filesystem_path(uri)
+ self.assertFalse(
+ result.startswith("/"),
+ f"Path must not start with '/' for GcsFileSystem, got:
{result!r} (input: {uri!r})"
+ )
+
+
if __name__ == '__main__':
unittest.main()
diff --git a/paimon-python/pypaimon/tests/gcs_file_io_test.py
b/paimon-python/pypaimon/tests/gcs_file_io_test.py
new file mode 100644
index 0000000000..672786cf57
--- /dev/null
+++ b/paimon-python/pypaimon/tests/gcs_file_io_test.py
@@ -0,0 +1,184 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+import unittest
+import uuid
+
+import pyarrow.fs as pafs
+
+from pypaimon.common.options import Options
+from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO
+
+
+class GCSFileIOTest(unittest.TestCase):
+ """Integration tests for PyArrowFileIO with GCS.
+
+ Requires the following environment variable to be set:
+ GCS_TEST_BUCKET — name of the GCS bucket to use (without gs:// prefix)
+
+ Credentials are picked up automatically via Application Default Credentials
+ (GOOGLE_APPLICATION_CREDENTIALS, GCP metadata server, or Workload
Identity).
+ All tests are skipped when GCS_TEST_BUCKET is not configured.
+ """
+
+ def setUp(self):
+ self.bucket = os.environ.get("GCS_TEST_BUCKET")
+ if not self.bucket:
+ self.skipTest("GCS_TEST_BUCKET is not configured")
+ return
+
+ self.root_path = f"gs://{self.bucket}/"
+ self.file_io = PyArrowFileIO(self.root_path, Options({}))
+ self.test_prefix = f"test-{uuid.uuid4().hex[:8]}/"
+
+ def tearDown(self):
+ if not hasattr(self, 'file_io'):
+ return
+ test_dir = f"{self.root_path}{self.test_prefix}"
+ try:
+ if self.file_io.exists(test_dir):
+ self.file_io.delete(test_dir, recursive=True)
+ except Exception:
+ pass
+
+ def _path(self, name: str) -> str:
+ return f"{self.root_path}{self.test_prefix}{name}"
+
+ def test_gcs_filesystem_type(self):
+ """PyArrowFileIO with gs:// should use GcsFileSystem."""
+ self.assertIsInstance(self.file_io.filesystem, pafs.GcsFileSystem)
+
+ def test_exists(self):
+ """exists() returns False for non-existent paths."""
+ self.assertFalse(self.file_io.exists(self._path("nonexistent.txt")))
+ with self.assertRaises(FileNotFoundError):
+ self.file_io.get_file_status(self._path("nonexistent.txt"))
+
+ def test_write_and_read_file(self):
+ """write_file() and read_file_utf8() round-trip."""
+ test_file = self._path("write_read_test.txt")
+
+ self.file_io.write_file(test_file, "hello gcs")
+ self.assertTrue(self.file_io.exists(test_file))
+ self.assertEqual(self.file_io.read_file_utf8(test_file), "hello gcs")
+
+ def test_write_file_overwrite(self):
+ """write_file() respects the overwrite flag."""
+ test_file = self._path("overwrite_test.txt")
+
+ self.file_io.write_file(test_file, "first")
+ with self.assertRaises(FileExistsError):
+ self.file_io.write_file(test_file, "second", overwrite=False)
+ self.assertEqual(self.file_io.read_file_utf8(test_file), "first")
+
+ self.file_io.write_file(test_file, "overwritten", overwrite=True)
+ self.assertEqual(self.file_io.read_file_utf8(test_file), "overwritten")
+
+ def test_new_input_stream_read(self):
+ """new_output_stream() / new_input_stream() round-trip."""
+ test_data = b"Hello, GCS! This is a test file."
+ test_file = self._path("input_stream_test.bin")
+
+ with self.file_io.new_output_stream(test_file) as out:
+ out.write(test_data)
+
+ with self.file_io.new_input_stream(test_file) as inp:
+ self.assertEqual(inp.read(), test_data)
+
+ with self.assertRaises(FileNotFoundError):
+ self.file_io.new_input_stream(self._path("nonexistent.bin"))
+
+ def test_get_file_status_directory(self):
+ """get_file_status() returns Directory type for a directory."""
+ test_dir = self._path("test-dir/")
+ self.file_io.mkdirs(test_dir)
+ status = self.file_io.get_file_status(test_dir)
+ self.assertIsNotNone(status)
+ self.assertEqual(status.type, pafs.FileType.Directory)
+
+ def test_get_file_status_file(self):
+ """get_file_status() returns File type and non-None size for a file."""
+ test_file = self._path("status_test.txt")
+ self.file_io.write_file(test_file, "content")
+ status = self.file_io.get_file_status(test_file)
+ self.assertEqual(status.type, pafs.FileType.File)
+ self.assertIsNotNone(status.size)
+
+ def test_delete_returns_false_when_not_exists(self):
+ """delete() returns False when the path does not exist."""
+ self.assertFalse(self.file_io.delete(self._path("nonexistent.txt")))
+ self.assertFalse(self.file_io.delete(self._path("nonexistent_dir"),
recursive=False))
+
+ def test_delete_non_empty_directory_raises_error(self):
+ """delete() without recursive=True raises OSError for non-empty
directory."""
+ test_dir = self._path("nonempty-dir/")
+ test_file = self._path("nonempty-dir/file.txt")
+ self.file_io.mkdirs(test_dir)
+ with self.file_io.new_output_stream(test_file) as out:
+ out.write(b"data")
+
+ with self.assertRaises(OSError) as ctx:
+ self.file_io.delete(test_dir, recursive=False)
+ self.assertIn("is not empty", str(ctx.exception))
+
+ def test_rename_returns_false_when_dst_exists(self):
+ """rename() returns False when the destination already exists."""
+ src = self._path("src.txt")
+ dst = self._path("dst.txt")
+ with self.file_io.new_output_stream(src) as out:
+ out.write(b"src")
+ with self.file_io.new_output_stream(dst) as out:
+ out.write(b"dst")
+
+ self.assertFalse(self.file_io.rename(src, dst))
+
+ def test_copy_file(self):
+ """copy_file() copies content and respects the overwrite flag."""
+ src = self._path("copy_src.txt")
+ dst = self._path("copy_dst.txt")
+ with self.file_io.new_output_stream(src) as out:
+ out.write(b"source content")
+ with self.file_io.new_output_stream(dst) as out:
+ out.write(b"target content")
+
+ with self.assertRaises(FileExistsError) as ctx:
+ self.file_io.copy_file(src, dst, overwrite=False)
+ self.assertIn("already exists", str(ctx.exception))
+
+ self.file_io.copy_file(src, dst, overwrite=True)
+ with self.file_io.new_input_stream(dst) as inp:
+ self.assertEqual(inp.read(), b"source content")
+
+ def test_try_to_write_atomic(self):
+ """try_to_write_atomic() writes a file and returns True on success."""
+ normal_file = self._path("atomic_file.txt")
+ self.assertTrue(self.file_io.try_to_write_atomic(normal_file, "atomic
content"))
+ self.assertEqual(self.file_io.read_file_utf8(normal_file), "atomic
content")
+
+ def test_mkdirs_raises_error_when_path_is_file(self):
+ """mkdirs() raises FileExistsError when the path is an existing
file."""
+ test_file = self._path("existing_file.txt")
+ self.file_io.write_file(test_file, "data")
+
+ with self.assertRaises(FileExistsError) as ctx:
+ self.file_io.mkdirs(test_file)
+ self.assertIn("is not a directory", str(ctx.exception))
+
+
+if __name__ == '__main__':
+ unittest.main()