This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit fe68d21324de6bb9ef018dbfa311c21a99a5afda Author: jerry <[email protected]> AuthorDate: Fri Oct 17 10:11:40 2025 +0800 [core]Python: fix blob write when blob_as_descriptor is true (#6404) --- paimon-python/pypaimon/common/config.py | 1 + paimon-python/pypaimon/common/core_options.py | 1 + paimon-python/pypaimon/common/file_io.py | 13 +- paimon-python/pypaimon/common/uri_reader.py | 171 ++++++++++++++++ .../pypaimon/read/reader/format_blob_reader.py | 48 ++--- paimon-python/pypaimon/read/split_read.py | 3 +- paimon-python/pypaimon/table/row/blob.py | 61 +++--- paimon-python/pypaimon/tests/blob_test.py | 153 ++++++++++---- .../pypaimon/tests/uri_reader_factory_test.py | 227 +++++++++++++++++++++ paimon-python/pypaimon/write/writer/data_writer.py | 3 +- 10 files changed, 582 insertions(+), 99 deletions(-) diff --git a/paimon-python/pypaimon/common/config.py b/paimon-python/pypaimon/common/config.py index 0478c207bb..81b05b8f84 100644 --- a/paimon-python/pypaimon/common/config.py +++ b/paimon-python/pypaimon/common/config.py @@ -47,6 +47,7 @@ class CatalogOptions: DLF_TOKEN_ECS_METADATA_URL = "dlf.token-ecs-metadata-url" PREFIX = 'prefix' HTTP_USER_AGENT_HEADER = 'header.HTTP_USER_AGENT' + BLOB_FILE_IO_DEFAULT_CACHE_SIZE = 2**31 - 1 class PVFSOptions: diff --git a/paimon-python/pypaimon/common/core_options.py b/paimon-python/pypaimon/common/core_options.py index 82b788438e..8ab26fe062 100644 --- a/paimon-python/pypaimon/common/core_options.py +++ b/paimon-python/pypaimon/common/core_options.py @@ -43,6 +43,7 @@ class CoreOptions(str, Enum): FILE_COMPRESSION_PER_LEVEL = "file.compression.per.level" FILE_FORMAT_PER_LEVEL = "file.format.per.level" FILE_BLOCK_SIZE = "file.block-size" + FILE_BLOB_AS_DESCRIPTOR = "blob-as-descriptor" # Scan options SCAN_FALLBACK_BRANCH = "scan.fallback-branch" INCREMENTAL_BETWEEN_TIMESTAMP = "incremental-between-timestamp" diff --git a/paimon-python/pypaimon/common/file_io.py b/paimon-python/pypaimon/common/file_io.py index f6371d2d80..eb32ebb755 100644 --- a/paimon-python/pypaimon/common/file_io.py +++ b/paimon-python/pypaimon/common/file_io.py @@ -15,7 +15,6 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ - import logging import os import subprocess @@ -28,8 +27,9 @@ from packaging.version import parse from pyarrow._fs import FileSystem from pypaimon.common.config import OssOptions, S3Options +from pypaimon.common.uri_reader import UriReaderFactory from pypaimon.schema.data_types import DataField, AtomicType, PyarrowFieldParser -from pypaimon.table.row.blob import BlobData +from pypaimon.table.row.blob import BlobData, BlobDescriptor, Blob from pypaimon.table.row.generic_row import GenericRow from pypaimon.table.row.row_kind import RowKind from pypaimon.write.blob_format_writer import BlobFormatWriter @@ -40,6 +40,7 @@ class FileIO: self.properties = catalog_options self.logger = logging.getLogger(__name__) scheme, netloc, _ = self.parse_location(path) + self.uri_reader_factory = UriReaderFactory(catalog_options) if scheme in {"oss"}: self.filesystem = self._initialize_oss_fs(path) elif scheme in {"s3", "s3a", "s3n"}: @@ -370,7 +371,7 @@ class FileIO: with self.new_output_stream(path) as output_stream: fastavro.writer(output_stream, avro_schema, records, **kwargs) - def write_blob(self, path: Path, data: pyarrow.Table, **kwargs): + def write_blob(self, path: Path, data: pyarrow.Table, blob_as_descriptor: bool, **kwargs): try: # Validate input constraints if data.num_columns != 1: @@ -399,7 +400,11 @@ class FileIO: col_data = records_dict[field_name][i] # Convert to appropriate type based on field type if hasattr(fields[0].type, 'type') and fields[0].type.type == "BLOB": - if isinstance(col_data, bytes): + if blob_as_descriptor: + blob_descriptor = BlobDescriptor.deserialize(col_data) + uri_reader = self.uri_reader_factory.create(blob_descriptor.uri) + blob_data = Blob.from_descriptor(uri_reader, blob_descriptor) + elif isinstance(col_data, bytes): blob_data = BlobData(col_data) else: # Convert to bytes if needed diff --git a/paimon-python/pypaimon/common/uri_reader.py b/paimon-python/pypaimon/common/uri_reader.py new file mode 100644 index 0000000000..823020caaf --- /dev/null +++ b/paimon-python/pypaimon/common/uri_reader.py @@ -0,0 +1,171 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import io +from abc import ABC, abstractmethod +from pathlib import Path +from typing import Any, Optional +from urllib.parse import urlparse, ParseResult + +import requests +from cachetools import LRUCache +from readerwriterlock import rwlock + +from pypaimon.common.config import CatalogOptions + + +class UriReader(ABC): + @classmethod + def from_http(cls) -> 'HttpUriReader': + return HttpUriReader() + + @classmethod + def from_file(cls, file_io: Any) -> 'FileUriReader': + return FileUriReader(file_io) + + @classmethod + def get_file_path(cls, uri: str): + parsed_uri = urlparse(uri) + if parsed_uri.scheme == 'file': + path = Path(parsed_uri.path) + elif parsed_uri.scheme and parsed_uri.scheme != '': + path = Path(parsed_uri.netloc + parsed_uri.path) + else: + path = Path(uri) + return path + + @abstractmethod + def new_input_stream(self, uri: str): + pass + + +class FileUriReader(UriReader): + + def __init__(self, file_io: Any): + self._file_io = file_io + + def new_input_stream(self, uri: str): + try: + path = self.get_file_path(uri) + return self._file_io.new_input_stream(path) + except Exception as e: + raise IOError(f"Failed to read file {uri}: {e}") + + +class HttpUriReader(UriReader): + + def new_input_stream(self, uri: str): + try: + response = requests.get(uri) + if response.status_code != 200: + raise RuntimeError(f"Failed to read HTTP URI {uri} status code {response.status_code}") + return io.BytesIO(response.content) + except Exception as e: + raise RuntimeError(f"Failed to read HTTP URI {uri}: {e}") + + +class UriKey: + + def __init__(self, scheme: Optional[str], authority: Optional[str]) -> None: + self._scheme = scheme + self._authority = authority + self._hash = hash((self._scheme, self._authority)) + + @property + def scheme(self) -> Optional[str]: + return self._scheme + + @property + def authority(self) -> Optional[str]: + return self._authority + + def __eq__(self, other: object) -> bool: + if not isinstance(other, UriKey): + return False + + return (self._scheme == other._scheme and + self._authority == other._authority) + + def __hash__(self) -> int: + return self._hash + + def __repr__(self) -> str: + return f"UriKey(scheme='{self._scheme}', authority='{self._authority}')" + + +class UriReaderFactory: + + def __init__(self, catalog_options: dict) -> None: + self.catalog_options = catalog_options + self._readers = LRUCache(CatalogOptions.BLOB_FILE_IO_DEFAULT_CACHE_SIZE) + self._readers_lock = rwlock.RWLockFair() + + def create(self, input_uri: str) -> UriReader: + try: + parsed_uri = urlparse(input_uri) + except Exception as e: + raise ValueError(f"Invalid URI: {input_uri}") from e + + key = UriKey(parsed_uri.scheme, parsed_uri.netloc or None) + rlock = self._readers_lock.gen_rlock() + rlock.acquire() + try: + reader = self._readers.get(key) + if reader is not None: + return reader + finally: + rlock.release() + wlock = self._readers_lock.gen_wlock() + wlock.acquire() + try: + reader = self._readers.get(key) + if reader is not None: + return reader + reader = self._new_reader(key, parsed_uri) + self._readers[key] = reader + return reader + finally: + wlock.release() + + def _new_reader(self, key: UriKey, parsed_uri: ParseResult) -> UriReader: + scheme = key.scheme + if scheme in ('http', 'https'): + return UriReader.from_http() + try: + # Import FileIO here to avoid circular imports + from pypaimon.common.file_io import FileIO + uri_string = parsed_uri.geturl() + file_io = FileIO(uri_string, self.catalog_options) + return UriReader.from_file(file_io) + except Exception as e: + raise RuntimeError(f"Failed to create reader for URI {parsed_uri.geturl()}") from e + + def clear_cache(self) -> None: + self._readers.clear() + + def get_cache_size(self) -> int: + return len(self._readers) + + def __getstate__(self): + state = self.__dict__.copy() + del state['_readers_lock'] + return state + + def __setstate__(self, state): + self.__dict__.update(state) + self._readers_lock = rwlock.RWLockFair() diff --git a/paimon-python/pypaimon/read/reader/format_blob_reader.py b/paimon-python/pypaimon/read/reader/format_blob_reader.py index 709a0c27a3..e6846dc568 100644 --- a/paimon-python/pypaimon/read/reader/format_blob_reader.py +++ b/paimon-python/pypaimon/read/reader/format_blob_reader.py @@ -26,21 +26,23 @@ from pyarrow import RecordBatch from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor from pypaimon.common.file_io import FileIO from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader -from pypaimon.schema.data_types import DataField, PyarrowFieldParser -from pypaimon.table.row.blob import Blob, BlobDescriptor, BlobRef +from pypaimon.schema.data_types import DataField, PyarrowFieldParser, AtomicType +from pypaimon.table.row.blob import Blob from pypaimon.table.row.generic_row import GenericRow +from pypaimon.table.row.row_kind import RowKind class FormatBlobReader(RecordBatchReader): def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str], - full_fields: List[DataField], push_down_predicate: Any): + full_fields: List[DataField], push_down_predicate: Any, blob_as_descriptor: bool): self._file_io = file_io self._file_path = file_path self._push_down_predicate = push_down_predicate + self._blob_as_descriptor = blob_as_descriptor # Get file size - self._file_size = file_io.get_file_size(file_path) + self._file_size = file_io.get_file_size(Path(file_path)) # Initialize the low-level blob format reader self.file_path = file_path @@ -66,7 +68,10 @@ class FormatBlobReader(RecordBatchReader): if self.returned: return None self.returned = True - batch_iterator = BlobRecordIterator(self.file_path, self.blob_lengths, self.blob_offsets, self._fields[0]) + batch_iterator = BlobRecordIterator( + self._file_io, self.file_path, self.blob_lengths, + self.blob_offsets, self._fields[0] + ) self._blob_iterator = iter(batch_iterator) # Collect records for this batch @@ -75,22 +80,16 @@ class FormatBlobReader(RecordBatchReader): try: while True: - # Get next blob record blob_row = next(self._blob_iterator) - # Check if first read returns None, stop immediately if blob_row is None: break - - # Extract blob data from the row - blob = blob_row.values[0] # Blob files have single blob field - - # Convert blob to appropriate format for each requested field + blob = blob_row.values[0] for field_name in self._fields: - # For blob files, all fields should contain blob data - if isinstance(blob, Blob): - blob_data = blob.to_data() + blob_descriptor = blob.to_descriptor() + if self._blob_as_descriptor: + blob_data = blob_descriptor.serialize() else: - blob_data = bytes(blob) if blob is not None else None + blob_data = blob.to_data() pydict_data[field_name].append(blob_data) records_in_batch += 1 @@ -162,7 +161,9 @@ class BlobRecordIterator: MAGIC_NUMBER_SIZE = 4 METADATA_OVERHEAD = 16 - def __init__(self, file_path: str, blob_lengths: List[int], blob_offsets: List[int], field_name: str): + def __init__(self, file_io: FileIO, file_path: str, blob_lengths: List[int], + blob_offsets: List[int], field_name: str): + self.file_io = file_io self.file_path = file_path self.field_name = field_name self.blob_lengths = blob_lengths @@ -175,25 +176,14 @@ class BlobRecordIterator: def __next__(self) -> GenericRow: if self.current_position >= len(self.blob_lengths): raise StopIteration - # Create blob reference for the current blob # Skip magic number (4 bytes) and exclude length (8 bytes) + CRC (4 bytes) = 12 bytes blob_offset = self.blob_offsets[self.current_position] + self.MAGIC_NUMBER_SIZE # Skip magic number blob_length = self.blob_lengths[self.current_position] - self.METADATA_OVERHEAD - - # Create BlobDescriptor for this blob - descriptor = BlobDescriptor(self.file_path, blob_offset, blob_length) - blob = BlobRef(descriptor) - + blob = Blob.from_file(self.file_io, self.file_path, blob_offset, blob_length) self.current_position += 1 - - # Return as GenericRow with single blob field - from pypaimon.schema.data_types import DataField, AtomicType - from pypaimon.table.row.row_kind import RowKind - fields = [DataField(0, self.field_name, AtomicType("BLOB"))] return GenericRow([blob], fields, RowKind.INSERT) def returned_position(self) -> int: - """Get current position in the iterator.""" return self.current_position diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index 81ebdd86f8..372679ed63 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -82,8 +82,9 @@ class SplitRead(ABC): format_reader = FormatAvroReader(self.table.file_io, file_path, self._get_final_read_data_fields(), self.read_fields, self.push_down_predicate) elif file_format == CoreOptions.FILE_FORMAT_BLOB: + blob_as_descriptor = self.table.options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR, False) format_reader = FormatBlobReader(self.table.file_io, file_path, self._get_final_read_data_fields(), - self.read_fields, self.push_down_predicate) + self.read_fields, self.push_down_predicate, blob_as_descriptor) elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format == CoreOptions.FILE_FORMAT_ORC: format_reader = FormatPyArrowReader(self.table.file_io, file_format, file_path, self._get_final_read_data_fields(), self.push_down_predicate) diff --git a/paimon-python/pypaimon/table/row/blob.py b/paimon-python/pypaimon/table/row/blob.py index b9d8cc0e38..b92ddb3e82 100644 --- a/paimon-python/pypaimon/table/row/blob.py +++ b/paimon-python/pypaimon/table/row/blob.py @@ -19,6 +19,9 @@ import io from abc import ABC, abstractmethod from typing import Optional, Union +from urllib.parse import urlparse + +from pypaimon.common.uri_reader import UriReader, FileUriReader class BlobDescriptor: @@ -144,21 +147,33 @@ class Blob(ABC): @staticmethod def from_local(file: str) -> 'Blob': - return Blob.from_file(file) + # Import FileIO locally to avoid circular imports + from pypaimon.common.file_io import FileIO + + parsed = urlparse(file) + if parsed.scheme == "file": + file_uri = file + else: + file_uri = f"file://{file}" + file_io = FileIO(file_uri, {}) + uri_reader = FileUriReader(file_io) + descriptor = BlobDescriptor(file, 0, -1) + return Blob.from_descriptor(uri_reader, descriptor) @staticmethod def from_http(uri: str) -> 'Blob': descriptor = BlobDescriptor(uri, 0, -1) - return BlobRef(descriptor) + return BlobRef(UriReader.from_http(), descriptor) @staticmethod - def from_file(file: str, offset: int = 0, length: int = -1) -> 'Blob': - descriptor = BlobDescriptor(file, offset, length) - return BlobRef(descriptor) + def from_file(file_io, file_path: str, offset: int, length: int) -> 'Blob': + uri_reader = FileUriReader(file_io) + descriptor = BlobDescriptor(file_path, offset, length) + return Blob.from_descriptor(uri_reader, descriptor) @staticmethod - def from_descriptor(descriptor: BlobDescriptor) -> 'Blob': - return BlobRef(descriptor) + def from_descriptor(uri_reader: UriReader, descriptor: BlobDescriptor) -> 'Blob': + return BlobRef(uri_reader, descriptor) class BlobData(Blob): @@ -199,7 +214,8 @@ class BlobData(Blob): class BlobRef(Blob): - def __init__(self, descriptor: BlobDescriptor): + def __init__(self, uri_reader: UriReader, descriptor: BlobDescriptor): + self._uri_reader = uri_reader self._descriptor = descriptor def to_data(self) -> bytes: @@ -216,27 +232,14 @@ class BlobRef(Blob): uri = self._descriptor.uri offset = self._descriptor.offset length = self._descriptor.length - - if uri.startswith('http://') or uri.startswith('https://'): - raise NotImplementedError("HTTP blob reading not implemented yet") - elif uri.startswith('file://') or '://' not in uri: - file_path = uri.replace('file://', '') if uri.startswith('file://') else uri - - try: - with open(file_path, 'rb') as f: - if offset > 0: - f.seek(offset) - - if length == -1: - data = f.read() - else: - data = f.read(length) - - return io.BytesIO(data) - except Exception as e: - raise IOError(f"Failed to read file {file_path}: {e}") - else: - raise ValueError(f"Unsupported URI scheme: {uri}") + with self._uri_reader.new_input_stream(uri) as input_stream: + if offset > 0: + input_stream.seek(offset) + if length == -1: + data = input_stream.read() + else: + data = input_stream.read(length) + return io.BytesIO(data) def __eq__(self, other) -> bool: if not isinstance(other, BlobRef): diff --git a/paimon-python/pypaimon/tests/blob_test.py b/paimon-python/pypaimon/tests/blob_test.py index b8ed65e5bb..b66c1dec9a 100644 --- a/paimon-python/pypaimon/tests/blob_test.py +++ b/paimon-python/pypaimon/tests/blob_test.py @@ -92,7 +92,8 @@ class BlobTest(unittest.TestCase): def test_from_file_with_offset_and_length(self): """Test Blob.from_file() method with offset and length.""" - blob = Blob.from_file(self.file, 0, 4) + file_io = FileIO(self.file if self.file.startswith('file://') else f"file://{self.file}", {}) + blob = Blob.from_file(file_io, self.file, 0, 4) # Verify it returns a BlobRef instance self.assertIsInstance(blob, BlobRef) @@ -100,16 +101,6 @@ class BlobTest(unittest.TestCase): # Verify the data matches (first 4 bytes: "test") self.assertEqual(blob.to_data(), b"test") - def test_from_file_full(self): - """Test Blob.from_file() method without offset and length.""" - blob = Blob.from_file(self.file) - - # Verify it returns a BlobRef instance - self.assertIsInstance(blob, BlobRef) - - # Verify the data matches - self.assertEqual(blob.to_data(), b"test data") - def test_from_http(self): """Test Blob.from_http() method.""" uri = "http://example.com/file.txt" @@ -197,7 +188,8 @@ class BlobTest(unittest.TestCase): self.assertIsInstance(blob_ref, Blob) # from_file should return BlobRef - blob_file = Blob.from_file(self.file) + file_io = FileIO(self.file if self.file.startswith('file://') else f"file://{self.file}", {}) + blob_file = Blob.from_file(file_io, self.file, 0, os.path.getsize(self.file)) self.assertIsInstance(blob_file, BlobRef) self.assertIsInstance(blob_file, Blob) @@ -505,7 +497,7 @@ class BlobTest(unittest.TestCase): descriptor = BlobDescriptor(self.file, 0, -1) # Create BlobRef from descriptor - blob_ref = BlobRef(descriptor) + blob_ref = Blob.from_local(self.file) # Verify descriptor is preserved returned_descriptor = blob_ref.to_descriptor() @@ -554,11 +546,6 @@ class BlobEndToEndTest(unittest.TestCase): pass def test_blob_end_to_end(self): - from pypaimon.schema.data_types import DataField, AtomicType - from pypaimon.table.row.blob import BlobData - from pypaimon.common.file_io import FileIO - from pathlib import Path - # Set up file I/O file_io = FileIO(self.temp_dir, {}) @@ -576,7 +563,7 @@ class BlobEndToEndTest(unittest.TestCase): schema = pa.schema([pa.field(blob_field_name, pa.large_binary())]) table = pa.table([blob_data], schema=schema) blob_files[blob_field_name] = Path(self.temp_dir) / (blob_field_name + ".blob") - file_io.write_blob(blob_files[blob_field_name], table) + file_io.write_blob(blob_files[blob_field_name], table, False) self.assertTrue(file_io.exists(blob_files[blob_field_name])) # ========== Step 3: Read Data and Check Data ========== @@ -587,7 +574,8 @@ class BlobEndToEndTest(unittest.TestCase): file_path=str(file_path), read_fields=[field_name], full_fields=read_fields, - push_down_predicate=None + push_down_predicate=None, + blob_as_descriptor=False ) # Read data @@ -693,7 +681,7 @@ class BlobEndToEndTest(unittest.TestCase): # Should throw RuntimeError for multiple columns with self.assertRaises(RuntimeError) as context: - file_io.write_blob(multi_column_file, multi_column_table) + file_io.write_blob(multi_column_file, multi_column_table, False) self.assertIn("single column", str(context.exception)) # Test that FileIO.write_blob rejects null values @@ -704,7 +692,7 @@ class BlobEndToEndTest(unittest.TestCase): # Should throw RuntimeError for null values with self.assertRaises(RuntimeError) as context: - file_io.write_blob(null_file, null_table) + file_io.write_blob(null_file, null_table, False) self.assertIn("null values", str(context.exception)) # ========== Test FormatBlobReader with complex type schema ========== @@ -714,7 +702,7 @@ class BlobEndToEndTest(unittest.TestCase): valid_table = pa.table([valid_blob_data], schema=valid_schema) valid_blob_file = Path(self.temp_dir) / "valid_blob.blob" - file_io.write_blob(valid_blob_file, valid_table) + file_io.write_blob(valid_blob_file, valid_table, False) # Try to read with complex type field definition - this should fail # because FormatBlobReader tries to create PyArrow schema with complex types @@ -728,7 +716,8 @@ class BlobEndToEndTest(unittest.TestCase): file_path=str(valid_blob_file), read_fields=["valid_blob"], full_fields=complex_read_fields, - push_down_predicate=None + push_down_predicate=None, + blob_as_descriptor=False ) # Reading should fail because the schema expects complex type but data is atomic @@ -761,7 +750,7 @@ class BlobEndToEndTest(unittest.TestCase): valid_table = pa.table([valid_blob_data], schema=valid_schema) header_test_file = Path(self.temp_dir) / "header_test.blob" - file_io.write_blob(header_test_file, valid_table) + file_io.write_blob(header_test_file, valid_table, False) # Read the file and corrupt the header (last 5 bytes: index_length + version) with open(header_test_file, 'rb') as f: @@ -785,7 +774,8 @@ class BlobEndToEndTest(unittest.TestCase): file_path=str(corrupted_header_file), read_fields=["test_blob"], full_fields=fields, - push_down_predicate=None + push_down_predicate=None, + blob_as_descriptor=False ) self.assertIn("Unsupported blob file version", str(context.exception)) @@ -798,7 +788,7 @@ class BlobEndToEndTest(unittest.TestCase): large_table = pa.table([large_blob_data], schema=large_schema) full_blob_file = Path(self.temp_dir) / "full_blob.blob" - file_io.write_blob(full_blob_file, large_table) + file_io.write_blob(full_blob_file, large_table, False) # Read the full file and truncate it in the middle with open(full_blob_file, 'rb') as f: @@ -819,7 +809,8 @@ class BlobEndToEndTest(unittest.TestCase): file_path=str(truncated_file), read_fields=["large_blob"], full_fields=fields, - push_down_predicate=None + push_down_predicate=None, + blob_as_descriptor=False ) # Should detect truncation/incomplete data (either invalid header or invalid version) self.assertTrue( @@ -835,7 +826,7 @@ class BlobEndToEndTest(unittest.TestCase): zero_table = pa.table([zero_blob_data], schema=zero_schema) zero_blob_file = Path(self.temp_dir) / "zero_length.blob" - file_io.write_blob(zero_blob_file, zero_table) + file_io.write_blob(zero_blob_file, zero_table, False) # Verify file was created self.assertTrue(file_io.exists(zero_blob_file)) @@ -849,7 +840,8 @@ class BlobEndToEndTest(unittest.TestCase): file_path=str(zero_blob_file), read_fields=["zero_blob"], full_fields=zero_fields, - push_down_predicate=None + push_down_predicate=None, + blob_as_descriptor=False ) zero_batch = zero_reader.read_arrow_batch() @@ -876,7 +868,7 @@ class BlobEndToEndTest(unittest.TestCase): large_sim_table = pa.table([simulated_large_data], schema=large_sim_schema) large_sim_file = Path(self.temp_dir) / "large_simulation.blob" - file_io.write_blob(large_sim_file, large_sim_table) + file_io.write_blob(large_sim_file, large_sim_table, False) # Verify large file was written large_sim_size = file_io.get_file_size(large_sim_file) @@ -889,7 +881,8 @@ class BlobEndToEndTest(unittest.TestCase): file_path=str(large_sim_file), read_fields=["large_sim_blob"], full_fields=large_sim_fields, - push_down_predicate=None + push_down_predicate=None, + blob_as_descriptor=False ) large_sim_batch = large_sim_reader.read_arrow_batch() @@ -947,7 +940,7 @@ class BlobEndToEndTest(unittest.TestCase): # Should reject multi-field table with self.assertRaises(RuntimeError) as context: - file_io.write_blob(multi_field_file, multi_field_table) + file_io.write_blob(multi_field_file, multi_field_table, False) self.assertIn("single column", str(context.exception)) # Test that blob format rejects non-binary field types @@ -958,7 +951,7 @@ class BlobEndToEndTest(unittest.TestCase): # Should reject non-binary field with self.assertRaises(RuntimeError) as context: - file_io.write_blob(non_binary_file, non_binary_table) + file_io.write_blob(non_binary_file, non_binary_table, False) # Should fail due to type conversion issues (non-binary field can't be converted to BLOB) self.assertTrue( "large_binary" in str(context.exception) or @@ -975,9 +968,99 @@ class BlobEndToEndTest(unittest.TestCase): # Should reject null values with self.assertRaises(RuntimeError) as context: - file_io.write_blob(null_file, null_table) + file_io.write_blob(null_file, null_table, False) self.assertIn("null values", str(context.exception)) + def test_blob_end_to_end_with_descriptor(self): + # Set up file I/O + file_io = FileIO(self.temp_dir, {}) + + # ========== Step 1: Write data to local file ========== + # Create test data and write it to a local file + test_content = b'This is test blob content stored in an external file for descriptor testing.' + # Write the test content to a local file + local_data_file = Path(self.temp_dir) / "external_blob" + with open(local_data_file, 'wb') as f: + f.write(test_content) + # Verify the file was created and has the correct content + self.assertTrue(local_data_file.exists()) + with open(local_data_file, 'rb') as f: + written_content = f.read() + self.assertEqual(written_content, test_content) + + # ========== Step 2: Use this file as blob descriptor ========== + # Create a BlobDescriptor pointing to the local file + blob_descriptor = BlobDescriptor( + uri=str(local_data_file), + offset=0, + length=len(test_content) + ) + # Serialize the descriptor to bytes (this is what would be stored in the blob column) + descriptor_bytes = blob_descriptor.serialize() + self.assertIsInstance(descriptor_bytes, bytes) + self.assertGreater(len(descriptor_bytes), 0) + + # Create PyArrow table with the serialized descriptor + blob_field_name = "blob_descriptor_field" + schema = pa.schema([pa.field(blob_field_name, pa.large_binary())]) + table = pa.table([[descriptor_bytes]], schema=schema) + + # Write the blob file with blob_as_descriptor=True + blob_file_path = Path(self.temp_dir) / "descriptor_blob.blob" + file_io.write_blob(blob_file_path, table, blob_as_descriptor=True) + # Verify the blob file was created + self.assertTrue(file_io.exists(blob_file_path)) + file_size = file_io.get_file_size(blob_file_path) + self.assertGreater(file_size, 0) + + # ========== Step 3: Read data and check ========== + # Define schema for reading + read_fields = [DataField(0, blob_field_name, AtomicType("BLOB"))] + reader = FormatBlobReader( + file_io=file_io, + file_path=str(blob_file_path), + read_fields=[blob_field_name], + full_fields=read_fields, + push_down_predicate=None, + blob_as_descriptor=True + ) + + # Read the data with blob_as_descriptor=True (should return a descriptor) + batch = reader.read_arrow_batch() + self.assertIsNotNone(batch) + self.assertEqual(batch.num_rows, 1) + self.assertEqual(batch.num_columns, 1) + + read_blob_bytes = batch.column(0)[0].as_py() + self.assertIsInstance(read_blob_bytes, bytes) + + # Deserialize the returned descriptor + returned_descriptor = BlobDescriptor.deserialize(read_blob_bytes) + + # The returned descriptor should point to the blob file (simplified implementation) + # because the current implementation creates a descriptor pointing to the blob file location + self.assertEqual(returned_descriptor.uri, str(blob_file_path)) + self.assertGreater(returned_descriptor.offset, 0) # Should have some offset in the blob file + + reader.close() + + reader_content = FormatBlobReader( + file_io=file_io, + file_path=str(blob_file_path), + read_fields=[blob_field_name], + full_fields=read_fields, + push_down_predicate=None, + blob_as_descriptor=False + ) + batch_content = reader_content.read_arrow_batch() + self.assertIsNotNone(batch_content) + self.assertEqual(batch_content.num_rows, 1) + read_content_bytes = batch_content.column(0)[0].as_py() + self.assertIsInstance(read_content_bytes, bytes) + # When blob_as_descriptor=False, we should get the actual file content + self.assertEqual(read_content_bytes, test_content) + reader_content.close() + if __name__ == '__main__': unittest.main() diff --git a/paimon-python/pypaimon/tests/uri_reader_factory_test.py b/paimon-python/pypaimon/tests/uri_reader_factory_test.py new file mode 100644 index 0000000000..12088d746c --- /dev/null +++ b/paimon-python/pypaimon/tests/uri_reader_factory_test.py @@ -0,0 +1,227 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import os +import tempfile +import unittest +from pathlib import Path +from pypaimon.common.file_io import FileIO +from pypaimon.common.uri_reader import UriReaderFactory, HttpUriReader, FileUriReader, UriReader + + +class MockFileIO: + """Mock FileIO for testing.""" + + def __init__(self, file_io: FileIO): + self._file_io = file_io + + def get_file_size(self, path: str) -> int: + """Get file size.""" + return self._file_io.get_file_size(Path(path)) + + def new_input_stream(self, path: Path): + """Create new input stream for reading.""" + return self._file_io.new_input_stream(path) + + +class UriReaderFactoryTest(unittest.TestCase): + + def setUp(self): + self.factory = UriReaderFactory({}) + self.temp_dir = tempfile.mkdtemp() + self.temp_file = os.path.join(self.temp_dir, "test.txt") + with open(self.temp_file, 'w') as f: + f.write("test content") + + def tearDown(self): + """Clean up temporary files.""" + try: + if os.path.exists(self.temp_file): + os.remove(self.temp_file) + os.rmdir(self.temp_dir) + except OSError: + pass # Ignore cleanup errors + + def test_create_http_uri_reader(self): + """Test creating HTTP URI reader.""" + reader = self.factory.create("http://example.com/file.txt") + self.assertIsInstance(reader, HttpUriReader) + + def test_create_https_uri_reader(self): + """Test creating HTTPS URI reader.""" + reader = self.factory.create("https://example.com/file.txt") + self.assertIsInstance(reader, HttpUriReader) + + def test_create_file_uri_reader(self): + """Test creating file URI reader.""" + reader = self.factory.create(f"file://{self.temp_file}") + self.assertIsInstance(reader, FileUriReader) + + def test_create_uri_reader_with_authority(self): + """Test creating URI readers with different authorities.""" + reader1 = self.factory.create("http://my_bucket1/path/to/file.txt") + reader2 = self.factory.create("http://my_bucket2/path/to/file.txt") + + # Different authorities should create different readers + self.assertNotEqual(reader1, reader2) + self.assertIsNot(reader1, reader2) + + def test_cached_readers_with_same_scheme_and_authority(self): + """Test that readers with same scheme and authority are cached.""" + reader1 = self.factory.create("http://my_bucket/path/to/file1.txt") + reader2 = self.factory.create("http://my_bucket/path/to/file2.txt") + + # Same scheme and authority should return the same cached reader + self.assertIs(reader1, reader2) + + def test_cached_readers_with_null_authority(self): + """Test that readers with null authority are cached.""" + reader1 = self.factory.create(f"file://{self.temp_file}") + reader2 = self.factory.create(f"file://{self.temp_dir}/another_file.txt") + + # Same scheme with null authority should return the same cached reader + self.assertIs(reader1, reader2) + + def test_create_uri_reader_with_local_path(self): + """Test creating URI reader with local path (no scheme).""" + reader = self.factory.create(self.temp_file) + self.assertIsInstance(reader, FileUriReader) + + def test_cache_size_tracking(self): + """Test that cache size is tracked correctly.""" + initial_size = self.factory.get_cache_size() + + # Create readers with different schemes/authorities + self.factory.create("http://example.com/file.txt") + self.assertEqual(self.factory.get_cache_size(), initial_size + 1) + + self.factory.create("https://example.com/file.txt") + self.assertEqual(self.factory.get_cache_size(), initial_size + 2) + + self.factory.create(f"file://{self.temp_file}") + self.assertEqual(self.factory.get_cache_size(), initial_size + 3) + + # Same scheme/authority should not increase cache size + self.factory.create("http://example.com/another_file.txt") + self.assertEqual(self.factory.get_cache_size(), initial_size + 3) + + def test_uri_reader_functionality(self): + """Test that created URI readers actually work.""" + # Test file URI reader + reader = self.factory.create(f"file://{self.temp_file}") + stream = reader.new_input_stream(self.temp_file) + content = stream.read().decode('utf-8') + self.assertEqual(content, "test content") + stream.close() + + def test_invalid_uri_handling(self): + """Test handling of invalid URIs.""" + # This should not raise an exception as urlparse is quite permissive + # But we can test edge cases + reader = self.factory.create("") + self.assertIsInstance(reader, (HttpUriReader, FileUriReader)) + + def test_uri_key_equality(self): + """Test UriKey equality and hashing behavior.""" + from pypaimon.common.uri_reader import UriKey + + key1 = UriKey("http", "example.com") + key2 = UriKey("http", "example.com") + key3 = UriKey("https", "example.com") + key4 = UriKey("http", "other.com") + + # Same scheme and authority should be equal + self.assertEqual(key1, key2) + self.assertEqual(hash(key1), hash(key2)) + + # Different scheme or authority should not be equal + self.assertNotEqual(key1, key3) + self.assertNotEqual(key1, key4) + + # Test with None values + key_none1 = UriKey(None, None) + key_none2 = UriKey(None, None) + self.assertEqual(key_none1, key_none2) + + def test_uri_key_string_representation(self): + """Test UriKey string representation.""" + from pypaimon.common.uri_reader import UriKey + + key = UriKey("http", "example.com") + repr_str = repr(key) + self.assertIn("http", repr_str) + self.assertIn("example.com", repr_str) + + def test_thread_safety_simulation(self): + """Test thread safety by creating multiple readers concurrently.""" + import threading + import time + + results = [] + + def create_reader(): + reader = self.factory.create("http://example.com/file.txt") + results.append(reader) + time.sleep(0.01) # Small delay to increase chance of race conditions + + # Create multiple threads + threads = [] + for _ in range(10): + thread = threading.Thread(target=create_reader) + threads.append(thread) + thread.start() + + # Wait for all threads to complete + for thread in threads: + thread.join() + + # All results should be the same cached reader + first_reader = results[0] + for reader in results[1:]: + self.assertIs(reader, first_reader) + + def test_different_file_schemes(self): + """Test different file-based schemes.""" + # Test absolute path without scheme + reader1 = self.factory.create(os.path.abspath(self.temp_file)) + self.assertIsInstance(reader1, FileUriReader) + + # Test file:// scheme + reader2 = self.factory.create(f"file://{self.temp_file}") + self.assertIsInstance(reader2, FileUriReader) + + # Different schemes (empty vs "file") should create different cache entries + self.assertIsNot(reader1, reader2) + + # But same scheme should be cached + reader3 = self.factory.create(f"file://{self.temp_dir}/another_file.txt") + self.assertIs(reader2, reader3) # Same file:// scheme + + def test_get_file_path_with_file_uri(self): + file_uri = f"file://{self.temp_file}" + path = UriReader.get_file_path(file_uri) + self.assertEqual(str(path), self.temp_file) + oss_file_path = "bucket/tmp/another_file.txt" + file_uri = f"oss://{oss_file_path}" + path = UriReader.get_file_path(file_uri) + self.assertEqual(str(path), oss_file_path) + path = UriReader.get_file_path(self.temp_file) + self.assertEqual(str(path), self.temp_file) + + +if __name__ == '__main__': + unittest.main() diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index ad6e327c89..0a063fb2f4 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -58,6 +58,7 @@ class DataWriter(ABC): self.pending_data: Optional[pa.Table] = None self.committed_files: List[DataFileMeta] = [] self.write_cols = write_cols + self.blob_as_descriptor = options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR, False) def write(self, data: pa.RecordBatch): processed_data = self._process_data(data) @@ -115,7 +116,7 @@ class DataWriter(ABC): elif self.file_format == CoreOptions.FILE_FORMAT_AVRO: self.file_io.write_avro(file_path, data) elif self.file_format == CoreOptions.FILE_FORMAT_BLOB: - self.file_io.write_blob(file_path, data) + self.file_io.write_blob(file_path, data, self.blob_as_descriptor) else: raise ValueError(f"Unsupported file format: {self.file_format}")
