Copilot commented on code in PR #6404: URL: https://github.com/apache/paimon/pull/6404#discussion_r2434732165
########## paimon-python/pypaimon/common/uri_reader.py: ########## @@ -0,0 +1,166 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import io +from abc import ABC, abstractmethod +from pathlib import Path +from typing import Any, Optional +from urllib.parse import urlparse, ParseResult +from urllib.request import urlopen + +from cachetools import LRUCache +from readerwriterlock import rwlock + +from pypaimon.common.config import CatalogOptions + + +class UriReader(ABC): + @classmethod + def from_http(cls) -> 'HttpUriReader': + return HttpUriReader() + + @classmethod + def from_file(cls, file_io: Any) -> 'FileUriReader': + return FileUriReader(file_io) + + @abstractmethod + def new_input_stream(self, uri: str) -> io.BytesIO: + pass + + +class FileUriReader(UriReader): + + def __init__(self, file_io: Any): + self._file_io = file_io + + def new_input_stream(self, uri: str) -> io.BytesIO: + try: + parsed_uri = urlparse(uri) + if parsed_uri.scheme == 'file': + file_path = parsed_uri.path + else: + file_path = uri + path_obj = Path(file_path) + with self._file_io.new_input_stream(path_obj) as input_stream: + data = input_stream.read() + return io.BytesIO(data) + except Exception as e: + raise IOError(f"Failed to read file {uri}: {e}") + + +class HttpUriReader(UriReader): + + def new_input_stream(self, uri: str) -> io.BytesIO: + try: + with urlopen(uri) as response: + data = response.read() + return io.BytesIO(data) + except Exception as e: + raise IOError(f"Failed to read HTTP URI {uri}: {e}") + + +class UriKey: + + def __init__(self, scheme: Optional[str], authority: Optional[str]) -> None: + self._scheme = scheme + self._authority = authority + self._hash = hash((self._scheme, self._authority)) + + @property + def scheme(self) -> Optional[str]: + return self._scheme + + @property + def authority(self) -> Optional[str]: + return self._authority + + def __eq__(self, other: object) -> bool: + if not isinstance(other, UriKey): + return False + + return (self._scheme == other._scheme and + self._authority == other._authority) + + def __hash__(self) -> int: + return self._hash + + def __repr__(self) -> str: + return f"UriKey(scheme='{self._scheme}', authority='{self._authority}')" + + +class UriReaderFactory: + + def __init__(self, catalog_options: dict) -> None: + self.catalog_options = catalog_options + self._readers = LRUCache(CatalogOptions.BLOB_FILE_IO_DEFAULT_CACHE_SIZE) Review Comment: This uses the very large default cache size directly, which can lead to unbounded memory growth. Read a bounded capacity from catalog_options with a sensible default, e.g., capacity = int(catalog_options.get('blob.file-io.cache-size', 256)). ```suggestion cache_size = int(self.catalog_options.get('blob.file-io.cache-size', CatalogOptions.BLOB_FILE_IO_DEFAULT_CACHE_SIZE)) self._readers = LRUCache(cache_size) ``` ########## paimon-python/pypaimon/common/core_options.py: ########## @@ -43,6 +43,7 @@ def __str__(self): FILE_COMPRESSION_PER_LEVEL = "file.compression.per.level" FILE_FORMAT_PER_LEVEL = "file.format.per.level" FILE_BLOCK_SIZE = "file.block-size" + FILE_BLOB_AS_DESCRIPTOR = "blob-as-descriptor" Review Comment: [nitpick] Configuration key naming is inconsistent with other FILE_* options which use the 'file.*' namespace (e.g., 'file.block-size'). Consider renaming the key to 'file.blob-as-descriptor' and updating lookups accordingly to maintain consistent option naming. ```suggestion FILE_BLOB_AS_DESCRIPTOR = "file.blob-as-descriptor" ``` ########## paimon-python/pypaimon/common/uri_reader.py: ########## @@ -0,0 +1,166 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import io +from abc import ABC, abstractmethod +from pathlib import Path +from typing import Any, Optional +from urllib.parse import urlparse, ParseResult +from urllib.request import urlopen + +from cachetools import LRUCache +from readerwriterlock import rwlock + +from pypaimon.common.config import CatalogOptions + + +class UriReader(ABC): + @classmethod + def from_http(cls) -> 'HttpUriReader': + return HttpUriReader() + + @classmethod + def from_file(cls, file_io: Any) -> 'FileUriReader': + return FileUriReader(file_io) + + @abstractmethod + def new_input_stream(self, uri: str) -> io.BytesIO: + pass + + +class FileUriReader(UriReader): + + def __init__(self, file_io: Any): + self._file_io = file_io + + def new_input_stream(self, uri: str) -> io.BytesIO: + try: + parsed_uri = urlparse(uri) + if parsed_uri.scheme == 'file': + file_path = parsed_uri.path + else: + file_path = uri + path_obj = Path(file_path) + with self._file_io.new_input_stream(path_obj) as input_stream: + data = input_stream.read() + return io.BytesIO(data) + except Exception as e: + raise IOError(f"Failed to read file {uri}: {e}") Review Comment: This reads the entire file into memory before returning a BytesIO, which is inefficient for large blobs; BlobRef then slices from this copy again. Prefer returning the underlying stream directly (e.g., return self._file_io.new_input_stream(path_obj)) so callers can seek/read only the requested range. ########## paimon-python/pypaimon/table/row/blob.py: ########## @@ -144,21 +146,31 @@ def from_data(data: bytes) -> 'Blob': @staticmethod def from_local(file: str) -> 'Blob': - return Blob.from_file(file) + # Import FileIO locally to avoid circular imports + from pypaimon.common.file_io import FileIO + + # Create a minimal FileIO instance for local files + file_io = FileIO(f"file://{file}", {}) + + # Use -1 length to indicate "read entire file" + uri_reader = FileUriReader(file_io) + descriptor = BlobDescriptor(file, 0, -1) Review Comment: If file already includes a 'file://' scheme, this will produce an invalid URI like 'file://file:///...'. Guard against double-prefixing by checking the scheme before prepending or normalize via urlparse. ```suggestion from urllib.parse import urlparse # Guard against double-prefixing 'file://' parsed = urlparse(file) if parsed.scheme == "file": file_uri = file else: file_uri = f"file://{file}" # Create a minimal FileIO instance for local files file_io = FileIO(file_uri, {}) # Use -1 length to indicate "read entire file" uri_reader = FileUriReader(file_io) descriptor = BlobDescriptor(file_uri, 0, -1) ``` ########## paimon-python/pypaimon/common/uri_reader.py: ########## @@ -0,0 +1,166 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import io +from abc import ABC, abstractmethod +from pathlib import Path +from typing import Any, Optional +from urllib.parse import urlparse, ParseResult +from urllib.request import urlopen + +from cachetools import LRUCache +from readerwriterlock import rwlock + +from pypaimon.common.config import CatalogOptions + + +class UriReader(ABC): + @classmethod + def from_http(cls) -> 'HttpUriReader': + return HttpUriReader() + + @classmethod + def from_file(cls, file_io: Any) -> 'FileUriReader': + return FileUriReader(file_io) + + @abstractmethod + def new_input_stream(self, uri: str) -> io.BytesIO: + pass + + +class FileUriReader(UriReader): + + def __init__(self, file_io: Any): + self._file_io = file_io + + def new_input_stream(self, uri: str) -> io.BytesIO: + try: + parsed_uri = urlparse(uri) + if parsed_uri.scheme == 'file': + file_path = parsed_uri.path + else: + file_path = uri + path_obj = Path(file_path) + with self._file_io.new_input_stream(path_obj) as input_stream: + data = input_stream.read() + return io.BytesIO(data) + except Exception as e: + raise IOError(f"Failed to read file {uri}: {e}") + + +class HttpUriReader(UriReader): + + def new_input_stream(self, uri: str) -> io.BytesIO: + try: + with urlopen(uri) as response: + data = response.read() + return io.BytesIO(data) + except Exception as e: + raise IOError(f"Failed to read HTTP URI {uri}: {e}") Review Comment: [nitpick] urlopen is called without a timeout, which can hang indefinitely on slow/unresponsive endpoints. Add a timeout (e.g., urlopen(uri, timeout=10)) or make it configurable through options. ########## paimon-python/pypaimon/common/config.py: ########## @@ -47,6 +47,7 @@ class CatalogOptions: DLF_TOKEN_ECS_METADATA_URL = "dlf.token-ecs-metadata-url" PREFIX = 'prefix' HTTP_USER_AGENT_HEADER = 'header.HTTP_USER_AGENT' + BLOB_FILE_IO_DEFAULT_CACHE_SIZE = 2**31 - 1 Review Comment: The default cache capacity for URI readers is set to 2**31 - 1, which is extremely large and risks excessive memory usage. Use a small, reasonable default (e.g., 128 or 256) and make it configurable via a string option so it can be tuned through catalog_options. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
