Copilot commented on code in PR #6404: URL: https://github.com/apache/paimon/pull/6404#discussion_r2434789294
########## paimon-python/pypaimon/common/uri_reader.py: ########## @@ -0,0 +1,164 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import io +from abc import ABC, abstractmethod +from pathlib import Path +from typing import Any, Optional +from urllib.parse import urlparse, ParseResult +from urllib.request import urlopen + +from cachetools import LRUCache +from readerwriterlock import rwlock + +from pypaimon.common.config import CatalogOptions + + +class UriReader(ABC): + @classmethod + def from_http(cls) -> 'HttpUriReader': + return HttpUriReader() + + @classmethod + def from_file(cls, file_io: Any) -> 'FileUriReader': + return FileUriReader(file_io) + + @abstractmethod + def new_input_stream(self, uri: str): + pass + + +class FileUriReader(UriReader): + + def __init__(self, file_io: Any): + self._file_io = file_io + + def new_input_stream(self, uri: str): + try: + parsed_uri = urlparse(uri) + if parsed_uri.scheme == 'file': + file_path = parsed_uri.path + else: + file_path = uri + path_obj = Path(file_path) + return self._file_io.new_input_stream(path_obj) + except Exception as e: + raise IOError(f"Failed to read file {uri}: {e}") + + +class HttpUriReader(UriReader): + + def new_input_stream(self, uri: str): + try: + with urlopen(uri) as response: + data = response.read() + return io.BytesIO(data) Review Comment: urlopen is used without a timeout and always buffers the entire response in memory. Add a timeout (e.g., urlopen(uri, timeout=30)) and, if possible, return a streaming object to avoid loading large blobs entirely into memory. ```suggestion response = urlopen(uri, timeout=30) return response # file-like object, caller should close ``` ########## paimon-python/pypaimon/read/reader/format_blob_reader.py: ########## @@ -26,21 +26,23 @@ from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor from pypaimon.common.file_io import FileIO from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader -from pypaimon.schema.data_types import DataField, PyarrowFieldParser -from pypaimon.table.row.blob import Blob, BlobDescriptor, BlobRef +from pypaimon.schema.data_types import DataField, PyarrowFieldParser, AtomicType +from pypaimon.table.row.blob import Blob from pypaimon.table.row.generic_row import GenericRow +from pypaimon.table.row.row_kind import RowKind class FormatBlobReader(RecordBatchReader): def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str], - full_fields: List[DataField], push_down_predicate: Any): + full_fields: List[DataField], push_down_predicate: Any, blob_as_descriptor: bool): self._file_io = file_io self._file_path = file_path self._push_down_predicate = push_down_predicate + self._blob_as_descriptor = blob_as_descriptor # Get file size - self._file_size = file_io.get_file_size(file_path) + self._file_size = file_io.get_file_size(Path(file_path)) Review Comment: Constructing Path from a URI string (e.g., s3://bucket/key) can mangle the path. Resolve the correct filesystem path before calling get_file_size (for 'file' use parsed.path, for object stores use f\"{parsed.netloc}{parsed.path}\") or delegate URI-to-path conversion to a helper to avoid breaking non-local files. ########## paimon-python/pypaimon/table/row/blob.py: ########## @@ -144,21 +147,33 @@ def from_data(data: bytes) -> 'Blob': @staticmethod def from_local(file: str) -> 'Blob': - return Blob.from_file(file) + # Import FileIO locally to avoid circular imports + from pypaimon.common.file_io import FileIO + + parsed = urlparse(file) + if parsed.scheme == "file": + file_uri = file + else: + file_uri = f"file://{file}" + file_io = FileIO(file_uri, {}) + uri_reader = FileUriReader(file_io) + descriptor = BlobDescriptor(file, 0, -1) + return Blob.from_descriptor(uri_reader, descriptor) @staticmethod def from_http(uri: str) -> 'Blob': descriptor = BlobDescriptor(uri, 0, -1) - return BlobRef(descriptor) + return BlobRef(UriReader.from_http(), descriptor) @staticmethod - def from_file(file: str, offset: int = 0, length: int = -1) -> 'Blob': - descriptor = BlobDescriptor(file, offset, length) - return BlobRef(descriptor) + def from_file(file_io, file_path: str, offset: int, length: int) -> 'Blob': + uri_reader = FileUriReader(file_io) + descriptor = BlobDescriptor(file_path, offset, length) + return Blob.from_descriptor(uri_reader, descriptor) Review Comment: [nitpick] Changing the signature of Blob.from_file introduces a breaking API change for existing callers that passed only (file_path, offset=0, length=-1). Consider providing a backward-compatible overload/dispatcher (e.g., accept both (file_io, file_path, offset, length) and the legacy (file_path, offset=0, length=-1) to construct a FileIO for local files) or add a new method name (e.g., from_file_with_io) while keeping the original. ```suggestion def from_file(*args, **kwargs) -> 'Blob': """ Create a Blob from a file. Supports two signatures: - from_file(file_io, file_path, offset, length) - from_file(file_path, offset=0, length=-1) """ # Import FileIO locally to avoid circular imports from pypaimon.common.file_io import FileIO if len(args) == 4: # New signature: (file_io, file_path, offset, length) file_io, file_path, offset, length = args uri_reader = FileUriReader(file_io) descriptor = BlobDescriptor(file_path, offset, length) return Blob.from_descriptor(uri_reader, descriptor) elif len(args) >= 1 and isinstance(args[0], str): # Legacy signature: (file_path, offset=0, length=-1) file_path = args[0] offset = args[1] if len(args) > 1 else 0 length = args[2] if len(args) > 2 else -1 parsed = urlparse(file_path) if parsed.scheme == "file": file_uri = file_path else: file_uri = f"file://{file_path}" file_io = FileIO(file_uri, {}) uri_reader = FileUriReader(file_io) descriptor = BlobDescriptor(file_path, offset, length) return Blob.from_descriptor(uri_reader, descriptor) else: raise TypeError("from_file expects either (file_io, file_path, offset, length) or (file_path, offset=0, length=-1)") ``` ########## paimon-python/pypaimon/common/uri_reader.py: ########## @@ -0,0 +1,164 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import io +from abc import ABC, abstractmethod +from pathlib import Path +from typing import Any, Optional +from urllib.parse import urlparse, ParseResult +from urllib.request import urlopen + +from cachetools import LRUCache +from readerwriterlock import rwlock + +from pypaimon.common.config import CatalogOptions + + +class UriReader(ABC): + @classmethod + def from_http(cls) -> 'HttpUriReader': + return HttpUriReader() + + @classmethod + def from_file(cls, file_io: Any) -> 'FileUriReader': + return FileUriReader(file_io) + + @abstractmethod + def new_input_stream(self, uri: str): + pass + + +class FileUriReader(UriReader): + + def __init__(self, file_io: Any): + self._file_io = file_io + + def new_input_stream(self, uri: str): + try: + parsed_uri = urlparse(uri) + if parsed_uri.scheme == 'file': + file_path = parsed_uri.path + else: + file_path = uri + path_obj = Path(file_path) Review Comment: For non-file URIs (e.g., s3://, oss://, hdfs://), wrapping the full URI string with Path collapses the scheme and netloc (e.g., 's3://bucket/key' becomes 's3:/bucket/key'), which breaks FileIO-backed filesystems. Build the filesystem path as netloc+path for non-file schemes before passing to FileIO, and only use Path(parsed_uri.path) for file URIs. ```suggestion path_obj = Path(parsed_uri.path) else: # For non-file URIs, build path as netloc + path path_obj = Path(parsed_uri.netloc + parsed_uri.path) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
