This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b7f6082e77 [python] support data file.external paths in pypaimon 
(#6674)
b7f6082e77 is described below

commit b7f6082e77a459dfba8573f65bb07a36354fb3b3
Author: XiaoHongbo <[email protected]>
AuthorDate: Wed Nov 26 21:38:09 2025 +0800

    [python] support data file.external paths in pypaimon (#6674)
---
 paimon-python/pypaimon/common/core_options.py      |  71 +++-
 .../pypaimon/common/external_path_provider.py      |  43 +++
 .../pypaimon/read/scanner/full_starting_scanner.py |   4 +-
 paimon-python/pypaimon/read/split_read.py          |   5 +-
 paimon-python/pypaimon/table/file_store_table.py   |  72 +++-
 .../pypaimon/tests/external_paths_test.py          | 427 +++++++++++++++++++++
 paimon-python/pypaimon/utils/__init__.py           |  17 +
 .../pypaimon/utils/file_store_path_factory.py      | 115 ++++++
 paimon-python/pypaimon/write/file_store_commit.py  |  16 +-
 paimon-python/pypaimon/write/writer/blob_writer.py |  18 +-
 .../pypaimon/write/writer/data_blob_writer.py      |  11 +-
 paimon-python/pypaimon/write/writer/data_writer.py |  63 +--
 12 files changed, 813 insertions(+), 49 deletions(-)

diff --git a/paimon-python/pypaimon/common/core_options.py 
b/paimon-python/pypaimon/common/core_options.py
index 87b8e9034a..028f757b77 100644
--- a/paimon-python/pypaimon/common/core_options.py
+++ b/paimon-python/pypaimon/common/core_options.py
@@ -17,6 +17,7 @@
 
################################################################################
 
 from enum import Enum
+from typing import List, Optional
 
 from pypaimon.common.memory_size import MemorySize
 
@@ -58,13 +59,17 @@ class CoreOptions(str, Enum):
     COMMIT_USER_PREFIX = "commit.user-prefix"
     ROW_TRACKING_ENABLED = "row-tracking.enabled"
     DATA_EVOLUTION_ENABLED = "data-evolution.enabled"
+    # External paths options
+    DATA_FILE_EXTERNAL_PATHS = "data-file.external-paths"
+    DATA_FILE_EXTERNAL_PATHS_STRATEGY = "data-file.external-paths.strategy"
+    DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS = 
"data-file.external-paths.specific-fs"
 
     @staticmethod
-    def get_blob_as_descriptor(options: dict) -> bool:
+    def blob_as_descriptor(options: dict) -> bool:
         return options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR, 
"false").lower() == 'true'
 
     @staticmethod
-    def get_split_target_size(options: dict) -> int:
+    def split_target_size(options: dict) -> int:
         """Get split target size from options, default to 128MB."""
         if CoreOptions.SOURCE_SPLIT_TARGET_SIZE in options:
             size_str = options[CoreOptions.SOURCE_SPLIT_TARGET_SIZE]
@@ -72,7 +77,7 @@ class CoreOptions(str, Enum):
         return MemorySize.of_mebi_bytes(128).get_bytes()
 
     @staticmethod
-    def get_split_open_file_cost(options: dict) -> int:
+    def split_open_file_cost(options: dict) -> int:
         """Get split open file cost from options, default to 4MB."""
         if CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST in options:
             cost_str = options[CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST]
@@ -80,7 +85,7 @@ class CoreOptions(str, Enum):
         return MemorySize.of_mebi_bytes(4).get_bytes()
 
     @staticmethod
-    def get_target_file_size(options: dict, has_primary_key: bool = False) -> 
int:
+    def target_file_size(options: dict, has_primary_key: bool = False) -> int:
         """Get target file size from options, default to 128MB for primary key 
table, 256MB for append-only table."""
         if CoreOptions.TARGET_FILE_SIZE in options:
             size_str = options[CoreOptions.TARGET_FILE_SIZE]
@@ -88,9 +93,63 @@ class CoreOptions(str, Enum):
         return MemorySize.of_mebi_bytes(128 if has_primary_key else 
256).get_bytes()
 
     @staticmethod
-    def get_blob_target_file_size(options: dict) -> int:
+    def blob_target_file_size(options: dict) -> int:
         """Get blob target file size from options, default to target-file-size 
(256MB for append-only table)."""
         if CoreOptions.BLOB_TARGET_FILE_SIZE in options:
             size_str = options[CoreOptions.BLOB_TARGET_FILE_SIZE]
             return MemorySize.parse(size_str).get_bytes()
-        return CoreOptions.get_target_file_size(options, has_primary_key=False)
+        return CoreOptions.target_file_size(options, has_primary_key=False)
+
+    @staticmethod
+    def data_file_external_paths(options: dict) -> Optional[List[str]]:
+        external_paths_str = options.get(CoreOptions.DATA_FILE_EXTERNAL_PATHS)
+        if not external_paths_str:
+            return None
+        return [path.strip() for path in external_paths_str.split(",") if 
path.strip()]
+
+    @staticmethod
+    def external_path_strategy(options: dict) -> 'ExternalPathStrategy':
+        strategy_value = 
options.get(CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY, "none")
+        if strategy_value is None:
+            strategy_value = "none"
+
+        strategy_str = strategy_value.lower() if isinstance(strategy_value, 
str) else str(strategy_value).lower()
+
+        try:
+            return ExternalPathStrategy(strategy_str)
+        except ValueError:
+            valid_values = [e.value for e in ExternalPathStrategy]
+            raise ValueError(
+                f"Could not parse value '{strategy_value}' for key 
'{CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY}'. "
+                f"Expected one of: {valid_values}"
+            )
+
+    @staticmethod
+    def external_specific_fs(options: dict) -> Optional[str]:
+        return options.get(CoreOptions.DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS)
+
+    @staticmethod
+    def file_compression(options: dict) -> str:
+        """Get file compression from options, default to 'zstd'."""
+        compression = options.get(CoreOptions.FILE_COMPRESSION, "zstd")
+        if compression is None:
+            compression = "zstd"
+        return compression
+
+    @staticmethod
+    def file_format(options: dict, default: Optional[str] = None) -> str:
+        if default is None:
+            default = CoreOptions.FILE_FORMAT_PARQUET
+        file_format = options.get(CoreOptions.FILE_FORMAT, default)
+        if file_format is None:
+            file_format = default
+        return file_format.lower() if file_format else file_format
+
+
+class ExternalPathStrategy(str, Enum):
+    """
+    Strategy for selecting external paths.
+    """
+    NONE = "none"
+    ROUND_ROBIN = "round-robin"
+    SPECIFIC_FS = "specific-fs"
diff --git a/paimon-python/pypaimon/common/external_path_provider.py 
b/paimon-python/pypaimon/common/external_path_provider.py
new file mode 100644
index 0000000000..a2989120b6
--- /dev/null
+++ b/paimon-python/pypaimon/common/external_path_provider.py
@@ -0,0 +1,43 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import random
+from typing import List
+
+
+class ExternalPathProvider:
+    def __init__(self, external_table_paths: List[str], relative_bucket_path: 
str):
+        self.external_table_paths = external_table_paths
+        self.relative_bucket_path = relative_bucket_path
+        self.position = random.randint(0, len(external_table_paths) - 1) if 
external_table_paths else 0
+
+    def get_next_external_data_path(self, file_name: str) -> str:
+        """
+        Get the next external data path using round-robin strategy.
+        """
+        if not self.external_table_paths:
+            raise ValueError("No external paths available")
+
+        self.position += 1
+        if self.position == len(self.external_table_paths):
+            self.position = 0
+
+        external_base = self.external_table_paths[self.position]
+        if self.relative_bucket_path:
+            return 
f"{external_base.rstrip('/')}/{self.relative_bucket_path.strip('/')}/{file_name}"
+        else:
+            return f"{external_base.rstrip('/')}/{file_name}"
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py 
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index dfbe329f43..a3503cdd03 100644
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -55,8 +55,8 @@ class FullStartingScanner(StartingScanner):
             self.predicate, self.table.field_names, self.table.partition_keys)
 
         # Get split target size and open file cost from table options
-        self.target_split_size = 
CoreOptions.get_split_target_size(self.table.options)
-        self.open_file_cost = 
CoreOptions.get_split_open_file_cost(self.table.options)
+        self.target_split_size = 
CoreOptions.split_target_size(self.table.options)
+        self.open_file_cost = 
CoreOptions.split_open_file_cost(self.table.options)
 
         self.idx_of_this_subtask = None
         self.number_of_para_subtasks = None
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index 5c90c2ddb6..92152db7ee 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -88,7 +88,8 @@ class SplitRead(ABC):
     def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool, 
read_fields: List[str]):
         (read_file_fields, read_arrow_predicate) = 
self._get_fields_and_predicate(file.schema_id, read_fields)
 
-        file_path = file.file_path
+        # Use external_path if available, otherwise use file_path
+        file_path = file.external_path if file.external_path else 
file.file_path
         _, extension = os.path.splitext(file_path)
         file_format = extension[1:]
 
@@ -97,7 +98,7 @@ class SplitRead(ABC):
             format_reader = FormatAvroReader(self.table.file_io, file_path, 
read_file_fields,
                                              self.read_fields, 
read_arrow_predicate)
         elif file_format == CoreOptions.FILE_FORMAT_BLOB:
-            blob_as_descriptor = 
CoreOptions.get_blob_as_descriptor(self.table.options)
+            blob_as_descriptor = 
CoreOptions.blob_as_descriptor(self.table.options)
             format_reader = FormatBlobReader(self.table.file_io, file_path, 
read_file_fields,
                                              self.read_fields, 
read_arrow_predicate, blob_as_descriptor)
         elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format == 
CoreOptions.FILE_FORMAT_ORC:
diff --git a/paimon-python/pypaimon/table/file_store_table.py 
b/paimon-python/pypaimon/table/file_store_table.py
index b8e08b6f52..382d3a59b0 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 
-from typing import Optional
+from typing import List, Optional
 
 from pypaimon.catalog.catalog_environment import CatalogEnvironment
 from pypaimon.common.core_options import CoreOptions
@@ -70,6 +70,32 @@ class FileStoreTable(Table):
         from pypaimon.snapshot.snapshot_manager import SnapshotManager
         return SnapshotManager(self)
 
+    def path_factory(self) -> 'FileStorePathFactory':
+        from pypaimon.utils.file_store_path_factory import FileStorePathFactory
+
+        # Get external paths
+        external_paths = self._create_external_paths()
+
+        # Get format identifier
+        format_identifier = CoreOptions.file_format(self.options)
+
+        file_compression = CoreOptions.file_compression(self.options)
+
+        return FileStorePathFactory(
+            root=str(self.table_path),
+            partition_keys=self.partition_keys,
+            default_part_value="__DEFAULT_PARTITION__",
+            format_identifier=format_identifier,
+            data_file_prefix="data-",
+            changelog_file_prefix="changelog-",
+            legacy_partition_name=True,
+            file_suffix_include_compression=False,
+            file_compression=file_compression,
+            data_file_path_directory=None,
+            external_paths=external_paths,
+            index_file_in_data_file_dir=False,
+        )
+
     def new_snapshot_commit(self):
         """Create a new SnapshotCommit instance using the catalog 
environment."""
         return 
self.catalog_environment.snapshot_commit(self.snapshot_manager())
@@ -129,3 +155,47 @@ class FileStoreTable(Table):
     def add_options(self, options: dict):
         for key, value in options.items():
             self.options[key] = value
+
+    def _create_external_paths(self) -> List[str]:
+        from urllib.parse import urlparse
+        from pypaimon.common.core_options import ExternalPathStrategy
+
+        external_paths_str = CoreOptions.data_file_external_paths(self.options)
+        if not external_paths_str:
+            return []
+
+        strategy = CoreOptions.external_path_strategy(self.options)
+        if strategy == ExternalPathStrategy.NONE:
+            return []
+
+        specific_fs = CoreOptions.external_specific_fs(self.options)
+
+        paths = []
+        for path_string in external_paths_str:
+            if not path_string:
+                continue
+
+            # Parse and validate path
+            parsed = urlparse(path_string)
+            scheme = parsed.scheme
+            if not scheme:
+                raise ValueError(
+                    f"External path must have a scheme (e.g., oss://, s3://, 
file://): {path_string}"
+                )
+
+            # Filter by specific filesystem if strategy is specific-fs
+            if strategy == ExternalPathStrategy.SPECIFIC_FS:
+                if not specific_fs:
+                    raise ValueError(
+                        f"data-file.external-paths.specific-fs must be set 
when "
+                        f"strategy is {ExternalPathStrategy.SPECIFIC_FS}"
+                    )
+                if scheme.lower() != specific_fs.lower():
+                    continue  # Skip paths that don't match the specific 
filesystem
+
+            paths.append(path_string)
+
+        if not paths:
+            raise ValueError("No valid external paths found after filtering")
+
+        return paths
diff --git a/paimon-python/pypaimon/tests/external_paths_test.py 
b/paimon-python/pypaimon/tests/external_paths_test.py
new file mode 100644
index 0000000000..1e5a404075
--- /dev/null
+++ b/paimon-python/pypaimon/tests/external_paths_test.py
@@ -0,0 +1,427 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.catalog.catalog import Identifier
+from pypaimon.common.core_options import CoreOptions, ExternalPathStrategy
+from pypaimon.common.external_path_provider import ExternalPathProvider
+
+
+class ExternalPathProviderTest(unittest.TestCase):
+    """Test ExternalPathProvider functionality."""
+
+    def test_path_selection_and_structure(self):
+        """Test path selection (round-robin) and path structure with various 
scenarios."""
+        # Test multiple paths with round-robin
+        external_paths = [
+            "oss://bucket1/external",
+            "oss://bucket2/external",
+            "oss://bucket3/external",
+        ]
+        relative_path = "partition=value/bucket-0"
+        provider = ExternalPathProvider(external_paths, relative_path)
+
+        paths = [provider.get_next_external_data_path("file.parquet") for _ in 
range(6)]
+
+        # Verify all buckets are used (2 cycles = 2 times each)
+        bucket_counts = {f"bucket{i}": sum(1 for p in paths if f"bucket{i}" in 
p) for i in [1, 2, 3]}
+        self.assertEqual(bucket_counts["bucket1"], 2)
+        self.assertEqual(bucket_counts["bucket2"], 2)
+        self.assertEqual(bucket_counts["bucket3"], 2)
+
+        # Verify path structure
+        self.assertIn("partition=value", paths[0])
+        self.assertIn("bucket-0", paths[0])
+        self.assertIn("file.parquet", paths[0])
+
+        # Test single path
+        single_provider = ExternalPathProvider(["oss://bucket/external"], 
"bucket-0")
+        single_path = 
single_provider.get_next_external_data_path("data.parquet")
+        self.assertIn("bucket/external", single_path)
+        self.assertIn("bucket-0", single_path)
+        self.assertIn("data.parquet", single_path)
+
+        # Test empty relative path
+        empty_provider = ExternalPathProvider(["oss://bucket/external"], "")
+        empty_path = empty_provider.get_next_external_data_path("file.parquet")
+        self.assertIn("bucket/external", empty_path)
+        self.assertIn("file.parquet", empty_path)
+
+
+class ExternalPathsConfigTest(unittest.TestCase):
+    """Test external paths configuration parsing through 
FileStoreTable._create_external_paths()."""
+
+    @classmethod
+    def setUpClass(cls):
+        """Set up test environment."""
+        cls.temp_dir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.temp_dir, "warehouse")
+        cls.catalog = CatalogFactory.create({"warehouse": cls.warehouse})
+        cls.catalog.create_database("test_db", False)
+
+    @classmethod
+    def tearDownClass(cls):
+        """Clean up test environment."""
+        shutil.rmtree(cls.temp_dir, ignore_errors=True)
+
+    def _create_table_with_options(self, options: dict) -> 'FileStoreTable':
+        """Helper method to create a table with specific options."""
+        table_name = "test_db.config_test"
+        # Manually delete table directory if it exists to ensure clean test 
environment
+        # FileSystemCatalog doesn't have drop_table method, so we need to 
delete manually
+        try:
+            table_path = 
self.catalog.get_table_path(Identifier.from_string(table_name))
+            # file_io.exists and delete accept Union[Path, URL, str]
+            if self.catalog.file_io.exists(table_path):
+                self.catalog.file_io.delete(table_path, recursive=True)
+        except Exception:
+            pass  # Table may not exist, ignore
+        pa_schema = pa.schema([("id", pa.int32()), ("name", pa.string())])
+        schema = Schema.from_pyarrow_schema(pa_schema, options=options)
+        self.catalog.create_table(table_name, schema, True)
+        return self.catalog.get_table(table_name)
+
+    def test_external_paths_strategies(self):
+        """Test different external path strategies (round-robin, specific-fs, 
none)."""
+        # Test round-robin strategy
+        options = {
+            CoreOptions.DATA_FILE_EXTERNAL_PATHS: 
"oss://bucket1/path1,oss://bucket2/path2,oss://bucket3/path3",
+            CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY: 
ExternalPathStrategy.ROUND_ROBIN,
+        }
+        table = self._create_table_with_options(options)
+        paths = table._create_external_paths()
+        self.assertEqual(len(paths), 3)
+        self.assertEqual(str(paths[0]), "oss://bucket1/path1")
+        self.assertEqual(str(paths[1]), "oss://bucket2/path2")
+        self.assertEqual(str(paths[2]), "oss://bucket3/path3")
+
+        # Test specific-fs strategy
+        options2 = {
+            CoreOptions.DATA_FILE_EXTERNAL_PATHS: 
"oss://bucket1/path1,s3://bucket2/path2,oss://bucket3/path3",
+            CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY: 
ExternalPathStrategy.SPECIFIC_FS,
+            CoreOptions.DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS: "oss",
+        }
+        table2 = self._create_table_with_options(options2)
+        paths2 = table2._create_external_paths()
+        self.assertEqual(len(paths2), 2)
+        self.assertIn("oss://bucket1/path1", [str(p) for p in paths2])
+        self.assertIn("oss://bucket3/path3", [str(p) for p in paths2])
+        self.assertNotIn("s3://bucket2/path2", [str(p) for p in paths2])
+
+        # Test none strategy
+        options3 = {
+            CoreOptions.DATA_FILE_EXTERNAL_PATHS: "oss://bucket1/path1",
+            CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY: 
ExternalPathStrategy.NONE,
+        }
+        table3 = self._create_table_with_options(options3)
+        paths3 = table3._create_external_paths()
+        self.assertEqual(len(paths3), 0)
+
+    def test_external_paths_edge_cases(self):
+        """Test edge cases: empty string, no config, invalid scheme."""
+        # Test empty string
+        options = {
+            CoreOptions.DATA_FILE_EXTERNAL_PATHS: "",
+            CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY: 
ExternalPathStrategy.ROUND_ROBIN,
+        }
+        table = self._create_table_with_options(options)
+        self.assertEqual(len(table._create_external_paths()), 0)
+
+        # Test no external paths option
+        options2 = {CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY: 
ExternalPathStrategy.ROUND_ROBIN}
+        table2 = self._create_table_with_options(options2)
+        self.assertEqual(len(table2._create_external_paths()), 0)
+
+        # Test invalid scheme (no scheme)
+        options3 = {
+            CoreOptions.DATA_FILE_EXTERNAL_PATHS: "/invalid/path",
+            CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY: 
ExternalPathStrategy.ROUND_ROBIN,
+        }
+        table3 = self._create_table_with_options(options3)
+        with self.assertRaises(ValueError) as context:
+            table3._create_external_paths()
+        self.assertIn("scheme", str(context.exception))
+
+    def test_create_external_path_provider(self):
+        """Test creating ExternalPathProvider from path factory."""
+        table_name = "test_db.config_test"
+        # Drop table if exists to ensure clean test environment
+        try:
+            self.catalog.drop_table(table_name, True)
+        except Exception:
+            pass  # Table may not exist, ignore
+        options = {
+            CoreOptions.DATA_FILE_EXTERNAL_PATHS: 
"oss://bucket1/path1,oss://bucket2/path2",
+            CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY: 
ExternalPathStrategy.ROUND_ROBIN,
+        }
+        pa_schema = pa.schema([
+            ("id", pa.int32()),
+            ("name", pa.string()),
+            ("dt", pa.string()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=["dt"], 
options=options)
+        self.catalog.create_table(table_name, schema, True)
+        table = self.catalog.get_table(table_name)
+        path_factory = table.path_factory()
+
+        # Test with external paths configured
+        provider = path_factory.create_external_path_provider(("value1",), 0)
+        self.assertIsNotNone(provider)
+        path = provider.get_next_external_data_path("file.parquet")
+        self.assertTrue("bucket1" in str(path) or "bucket2" in str(path))
+        self.assertIn("dt=value1", str(path))
+        self.assertIn("bucket-0", str(path))
+
+        # Test with none strategy (should return None)
+        # Use a different table name to avoid conflicts
+        options2 = {
+            CoreOptions.DATA_FILE_EXTERNAL_PATHS: "oss://bucket1/path1",
+            CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY: 
ExternalPathStrategy.NONE,
+        }
+        pa_schema2 = pa.schema([("id", pa.int32()), ("name", pa.string())])
+        schema2 = Schema.from_pyarrow_schema(pa_schema2, options=options2)
+        table_name2 = "test_db.config_test_none"
+        try:
+            # Manually delete table directory if it exists
+            table_path2 = 
self.catalog.get_table_path(Identifier.from_string(table_name2))
+            # file_io.exists and delete accept Union[Path, URL, str]
+            if self.catalog.file_io.exists(table_path2):
+                self.catalog.file_io.delete(table_path2, recursive=True)
+        except Exception:
+            pass  # Table may not exist, ignore
+        self.catalog.create_table(table_name2, schema2, True)
+        table2 = self.catalog.get_table(table_name2)
+        provider2 = table2.path_factory().create_external_path_provider((), 0)
+        self.assertIsNone(provider2)
+
+        # Test without external paths config (should return None)
+        # Use a different table name to avoid conflicts
+        options3 = {}
+        pa_schema3 = pa.schema([("id", pa.int32()), ("name", pa.string())])
+        schema3 = Schema.from_pyarrow_schema(pa_schema3, options=options3)
+        table_name3 = "test_db.config_test_empty"
+        try:
+            # Manually delete table directory if it exists
+            table_path3 = 
self.catalog.get_table_path(Identifier.from_string(table_name3))
+            # file_io.exists and delete accept Union[Path, URL, str]
+            if self.catalog.file_io.exists(table_path3):
+                self.catalog.file_io.delete(table_path3, recursive=True)
+        except Exception:
+            pass  # Table may not exist, ignore
+        self.catalog.create_table(table_name3, schema3, True)
+        table3 = self.catalog.get_table(table_name3)
+        provider3 = table3.path_factory().create_external_path_provider((), 0)
+        self.assertIsNone(provider3)
+
+
+class ExternalPathsIntegrationTest(unittest.TestCase):
+    """Integration tests for external paths feature."""
+
+    @classmethod
+    def setUpClass(cls):
+        """Set up test environment."""
+        cls.temp_dir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.temp_dir, "warehouse")
+        cls.external_dir = os.path.join(cls.temp_dir, "external_data")
+
+        # Create external directory
+        os.makedirs(cls.external_dir, exist_ok=True)
+
+        cls.catalog = CatalogFactory.create({
+            "warehouse": cls.warehouse
+        })
+        cls.catalog.create_database("test_db", False)
+
+    @classmethod
+    def tearDownClass(cls):
+        """Clean up test environment."""
+        shutil.rmtree(cls.temp_dir, ignore_errors=True)
+
+    def test_write_with_external_paths(self):
+        """Test writing data with external paths configured."""
+        pa_schema = pa.schema([
+            ("id", pa.int32()),
+            ("name", pa.string()),
+            ("value", pa.float64()),
+        ])
+
+        # Create table with external paths
+        external_path = f"file://{self.external_dir}"
+        table_options = {
+            CoreOptions.DATA_FILE_EXTERNAL_PATHS: external_path,
+            CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY: 
ExternalPathStrategy.ROUND_ROBIN,
+        }
+        schema = Schema.from_pyarrow_schema(pa_schema, options=table_options)
+
+        self.catalog.create_table("test_db.external_test", schema, False)
+        table = self.catalog.get_table("test_db.external_test")
+
+        # Write data (use explicit schema to match table schema)
+        data = pa.Table.from_pydict({
+            "id": [1, 2, 3],
+            "name": ["Alice", "Bob", "Charlie"],
+            "value": [10.5, 20.3, 30.7],
+        }, schema=pa_schema)
+
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(data)
+        commit_messages = table_write.prepare_commit()
+
+        # Verify external_path is set in file metadata
+        self.assertGreater(len(commit_messages), 0)
+        for commit_msg in commit_messages:
+            self.assertGreater(len(commit_msg.new_files), 0)
+            for file_meta in commit_msg.new_files:
+                # External path should be set
+                self.assertIsNotNone(file_meta.external_path)
+                self.assertTrue(file_meta.external_path.startswith("file://"))
+                self.assertIn(self.external_dir, file_meta.external_path)
+
+        table_commit.commit(commit_messages)
+        table_write.close()
+        table_commit.close()
+
+    def test_read_with_external_paths(self):
+        """Test reading data with external paths."""
+        pa_schema = pa.schema([
+            ("id", pa.int32()),
+            ("name", pa.string()),
+        ])
+
+        # Create table with external paths
+        external_path = f"file://{self.external_dir}"
+        table_options = {
+            CoreOptions.DATA_FILE_EXTERNAL_PATHS: external_path,
+            CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY: 
ExternalPathStrategy.ROUND_ROBIN,
+        }
+        schema = Schema.from_pyarrow_schema(pa_schema, options=table_options)
+
+        self.catalog.create_table("test_db.external_read_test", schema, False)
+        table = self.catalog.get_table("test_db.external_read_test")
+
+        # Write data (use explicit schema to match table schema)
+        write_data = pa.Table.from_pydict({
+            "id": [1, 2, 3, 4, 5],
+            "name": ["A", "B", "C", "D", "E"],
+        }, schema=pa_schema)
+
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(write_data)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        # Read data back
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        result = table_read.to_arrow(table_scan.plan().splits())
+
+        # Verify data
+        self.assertEqual(result.num_rows, 5)
+        self.assertEqual(result.num_columns, 2)
+        self.assertListEqual(result.column("id").to_pylist(), [1, 2, 3, 4, 5])
+        self.assertListEqual(result.column("name").to_pylist(), ["A", "B", 
"C", "D", "E"])
+
+    def test_write_without_external_paths(self):
+        """Test that writing without external paths still works."""
+        pa_schema = pa.schema([
+            ("id", pa.int32()),
+            ("name", pa.string()),
+        ])
+
+        schema = Schema.from_pyarrow_schema(pa_schema)
+        self.catalog.create_table("test_db.normal_test", schema, False)
+        table = self.catalog.get_table("test_db.normal_test")
+
+        # Write data (use explicit schema to match table schema)
+        data = pa.Table.from_pydict({
+            "id": [1, 2, 3],
+            "name": ["X", "Y", "Z"],
+        }, schema=pa_schema)
+
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(data)
+        commit_messages = table_write.prepare_commit()
+
+        # Verify external_path is None (not configured)
+        for commit_msg in commit_messages:
+            for file_meta in commit_msg.new_files:
+                self.assertIsNone(file_meta.external_path)
+
+        table_commit.commit(commit_messages)
+        table_write.close()
+        table_commit.close()
+
+    def test_external_paths_with_partition(self):
+        """Test external paths with partitioned table."""
+        pa_schema = pa.schema([
+            ("id", pa.int32()),
+            ("name", pa.string()),
+            ("dt", pa.string()),
+        ])
+
+        external_path = f"file://{self.external_dir}"
+        table_options = {
+            CoreOptions.DATA_FILE_EXTERNAL_PATHS: external_path,
+            CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY: 
ExternalPathStrategy.ROUND_ROBIN,
+        }
+        schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=["dt"], 
options=table_options)
+
+        self.catalog.create_table("test_db.partitioned_external", schema, 
False)
+        table = self.catalog.get_table("test_db.partitioned_external")
+
+        # Write data with partition (use explicit schema to match table schema)
+        data = pa.Table.from_pydict({
+            "id": [1, 2, 3],
+            "name": ["A", "B", "C"],
+            "dt": ["2024-01-01", "2024-01-01", "2024-01-02"],
+        }, schema=pa_schema)
+
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(data)
+        commit_messages = table_write.prepare_commit()
+
+        # Verify external paths include partition info
+        for commit_msg in commit_messages:
+            for file_meta in commit_msg.new_files:
+                self.assertIsNotNone(file_meta.external_path)
+                # Should contain partition path
+                self.assertIn("dt=", file_meta.external_path)
+
+        table_commit.commit(commit_messages)
+        table_write.close()
+        table_commit.close()
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/paimon-python/pypaimon/utils/__init__.py 
b/paimon-python/pypaimon/utils/__init__.py
new file mode 100644
index 0000000000..cefe81778c
--- /dev/null
+++ b/paimon-python/pypaimon/utils/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
diff --git a/paimon-python/pypaimon/utils/file_store_path_factory.py 
b/paimon-python/pypaimon/utils/file_store_path_factory.py
new file mode 100644
index 0000000000..b99d9d5f29
--- /dev/null
+++ b/paimon-python/pypaimon/utils/file_store_path_factory.py
@@ -0,0 +1,115 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+from typing import List, Optional, Tuple
+
+from pypaimon.common.external_path_provider import ExternalPathProvider
+from pypaimon.table.bucket_mode import BucketMode
+
+
+class FileStorePathFactory:
+    MANIFEST_PATH = "manifest"
+    MANIFEST_PREFIX = "manifest-"
+    MANIFEST_LIST_PREFIX = "manifest-list-"
+    INDEX_MANIFEST_PREFIX = "index-manifest-"
+
+    INDEX_PATH = "index"
+    INDEX_PREFIX = "index-"
+
+    STATISTICS_PATH = "statistics"
+    STATISTICS_PREFIX = "stat-"
+
+    BUCKET_PATH_PREFIX = "bucket-"
+
+    def __init__(
+        self,
+        root: str,
+        partition_keys: List[str],
+        default_part_value: str,
+        format_identifier: str,
+        data_file_prefix: str,
+        changelog_file_prefix: str,
+        legacy_partition_name: bool,
+        file_suffix_include_compression: bool,
+        file_compression: str,
+        data_file_path_directory: Optional[str] = None,
+        external_paths: Optional[List[str]] = None,
+        index_file_in_data_file_dir: bool = False,
+    ):
+        self._root = root.rstrip('/')
+        self.partition_keys = partition_keys
+        self.default_part_value = default_part_value
+        self.format_identifier = format_identifier
+        self.data_file_prefix = data_file_prefix
+        self.changelog_file_prefix = changelog_file_prefix
+        self.file_suffix_include_compression = file_suffix_include_compression
+        self.file_compression = file_compression
+        self.data_file_path_directory = data_file_path_directory
+        self.external_paths = external_paths or []
+        self.index_file_in_data_file_dir = index_file_in_data_file_dir
+        self.legacy_partition_name = legacy_partition_name
+
+    def root(self) -> str:
+        return self._root
+
+    def manifest_path(self) -> str:
+        return f"{self._root}/{self.MANIFEST_PATH}"
+
+    def index_path(self) -> str:
+        return f"{self._root}/{self.INDEX_PATH}"
+
+    def statistics_path(self) -> str:
+        return f"{self._root}/{self.STATISTICS_PATH}"
+
+    def data_file_path(self) -> str:
+        if self.data_file_path_directory:
+            return f"{self._root}/{self.data_file_path_directory}"
+        return self._root
+
+    def relative_bucket_path(self, partition: Tuple, bucket: int) -> str:
+        bucket_name = str(bucket)
+        if bucket == BucketMode.POSTPONE_BUCKET.value:
+            bucket_name = "postpone"
+
+        relative_parts = [f"{self.BUCKET_PATH_PREFIX}{bucket_name}"]
+
+        # Add partition path
+        if partition:
+            partition_parts = []
+            for i, field_name in enumerate(self.partition_keys):
+                partition_parts.append(f"{field_name}={partition[i]}")
+            if partition_parts:
+                relative_parts = partition_parts + relative_parts
+
+        # Add data file path directory if specified
+        if self.data_file_path_directory:
+            relative_parts = [self.data_file_path_directory] + relative_parts
+
+        return "/".join(relative_parts)
+
+    def bucket_path(self, partition: Tuple, bucket: int) -> str:
+        relative_path = self.relative_bucket_path(partition, bucket)
+        return f"{self._root}/{relative_path}"
+
+    def create_external_path_provider(
+        self, partition: Tuple, bucket: int
+    ) -> Optional[ExternalPathProvider]:
+        if not self.external_paths:
+            return None
+
+        relative_bucket_path = self.relative_bucket_path(partition, bucket)
+        return ExternalPathProvider(self.external_paths, relative_bucket_path)
diff --git a/paimon-python/pypaimon/write/file_store_commit.py 
b/paimon-python/pypaimon/write/file_store_commit.py
index ba4cee9e90..014e8fbf6e 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -18,7 +18,6 @@
 
 import time
 import uuid
-from pathlib import Path
 from typing import List
 
 from pypaimon.common.core_options import CoreOptions
@@ -158,14 +157,12 @@ class FileStoreCommit:
                 delta_record_count -= entry.file.row_count
         self.manifest_file_manager.write(new_manifest_file, commit_entries)
         # TODO: implement noConflictsOrFail logic
-
         partition_columns = list(zip(*(entry.partition.values for entry in 
commit_entries)))
         partition_min_stats = [min(col) for col in partition_columns]
         partition_max_stats = [max(col) for col in partition_columns]
         partition_null_counts = [sum(value == 0 for value in col) for col in 
partition_columns]
         if not all(count == 0 for count in partition_null_counts):
             raise RuntimeError("Partition value should not be null")
-
         manifest_file_path = 
f"{self.manifest_file_manager.manifest_path}/{new_manifest_file}"
         new_manifest_list = ManifestFileMeta(
             file_name=new_manifest_file,
@@ -227,14 +224,19 @@ class FileStoreCommit:
                 raise RuntimeError(f"Failed to commit snapshot 
{new_snapshot_id}")
 
     def abort(self, commit_messages: List[CommitMessage]):
+        """Abort commit and delete files. Uses external_path if available to 
ensure proper scheme handling."""
         for message in commit_messages:
             for file in message.new_files:
                 try:
-                    file_path_obj = Path(file.file_path)
-                    if file_path_obj.exists():
-                        file_path_obj.unlink()
+                    path_to_delete = file.external_path if file.external_path 
else file.file_path
+                    if path_to_delete:
+                        path_str = str(path_to_delete)
+                        self.table.file_io.delete_quietly(path_str)
                 except Exception as e:
-                    print(f"Warning: Failed to clean up file {file.file_path}: 
{e}")
+                    import logging
+                    logger = logging.getLogger(__name__)
+                    path_to_delete = file.external_path if file.external_path 
else file.file_path
+                    logger.warning(f"Failed to clean up file {path_to_delete} 
during abort: {e}")
 
     def close(self):
         """Close the FileStoreCommit and release resources."""
diff --git a/paimon-python/pypaimon/write/writer/blob_writer.py 
b/paimon-python/pypaimon/write/writer/blob_writer.py
index f22577deac..92c7e6ea1d 100644
--- a/paimon-python/pypaimon/write/writer/blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/blob_writer.py
@@ -42,7 +42,7 @@ class BlobWriter(AppendOnlyDataWriter):
         self.blob_column = blob_column
 
         options = self.table.options
-        self.blob_target_file_size = 
CoreOptions.get_blob_target_file_size(options)
+        self.blob_target_file_size = CoreOptions.blob_target_file_size(options)
 
         self.current_writer: Optional[BlobFileWriter] = None
         self.current_file_path: Optional[str] = None
@@ -117,7 +117,11 @@ class BlobWriter(AppendOnlyDataWriter):
         file_name = self.current_file_path.split('/')[-1]
         row_count = self.current_writer.row_count
 
-        self._add_file_metadata(file_name, self.current_file_path, row_count, 
file_size)
+        # Determine if this is an external path
+        is_external_path = self.external_path_provider is not None
+        external_path_str = self.current_file_path if is_external_path else 
None
+
+        self._add_file_metadata(file_name, self.current_file_path, row_count, 
file_size, external_path_str)
 
         self.current_writer = None
         self.current_file_path = None
@@ -143,10 +147,14 @@ class BlobWriter(AppendOnlyDataWriter):
 
         file_size = self.file_io.get_file_size(file_path)
 
+        is_external_path = self.external_path_provider is not None
+        external_path_str = file_path if is_external_path else None
+
         # Reuse _add_file_metadata for consistency (blob table is append-only, 
no primary keys)
-        self._add_file_metadata(file_name, file_path, data, file_size)
+        self._add_file_metadata(file_name, file_path, data, file_size, 
external_path_str)
 
-    def _add_file_metadata(self, file_name: str, file_path: str, 
data_or_row_count, file_size: int):
+    def _add_file_metadata(self, file_name: str, file_path: str, 
data_or_row_count, file_size: int,
+                           external_path: Optional[str] = None):
         """Add file metadata to committed_files."""
         from datetime import datetime
         from pypaimon.manifest.schema.data_file_meta import DataFileMeta
@@ -200,7 +208,7 @@ class BlobWriter(AppendOnlyDataWriter):
             delete_row_count=0,
             file_source=0,  # FileSource.APPEND = 0
             value_stats_cols=None,
-            external_path=None,
+            external_path=external_path,
             first_row_id=None,
             write_cols=self.write_cols,
             file_path=file_path,
diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py 
b/paimon-python/pypaimon/write/writer/data_blob_writer.py
index b3bcc9219c..26ba1bee7b 100644
--- a/paimon-python/pypaimon/write/writer/data_blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py
@@ -258,10 +258,14 @@ class DataBlobWriter(DataWriter):
         else:
             raise ValueError(f"Unsupported file format: {self.file_format}")
 
-        # Generate metadata
-        return self._create_data_file_meta(file_name, file_path, data)
+        # Determine if this is an external path
+        is_external_path = self.external_path_provider is not None
+        external_path_str = file_path if is_external_path else None
 
-    def _create_data_file_meta(self, file_name: str, file_path: str, data: 
pa.Table) -> DataFileMeta:
+        return self._create_data_file_meta(file_name, file_path, data, 
external_path_str)
+
+    def _create_data_file_meta(self, file_name: str, file_path: str, data: 
pa.Table,
+                               external_path: Optional[str] = None) -> 
DataFileMeta:
         # Column stats (only for normal columns)
         column_stats = {
             field.name: self._get_column_stats(data, field.name)
@@ -302,6 +306,7 @@ class DataBlobWriter(DataWriter):
             delete_row_count=0,
             file_source=0,
             value_stats_cols=self.normal_column_names,
+            external_path=external_path,
             file_path=file_path,
             write_cols=self.write_cols)
 
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
index fdc74d6f64..56487094ed 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -23,6 +23,7 @@ from datetime import datetime
 from typing import Dict, List, Optional, Tuple
 
 from pypaimon.common.core_options import CoreOptions
+from pypaimon.common.external_path_provider import ExternalPathProvider
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.manifest.schema.simple_stats import SimpleStats
 from pypaimon.schema.data_types import PyarrowFieldParser
@@ -46,18 +47,28 @@ class DataWriter(ABC):
         self.trimmed_primary_keys = self.table.trimmed_primary_keys
 
         options = self.table.options
-        self.target_file_size = CoreOptions.get_target_file_size(options, 
self.table.is_primary_key_table)
-        self.file_format = options.get(CoreOptions.FILE_FORMAT,
-                                       CoreOptions.FILE_FORMAT_PARQUET
-                                       if self.bucket != 
BucketMode.POSTPONE_BUCKET.value
-                                       else CoreOptions.FILE_FORMAT_AVRO)
-        self.compression = options.get(CoreOptions.FILE_COMPRESSION, "zstd")
+        self.target_file_size = CoreOptions.target_file_size(options, 
self.table.is_primary_key_table)
+        # POSTPONE_BUCKET uses AVRO format, otherwise default to PARQUET
+        default_format = (
+            CoreOptions.FILE_FORMAT_AVRO
+            if self.bucket == BucketMode.POSTPONE_BUCKET.value
+            else CoreOptions.FILE_FORMAT_PARQUET
+        )
+        self.file_format = CoreOptions.file_format(options, 
default=default_format)
+        self.compression = CoreOptions.file_compression(options)
         self.sequence_generator = SequenceGenerator(max_seq_number)
 
         self.pending_data: Optional[pa.Table] = None
         self.committed_files: List[DataFileMeta] = []
         self.write_cols = write_cols
-        self.blob_as_descriptor = CoreOptions.get_blob_as_descriptor(options)
+        self.blob_as_descriptor = CoreOptions.blob_as_descriptor(options)
+
+        self.path_factory = self.table.path_factory()
+        self.external_path_provider: Optional[ExternalPathProvider] = 
self.path_factory.create_external_path_provider(
+            self.partition, self.bucket
+        )
+        # Store the current generated external path to preserve scheme in 
metadata
+        self._current_external_path: Optional[str] = None
 
     def write(self, data: pa.RecordBatch):
         try:
@@ -105,13 +116,17 @@ class DataWriter(ABC):
         # Delete any files that were written
         for file_meta in self.committed_files:
             try:
-                if file_meta.file_path:
-                    self.file_io.delete_quietly(file_meta.file_path)
+                # Use external_path if available (contains full URL scheme), 
otherwise use file_path
+                path_to_delete = file_meta.external_path if 
file_meta.external_path else file_meta.file_path
+                if path_to_delete:
+                    path_str = str(path_to_delete)
+                    self.file_io.delete_quietly(path_str)
             except Exception as e:
                 # Log but don't raise - we want to clean up as much as possible
                 import logging
                 logger = logging.getLogger(__name__)
-                logger.warning(f"Failed to delete file {file_meta.file_path} 
during abort: {e}")
+                path_to_delete = file_meta.external_path if 
file_meta.external_path else file_meta.file_path
+                logger.warning(f"Failed to delete file {path_to_delete} during 
abort: {e}")
 
         # Clean up resources
         self.pending_data = None
@@ -145,6 +160,14 @@ class DataWriter(ABC):
             return
         file_name = f"data-{uuid.uuid4()}-0.{self.file_format}"
         file_path = self._generate_file_path(file_name)
+
+        is_external_path = self.external_path_provider is not None
+        if is_external_path:
+            # Use the stored external path from _generate_file_path to 
preserve scheme
+            external_path_str = self._current_external_path if 
self._current_external_path else None
+        else:
+            external_path_str = None
+
         if self.file_format == CoreOptions.FILE_FORMAT_PARQUET:
             self.file_io.write_parquet(file_path, data, 
compression=self.compression)
         elif self.file_format == CoreOptions.FILE_FORMAT_ORC:
@@ -211,7 +234,7 @@ class DataWriter(ABC):
             delete_row_count=0,
             file_source=0,
             value_stats_cols=None,  # None means all columns in the data have 
statistics
-            external_path=None,
+            external_path=external_path_str,  # Set external path if using 
external paths
             first_row_id=None,
             write_cols=self.write_cols,
             # None means all columns in the table have been written
@@ -219,19 +242,13 @@ class DataWriter(ABC):
         ))
 
     def _generate_file_path(self, file_name: str) -> str:
-        path_builder = str(self.table.table_path)
-
-        for i, field_name in enumerate(self.table.partition_keys):
-            path_builder = 
f"{path_builder.rstrip('/')}/{field_name}={str(self.partition[i])}"
-
-        if self.bucket == BucketMode.POSTPONE_BUCKET.value:
-            bucket_name = "postpone"
-        else:
-            bucket_name = str(self.bucket)
-
-        path_builder = 
f"{path_builder.rstrip('/')}/bucket-{bucket_name}/{file_name}"
+        if self.external_path_provider:
+            external_path = 
self.external_path_provider.get_next_external_data_path(file_name)
+            self._current_external_path = external_path
+            return external_path
 
-        return path_builder
+        bucket_path = self.path_factory.bucket_path(self.partition, 
self.bucket)
+        return f"{bucket_path.rstrip('/')}/{file_name}"
 
     @staticmethod
     def _find_optimal_split_point(data: pa.RecordBatch, target_size: int) -> 
int:

Reply via email to