Copilot commented on code in PR #6404:
URL: https://github.com/apache/paimon/pull/6404#discussion_r2434732165


##########
paimon-python/pypaimon/common/uri_reader.py:
##########
@@ -0,0 +1,166 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import io
+from abc import ABC, abstractmethod
+from pathlib import Path
+from typing import Any, Optional
+from urllib.parse import urlparse, ParseResult
+from urllib.request import urlopen
+
+from cachetools import LRUCache
+from readerwriterlock import rwlock
+
+from pypaimon.common.config import CatalogOptions
+
+
+class UriReader(ABC):
+    @classmethod
+    def from_http(cls) -> 'HttpUriReader':
+        return HttpUriReader()
+
+    @classmethod
+    def from_file(cls, file_io: Any) -> 'FileUriReader':
+        return FileUriReader(file_io)
+
+    @abstractmethod
+    def new_input_stream(self, uri: str) -> io.BytesIO:
+        pass
+
+
+class FileUriReader(UriReader):
+
+    def __init__(self, file_io: Any):
+        self._file_io = file_io
+
+    def new_input_stream(self, uri: str) -> io.BytesIO:
+        try:
+            parsed_uri = urlparse(uri)
+            if parsed_uri.scheme == 'file':
+                file_path = parsed_uri.path
+            else:
+                file_path = uri
+            path_obj = Path(file_path)
+            with self._file_io.new_input_stream(path_obj) as input_stream:
+                data = input_stream.read()
+                return io.BytesIO(data)
+        except Exception as e:
+            raise IOError(f"Failed to read file {uri}: {e}")
+
+
+class HttpUriReader(UriReader):
+
+    def new_input_stream(self, uri: str) -> io.BytesIO:
+        try:
+            with urlopen(uri) as response:
+                data = response.read()
+                return io.BytesIO(data)
+        except Exception as e:
+            raise IOError(f"Failed to read HTTP URI {uri}: {e}")
+
+
+class UriKey:
+
+    def __init__(self, scheme: Optional[str], authority: Optional[str]) -> 
None:
+        self._scheme = scheme
+        self._authority = authority
+        self._hash = hash((self._scheme, self._authority))
+
+    @property
+    def scheme(self) -> Optional[str]:
+        return self._scheme
+
+    @property
+    def authority(self) -> Optional[str]:
+        return self._authority
+
+    def __eq__(self, other: object) -> bool:
+        if not isinstance(other, UriKey):
+            return False
+
+        return (self._scheme == other._scheme and
+                self._authority == other._authority)
+
+    def __hash__(self) -> int:
+        return self._hash
+
+    def __repr__(self) -> str:
+        return f"UriKey(scheme='{self._scheme}', 
authority='{self._authority}')"
+
+
+class UriReaderFactory:
+
+    def __init__(self, catalog_options: dict) -> None:
+        self.catalog_options = catalog_options
+        self._readers = 
LRUCache(CatalogOptions.BLOB_FILE_IO_DEFAULT_CACHE_SIZE)

Review Comment:
   This uses the very large default cache size directly, which can lead to 
unbounded memory growth. Read a bounded capacity from catalog_options with a 
sensible default, e.g., capacity = 
int(catalog_options.get('blob.file-io.cache-size', 256)).
   ```suggestion
           cache_size = int(self.catalog_options.get('blob.file-io.cache-size', 
CatalogOptions.BLOB_FILE_IO_DEFAULT_CACHE_SIZE))
           self._readers = LRUCache(cache_size)
   ```



##########
paimon-python/pypaimon/common/core_options.py:
##########
@@ -43,6 +43,7 @@ def __str__(self):
     FILE_COMPRESSION_PER_LEVEL = "file.compression.per.level"
     FILE_FORMAT_PER_LEVEL = "file.format.per.level"
     FILE_BLOCK_SIZE = "file.block-size"
+    FILE_BLOB_AS_DESCRIPTOR = "blob-as-descriptor"

Review Comment:
   [nitpick] Configuration key naming is inconsistent with other FILE_* options 
which use the 'file.*' namespace (e.g., 'file.block-size'). Consider renaming 
the key to 'file.blob-as-descriptor' and updating lookups accordingly to 
maintain consistent option naming.
   ```suggestion
       FILE_BLOB_AS_DESCRIPTOR = "file.blob-as-descriptor"
   ```



##########
paimon-python/pypaimon/common/uri_reader.py:
##########
@@ -0,0 +1,166 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import io
+from abc import ABC, abstractmethod
+from pathlib import Path
+from typing import Any, Optional
+from urllib.parse import urlparse, ParseResult
+from urllib.request import urlopen
+
+from cachetools import LRUCache
+from readerwriterlock import rwlock
+
+from pypaimon.common.config import CatalogOptions
+
+
+class UriReader(ABC):
+    @classmethod
+    def from_http(cls) -> 'HttpUriReader':
+        return HttpUriReader()
+
+    @classmethod
+    def from_file(cls, file_io: Any) -> 'FileUriReader':
+        return FileUriReader(file_io)
+
+    @abstractmethod
+    def new_input_stream(self, uri: str) -> io.BytesIO:
+        pass
+
+
+class FileUriReader(UriReader):
+
+    def __init__(self, file_io: Any):
+        self._file_io = file_io
+
+    def new_input_stream(self, uri: str) -> io.BytesIO:
+        try:
+            parsed_uri = urlparse(uri)
+            if parsed_uri.scheme == 'file':
+                file_path = parsed_uri.path
+            else:
+                file_path = uri
+            path_obj = Path(file_path)
+            with self._file_io.new_input_stream(path_obj) as input_stream:
+                data = input_stream.read()
+                return io.BytesIO(data)
+        except Exception as e:
+            raise IOError(f"Failed to read file {uri}: {e}")

Review Comment:
   This reads the entire file into memory before returning a BytesIO, which is 
inefficient for large blobs; BlobRef then slices from this copy again. Prefer 
returning the underlying stream directly (e.g., return 
self._file_io.new_input_stream(path_obj)) so callers can seek/read only the 
requested range.



##########
paimon-python/pypaimon/table/row/blob.py:
##########
@@ -144,21 +146,31 @@ def from_data(data: bytes) -> 'Blob':
 
     @staticmethod
     def from_local(file: str) -> 'Blob':
-        return Blob.from_file(file)
+        # Import FileIO locally to avoid circular imports
+        from pypaimon.common.file_io import FileIO
+
+        # Create a minimal FileIO instance for local files
+        file_io = FileIO(f"file://{file}", {})
+
+        # Use -1 length to indicate "read entire file"
+        uri_reader = FileUriReader(file_io)
+        descriptor = BlobDescriptor(file, 0, -1)

Review Comment:
   If file already includes a 'file://' scheme, this will produce an invalid 
URI like 'file://file:///...'. Guard against double-prefixing by checking the 
scheme before prepending or normalize via urlparse.
   ```suggestion
           from urllib.parse import urlparse
   
           # Guard against double-prefixing 'file://'
           parsed = urlparse(file)
           if parsed.scheme == "file":
               file_uri = file
           else:
               file_uri = f"file://{file}"
   
           # Create a minimal FileIO instance for local files
           file_io = FileIO(file_uri, {})
   
           # Use -1 length to indicate "read entire file"
           uri_reader = FileUriReader(file_io)
           descriptor = BlobDescriptor(file_uri, 0, -1)
   ```



##########
paimon-python/pypaimon/common/uri_reader.py:
##########
@@ -0,0 +1,166 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import io
+from abc import ABC, abstractmethod
+from pathlib import Path
+from typing import Any, Optional
+from urllib.parse import urlparse, ParseResult
+from urllib.request import urlopen
+
+from cachetools import LRUCache
+from readerwriterlock import rwlock
+
+from pypaimon.common.config import CatalogOptions
+
+
+class UriReader(ABC):
+    @classmethod
+    def from_http(cls) -> 'HttpUriReader':
+        return HttpUriReader()
+
+    @classmethod
+    def from_file(cls, file_io: Any) -> 'FileUriReader':
+        return FileUriReader(file_io)
+
+    @abstractmethod
+    def new_input_stream(self, uri: str) -> io.BytesIO:
+        pass
+
+
+class FileUriReader(UriReader):
+
+    def __init__(self, file_io: Any):
+        self._file_io = file_io
+
+    def new_input_stream(self, uri: str) -> io.BytesIO:
+        try:
+            parsed_uri = urlparse(uri)
+            if parsed_uri.scheme == 'file':
+                file_path = parsed_uri.path
+            else:
+                file_path = uri
+            path_obj = Path(file_path)
+            with self._file_io.new_input_stream(path_obj) as input_stream:
+                data = input_stream.read()
+                return io.BytesIO(data)
+        except Exception as e:
+            raise IOError(f"Failed to read file {uri}: {e}")
+
+
+class HttpUriReader(UriReader):
+
+    def new_input_stream(self, uri: str) -> io.BytesIO:
+        try:
+            with urlopen(uri) as response:
+                data = response.read()
+                return io.BytesIO(data)
+        except Exception as e:
+            raise IOError(f"Failed to read HTTP URI {uri}: {e}")

Review Comment:
   [nitpick] urlopen is called without a timeout, which can hang indefinitely 
on slow/unresponsive endpoints. Add a timeout (e.g., urlopen(uri, timeout=10)) 
or make it configurable through options.



##########
paimon-python/pypaimon/common/config.py:
##########
@@ -47,6 +47,7 @@ class CatalogOptions:
     DLF_TOKEN_ECS_METADATA_URL = "dlf.token-ecs-metadata-url"
     PREFIX = 'prefix'
     HTTP_USER_AGENT_HEADER = 'header.HTTP_USER_AGENT'
+    BLOB_FILE_IO_DEFAULT_CACHE_SIZE = 2**31 - 1

Review Comment:
   The default cache capacity for URI readers is set to 2**31 - 1, which is 
extremely large and risks excessive memory usage. Use a small, reasonable 
default (e.g., 128 or 256) and make it configurable via a string option so it 
can be tuned through catalog_options.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to