This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2369501fe8 [python] Support row-level Blob access (#7891)
2369501fe8 is described below

commit 2369501fe85465a3445f1f0a55bdf53b2e22b244
Author: XiaoHongbo <[email protected]>
AuthorDate: Mon May 25 18:04:33 2026 +0800

    [python] Support row-level Blob access (#7891)
    
    Add `InternalRow.get_blob(pos)` to pypaimon, aligned with Java
    `InternalRow.getBlob`. Reads on BLOB columns return a `Blob` object
    (`BlobData` for inline storage, `BlobRef` for descriptor storage with
    lazy URI resolution).
    
    Also adds `Blob.from_bytes(data, file_io)` factory that auto-dispatches
    based on the BLOBDESC magic header (mirrors Java `Blob.fromBytes`).
---
 docs/docs/append-table/blob.mdx                    |   2 +
 docs/docs/pypaimon/blob.md                         | 155 +++++++++++++
 .../read/reader/blob_descriptor_convert_reader.py  |   2 +
 .../pypaimon/read/reader/concat_batch_reader.py    |   4 +-
 .../pypaimon/read/reader/data_file_batch_reader.py |  23 +-
 .../read/reader/filter_record_batch_reader.py      |   2 +
 .../read/reader/iface/record_batch_reader.py       |  13 +-
 .../read/reader/outer_projection_record_reader.py  |  20 +-
 .../read/reader/row_range_filter_record_reader.py  |   2 +
 paimon-python/pypaimon/read/split_read.py          |  20 +-
 paimon-python/pypaimon/table/row/binary_row.py     |   9 +
 paimon-python/pypaimon/table/row/blob.py           |  20 ++
 paimon-python/pypaimon/table/row/generic_row.py    |   9 +
 paimon-python/pypaimon/table/row/internal_row.py   |   6 +-
 paimon-python/pypaimon/table/row/offset_row.py     |  16 +-
 paimon-python/pypaimon/table/row/projected_row.py  |   6 +
 paimon-python/pypaimon/tests/blob_table_test.py    | 255 +++++++++++++++++++++
 paimon-python/pypaimon/tests/blob_test.py          |  32 +++
 18 files changed, 562 insertions(+), 34 deletions(-)

diff --git a/docs/docs/append-table/blob.mdx b/docs/docs/append-table/blob.mdx
index 4f0ce604ac..7f0afc964e 100644
--- a/docs/docs/append-table/blob.mdx
+++ b/docs/docs/append-table/blob.mdx
@@ -730,6 +730,8 @@ For these configured fields:
 - writes can still start from raw BLOB input
 - the field is treated as descriptor-based for operations such as `MERGE INTO`
 
+For the Python equivalent, see [Blob Storage in pypaimon](../pypaimon/blob).
+
 ## Limitations
 
 1. **Append Table Only**: Blob type is designed for append-only tables. 
Primary key tables are not supported.
diff --git a/docs/docs/pypaimon/blob.md b/docs/docs/pypaimon/blob.md
new file mode 100644
index 0000000000..916d893571
--- /dev/null
+++ b/docs/docs/pypaimon/blob.md
@@ -0,0 +1,155 @@
+---
+title: "Blob Storage"
+sidebar_position: 7
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Blob Storage in pypaimon
+
+For Paimon's Blob storage concepts (storage modes, table options, SQL usage,
+Java API), see [Blob Storage](../append-table/blob).
+
+This page covers the Python API for reading and writing BLOB columns.
+
+## Creating a Table
+
+A BLOB column maps to PyArrow `large_binary()`. The table must enable
+`row-tracking.enabled` and `data-evolution.enabled`.
+
+```python
+from pypaimon import CatalogFactory, Schema
+import pyarrow as pa
+
+catalog = CatalogFactory.create({'warehouse': '/tmp/paimon-warehouse'})
+catalog.create_database('my_db', True)
+
+pa_schema = pa.schema([
+    ('id', pa.int32()),
+    ('name', pa.string()),
+    ('image', pa.large_binary()),
+])
+schema = Schema.from_pyarrow_schema(
+    pa_schema,
+    options={
+        'row-tracking.enabled': 'true',
+        'data-evolution.enabled': 'true',
+    },
+)
+catalog.create_table('my_db.image_table', schema, True)
+```
+
+## Writing Blob Data
+
+Pass raw bytes for the blob column in a PyArrow Table; pypaimon writes them
+to dedicated `.blob` files automatically.
+
+```python
+table = catalog.get_table('my_db.image_table')
+write_builder = table.new_batch_write_builder()
+writer = write_builder.new_write()
+
+with open('cat.jpg', 'rb') as f1, open('dog.jpg', 'rb') as f2:
+    writer.write_arrow(pa.Table.from_pydict({
+        'id': [1, 2],
+        'name': ['cat', 'dog'],
+        'image': [f1.read(), f2.read()],
+    }, schema=pa_schema))
+
+write_builder.new_commit().commit(writer.prepare_commit())
+writer.close()
+```
+
+## Reading Blob Data
+
+Use `row.get_blob(pos)` to access blob columns. It returns a `Blob` object
+regardless of how the blob is stored.
+
+```python
+read_builder = table.new_read_builder()
+splits = read_builder.new_scan().plan().splits()
+read = read_builder.new_read()
+
+for row in read.to_iterator(splits):
+    blob = row.get_blob(2)
+    if blob is None:
+        continue
+    data = blob.to_data()
+```
+
+## Streaming for Large Blobs
+
+`blob.new_input_stream()` returns a file-like object. Whether it is
+genuinely lazy depends on how the table is configured:
+
+- Default mode (`blob-as-descriptor=false`): the read path materialises
+  the payload before it reaches `row.get_blob(pos)`. `Blob` is a
+  `BlobData` and `new_input_stream()` wraps the in-memory bytes — not
+  true streaming. For large blobs this can still OOM.
+- Descriptor mode (`blob-as-descriptor=true`): the read path preserves
+  the descriptor. `Blob` is a `BlobRef` and `new_input_stream()` opens
+  the underlying file on demand.
+
+This mirrors Java's `BlobFormatReader` semantics.
+
+For genuine on-demand streaming of large blobs (videos, model weights),
+configure `blob-as-descriptor=true` before reading:
+
+```python
+schema = Schema.from_pyarrow_schema(
+    pa_schema,
+    options={
+        'row-tracking.enabled': 'true',
+        'data-evolution.enabled': 'true',
+        'blob-as-descriptor': 'true',
+    },
+)
+# Reads of this table return BlobRef whose new_input_stream() is lazy.
+for row in read.to_iterator(splits):
+    with row.get_blob(2).new_input_stream() as stream:
+        chunk = stream.read(1024)
+```
+
+## Lower-level: `Blob.from_bytes`
+
+When you already have raw or descriptor bytes (for example from a custom
+source) and want to wrap them as a `Blob`, use the factory:
+
+```python
+from pypaimon.table.row.blob import Blob
+
+# Inline bytes → BlobData (no file_io required)
+blob = Blob.from_bytes(b'hello')
+
+# Descriptor bytes → BlobRef (lazy; requires file_io to resolve the URI)
+file_io = table.file_io
+blob = Blob.from_bytes(descriptor_bytes, file_io)
+
+data = blob.to_data()
+```
+
+The factory auto-dispatches based on the bytes content (BLOBDESC magic
+header). This mirrors Java's `Blob.fromBytes(...)`.
+
+## See Also
+
+- [Blob Storage](../append-table/blob) — concept, storage modes,
+  SQL/Java API
+- [Data Evolution](./data-evolution) — required for
+  blob tables
diff --git 
a/paimon-python/pypaimon/read/reader/blob_descriptor_convert_reader.py 
b/paimon-python/pypaimon/read/reader/blob_descriptor_convert_reader.py
index 9ee1a33c38..35fe046a03 100644
--- a/paimon-python/pypaimon/read/reader/blob_descriptor_convert_reader.py
+++ b/paimon-python/pypaimon/read/reader/blob_descriptor_convert_reader.py
@@ -28,6 +28,8 @@ class BlobDescriptorConvertReader(RecordBatchReader):
         self._inner = inner
         self._table = table
         self._descriptor_fields = 
CoreOptions.blob_descriptor_fields(table.options)
+        self.file_io = inner.file_io
+        self.blob_field_indices = inner.blob_field_indices
 
     def read_arrow_batch(self) -> Optional[RecordBatch]:
         import pyarrow
diff --git a/paimon-python/pypaimon/read/reader/concat_batch_reader.py 
b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
index 564f015d24..722c976a51 100644
--- a/paimon-python/pypaimon/read/reader/concat_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
@@ -29,9 +29,11 @@ _MIN_BATCH_SIZE_TO_REFILL = 1024
 
 class ConcatBatchReader(RecordBatchReader):
 
-    def __init__(self, reader_suppliers: List[Callable]):
+    def __init__(self, reader_suppliers: List[Callable], file_io=None, 
blob_field_indices=None):
         self.queue: collections.deque[Callable] = 
collections.deque(reader_suppliers)
         self.current_reader: Optional[RecordBatchReader] = None
+        self.file_io = file_io
+        self.blob_field_indices = blob_field_indices
 
     def read_arrow_batch(self) -> Optional[RecordBatch]:
         while True:
diff --git a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py 
b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
index f71a8ed9eb..64da0cc840 100644
--- a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
@@ -25,7 +25,7 @@ from pypaimon.read.partition_info import PartitionInfo
 from pypaimon.read.reader.format_blob_reader import FormatBlobReader
 from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
 from pypaimon.schema.data_types import DataField, PyarrowFieldParser
-from pypaimon.table.row.blob import Blob, BlobDescriptor
+from pypaimon.table.row.blob import Blob
 from pypaimon.table.special_fields import SpecialFields
 
 
@@ -181,28 +181,9 @@ class DataFileBatchReader(RecordBatchReader):
         value = self._normalize_blob_cell(value)
         if value is None:
             return None
-
         if not isinstance(value, bytes):
             return value
-
-        descriptor = self._deserialize_descriptor_or_none(value)
-        if descriptor is None:
-            return value
-
-        try:
-            uri_reader = self.file_io.uri_reader_factory.create(descriptor.uri)
-            blob = Blob.from_descriptor(uri_reader, descriptor)
-            return blob.to_data()
-        except Exception as e:
-            raise RuntimeError(
-                "Failed to read blob bytes from descriptor URI while 
converting blob value."
-            ) from e
-
-    @staticmethod
-    def _deserialize_descriptor_or_none(raw: bytes):
-        if not BlobDescriptor.is_blob_descriptor(raw):
-            return None
-        return BlobDescriptor.deserialize(raw)
+        return Blob.from_bytes(value, self.file_io).to_data()
 
     def _assign_row_tracking(self, record_batch: RecordBatch) -> RecordBatch:
         """Assign row tracking meta fields (_ROW_ID and _SEQUENCE_NUMBER)."""
diff --git a/paimon-python/pypaimon/read/reader/filter_record_batch_reader.py 
b/paimon-python/pypaimon/read/reader/filter_record_batch_reader.py
index 518828e207..a6039c46a8 100644
--- a/paimon-python/pypaimon/read/reader/filter_record_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/filter_record_batch_reader.py
@@ -46,6 +46,8 @@ class FilterRecordBatchReader(RecordBatchReader):
         self.predicate = predicate
         self.field_names = field_names
         self.schema_fields = schema_fields
+        self.file_io = reader.file_io
+        self.blob_field_indices = reader.blob_field_indices
 
     def read_arrow_batch(self) -> Optional[pa.RecordBatch]:
         while True:
diff --git a/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py 
b/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py
index 3b29383c22..e869bd9c8d 100644
--- a/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py
@@ -34,6 +34,9 @@ class RecordBatchReader(RecordReader):
     The reader that reads the pyarrow batches of records.
     """
 
+    file_io = None
+    blob_field_indices = None
+
     @abstractmethod
     def read_arrow_batch(self) -> Optional[RecordBatch]:
         """
@@ -61,13 +64,17 @@ class RecordBatchReader(RecordReader):
         df = self.read_next_df()
         if df is None:
             return None
-        return InternalRowWrapperIterator(df.iter_rows(), df.width)
+        return InternalRowWrapperIterator(
+            df.iter_rows(), df.width, self.file_io, self.blob_field_indices)
 
 
 class InternalRowWrapperIterator(RecordIterator[InternalRow]):
-    def __init__(self, iterator: Iterator[tuple], width: int):
+    def __init__(self, iterator: Iterator[tuple], width: int,
+                 file_io=None, blob_field_indices=None):
         self._iterator = iterator
-        self._reused_row = OffsetRow(None, 0, width)
+        self._reused_row = OffsetRow(None, 0, width,
+                                     file_io=file_io,
+                                     blob_field_indices=blob_field_indices)
 
     def next(self) -> Optional[InternalRow]:
         row_tuple = next(self._iterator, None)
diff --git 
a/paimon-python/pypaimon/read/reader/outer_projection_record_reader.py 
b/paimon-python/pypaimon/read/reader/outer_projection_record_reader.py
index be17382a1b..23e67e0231 100644
--- a/paimon-python/pypaimon/read/reader/outer_projection_record_reader.py
+++ b/paimon-python/pypaimon/read/reader/outer_projection_record_reader.py
@@ -41,6 +41,8 @@ class OuterProjectionRecordReader(RecordReader[InternalRow]):
         inner: RecordReader[InternalRow],
         inner_top_names: List[str],
         name_paths: List[List[str]],
+        file_io=None,
+        blob_field_indices=None,
     ):
         if not name_paths:
             raise ValueError("name_paths must be non-empty")
@@ -58,12 +60,22 @@ class 
OuterProjectionRecordReader(RecordReader[InternalRow]):
             self._specs.append(_PathSpec(name_to_top_idx[top_name], 
list(path[1:])))
         self._inner = inner
         self._flat_arity = len(name_paths)
+        self._file_io = file_io
+        self._blob_field_indices = None
+        if blob_field_indices is not None:
+            self._blob_field_indices = {
+                proj_pos
+                for proj_pos, spec in enumerate(self._specs)
+                if not spec.sub_names and spec.top_idx in blob_field_indices
+            }
 
     def read_batch(self) -> Optional[RecordIterator[InternalRow]]:
         inner_batch = self._inner.read_batch()
         if inner_batch is None:
             return None
-        return _OuterProjectionIterator(inner_batch, self._specs, 
self._flat_arity)
+        return _OuterProjectionIterator(
+            inner_batch, self._specs, self._flat_arity, self._file_io,
+            self._blob_field_indices)
 
     def close(self) -> None:
         self._inner.close()
@@ -77,11 +89,15 @@ class _OuterProjectionIterator(RecordIterator[InternalRow]):
         inner: RecordIterator[InternalRow],
         specs: List["_PathSpec"],
         flat_arity: int,
+        file_io=None,
+        blob_field_indices=None,
     ):
         self._inner = inner
         self._specs = specs
         self._flat_arity = flat_arity
-        self._reused_row = OffsetRow(None, 0, flat_arity)
+        self._reused_row = OffsetRow(None, 0, flat_arity,
+                                     file_io=file_io,
+                                     blob_field_indices=blob_field_indices)
 
     def next(self) -> Optional[InternalRow]:
         inner_row = self._inner.next()
diff --git 
a/paimon-python/pypaimon/read/reader/row_range_filter_record_reader.py 
b/paimon-python/pypaimon/read/reader/row_range_filter_record_reader.py
index 8f747a7e71..5f97fb4ad5 100644
--- a/paimon-python/pypaimon/read/reader/row_range_filter_record_reader.py
+++ b/paimon-python/pypaimon/read/reader/row_range_filter_record_reader.py
@@ -32,6 +32,8 @@ class RowIdFilterRecordBatchReader(RecordBatchReader):
         self.reader = reader
         self.current_row_id = first_row_id
         self.row_id_ranges = row_id_ranges
+        self.file_io = reader.file_io
+        self.blob_field_indices = reader.blob_field_indices
 
     def read_arrow_batch(self) -> Optional[RecordBatch]:
         while True:
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index c90d406019..b6b81028c5 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -71,6 +71,11 @@ NULL_FIELD_INDEX = -1
 _COMPRESS_EXTENSIONS = frozenset(['gz', 'bz2', 'deflate', 'snappy', 'lz4', 
'zst'])
 
 
+def _blob_field_indices(fields: List[DataField]) -> set:
+    return {i for i, f in enumerate(fields)
+            if hasattr(f.type, 'type') and f.type.type == 'BLOB'}
+
+
 def format_identifier(file_name):
     idx = file_name.rfind('.')
     assert idx != -1, "%s is not a legal file name." % file_name
@@ -572,7 +577,9 @@ class RawFileSplitRead(SplitRead):
         if not data_readers:
             return EmptyFileRecordReader()
 
-        concat_reader = ConcatBatchReader(data_readers)
+        concat_reader = ConcatBatchReader(
+            data_readers, file_io=self.table.file_io,
+            blob_field_indices=_blob_field_indices(self.read_fields))
         # if the table is appendonly table, we don't need extra filter, all 
predicates has pushed down
         if self.table.is_primary_key_table and self.predicate_for_reader:
             return FilterRecordReader(concat_reader, self.predicate_for_reader)
@@ -673,9 +680,12 @@ class MergeFileSplitRead(SplitRead):
         if self.outer_extract_name_paths:
             from pypaimon.read.reader.outer_projection_record_reader import \
                 OuterProjectionRecordReader
-            inner_top_names = [f.name for f in 
self.read_fields[-self.value_arity:]]
+            inner_value_fields = self.read_fields[-self.value_arity:]
             reader = OuterProjectionRecordReader(
-                reader, inner_top_names, self.outer_extract_name_paths)
+                reader, [f.name for f in inner_value_fields],
+                self.outer_extract_name_paths,
+                file_io=self.table.file_io,
+                blob_field_indices=_blob_field_indices(inner_value_fields))
         if self.limit is not None:
             from pypaimon.read.reader.limited_record_reader import \
                 LimitedRecordReader
@@ -729,7 +739,9 @@ class DataEvolutionSplitRead(SplitRead):
                     lambda files=need_merge_files: 
self._create_union_reader(files)
                 )
 
-        merge_reader = ConcatBatchReader(suppliers)
+        merge_reader = ConcatBatchReader(
+            suppliers, file_io=self.table.file_io,
+            blob_field_indices=_blob_field_indices(self.read_fields))
         if self.predicate_for_reader is not None:
             reader = FilterRecordBatchReader(
                 merge_reader,
diff --git a/paimon-python/pypaimon/table/row/binary_row.py 
b/paimon-python/pypaimon/table/row/binary_row.py
index f6122bd368..8eaaf85257 100644
--- a/paimon-python/pypaimon/table/row/binary_row.py
+++ b/paimon-python/pypaimon/table/row/binary_row.py
@@ -51,6 +51,15 @@ class BinaryRow(InternalRow):
                                                             self.arity),
                                                         index, 
self.fields[index].type)
 
+    def get_blob(self, pos: int):
+        from pypaimon.table.row.blob import Blob
+        value = self.get_field(pos)
+        if value is None:
+            return None
+        if isinstance(value, Blob):
+            return value
+        raise TypeError(f"Cannot get Blob from {type(value)} at position 
{pos}")
+
     def get_row_kind(self) -> RowKind:
         return self.row_kind
 
diff --git a/paimon-python/pypaimon/table/row/blob.py 
b/paimon-python/pypaimon/table/row/blob.py
index b619b6a76a..19a932a9f9 100644
--- a/paimon-python/pypaimon/table/row/blob.py
+++ b/paimon-python/pypaimon/table/row/blob.py
@@ -276,6 +276,26 @@ class Blob(ABC):
     def from_descriptor(uri_reader: UriReader, descriptor: BlobDescriptor) -> 
'Blob':
         return BlobRef(uri_reader, descriptor)
 
+    @staticmethod
+    def from_bytes(data: Optional[bytes], file_io=None, allow_blob_data: bool 
= True) -> Optional['Blob']:
+        if data is None:
+            return None
+        if not isinstance(data, (bytes, bytearray)):
+            raise TypeError(f"Blob.from_bytes expects bytes, got {type(data)}")
+        data = bytes(data)
+        is_descriptor = BlobDescriptor.is_blob_descriptor(data)
+        if not allow_blob_data and not is_descriptor:
+            raise ValueError(
+                "Expected BlobDescriptor bytes, got raw bytes 
(allow_blob_data=False)"
+            )
+        if is_descriptor:
+            if file_io is None:
+                raise ValueError("file_io is required to resolve 
BlobDescriptor bytes")
+            descriptor = BlobDescriptor.deserialize(data)
+            uri_reader = file_io.uri_reader_factory.create(descriptor.uri)
+            return BlobRef(uri_reader, descriptor)
+        return BlobData(data)
+
 
 class BlobData(Blob):
 
diff --git a/paimon-python/pypaimon/table/row/generic_row.py 
b/paimon-python/pypaimon/table/row/generic_row.py
index 79f898d1f2..0c2458eff9 100644
--- a/paimon-python/pypaimon/table/row/generic_row.py
+++ b/paimon-python/pypaimon/table/row/generic_row.py
@@ -114,6 +114,15 @@ class GenericRow(InternalRow):
             raise IndexError(f"Position {pos} is out of bounds for row arity 
{len(self.values)}")
         return self.values[pos]
 
+    def get_blob(self, pos: int):
+        from pypaimon.table.row.blob import Blob
+        value = self.get_field(pos)
+        if value is None:
+            return None
+        if isinstance(value, Blob):
+            return value
+        raise TypeError(f"Cannot get Blob from {type(value)} at position 
{pos}")
+
     def get_row_kind(self) -> RowKind:
         return self.row_kind
 
diff --git a/paimon-python/pypaimon/table/row/internal_row.py 
b/paimon-python/pypaimon/table/row/internal_row.py
index ec89a74377..50d08811d7 100644
--- a/paimon-python/pypaimon/table/row/internal_row.py
+++ b/paimon-python/pypaimon/table/row/internal_row.py
@@ -16,7 +16,7 @@
 # under the License.
 
 from abc import ABC, abstractmethod
-from typing import Any
+from typing import Any, Optional
 
 from pypaimon.table.row.row_kind import RowKind
 
@@ -45,6 +45,10 @@ class InternalRow(ABC):
         The number does not include RowKind. It is kept separately.
         """
 
+    @abstractmethod
+    def get_blob(self, pos: int) -> Optional[Any]:
+        """Returns the Blob at the given position, or None if null."""
+
     def __str__(self) -> str:
         fields = []
         for pos in range(self.__len__()):
diff --git a/paimon-python/pypaimon/table/row/offset_row.py 
b/paimon-python/pypaimon/table/row/offset_row.py
index a9f02b1867..1705ae130b 100644
--- a/paimon-python/pypaimon/table/row/offset_row.py
+++ b/paimon-python/pypaimon/table/row/offset_row.py
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from typing import Optional
+from typing import FrozenSet, Iterable, Optional
 
 from pypaimon.table.row.internal_row import InternalRow, RowKind
 
@@ -23,11 +23,16 @@ from pypaimon.table.row.internal_row import InternalRow, 
RowKind
 class OffsetRow(InternalRow):
     """A InternalRow to wrap row with offset."""
 
-    def __init__(self, row_tuple: Optional[tuple], offset: int, arity: int):
+    def __init__(self, row_tuple: Optional[tuple], offset: int, arity: int,
+                 file_io=None, blob_field_indices: Optional[Iterable[int]] = 
None):
         self.row_tuple = row_tuple
         self.offset = offset
         self.arity = arity
         self.row_kind_byte: int = 1
+        self._file_io = file_io
+        self._blob_field_indices: FrozenSet[int] = (
+            frozenset(blob_field_indices) if blob_field_indices is not None 
else frozenset()
+        )
 
     def replace(self, row_tuple: tuple) -> 'OffsetRow':
         self.row_tuple = row_tuple
@@ -46,6 +51,13 @@ class OffsetRow(InternalRow):
             raise IndexError(f"Position {pos} is out of bounds for row arity 
{self.arity}")
         return self.row_tuple[self.offset + pos]
 
+    def get_blob(self, pos: int):
+        from pypaimon.table.row.blob import Blob
+
+        if pos not in self._blob_field_indices:
+            raise TypeError(f"Field at position {pos} is not a BLOB field")
+        return Blob.from_bytes(self.get_field(pos), self._file_io)
+
     def get_row_kind(self) -> RowKind:
         return RowKind(self.row_kind_byte)
 
diff --git a/paimon-python/pypaimon/table/row/projected_row.py 
b/paimon-python/pypaimon/table/row/projected_row.py
index 0af44d9502..5d7cbd375e 100644
--- a/paimon-python/pypaimon/table/row/projected_row.py
+++ b/paimon-python/pypaimon/table/row/projected_row.py
@@ -48,6 +48,12 @@ class ProjectedRow(InternalRow):
             return None
         return self.row.get_field(self.index_mapping[pos])
 
+    def get_blob(self, pos: int):
+        """Returns the Blob at the projected position; delegates to the inner 
row."""
+        if self.index_mapping[pos] < 0:
+            return None
+        return self.row.get_blob(self.index_mapping[pos])
+
     def get_row_kind(self) -> RowKind:
         """Returns the kind of change that this row describes in a 
changelog."""
         return self.row.get_row_kind()
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py 
b/paimon-python/pypaimon/tests/blob_table_test.py
index 95f89ff8de..c4e5a4d1bd 100755
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -3233,5 +3233,260 @@ class DataBlobWriterTest(unittest.TestCase):
         self.assertIn('Cannot rename BLOB column', str(ctx.exception))
 
 
+class GetBlobTest(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        cls.temp_dir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.temp_dir, 'warehouse')
+        cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+        cls.catalog.create_database('test_db', False)
+
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('picture', pa.large_binary()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+        })
+        cls.catalog.create_table('test_db.get_blob_test', schema, False)
+        cls.table = cls.catalog.get_table('test_db.get_blob_test')
+
+        data = pa.Table.from_pydict({
+            'id': [1, 2, 3],
+            'name': ['a', 'b', 'c'],
+            'picture': [b'img_data_1', b'img_data_2', b'img_data_3'],
+        }, schema=pa_schema)
+
+        write_builder = cls.table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(data)
+        commit_messages = writer.prepare_commit()
+        commit = write_builder.new_commit()
+        commit.commit(commit_messages)
+        writer.close()
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.temp_dir, ignore_errors=True)
+
+    def test_get_blob_access(self):
+        read_builder = self.table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        read = read_builder.new_read()
+
+        results = []
+        for row in read.to_iterator(splits):
+            blob = row.get_blob(2)
+            self.assertIsNotNone(blob)
+            results.append((row.get_field(0), blob.to_data()))
+
+        self.assertEqual(len(results), 3)
+        results.sort(key=lambda x: x[0])
+        self.assertEqual(results[0], (1, b'img_data_1'))
+        self.assertEqual(results[1], (2, b'img_data_2'))
+        self.assertEqual(results[2], (3, b'img_data_3'))
+
+    def test_get_blob_streaming(self):
+        read_builder = self.table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        read = read_builder.new_read()
+
+        results = []
+        for row in read.to_iterator(splits):
+            with row.get_blob(2).new_input_stream() as stream:
+                results.append((row.get_field(0), stream.read()))
+        self.assertEqual(len(results), 3)
+        results.sort(key=lambda x: x[0])
+        self.assertEqual(results[0], (1, b'img_data_1'))
+        self.assertEqual(results[1], (2, b'img_data_2'))
+        self.assertEqual(results[2], (3, b'img_data_3'))
+
+    def test_get_blob_non_blob_field_raises(self):
+        read_builder = self.table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        read = read_builder.new_read()
+
+        for row in read.to_iterator(splits):
+            with self.assertRaises(TypeError):
+                row.get_blob(0)
+            break
+
+    def test_to_iterator_yields_all_rows(self):
+        read_builder = self.table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        read = read_builder.new_read()
+
+        count = 0
+        for row in read.to_iterator(splits):
+            self.assertIsNotNone(row.get_field(0))
+            self.assertIsNotNone(row.get_field(1))
+            count += 1
+        self.assertEqual(count, 3)
+
+
+class GetBlobMultiColumnTest(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        cls.temp_dir = tempfile.mkdtemp()
+        cls.catalog = CatalogFactory.create({'warehouse': 
os.path.join(cls.temp_dir, 'warehouse')})
+        cls.catalog.create_database('test_db', False)
+
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('cover', pa.large_binary()),
+            ('thumb', pa.large_binary()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+        })
+        cls.catalog.create_table('test_db.get_blob_multi', schema, False)
+        cls.table = cls.catalog.get_table('test_db.get_blob_multi')
+
+        write_builder = cls.table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(pa.Table.from_pydict({
+            'id': [1, 2],
+            'cover': [b'cover_1', b'cover_2'],
+            'thumb': [b'thumb_1', b'thumb_2'],
+        }, schema=pa_schema))
+        write_builder.new_commit().commit(writer.prepare_commit())
+        writer.close()
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.temp_dir, ignore_errors=True)
+
+    def test_get_blob_each_column_independent(self):
+        read_builder = self.table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        read = read_builder.new_read()
+
+        results = []
+        for row in read.to_iterator(splits):
+            results.append((row.get_field(0),
+                            row.get_blob(1).to_data(),
+                            row.get_blob(2).to_data()))
+        results.sort(key=lambda x: x[0])
+        self.assertEqual(results, [(1, b'cover_1', b'thumb_1'),
+                                   (2, b'cover_2', b'thumb_2')])
+
+
+class GetBlobThroughDescriptorConvertReaderTest(unittest.TestCase):
+    """Pin BlobDescriptorConvertReader's file_io / blob_field_indices
+    propagation. Configuring blob-descriptor-field puts the wrapper on the
+    read chain; reading via to_iterator() + row.get_blob() is the only path
+    that consumes those propagated attributes. Regressions on either line
+    would silently pass the to_arrow()-based blob_descriptor tests."""
+
+    @classmethod
+    def setUpClass(cls):
+        cls.temp_dir = tempfile.mkdtemp()
+        cls.catalog = CatalogFactory.create({'warehouse': 
os.path.join(cls.temp_dir, 'warehouse')})
+        cls.catalog.create_database('test_db', False)
+
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('pic1', pa.large_binary()),
+            ('pic2', pa.large_binary()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'blob-descriptor-field': 'pic2',
+        })
+        cls.catalog.create_table('test_db.get_blob_via_descriptor_convert', 
schema, False)
+        cls.table = 
cls.catalog.get_table('test_db.get_blob_via_descriptor_convert')
+
+        from pypaimon.table.row.blob import BlobDescriptor
+        cls.pic1_data = b'inline_pic1_payload'
+        cls.pic2_data = b'external_pic2_payload'
+        cls.pic2_path = os.path.join(cls.temp_dir, 'pic2_external_blob')
+        with open(cls.pic2_path, 'wb') as f:
+            f.write(cls.pic2_data)
+
+        write_builder = cls.table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(pa.Table.from_pydict({
+            'id': [1],
+            'pic1': [cls.pic1_data],
+            'pic2': [BlobDescriptor(cls.pic2_path, 0, 
len(cls.pic2_data)).serialize()],
+        }, schema=pa_schema))
+        write_builder.new_commit().commit(writer.prepare_commit())
+        writer.close()
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.temp_dir, ignore_errors=True)
+
+    def test_get_blob_resolves_through_descriptor_convert_reader(self):
+        read_builder = self.table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        read = read_builder.new_read()
+
+        for row in read.to_iterator(splits):
+            self.assertEqual(row.get_blob(1).to_data(), self.pic1_data)
+            self.assertEqual(row.get_blob(2).to_data(), self.pic2_data)
+            break
+
+
+class GetBlobNonBlobColumnSecurityTest(unittest.TestCase):
+    """SSRF defence: get_blob on a non-BLOB column must NOT resolve a
+    descriptor URI even when the cell starts with the BLOBDESC magic header."""
+
+    @classmethod
+    def setUpClass(cls):
+        cls.temp_dir = tempfile.mkdtemp()
+        cls.catalog = CatalogFactory.create({'warehouse': 
os.path.join(cls.temp_dir, 'warehouse')})
+        cls.catalog.create_database('test_db', False)
+
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('payload', pa.binary()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema)
+        cls.catalog.create_table('test_db.no_blob_col', schema, False)
+        cls.table = cls.catalog.get_table('test_db.no_blob_col')
+
+        from pypaimon.table.row.blob import BlobDescriptor
+        attacker_bytes = BlobDescriptor(
+            "/etc/should-never-be-read-by-paimon", 0, 32
+        ).serialize()
+
+        write_builder = cls.table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(pa.Table.from_pydict({
+            'id': [1],
+            'payload': [attacker_bytes],
+        }, schema=pa_schema))
+        write_builder.new_commit().commit(writer.prepare_commit())
+        writer.close()
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.temp_dir, ignore_errors=True)
+
+    def test_get_blob_on_non_blob_column_with_magic_bytes_raises(self):
+        from unittest.mock import patch
+
+        read_builder = self.table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        read = read_builder.new_read()
+
+        with patch.object(
+            self.table.file_io.uri_reader_factory, 'create',
+            side_effect=AssertionError("URI resolution must not happen on 
non-BLOB column")
+        ) as mock_create:
+            for row in read.to_iterator(splits):
+                with self.assertRaises(TypeError):
+                    row.get_blob(1)
+                break
+            mock_create.assert_not_called()
+
+
 if __name__ == '__main__':
     unittest.main()
diff --git a/paimon-python/pypaimon/tests/blob_test.py 
b/paimon-python/pypaimon/tests/blob_test.py
index d9cf64210d..db0c639888 100644
--- a/paimon-python/pypaimon/tests/blob_test.py
+++ b/paimon-python/pypaimon/tests/blob_test.py
@@ -134,6 +134,38 @@ class BlobTest(unittest.TestCase):
         self.assertEqual(descriptor.offset, 0)
         self.assertEqual(descriptor.length, -1)
 
+    def test_from_bytes_with_raw_data(self):
+        raw = b"hello blob"
+        blob = Blob.from_bytes(raw)
+        self.assertIsInstance(blob, BlobData)
+        self.assertEqual(blob.to_data(), raw)
+
+    def test_from_bytes_with_none(self):
+        self.assertIsNone(Blob.from_bytes(None))
+
+    def test_from_bytes_with_descriptor(self):
+        from pypaimon.common.file_io import FileIO
+        data = b"actual blob content"
+        with tempfile.TemporaryDirectory() as tmp_dir:
+            blob_path = os.path.join(tmp_dir, "blob.bin")
+            with open(blob_path, 'wb') as f:
+                f.write(data)
+            descriptor = BlobDescriptor(blob_path, 0, len(data))
+            file_io = FileIO.get(f"file://{tmp_dir}", {})
+            blob = Blob.from_bytes(descriptor.serialize(), file_io)
+            self.assertIsInstance(blob, BlobRef)
+            self.assertEqual(blob.to_data(), data)
+
+    def test_from_bytes_descriptor_without_file_io_raises(self):
+        descriptor = BlobDescriptor("/tmp/fake", 0, 10)
+        serialized = descriptor.serialize()
+        with self.assertRaises(ValueError):
+            Blob.from_bytes(serialized)
+
+    def test_from_bytes_invalid_type_raises(self):
+        with self.assertRaises(TypeError):
+            Blob.from_bytes(12345)
+
     def test_blob_data_interface_compliance(self):
         """Test that BlobData properly implements Blob interface."""
         test_data = b"interface test data"


Reply via email to