This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit fe68d21324de6bb9ef018dbfa311c21a99a5afda
Author: jerry <[email protected]>
AuthorDate: Fri Oct 17 10:11:40 2025 +0800

    [core]Python: fix blob write when blob_as_descriptor is true (#6404)
---
 paimon-python/pypaimon/common/config.py            |   1 +
 paimon-python/pypaimon/common/core_options.py      |   1 +
 paimon-python/pypaimon/common/file_io.py           |  13 +-
 paimon-python/pypaimon/common/uri_reader.py        | 171 ++++++++++++++++
 .../pypaimon/read/reader/format_blob_reader.py     |  48 ++---
 paimon-python/pypaimon/read/split_read.py          |   3 +-
 paimon-python/pypaimon/table/row/blob.py           |  61 +++---
 paimon-python/pypaimon/tests/blob_test.py          | 153 ++++++++++----
 .../pypaimon/tests/uri_reader_factory_test.py      | 227 +++++++++++++++++++++
 paimon-python/pypaimon/write/writer/data_writer.py |   3 +-
 10 files changed, 582 insertions(+), 99 deletions(-)

diff --git a/paimon-python/pypaimon/common/config.py 
b/paimon-python/pypaimon/common/config.py
index 0478c207bb..81b05b8f84 100644
--- a/paimon-python/pypaimon/common/config.py
+++ b/paimon-python/pypaimon/common/config.py
@@ -47,6 +47,7 @@ class CatalogOptions:
     DLF_TOKEN_ECS_METADATA_URL = "dlf.token-ecs-metadata-url"
     PREFIX = 'prefix'
     HTTP_USER_AGENT_HEADER = 'header.HTTP_USER_AGENT'
+    BLOB_FILE_IO_DEFAULT_CACHE_SIZE = 2**31 - 1
 
 
 class PVFSOptions:
diff --git a/paimon-python/pypaimon/common/core_options.py 
b/paimon-python/pypaimon/common/core_options.py
index 82b788438e..8ab26fe062 100644
--- a/paimon-python/pypaimon/common/core_options.py
+++ b/paimon-python/pypaimon/common/core_options.py
@@ -43,6 +43,7 @@ class CoreOptions(str, Enum):
     FILE_COMPRESSION_PER_LEVEL = "file.compression.per.level"
     FILE_FORMAT_PER_LEVEL = "file.format.per.level"
     FILE_BLOCK_SIZE = "file.block-size"
+    FILE_BLOB_AS_DESCRIPTOR = "blob-as-descriptor"
     # Scan options
     SCAN_FALLBACK_BRANCH = "scan.fallback-branch"
     INCREMENTAL_BETWEEN_TIMESTAMP = "incremental-between-timestamp"
diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/common/file_io.py
index f6371d2d80..eb32ebb755 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -15,7 +15,6 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-
 import logging
 import os
 import subprocess
@@ -28,8 +27,9 @@ from packaging.version import parse
 from pyarrow._fs import FileSystem
 
 from pypaimon.common.config import OssOptions, S3Options
+from pypaimon.common.uri_reader import UriReaderFactory
 from pypaimon.schema.data_types import DataField, AtomicType, 
PyarrowFieldParser
-from pypaimon.table.row.blob import BlobData
+from pypaimon.table.row.blob import BlobData, BlobDescriptor, Blob
 from pypaimon.table.row.generic_row import GenericRow
 from pypaimon.table.row.row_kind import RowKind
 from pypaimon.write.blob_format_writer import BlobFormatWriter
@@ -40,6 +40,7 @@ class FileIO:
         self.properties = catalog_options
         self.logger = logging.getLogger(__name__)
         scheme, netloc, _ = self.parse_location(path)
+        self.uri_reader_factory = UriReaderFactory(catalog_options)
         if scheme in {"oss"}:
             self.filesystem = self._initialize_oss_fs(path)
         elif scheme in {"s3", "s3a", "s3n"}:
@@ -370,7 +371,7 @@ class FileIO:
         with self.new_output_stream(path) as output_stream:
             fastavro.writer(output_stream, avro_schema, records, **kwargs)
 
-    def write_blob(self, path: Path, data: pyarrow.Table, **kwargs):
+    def write_blob(self, path: Path, data: pyarrow.Table, blob_as_descriptor: 
bool, **kwargs):
         try:
             # Validate input constraints
             if data.num_columns != 1:
@@ -399,7 +400,11 @@ class FileIO:
                     col_data = records_dict[field_name][i]
                     # Convert to appropriate type based on field type
                     if hasattr(fields[0].type, 'type') and fields[0].type.type 
== "BLOB":
-                        if isinstance(col_data, bytes):
+                        if blob_as_descriptor:
+                            blob_descriptor = 
BlobDescriptor.deserialize(col_data)
+                            uri_reader = 
self.uri_reader_factory.create(blob_descriptor.uri)
+                            blob_data = Blob.from_descriptor(uri_reader, 
blob_descriptor)
+                        elif isinstance(col_data, bytes):
                             blob_data = BlobData(col_data)
                         else:
                             # Convert to bytes if needed
diff --git a/paimon-python/pypaimon/common/uri_reader.py 
b/paimon-python/pypaimon/common/uri_reader.py
new file mode 100644
index 0000000000..823020caaf
--- /dev/null
+++ b/paimon-python/pypaimon/common/uri_reader.py
@@ -0,0 +1,171 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import io
+from abc import ABC, abstractmethod
+from pathlib import Path
+from typing import Any, Optional
+from urllib.parse import urlparse, ParseResult
+
+import requests
+from cachetools import LRUCache
+from readerwriterlock import rwlock
+
+from pypaimon.common.config import CatalogOptions
+
+
+class UriReader(ABC):
+    @classmethod
+    def from_http(cls) -> 'HttpUriReader':
+        return HttpUriReader()
+
+    @classmethod
+    def from_file(cls, file_io: Any) -> 'FileUriReader':
+        return FileUriReader(file_io)
+
+    @classmethod
+    def get_file_path(cls, uri: str):
+        parsed_uri = urlparse(uri)
+        if parsed_uri.scheme == 'file':
+            path = Path(parsed_uri.path)
+        elif parsed_uri.scheme and parsed_uri.scheme != '':
+            path = Path(parsed_uri.netloc + parsed_uri.path)
+        else:
+            path = Path(uri)
+        return path
+
+    @abstractmethod
+    def new_input_stream(self, uri: str):
+        pass
+
+
+class FileUriReader(UriReader):
+
+    def __init__(self, file_io: Any):
+        self._file_io = file_io
+
+    def new_input_stream(self, uri: str):
+        try:
+            path = self.get_file_path(uri)
+            return self._file_io.new_input_stream(path)
+        except Exception as e:
+            raise IOError(f"Failed to read file {uri}: {e}")
+
+
+class HttpUriReader(UriReader):
+
+    def new_input_stream(self, uri: str):
+        try:
+            response = requests.get(uri)
+            if response.status_code != 200:
+                raise RuntimeError(f"Failed to read HTTP URI {uri} status code 
{response.status_code}")
+            return io.BytesIO(response.content)
+        except Exception as e:
+            raise RuntimeError(f"Failed to read HTTP URI {uri}: {e}")
+
+
+class UriKey:
+
+    def __init__(self, scheme: Optional[str], authority: Optional[str]) -> 
None:
+        self._scheme = scheme
+        self._authority = authority
+        self._hash = hash((self._scheme, self._authority))
+
+    @property
+    def scheme(self) -> Optional[str]:
+        return self._scheme
+
+    @property
+    def authority(self) -> Optional[str]:
+        return self._authority
+
+    def __eq__(self, other: object) -> bool:
+        if not isinstance(other, UriKey):
+            return False
+
+        return (self._scheme == other._scheme and
+                self._authority == other._authority)
+
+    def __hash__(self) -> int:
+        return self._hash
+
+    def __repr__(self) -> str:
+        return f"UriKey(scheme='{self._scheme}', 
authority='{self._authority}')"
+
+
+class UriReaderFactory:
+
+    def __init__(self, catalog_options: dict) -> None:
+        self.catalog_options = catalog_options
+        self._readers = 
LRUCache(CatalogOptions.BLOB_FILE_IO_DEFAULT_CACHE_SIZE)
+        self._readers_lock = rwlock.RWLockFair()
+
+    def create(self, input_uri: str) -> UriReader:
+        try:
+            parsed_uri = urlparse(input_uri)
+        except Exception as e:
+            raise ValueError(f"Invalid URI: {input_uri}") from e
+
+        key = UriKey(parsed_uri.scheme, parsed_uri.netloc or None)
+        rlock = self._readers_lock.gen_rlock()
+        rlock.acquire()
+        try:
+            reader = self._readers.get(key)
+            if reader is not None:
+                return reader
+        finally:
+            rlock.release()
+        wlock = self._readers_lock.gen_wlock()
+        wlock.acquire()
+        try:
+            reader = self._readers.get(key)
+            if reader is not None:
+                return reader
+            reader = self._new_reader(key, parsed_uri)
+            self._readers[key] = reader
+            return reader
+        finally:
+            wlock.release()
+
+    def _new_reader(self, key: UriKey, parsed_uri: ParseResult) -> UriReader:
+        scheme = key.scheme
+        if scheme in ('http', 'https'):
+            return UriReader.from_http()
+        try:
+            # Import FileIO here to avoid circular imports
+            from pypaimon.common.file_io import FileIO
+            uri_string = parsed_uri.geturl()
+            file_io = FileIO(uri_string, self.catalog_options)
+            return UriReader.from_file(file_io)
+        except Exception as e:
+            raise RuntimeError(f"Failed to create reader for URI 
{parsed_uri.geturl()}") from e
+
+    def clear_cache(self) -> None:
+        self._readers.clear()
+
+    def get_cache_size(self) -> int:
+        return len(self._readers)
+
+    def __getstate__(self):
+        state = self.__dict__.copy()
+        del state['_readers_lock']
+        return state
+
+    def __setstate__(self, state):
+        self.__dict__.update(state)
+        self._readers_lock = rwlock.RWLockFair()
diff --git a/paimon-python/pypaimon/read/reader/format_blob_reader.py 
b/paimon-python/pypaimon/read/reader/format_blob_reader.py
index 709a0c27a3..e6846dc568 100644
--- a/paimon-python/pypaimon/read/reader/format_blob_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_blob_reader.py
@@ -26,21 +26,23 @@ from pyarrow import RecordBatch
 from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor
 from pypaimon.common.file_io import FileIO
 from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
-from pypaimon.schema.data_types import DataField, PyarrowFieldParser
-from pypaimon.table.row.blob import Blob, BlobDescriptor, BlobRef
+from pypaimon.schema.data_types import DataField, PyarrowFieldParser, 
AtomicType
+from pypaimon.table.row.blob import Blob
 from pypaimon.table.row.generic_row import GenericRow
+from pypaimon.table.row.row_kind import RowKind
 
 
 class FormatBlobReader(RecordBatchReader):
 
     def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
-                 full_fields: List[DataField], push_down_predicate: Any):
+                 full_fields: List[DataField], push_down_predicate: Any, 
blob_as_descriptor: bool):
         self._file_io = file_io
         self._file_path = file_path
         self._push_down_predicate = push_down_predicate
+        self._blob_as_descriptor = blob_as_descriptor
 
         # Get file size
-        self._file_size = file_io.get_file_size(file_path)
+        self._file_size = file_io.get_file_size(Path(file_path))
 
         # Initialize the low-level blob format reader
         self.file_path = file_path
@@ -66,7 +68,10 @@ class FormatBlobReader(RecordBatchReader):
             if self.returned:
                 return None
             self.returned = True
-            batch_iterator = BlobRecordIterator(self.file_path, 
self.blob_lengths, self.blob_offsets, self._fields[0])
+            batch_iterator = BlobRecordIterator(
+                self._file_io, self.file_path, self.blob_lengths,
+                self.blob_offsets, self._fields[0]
+            )
             self._blob_iterator = iter(batch_iterator)
 
         # Collect records for this batch
@@ -75,22 +80,16 @@ class FormatBlobReader(RecordBatchReader):
 
         try:
             while True:
-                # Get next blob record
                 blob_row = next(self._blob_iterator)
-                # Check if first read returns None, stop immediately
                 if blob_row is None:
                     break
-
-                # Extract blob data from the row
-                blob = blob_row.values[0]  # Blob files have single blob field
-
-                # Convert blob to appropriate format for each requested field
+                blob = blob_row.values[0]
                 for field_name in self._fields:
-                    # For blob files, all fields should contain blob data
-                    if isinstance(blob, Blob):
-                        blob_data = blob.to_data()
+                    blob_descriptor = blob.to_descriptor()
+                    if self._blob_as_descriptor:
+                        blob_data = blob_descriptor.serialize()
                     else:
-                        blob_data = bytes(blob) if blob is not None else None
+                        blob_data = blob.to_data()
                     pydict_data[field_name].append(blob_data)
 
                 records_in_batch += 1
@@ -162,7 +161,9 @@ class BlobRecordIterator:
     MAGIC_NUMBER_SIZE = 4
     METADATA_OVERHEAD = 16
 
-    def __init__(self, file_path: str, blob_lengths: List[int], blob_offsets: 
List[int], field_name: str):
+    def __init__(self, file_io: FileIO, file_path: str, blob_lengths: 
List[int],
+                 blob_offsets: List[int], field_name: str):
+        self.file_io = file_io
         self.file_path = file_path
         self.field_name = field_name
         self.blob_lengths = blob_lengths
@@ -175,25 +176,14 @@ class BlobRecordIterator:
     def __next__(self) -> GenericRow:
         if self.current_position >= len(self.blob_lengths):
             raise StopIteration
-
         # Create blob reference for the current blob
         # Skip magic number (4 bytes) and exclude length (8 bytes) + CRC (4 
bytes) = 12 bytes
         blob_offset = self.blob_offsets[self.current_position] + 
self.MAGIC_NUMBER_SIZE  # Skip magic number
         blob_length = self.blob_lengths[self.current_position] - 
self.METADATA_OVERHEAD
-
-        # Create BlobDescriptor for this blob
-        descriptor = BlobDescriptor(self.file_path, blob_offset, blob_length)
-        blob = BlobRef(descriptor)
-
+        blob = Blob.from_file(self.file_io, self.file_path, blob_offset, 
blob_length)
         self.current_position += 1
-
-        # Return as GenericRow with single blob field
-        from pypaimon.schema.data_types import DataField, AtomicType
-        from pypaimon.table.row.row_kind import RowKind
-
         fields = [DataField(0, self.field_name, AtomicType("BLOB"))]
         return GenericRow([blob], fields, RowKind.INSERT)
 
     def returned_position(self) -> int:
-        """Get current position in the iterator."""
         return self.current_position
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index 81ebdd86f8..372679ed63 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -82,8 +82,9 @@ class SplitRead(ABC):
             format_reader = FormatAvroReader(self.table.file_io, file_path, 
self._get_final_read_data_fields(),
                                              self.read_fields, 
self.push_down_predicate)
         elif file_format == CoreOptions.FILE_FORMAT_BLOB:
+            blob_as_descriptor = 
self.table.options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR, False)
             format_reader = FormatBlobReader(self.table.file_io, file_path, 
self._get_final_read_data_fields(),
-                                             self.read_fields, 
self.push_down_predicate)
+                                             self.read_fields, 
self.push_down_predicate, blob_as_descriptor)
         elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format == 
CoreOptions.FILE_FORMAT_ORC:
             format_reader = FormatPyArrowReader(self.table.file_io, 
file_format, file_path,
                                                 
self._get_final_read_data_fields(), self.push_down_predicate)
diff --git a/paimon-python/pypaimon/table/row/blob.py 
b/paimon-python/pypaimon/table/row/blob.py
index b9d8cc0e38..b92ddb3e82 100644
--- a/paimon-python/pypaimon/table/row/blob.py
+++ b/paimon-python/pypaimon/table/row/blob.py
@@ -19,6 +19,9 @@
 import io
 from abc import ABC, abstractmethod
 from typing import Optional, Union
+from urllib.parse import urlparse
+
+from pypaimon.common.uri_reader import UriReader, FileUriReader
 
 
 class BlobDescriptor:
@@ -144,21 +147,33 @@ class Blob(ABC):
 
     @staticmethod
     def from_local(file: str) -> 'Blob':
-        return Blob.from_file(file)
+        # Import FileIO locally to avoid circular imports
+        from pypaimon.common.file_io import FileIO
+
+        parsed = urlparse(file)
+        if parsed.scheme == "file":
+            file_uri = file
+        else:
+            file_uri = f"file://{file}"
+        file_io = FileIO(file_uri, {})
+        uri_reader = FileUriReader(file_io)
+        descriptor = BlobDescriptor(file, 0, -1)
+        return Blob.from_descriptor(uri_reader, descriptor)
 
     @staticmethod
     def from_http(uri: str) -> 'Blob':
         descriptor = BlobDescriptor(uri, 0, -1)
-        return BlobRef(descriptor)
+        return BlobRef(UriReader.from_http(), descriptor)
 
     @staticmethod
-    def from_file(file: str, offset: int = 0, length: int = -1) -> 'Blob':
-        descriptor = BlobDescriptor(file, offset, length)
-        return BlobRef(descriptor)
+    def from_file(file_io, file_path: str, offset: int, length: int) -> 'Blob':
+        uri_reader = FileUriReader(file_io)
+        descriptor = BlobDescriptor(file_path, offset, length)
+        return Blob.from_descriptor(uri_reader, descriptor)
 
     @staticmethod
-    def from_descriptor(descriptor: BlobDescriptor) -> 'Blob':
-        return BlobRef(descriptor)
+    def from_descriptor(uri_reader: UriReader, descriptor: BlobDescriptor) -> 
'Blob':
+        return BlobRef(uri_reader, descriptor)
 
 
 class BlobData(Blob):
@@ -199,7 +214,8 @@ class BlobData(Blob):
 
 class BlobRef(Blob):
 
-    def __init__(self, descriptor: BlobDescriptor):
+    def __init__(self, uri_reader: UriReader, descriptor: BlobDescriptor):
+        self._uri_reader = uri_reader
         self._descriptor = descriptor
 
     def to_data(self) -> bytes:
@@ -216,27 +232,14 @@ class BlobRef(Blob):
         uri = self._descriptor.uri
         offset = self._descriptor.offset
         length = self._descriptor.length
-
-        if uri.startswith('http://') or uri.startswith('https://'):
-            raise NotImplementedError("HTTP blob reading not implemented yet")
-        elif uri.startswith('file://') or '://' not in uri:
-            file_path = uri.replace('file://', '') if 
uri.startswith('file://') else uri
-
-            try:
-                with open(file_path, 'rb') as f:
-                    if offset > 0:
-                        f.seek(offset)
-
-                    if length == -1:
-                        data = f.read()
-                    else:
-                        data = f.read(length)
-
-                    return io.BytesIO(data)
-            except Exception as e:
-                raise IOError(f"Failed to read file {file_path}: {e}")
-        else:
-            raise ValueError(f"Unsupported URI scheme: {uri}")
+        with self._uri_reader.new_input_stream(uri) as input_stream:
+            if offset > 0:
+                input_stream.seek(offset)
+            if length == -1:
+                data = input_stream.read()
+            else:
+                data = input_stream.read(length)
+            return io.BytesIO(data)
 
     def __eq__(self, other) -> bool:
         if not isinstance(other, BlobRef):
diff --git a/paimon-python/pypaimon/tests/blob_test.py 
b/paimon-python/pypaimon/tests/blob_test.py
index b8ed65e5bb..b66c1dec9a 100644
--- a/paimon-python/pypaimon/tests/blob_test.py
+++ b/paimon-python/pypaimon/tests/blob_test.py
@@ -92,7 +92,8 @@ class BlobTest(unittest.TestCase):
 
     def test_from_file_with_offset_and_length(self):
         """Test Blob.from_file() method with offset and length."""
-        blob = Blob.from_file(self.file, 0, 4)
+        file_io = FileIO(self.file if self.file.startswith('file://') else 
f"file://{self.file}", {})
+        blob = Blob.from_file(file_io, self.file, 0, 4)
 
         # Verify it returns a BlobRef instance
         self.assertIsInstance(blob, BlobRef)
@@ -100,16 +101,6 @@ class BlobTest(unittest.TestCase):
         # Verify the data matches (first 4 bytes: "test")
         self.assertEqual(blob.to_data(), b"test")
 
-    def test_from_file_full(self):
-        """Test Blob.from_file() method without offset and length."""
-        blob = Blob.from_file(self.file)
-
-        # Verify it returns a BlobRef instance
-        self.assertIsInstance(blob, BlobRef)
-
-        # Verify the data matches
-        self.assertEqual(blob.to_data(), b"test data")
-
     def test_from_http(self):
         """Test Blob.from_http() method."""
         uri = "http://example.com/file.txt";
@@ -197,7 +188,8 @@ class BlobTest(unittest.TestCase):
         self.assertIsInstance(blob_ref, Blob)
 
         # from_file should return BlobRef
-        blob_file = Blob.from_file(self.file)
+        file_io = FileIO(self.file if self.file.startswith('file://') else 
f"file://{self.file}", {})
+        blob_file = Blob.from_file(file_io, self.file, 0, 
os.path.getsize(self.file))
         self.assertIsInstance(blob_file, BlobRef)
         self.assertIsInstance(blob_file, Blob)
 
@@ -505,7 +497,7 @@ class BlobTest(unittest.TestCase):
         descriptor = BlobDescriptor(self.file, 0, -1)
 
         # Create BlobRef from descriptor
-        blob_ref = BlobRef(descriptor)
+        blob_ref = Blob.from_local(self.file)
 
         # Verify descriptor is preserved
         returned_descriptor = blob_ref.to_descriptor()
@@ -554,11 +546,6 @@ class BlobEndToEndTest(unittest.TestCase):
             pass
 
     def test_blob_end_to_end(self):
-        from pypaimon.schema.data_types import DataField, AtomicType
-        from pypaimon.table.row.blob import BlobData
-        from pypaimon.common.file_io import FileIO
-        from pathlib import Path
-
         # Set up file I/O
         file_io = FileIO(self.temp_dir, {})
 
@@ -576,7 +563,7 @@ class BlobEndToEndTest(unittest.TestCase):
         schema = pa.schema([pa.field(blob_field_name, pa.large_binary())])
         table = pa.table([blob_data], schema=schema)
         blob_files[blob_field_name] = Path(self.temp_dir) / (blob_field_name + 
".blob")
-        file_io.write_blob(blob_files[blob_field_name], table)
+        file_io.write_blob(blob_files[blob_field_name], table, False)
         self.assertTrue(file_io.exists(blob_files[blob_field_name]))
 
         # ========== Step 3: Read Data and Check Data ==========
@@ -587,7 +574,8 @@ class BlobEndToEndTest(unittest.TestCase):
                 file_path=str(file_path),
                 read_fields=[field_name],
                 full_fields=read_fields,
-                push_down_predicate=None
+                push_down_predicate=None,
+                blob_as_descriptor=False
             )
 
             # Read data
@@ -693,7 +681,7 @@ class BlobEndToEndTest(unittest.TestCase):
 
         # Should throw RuntimeError for multiple columns
         with self.assertRaises(RuntimeError) as context:
-            file_io.write_blob(multi_column_file, multi_column_table)
+            file_io.write_blob(multi_column_file, multi_column_table, False)
         self.assertIn("single column", str(context.exception))
 
         # Test that FileIO.write_blob rejects null values
@@ -704,7 +692,7 @@ class BlobEndToEndTest(unittest.TestCase):
 
         # Should throw RuntimeError for null values
         with self.assertRaises(RuntimeError) as context:
-            file_io.write_blob(null_file, null_table)
+            file_io.write_blob(null_file, null_table, False)
         self.assertIn("null values", str(context.exception))
 
         # ========== Test FormatBlobReader with complex type schema ==========
@@ -714,7 +702,7 @@ class BlobEndToEndTest(unittest.TestCase):
         valid_table = pa.table([valid_blob_data], schema=valid_schema)
 
         valid_blob_file = Path(self.temp_dir) / "valid_blob.blob"
-        file_io.write_blob(valid_blob_file, valid_table)
+        file_io.write_blob(valid_blob_file, valid_table, False)
 
         # Try to read with complex type field definition - this should fail
         # because FormatBlobReader tries to create PyArrow schema with complex 
types
@@ -728,7 +716,8 @@ class BlobEndToEndTest(unittest.TestCase):
             file_path=str(valid_blob_file),
             read_fields=["valid_blob"],
             full_fields=complex_read_fields,
-            push_down_predicate=None
+            push_down_predicate=None,
+            blob_as_descriptor=False
         )
 
         # Reading should fail because the schema expects complex type but data 
is atomic
@@ -761,7 +750,7 @@ class BlobEndToEndTest(unittest.TestCase):
         valid_table = pa.table([valid_blob_data], schema=valid_schema)
 
         header_test_file = Path(self.temp_dir) / "header_test.blob"
-        file_io.write_blob(header_test_file, valid_table)
+        file_io.write_blob(header_test_file, valid_table, False)
 
         # Read the file and corrupt the header (last 5 bytes: index_length + 
version)
         with open(header_test_file, 'rb') as f:
@@ -785,7 +774,8 @@ class BlobEndToEndTest(unittest.TestCase):
                 file_path=str(corrupted_header_file),
                 read_fields=["test_blob"],
                 full_fields=fields,
-                push_down_predicate=None
+                push_down_predicate=None,
+                blob_as_descriptor=False
             )
         self.assertIn("Unsupported blob file version", str(context.exception))
 
@@ -798,7 +788,7 @@ class BlobEndToEndTest(unittest.TestCase):
         large_table = pa.table([large_blob_data], schema=large_schema)
 
         full_blob_file = Path(self.temp_dir) / "full_blob.blob"
-        file_io.write_blob(full_blob_file, large_table)
+        file_io.write_blob(full_blob_file, large_table, False)
 
         # Read the full file and truncate it in the middle
         with open(full_blob_file, 'rb') as f:
@@ -819,7 +809,8 @@ class BlobEndToEndTest(unittest.TestCase):
                 file_path=str(truncated_file),
                 read_fields=["large_blob"],
                 full_fields=fields,
-                push_down_predicate=None
+                push_down_predicate=None,
+                blob_as_descriptor=False
             )
         # Should detect truncation/incomplete data (either invalid header or 
invalid version)
         self.assertTrue(
@@ -835,7 +826,7 @@ class BlobEndToEndTest(unittest.TestCase):
         zero_table = pa.table([zero_blob_data], schema=zero_schema)
 
         zero_blob_file = Path(self.temp_dir) / "zero_length.blob"
-        file_io.write_blob(zero_blob_file, zero_table)
+        file_io.write_blob(zero_blob_file, zero_table, False)
 
         # Verify file was created
         self.assertTrue(file_io.exists(zero_blob_file))
@@ -849,7 +840,8 @@ class BlobEndToEndTest(unittest.TestCase):
             file_path=str(zero_blob_file),
             read_fields=["zero_blob"],
             full_fields=zero_fields,
-            push_down_predicate=None
+            push_down_predicate=None,
+            blob_as_descriptor=False
         )
 
         zero_batch = zero_reader.read_arrow_batch()
@@ -876,7 +868,7 @@ class BlobEndToEndTest(unittest.TestCase):
         large_sim_table = pa.table([simulated_large_data], 
schema=large_sim_schema)
 
         large_sim_file = Path(self.temp_dir) / "large_simulation.blob"
-        file_io.write_blob(large_sim_file, large_sim_table)
+        file_io.write_blob(large_sim_file, large_sim_table, False)
 
         # Verify large file was written
         large_sim_size = file_io.get_file_size(large_sim_file)
@@ -889,7 +881,8 @@ class BlobEndToEndTest(unittest.TestCase):
             file_path=str(large_sim_file),
             read_fields=["large_sim_blob"],
             full_fields=large_sim_fields,
-            push_down_predicate=None
+            push_down_predicate=None,
+            blob_as_descriptor=False
         )
 
         large_sim_batch = large_sim_reader.read_arrow_batch()
@@ -947,7 +940,7 @@ class BlobEndToEndTest(unittest.TestCase):
 
         # Should reject multi-field table
         with self.assertRaises(RuntimeError) as context:
-            file_io.write_blob(multi_field_file, multi_field_table)
+            file_io.write_blob(multi_field_file, multi_field_table, False)
         self.assertIn("single column", str(context.exception))
 
         # Test that blob format rejects non-binary field types
@@ -958,7 +951,7 @@ class BlobEndToEndTest(unittest.TestCase):
 
         # Should reject non-binary field
         with self.assertRaises(RuntimeError) as context:
-            file_io.write_blob(non_binary_file, non_binary_table)
+            file_io.write_blob(non_binary_file, non_binary_table, False)
         # Should fail due to type conversion issues (non-binary field can't be 
converted to BLOB)
         self.assertTrue(
             "large_binary" in str(context.exception) or
@@ -975,9 +968,99 @@ class BlobEndToEndTest(unittest.TestCase):
 
         # Should reject null values
         with self.assertRaises(RuntimeError) as context:
-            file_io.write_blob(null_file, null_table)
+            file_io.write_blob(null_file, null_table, False)
         self.assertIn("null values", str(context.exception))
 
+    def test_blob_end_to_end_with_descriptor(self):
+        # Set up file I/O
+        file_io = FileIO(self.temp_dir, {})
+
+        # ========== Step 1: Write data to local file ==========
+        # Create test data and write it to a local file
+        test_content = b'This is test blob content stored in an external file 
for descriptor testing.'
+        # Write the test content to a local file
+        local_data_file = Path(self.temp_dir) / "external_blob"
+        with open(local_data_file, 'wb') as f:
+            f.write(test_content)
+        # Verify the file was created and has the correct content
+        self.assertTrue(local_data_file.exists())
+        with open(local_data_file, 'rb') as f:
+            written_content = f.read()
+        self.assertEqual(written_content, test_content)
+
+        # ========== Step 2: Use this file as blob descriptor ==========
+        # Create a BlobDescriptor pointing to the local file
+        blob_descriptor = BlobDescriptor(
+            uri=str(local_data_file),
+            offset=0,
+            length=len(test_content)
+        )
+        # Serialize the descriptor to bytes (this is what would be stored in 
the blob column)
+        descriptor_bytes = blob_descriptor.serialize()
+        self.assertIsInstance(descriptor_bytes, bytes)
+        self.assertGreater(len(descriptor_bytes), 0)
+
+        # Create PyArrow table with the serialized descriptor
+        blob_field_name = "blob_descriptor_field"
+        schema = pa.schema([pa.field(blob_field_name, pa.large_binary())])
+        table = pa.table([[descriptor_bytes]], schema=schema)
+
+        # Write the blob file with blob_as_descriptor=True
+        blob_file_path = Path(self.temp_dir) / "descriptor_blob.blob"
+        file_io.write_blob(blob_file_path, table, blob_as_descriptor=True)
+        # Verify the blob file was created
+        self.assertTrue(file_io.exists(blob_file_path))
+        file_size = file_io.get_file_size(blob_file_path)
+        self.assertGreater(file_size, 0)
+
+        # ========== Step 3: Read data and check ==========
+        # Define schema for reading
+        read_fields = [DataField(0, blob_field_name, AtomicType("BLOB"))]
+        reader = FormatBlobReader(
+            file_io=file_io,
+            file_path=str(blob_file_path),
+            read_fields=[blob_field_name],
+            full_fields=read_fields,
+            push_down_predicate=None,
+            blob_as_descriptor=True
+        )
+
+        # Read the data with blob_as_descriptor=True (should return a 
descriptor)
+        batch = reader.read_arrow_batch()
+        self.assertIsNotNone(batch)
+        self.assertEqual(batch.num_rows, 1)
+        self.assertEqual(batch.num_columns, 1)
+
+        read_blob_bytes = batch.column(0)[0].as_py()
+        self.assertIsInstance(read_blob_bytes, bytes)
+
+        # Deserialize the returned descriptor
+        returned_descriptor = BlobDescriptor.deserialize(read_blob_bytes)
+
+        # The returned descriptor should point to the blob file (simplified 
implementation)
+        # because the current implementation creates a descriptor pointing to 
the blob file location
+        self.assertEqual(returned_descriptor.uri, str(blob_file_path))
+        self.assertGreater(returned_descriptor.offset, 0)  # Should have some 
offset in the blob file
+
+        reader.close()
+
+        reader_content = FormatBlobReader(
+            file_io=file_io,
+            file_path=str(blob_file_path),
+            read_fields=[blob_field_name],
+            full_fields=read_fields,
+            push_down_predicate=None,
+            blob_as_descriptor=False
+        )
+        batch_content = reader_content.read_arrow_batch()
+        self.assertIsNotNone(batch_content)
+        self.assertEqual(batch_content.num_rows, 1)
+        read_content_bytes = batch_content.column(0)[0].as_py()
+        self.assertIsInstance(read_content_bytes, bytes)
+        # When blob_as_descriptor=False, we should get the actual file content
+        self.assertEqual(read_content_bytes, test_content)
+        reader_content.close()
+
 
 if __name__ == '__main__':
     unittest.main()
diff --git a/paimon-python/pypaimon/tests/uri_reader_factory_test.py 
b/paimon-python/pypaimon/tests/uri_reader_factory_test.py
new file mode 100644
index 0000000000..12088d746c
--- /dev/null
+++ b/paimon-python/pypaimon/tests/uri_reader_factory_test.py
@@ -0,0 +1,227 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import os
+import tempfile
+import unittest
+from pathlib import Path
+from pypaimon.common.file_io import FileIO
+from pypaimon.common.uri_reader import UriReaderFactory, HttpUriReader, 
FileUriReader, UriReader
+
+
+class MockFileIO:
+    """Mock FileIO for testing."""
+
+    def __init__(self, file_io: FileIO):
+        self._file_io = file_io
+
+    def get_file_size(self, path: str) -> int:
+        """Get file size."""
+        return self._file_io.get_file_size(Path(path))
+
+    def new_input_stream(self, path: Path):
+        """Create new input stream for reading."""
+        return self._file_io.new_input_stream(path)
+
+
+class UriReaderFactoryTest(unittest.TestCase):
+
+    def setUp(self):
+        self.factory = UriReaderFactory({})
+        self.temp_dir = tempfile.mkdtemp()
+        self.temp_file = os.path.join(self.temp_dir, "test.txt")
+        with open(self.temp_file, 'w') as f:
+            f.write("test content")
+
+    def tearDown(self):
+        """Clean up temporary files."""
+        try:
+            if os.path.exists(self.temp_file):
+                os.remove(self.temp_file)
+            os.rmdir(self.temp_dir)
+        except OSError:
+            pass  # Ignore cleanup errors
+
+    def test_create_http_uri_reader(self):
+        """Test creating HTTP URI reader."""
+        reader = self.factory.create("http://example.com/file.txt";)
+        self.assertIsInstance(reader, HttpUriReader)
+
+    def test_create_https_uri_reader(self):
+        """Test creating HTTPS URI reader."""
+        reader = self.factory.create("https://example.com/file.txt";)
+        self.assertIsInstance(reader, HttpUriReader)
+
+    def test_create_file_uri_reader(self):
+        """Test creating file URI reader."""
+        reader = self.factory.create(f"file://{self.temp_file}")
+        self.assertIsInstance(reader, FileUriReader)
+
+    def test_create_uri_reader_with_authority(self):
+        """Test creating URI readers with different authorities."""
+        reader1 = self.factory.create("http://my_bucket1/path/to/file.txt";)
+        reader2 = self.factory.create("http://my_bucket2/path/to/file.txt";)
+
+        # Different authorities should create different readers
+        self.assertNotEqual(reader1, reader2)
+        self.assertIsNot(reader1, reader2)
+
+    def test_cached_readers_with_same_scheme_and_authority(self):
+        """Test that readers with same scheme and authority are cached."""
+        reader1 = self.factory.create("http://my_bucket/path/to/file1.txt";)
+        reader2 = self.factory.create("http://my_bucket/path/to/file2.txt";)
+
+        # Same scheme and authority should return the same cached reader
+        self.assertIs(reader1, reader2)
+
+    def test_cached_readers_with_null_authority(self):
+        """Test that readers with null authority are cached."""
+        reader1 = self.factory.create(f"file://{self.temp_file}")
+        reader2 = 
self.factory.create(f"file://{self.temp_dir}/another_file.txt")
+
+        # Same scheme with null authority should return the same cached reader
+        self.assertIs(reader1, reader2)
+
+    def test_create_uri_reader_with_local_path(self):
+        """Test creating URI reader with local path (no scheme)."""
+        reader = self.factory.create(self.temp_file)
+        self.assertIsInstance(reader, FileUriReader)
+
+    def test_cache_size_tracking(self):
+        """Test that cache size is tracked correctly."""
+        initial_size = self.factory.get_cache_size()
+
+        # Create readers with different schemes/authorities
+        self.factory.create("http://example.com/file.txt";)
+        self.assertEqual(self.factory.get_cache_size(), initial_size + 1)
+
+        self.factory.create("https://example.com/file.txt";)
+        self.assertEqual(self.factory.get_cache_size(), initial_size + 2)
+
+        self.factory.create(f"file://{self.temp_file}")
+        self.assertEqual(self.factory.get_cache_size(), initial_size + 3)
+
+        # Same scheme/authority should not increase cache size
+        self.factory.create("http://example.com/another_file.txt";)
+        self.assertEqual(self.factory.get_cache_size(), initial_size + 3)
+
+    def test_uri_reader_functionality(self):
+        """Test that created URI readers actually work."""
+        # Test file URI reader
+        reader = self.factory.create(f"file://{self.temp_file}")
+        stream = reader.new_input_stream(self.temp_file)
+        content = stream.read().decode('utf-8')
+        self.assertEqual(content, "test content")
+        stream.close()
+
+    def test_invalid_uri_handling(self):
+        """Test handling of invalid URIs."""
+        # This should not raise an exception as urlparse is quite permissive
+        # But we can test edge cases
+        reader = self.factory.create("")
+        self.assertIsInstance(reader, (HttpUriReader, FileUriReader))
+
+    def test_uri_key_equality(self):
+        """Test UriKey equality and hashing behavior."""
+        from pypaimon.common.uri_reader import UriKey
+
+        key1 = UriKey("http", "example.com")
+        key2 = UriKey("http", "example.com")
+        key3 = UriKey("https", "example.com")
+        key4 = UriKey("http", "other.com")
+
+        # Same scheme and authority should be equal
+        self.assertEqual(key1, key2)
+        self.assertEqual(hash(key1), hash(key2))
+
+        # Different scheme or authority should not be equal
+        self.assertNotEqual(key1, key3)
+        self.assertNotEqual(key1, key4)
+
+        # Test with None values
+        key_none1 = UriKey(None, None)
+        key_none2 = UriKey(None, None)
+        self.assertEqual(key_none1, key_none2)
+
+    def test_uri_key_string_representation(self):
+        """Test UriKey string representation."""
+        from pypaimon.common.uri_reader import UriKey
+
+        key = UriKey("http", "example.com")
+        repr_str = repr(key)
+        self.assertIn("http", repr_str)
+        self.assertIn("example.com", repr_str)
+
+    def test_thread_safety_simulation(self):
+        """Test thread safety by creating multiple readers concurrently."""
+        import threading
+        import time
+
+        results = []
+
+        def create_reader():
+            reader = self.factory.create("http://example.com/file.txt";)
+            results.append(reader)
+            time.sleep(0.01)  # Small delay to increase chance of race 
conditions
+
+        # Create multiple threads
+        threads = []
+        for _ in range(10):
+            thread = threading.Thread(target=create_reader)
+            threads.append(thread)
+            thread.start()
+
+        # Wait for all threads to complete
+        for thread in threads:
+            thread.join()
+
+        # All results should be the same cached reader
+        first_reader = results[0]
+        for reader in results[1:]:
+            self.assertIs(reader, first_reader)
+
+    def test_different_file_schemes(self):
+        """Test different file-based schemes."""
+        # Test absolute path without scheme
+        reader1 = self.factory.create(os.path.abspath(self.temp_file))
+        self.assertIsInstance(reader1, FileUriReader)
+
+        # Test file:// scheme
+        reader2 = self.factory.create(f"file://{self.temp_file}")
+        self.assertIsInstance(reader2, FileUriReader)
+
+        # Different schemes (empty vs "file") should create different cache 
entries
+        self.assertIsNot(reader1, reader2)
+
+        # But same scheme should be cached
+        reader3 = 
self.factory.create(f"file://{self.temp_dir}/another_file.txt")
+        self.assertIs(reader2, reader3)  # Same file:// scheme
+
+    def test_get_file_path_with_file_uri(self):
+        file_uri = f"file://{self.temp_file}"
+        path = UriReader.get_file_path(file_uri)
+        self.assertEqual(str(path), self.temp_file)
+        oss_file_path = "bucket/tmp/another_file.txt"
+        file_uri = f"oss://{oss_file_path}"
+        path = UriReader.get_file_path(file_uri)
+        self.assertEqual(str(path), oss_file_path)
+        path = UriReader.get_file_path(self.temp_file)
+        self.assertEqual(str(path), self.temp_file)
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
index ad6e327c89..0a063fb2f4 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -58,6 +58,7 @@ class DataWriter(ABC):
         self.pending_data: Optional[pa.Table] = None
         self.committed_files: List[DataFileMeta] = []
         self.write_cols = write_cols
+        self.blob_as_descriptor = 
options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR, False)
 
     def write(self, data: pa.RecordBatch):
         processed_data = self._process_data(data)
@@ -115,7 +116,7 @@ class DataWriter(ABC):
         elif self.file_format == CoreOptions.FILE_FORMAT_AVRO:
             self.file_io.write_avro(file_path, data)
         elif self.file_format == CoreOptions.FILE_FORMAT_BLOB:
-            self.file_io.write_blob(file_path, data)
+            self.file_io.write_blob(file_path, data, self.blob_as_descriptor)
         else:
             raise ValueError(f"Unsupported file format: {self.file_format}")
 


Reply via email to