This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d290f0da2 [python] Use pypaimon with Google Cloud Storage (#7769)
5d290f0da2 is described below

commit 5d290f0da27a7bf7d4b4e761f8761e17e002c094
Author: Lars Skjærven <[email protected]>
AuthorDate: Sat May 23 17:00:58 2026 +0200

    [python] Use pypaimon with Google Cloud Storage (#7769)
    
    Use Google Cloud Storage (GCS) as warehouse / object storage for
    pypaimon (without java dependencies). This PR adds PyArrowFileIO
    handling for GCS.
---
 paimon-python/pypaimon/common/options/config.py    |  16 ++
 .../pypaimon/filesystem/pyarrow_file_io.py         |  29 ++++
 paimon-python/pypaimon/tests/file_io_test.py       |  57 +++++++
 paimon-python/pypaimon/tests/gcs_file_io_test.py   | 184 +++++++++++++++++++++
 4 files changed, 286 insertions(+)

diff --git a/paimon-python/pypaimon/common/options/config.py 
b/paimon-python/pypaimon/common/options/config.py
index 8112715239..465671725d 100644
--- a/paimon-python/pypaimon/common/options/config.py
+++ b/paimon-python/pypaimon/common/options/config.py
@@ -43,6 +43,22 @@ class S3Options:
     S3_REGION = 
ConfigOptions.key("fs.s3.region").string_type().no_default_value().with_description("S3
 region")
 
 
+class GcsOptions:
+    GCS_ACCESS_TOKEN = (
+        ConfigOptions.key("gcs.access-token").string_type().no_default_value()
+        .with_description(
+            "GCS access token. If not set, ADC (Application Default 
Credentials) is used "
+            "automatically."))
+    GCS_ACCESS_TOKEN_EXPIRATION = (
+        
ConfigOptions.key("gcs.access-token.expiration").string_type().no_default_value()
+        .with_description(
+            "ISO 8601 expiration datetime for the GCS access token. "
+            "Required when gcs.access-token is set."))
+    GCS_PROJECT_ID = (
+        ConfigOptions.key("gcs.project-id").string_type().no_default_value()
+        .with_description("GCP project ID for GCS requests."))
+
+
 class PVFSOptions:
     CACHE_ENABLED = 
ConfigOptions.key("cache-enabled").boolean_type().default_value("true").with_description(
         "Enable cache")
diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py 
b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
index d81238c89f..4ee75f464d 100644
--- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
+++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
@@ -78,6 +78,8 @@ class PyArrowFileIO(FileIO):
             self.filesystem = self._initialize_s3_fs()
         elif scheme in {"hdfs", "viewfs"}:
             self.filesystem = self._initialize_hdfs_fs(scheme, netloc)
+        elif scheme == "gs":
+            self.filesystem = self._initialize_gcs_fs()
         else:
             raise ValueError(f"Unrecognized filesystem type in URI: {scheme}")
 
@@ -301,6 +303,26 @@ class PyArrowFileIO(FileIO):
                 user=os.environ.get('HADOOP_USER_NAME', 'hadoop')
             )
 
+    def _initialize_gcs_fs(self) -> FileSystem:
+        access_token = self._get_property("gcs.access-token")
+        token_expiry = self._get_property("gcs.access-token.expiration")
+        project_id = self._get_property("gcs.project-id")
+
+        kwargs = {}
+        if access_token:
+            from datetime import datetime
+            kwargs["access_token"] = access_token
+            kwargs["credential_token_expiration"] = (
+                datetime.fromisoformat(token_expiry) if token_expiry
+                else datetime(9999, 12, 31)
+            )
+        if project_id:
+            kwargs["project_id"] = project_id
+
+        # With no kwargs, GcsFileSystem uses ADC automatically
+        # (GOOGLE_APPLICATION_CREDENTIALS or GCP metadata server / Workload 
Identity)
+        return pafs.GcsFileSystem(**kwargs)
+
     @staticmethod
     def _kerberos_login_from_keytab(principal: str, keytab: str):
         if not os.path.isfile(keytab):
@@ -706,6 +728,13 @@ class PyArrowFileIO(FileIO):
             else:
                 return str(path)
 
+        from pyarrow.fs import GcsFileSystem
+        if isinstance(self.filesystem, GcsFileSystem):
+            if parsed.scheme and parsed.netloc:
+                path_part = normalized_path.lstrip('/')
+                return f"{parsed.netloc}/{path_part}" if path_part else 
parsed.netloc
+            return str(path)
+
         if parsed.scheme:
             if not normalized_path:
                 return '.'
diff --git a/paimon-python/pypaimon/tests/file_io_test.py 
b/paimon-python/pypaimon/tests/file_io_test.py
index 24a3806395..2ca5dc6ec7 100644
--- a/paimon-python/pypaimon/tests/file_io_test.py
+++ b/paimon-python/pypaimon/tests/file_io_test.py
@@ -562,5 +562,62 @@ class HdfsFileIOTest(unittest.TestCase):
             self.assertIn('HADOOP_CONF_DIR', str(ctx.exception))
 
 
+class GCSFileIOPathTest(unittest.TestCase):
+    """Unit tests for PyArrowFileIO.to_filesystem_path with GCS (no 
credentials required)."""
+
+    def setUp(self):
+        self.file_io = PyArrowFileIO("gs://my-bucket/warehouse", Options({}))
+
+    def test_gcs_filesystem_type(self):
+        """GCS warehouse path should produce a GcsFileSystem."""
+        self.assertIsInstance(self.file_io.filesystem, pafs.GcsFileSystem)
+
+    def test_gcs_path_conversion(self):
+        """gs://bucket/key should map to bucket/key (no leading slash)."""
+        self.assertEqual(
+            
self.file_io.to_filesystem_path("gs://my-bucket/path/to/file.parquet"),
+            "my-bucket/path/to/file.parquet"
+        )
+        self.assertEqual(
+            
self.file_io.to_filesystem_path("gs://my-bucket/warehouse/db.db/tbl/data-0.parquet"),
+            "my-bucket/warehouse/db.db/tbl/data-0.parquet"
+        )
+
+    def test_gcs_path_bucket_only(self):
+        """gs://bucket with no path should return just the bucket name."""
+        self.assertEqual(
+            self.file_io.to_filesystem_path("gs://my-bucket"),
+            "my-bucket"
+        )
+
+    def test_gcs_path_normalization(self):
+        """Multiple consecutive slashes in the path should be collapsed."""
+        self.assertEqual(
+            
self.file_io.to_filesystem_path("gs://my-bucket///path///to///file.parquet"),
+            "my-bucket/path/to/file.parquet"
+        )
+
+    def test_gcs_path_idempotency(self):
+        """Already-converted bucket/key paths should pass through unchanged."""
+        converted = "my-bucket/path/to/file.parquet"
+        self.assertEqual(self.file_io.to_filesystem_path(converted), converted)
+        parent = str(Path(converted).parent)
+        self.assertEqual(self.file_io.to_filesystem_path(parent), parent)
+
+    def test_gcs_path_no_leading_slash(self):
+        """to_filesystem_path must never return a path starting with '/'."""
+        cases = [
+            "gs://my-bucket/path/to/file.parquet",
+            "gs://my-bucket",
+            "gs://my-bucket///path",
+        ]
+        for uri in cases:
+            result = self.file_io.to_filesystem_path(uri)
+            self.assertFalse(
+                result.startswith("/"),
+                f"Path must not start with '/' for GcsFileSystem, got: 
{result!r} (input: {uri!r})"
+            )
+
+
 if __name__ == '__main__':
     unittest.main()
diff --git a/paimon-python/pypaimon/tests/gcs_file_io_test.py 
b/paimon-python/pypaimon/tests/gcs_file_io_test.py
new file mode 100644
index 0000000000..672786cf57
--- /dev/null
+++ b/paimon-python/pypaimon/tests/gcs_file_io_test.py
@@ -0,0 +1,184 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+import unittest
+import uuid
+
+import pyarrow.fs as pafs
+
+from pypaimon.common.options import Options
+from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO
+
+
+class GCSFileIOTest(unittest.TestCase):
+    """Integration tests for PyArrowFileIO with GCS.
+
+    Requires the following environment variable to be set:
+      GCS_TEST_BUCKET  — name of the GCS bucket to use (without gs:// prefix)
+
+    Credentials are picked up automatically via Application Default Credentials
+    (GOOGLE_APPLICATION_CREDENTIALS, GCP metadata server, or Workload 
Identity).
+    All tests are skipped when GCS_TEST_BUCKET is not configured.
+    """
+
+    def setUp(self):
+        self.bucket = os.environ.get("GCS_TEST_BUCKET")
+        if not self.bucket:
+            self.skipTest("GCS_TEST_BUCKET is not configured")
+            return
+
+        self.root_path = f"gs://{self.bucket}/"
+        self.file_io = PyArrowFileIO(self.root_path, Options({}))
+        self.test_prefix = f"test-{uuid.uuid4().hex[:8]}/"
+
+    def tearDown(self):
+        if not hasattr(self, 'file_io'):
+            return
+        test_dir = f"{self.root_path}{self.test_prefix}"
+        try:
+            if self.file_io.exists(test_dir):
+                self.file_io.delete(test_dir, recursive=True)
+        except Exception:
+            pass
+
+    def _path(self, name: str) -> str:
+        return f"{self.root_path}{self.test_prefix}{name}"
+
+    def test_gcs_filesystem_type(self):
+        """PyArrowFileIO with gs:// should use GcsFileSystem."""
+        self.assertIsInstance(self.file_io.filesystem, pafs.GcsFileSystem)
+
+    def test_exists(self):
+        """exists() returns False for non-existent paths."""
+        self.assertFalse(self.file_io.exists(self._path("nonexistent.txt")))
+        with self.assertRaises(FileNotFoundError):
+            self.file_io.get_file_status(self._path("nonexistent.txt"))
+
+    def test_write_and_read_file(self):
+        """write_file() and read_file_utf8() round-trip."""
+        test_file = self._path("write_read_test.txt")
+
+        self.file_io.write_file(test_file, "hello gcs")
+        self.assertTrue(self.file_io.exists(test_file))
+        self.assertEqual(self.file_io.read_file_utf8(test_file), "hello gcs")
+
+    def test_write_file_overwrite(self):
+        """write_file() respects the overwrite flag."""
+        test_file = self._path("overwrite_test.txt")
+
+        self.file_io.write_file(test_file, "first")
+        with self.assertRaises(FileExistsError):
+            self.file_io.write_file(test_file, "second", overwrite=False)
+        self.assertEqual(self.file_io.read_file_utf8(test_file), "first")
+
+        self.file_io.write_file(test_file, "overwritten", overwrite=True)
+        self.assertEqual(self.file_io.read_file_utf8(test_file), "overwritten")
+
+    def test_new_input_stream_read(self):
+        """new_output_stream() / new_input_stream() round-trip."""
+        test_data = b"Hello, GCS! This is a test file."
+        test_file = self._path("input_stream_test.bin")
+
+        with self.file_io.new_output_stream(test_file) as out:
+            out.write(test_data)
+
+        with self.file_io.new_input_stream(test_file) as inp:
+            self.assertEqual(inp.read(), test_data)
+
+        with self.assertRaises(FileNotFoundError):
+            self.file_io.new_input_stream(self._path("nonexistent.bin"))
+
+    def test_get_file_status_directory(self):
+        """get_file_status() returns Directory type for a directory."""
+        test_dir = self._path("test-dir/")
+        self.file_io.mkdirs(test_dir)
+        status = self.file_io.get_file_status(test_dir)
+        self.assertIsNotNone(status)
+        self.assertEqual(status.type, pafs.FileType.Directory)
+
+    def test_get_file_status_file(self):
+        """get_file_status() returns File type and non-None size for a file."""
+        test_file = self._path("status_test.txt")
+        self.file_io.write_file(test_file, "content")
+        status = self.file_io.get_file_status(test_file)
+        self.assertEqual(status.type, pafs.FileType.File)
+        self.assertIsNotNone(status.size)
+
+    def test_delete_returns_false_when_not_exists(self):
+        """delete() returns False when the path does not exist."""
+        self.assertFalse(self.file_io.delete(self._path("nonexistent.txt")))
+        self.assertFalse(self.file_io.delete(self._path("nonexistent_dir"), 
recursive=False))
+
+    def test_delete_non_empty_directory_raises_error(self):
+        """delete() without recursive=True raises OSError for non-empty 
directory."""
+        test_dir = self._path("nonempty-dir/")
+        test_file = self._path("nonempty-dir/file.txt")
+        self.file_io.mkdirs(test_dir)
+        with self.file_io.new_output_stream(test_file) as out:
+            out.write(b"data")
+
+        with self.assertRaises(OSError) as ctx:
+            self.file_io.delete(test_dir, recursive=False)
+        self.assertIn("is not empty", str(ctx.exception))
+
+    def test_rename_returns_false_when_dst_exists(self):
+        """rename() returns False when the destination already exists."""
+        src = self._path("src.txt")
+        dst = self._path("dst.txt")
+        with self.file_io.new_output_stream(src) as out:
+            out.write(b"src")
+        with self.file_io.new_output_stream(dst) as out:
+            out.write(b"dst")
+
+        self.assertFalse(self.file_io.rename(src, dst))
+
+    def test_copy_file(self):
+        """copy_file() copies content and respects the overwrite flag."""
+        src = self._path("copy_src.txt")
+        dst = self._path("copy_dst.txt")
+        with self.file_io.new_output_stream(src) as out:
+            out.write(b"source content")
+        with self.file_io.new_output_stream(dst) as out:
+            out.write(b"target content")
+
+        with self.assertRaises(FileExistsError) as ctx:
+            self.file_io.copy_file(src, dst, overwrite=False)
+        self.assertIn("already exists", str(ctx.exception))
+
+        self.file_io.copy_file(src, dst, overwrite=True)
+        with self.file_io.new_input_stream(dst) as inp:
+            self.assertEqual(inp.read(), b"source content")
+
+    def test_try_to_write_atomic(self):
+        """try_to_write_atomic() writes a file and returns True on success."""
+        normal_file = self._path("atomic_file.txt")
+        self.assertTrue(self.file_io.try_to_write_atomic(normal_file, "atomic 
content"))
+        self.assertEqual(self.file_io.read_file_utf8(normal_file), "atomic 
content")
+
+    def test_mkdirs_raises_error_when_path_is_file(self):
+        """mkdirs() raises FileExistsError when the path is an existing 
file."""
+        test_file = self._path("existing_file.txt")
+        self.file_io.write_file(test_file, "data")
+
+        with self.assertRaises(FileExistsError) as ctx:
+            self.file_io.mkdirs(test_file)
+        self.assertIn("is not a directory", str(ctx.exception))
+
+
+if __name__ == '__main__':
+    unittest.main()

Reply via email to