Copilot commented on code in PR #6404:
URL: https://github.com/apache/paimon/pull/6404#discussion_r2434695408


##########
paimon-python/pypaimon/common/config.py:
##########
@@ -47,6 +47,7 @@ class CatalogOptions:
     DLF_TOKEN_ECS_METADATA_URL = "dlf.token-ecs-metadata-url"
     PREFIX = 'prefix'
     HTTP_USER_AGENT_HEADER = 'header.HTTP_USER_AGENT'
+    BLOB_FILE_IO_DEFAULT_CACHE_SIZE = 2**31 - 1

Review Comment:
   The default LRU cache size is extremely large and can lead to unbounded 
memory usage in long-lived processes. Consider making this value configurable 
(via catalog options) with a conservative default (e.g., 256 or 1024).
   ```suggestion
       # Default LRU cache size for blob file IO. This value is configurable 
via catalog options.
       BLOB_FILE_IO_DEFAULT_CACHE_SIZE = 1024  # Conservative default to 
prevent unbounded memory usage
   ```



##########
paimon-python/pypaimon/common/uri_reader.py:
##########
@@ -0,0 +1,154 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import io
+from abc import ABC, abstractmethod
+from pathlib import Path
+from typing import Any, Optional
+from urllib.parse import urlparse, ParseResult
+from urllib.request import urlopen
+
+from cachetools import LRUCache
+from readerwriterlock import rwlock
+
+from pypaimon.common.config import CatalogOptions
+
+
+class UriReader(ABC):
+    @classmethod
+    def from_http(cls) -> 'HttpUriReader':
+        return HttpUriReader()
+
+    @classmethod
+    def from_file(cls, file_io: Any) -> 'FileUriReader':
+        return FileUriReader(file_io)
+
+    @abstractmethod
+    def new_input_stream(self, uri: str) -> io.BytesIO:
+        pass
+
+
+class FileUriReader(UriReader):
+
+    def __init__(self, file_io: Any):
+        self._file_io = file_io
+
+    def new_input_stream(self, uri: str) -> io.BytesIO:
+        try:
+            path_obj = Path(uri)
+            with self._file_io.new_input_stream(path_obj) as input_stream:
+                data = input_stream.read()
+                return io.BytesIO(data)
+        except Exception as e:
+            raise IOError(f"Failed to read file {uri}: {e}")

Review Comment:
   FileUriReader.new_input_stream does not normalize file:// URIs; 
Path('file:///...') will not resolve to a valid local path. Parse the URI and 
use parsed.path when scheme == 'file', otherwise treat the input as a 
filesystem path.



##########
paimon-python/pypaimon/tests/blob_test.py:
##########
@@ -197,7 +188,8 @@ def test_blob_factory_methods_return_correct_types(self):
         self.assertIsInstance(blob_ref, Blob)
 
         # from_file should return BlobRef
-        blob_file = Blob.from_file(self.file)
+        file_io = FileIO(self.file if self.file.startswith('file://') else 
f"file://{self.file}", {})
+        blob_file = Blob.from_file(file_io, self.file, 0, len(self.file))

Review Comment:
   Length uses len(self.file), which is the length of the file path string, not 
the file's byte length. Use the actual file size, e.g., 
os.path.getsize(self.file) or read and len(content).
   ```suggestion
           blob_file = Blob.from_file(file_io, self.file, 0, 
os.path.getsize(self.file))
   ```



##########
paimon-python/pypaimon/common/file_io.py:
##########
@@ -370,7 +371,7 @@ def record_generator():
         with self.new_output_stream(path) as output_stream:
             fastavro.writer(output_stream, avro_schema, records, **kwargs)
 
-    def write_blob(self, path: Path, data: pyarrow.Table, **kwargs):
+    def write_blob(self, path: Path, data: pyarrow.Table, blob_as_descriptor: 
bool, **kwargs):

Review Comment:
   Changing the method signature to require blob_as_descriptor is a breaking 
change for existing callers. To preserve backward compatibility, add a default 
value (e.g., blob_as_descriptor: bool = False) or accept it via kwargs/options.
   ```suggestion
       def write_blob(self, path: Path, data: pyarrow.Table, 
blob_as_descriptor: bool = False, **kwargs):
   ```



##########
paimon-python/pypaimon/common/uri_reader.py:
##########
@@ -0,0 +1,154 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import io
+from abc import ABC, abstractmethod
+from pathlib import Path
+from typing import Any, Optional
+from urllib.parse import urlparse, ParseResult
+from urllib.request import urlopen
+
+from cachetools import LRUCache
+from readerwriterlock import rwlock
+
+from pypaimon.common.config import CatalogOptions
+
+
+class UriReader(ABC):
+    @classmethod
+    def from_http(cls) -> 'HttpUriReader':
+        return HttpUriReader()
+
+    @classmethod
+    def from_file(cls, file_io: Any) -> 'FileUriReader':
+        return FileUriReader(file_io)
+
+    @abstractmethod
+    def new_input_stream(self, uri: str) -> io.BytesIO:
+        pass
+
+
+class FileUriReader(UriReader):
+
+    def __init__(self, file_io: Any):
+        self._file_io = file_io
+
+    def new_input_stream(self, uri: str) -> io.BytesIO:
+        try:
+            path_obj = Path(uri)
+            with self._file_io.new_input_stream(path_obj) as input_stream:
+                data = input_stream.read()
+                return io.BytesIO(data)
+        except Exception as e:
+            raise IOError(f"Failed to read file {uri}: {e}")
+
+
+class HttpUriReader(UriReader):
+
+    def new_input_stream(self, uri: str) -> io.BytesIO:
+        try:
+            with urlopen(uri) as response:
+                data = response.read()
+                return io.BytesIO(data)
+        except Exception as e:
+            raise IOError(f"Failed to read HTTP URI {uri}: {e}")
+
+
+class UriKey:
+
+    def __init__(self, scheme: Optional[str], authority: Optional[str]) -> 
None:
+        self._scheme = scheme
+        self._authority = authority
+        self._hash = hash((self._scheme, self._authority))
+
+    @property
+    def scheme(self) -> Optional[str]:
+        return self._scheme
+
+    @property
+    def authority(self) -> Optional[str]:
+        return self._authority
+
+    def __eq__(self, other: object) -> bool:
+        if not isinstance(other, UriKey):
+            return False
+
+        return (self._scheme == other._scheme and
+                self._authority == other._authority)
+
+    def __hash__(self) -> int:
+        return self._hash
+
+    def __repr__(self) -> str:
+        return f"UriKey(scheme='{self._scheme}', 
authority='{self._authority}')"
+
+
+class UriReaderFactory:
+
+    def __init__(self, catalog_options: dict) -> None:
+        self.catalog_options = catalog_options
+        self._readers = 
LRUCache(CatalogOptions.BLOB_FILE_IO_DEFAULT_CACHE_SIZE)
+        self._readers_lock = rwlock.RWLockFair()
+
+    def create(self, input_uri: str) -> UriReader:
+        try:
+            parsed_uri = urlparse(input_uri)
+        except Exception as e:
+            raise ValueError(f"Invalid URI: {input_uri}") from e
+
+        key = UriKey(parsed_uri.scheme, parsed_uri.netloc or None)
+        lock = self._readers_lock.gen_rlock()
+        try:
+            lock.acquire()
+            reader = self._readers.get(key)
+            if reader is not None:
+                return reader
+            else:
+                reader = self._new_reader(key, parsed_uri)
+                self._readers[key] = reader
+            return reader
+        finally:
+            lock.release()

Review Comment:
   The factory mutates the LRUCache under a read lock (gen_rlock), which is not 
safe and may race under concurrency. Use a write lock when inserting into the 
cache, or use a double-checked locking pattern: read under read lock, if miss 
then acquire write lock, recheck, then insert.
   ```suggestion
           rlock = self._readers_lock.gen_rlock()
           rlock.acquire()
           try:
               reader = self._readers.get(key)
               if reader is not None:
                   return reader
           finally:
               rlock.release()
   
           wlock = self._readers_lock.gen_wlock()
           wlock.acquire()
           try:
               reader = self._readers.get(key)
               if reader is not None:
                   return reader
               reader = self._new_reader(key, parsed_uri)
               self._readers[key] = reader
               return reader
           finally:
               wlock.release()
   ```



##########
paimon-python/pypaimon/tests/blob_test.py:
##########
@@ -505,7 +497,8 @@ def test_blob_descriptor_with_blob_ref(self):
         descriptor = BlobDescriptor(self.file, 0, -1)
 
         # Create BlobRef from descriptor
-        blob_ref = BlobRef(descriptor)
+        file_io = FileIO(self.file if self.file.startswith('file://') else 
f"file://{self.file}", {})
+        blob_ref = BlobRef(file_io, descriptor)

Review Comment:
   BlobRef now expects a UriReader, not a FileIO. Construct a reader, e.g., 
from pypaimon.common.uri_reader import FileUriReader and use 
BlobRef(FileUriReader(file_io), descriptor), or use 
UriReaderFactory.create(descriptor.uri).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to