Copilot commented on code in PR #150:
URL: https://github.com/apache/fluss-rust/pull/150#discussion_r2771571891
##########
bindings/python/src/table.rs:
##########
@@ -1914,9 +1900,215 @@ impl LogScanner {
table_info,
projected_schema,
projected_row_type,
- start_timestamp: None,
- end_timestamp: None,
+ partition_name_cache: std::sync::RwLock::new(None),
+ }
+ }
+
+ /// Get partition_id -> partition_name mapping, using cache if available
+ fn get_partition_name_map(
+ &self,
+ py: Python,
+ table_path: &fcore::metadata::TablePath,
+ ) -> PyResult<HashMap<i64, String>> {
+ // Check cache first (read lock)
+ {
+ let cache = self.partition_name_cache.read().unwrap();
+ if let Some(map) = cache.as_ref() {
+ return Ok(map.clone());
+ }
+ }
+
+ // Fetch partition infos (releases GIL during async call)
+ let partition_infos: Vec<fcore::metadata::PartitionInfo> = py
+ .detach(|| {
+ TOKIO_RUNTIME.block_on(async {
self.admin.list_partition_infos(table_path).await })
+ })
+ .map_err(|e| FlussError::new_err(format!("Failed to list partition
infos: {e}")))?;
+
+ // Build and cache the mapping
+ let map: HashMap<i64, String> = partition_infos
+ .into_iter()
+ .map(|info| (info.get_partition_id(), info.get_partition_name()))
+ .collect();
+
+ // Store in cache (write lock)
+ {
+ let mut cache = self.partition_name_cache.write().unwrap();
Review Comment:
The `unwrap()` calls on `RwLock::read()` and `RwLock::write()` will panic if
the lock is poisoned (which happens when a thread panics while holding the
lock). While poison is rare in Python bindings due to controlled execution,
it's better to handle this gracefully. Consider using `expect()` with a
descriptive message or mapping to a PyErr using `map_err()` instead of
`unwrap()`.
##########
bindings/python/src/table.rs:
##########
@@ -1914,9 +1900,215 @@ impl LogScanner {
table_info,
projected_schema,
projected_row_type,
- start_timestamp: None,
- end_timestamp: None,
+ partition_name_cache: std::sync::RwLock::new(None),
+ }
+ }
+
+ /// Get partition_id -> partition_name mapping, using cache if available
+ fn get_partition_name_map(
+ &self,
+ py: Python,
+ table_path: &fcore::metadata::TablePath,
+ ) -> PyResult<HashMap<i64, String>> {
+ // Check cache first (read lock)
+ {
+ let cache = self.partition_name_cache.read().unwrap();
+ if let Some(map) = cache.as_ref() {
+ return Ok(map.clone());
+ }
+ }
+
+ // Fetch partition infos (releases GIL during async call)
+ let partition_infos: Vec<fcore::metadata::PartitionInfo> = py
+ .detach(|| {
+ TOKIO_RUNTIME.block_on(async {
self.admin.list_partition_infos(table_path).await })
+ })
+ .map_err(|e| FlussError::new_err(format!("Failed to list partition
infos: {e}")))?;
+
+ // Build and cache the mapping
+ let map: HashMap<i64, String> = partition_infos
+ .into_iter()
+ .map(|info| (info.get_partition_id(), info.get_partition_name()))
+ .collect();
+
+ // Store in cache (write lock)
+ {
+ let mut cache = self.partition_name_cache.write().unwrap();
+ *cache = Some(map.clone());
+ }
+
+ Ok(map)
Review Comment:
The partition name cache is never invalidated. If partitions are added or
deleted during the lifetime of a LogScanner, the cache will become stale and
may cause incorrect behavior or errors. Consider either:
1. Removing the cache if LogScanner instances are short-lived
2. Adding a cache invalidation mechanism if partitions can be dynamically
created/dropped
3. Adding a TTL or max-age for cached entries
Additionally, there's a potential race condition: if multiple threads call
this method concurrently before the cache is populated, they may all fetch
partition infos simultaneously. Consider using a synchronization primitive like
`once_cell::sync::OnceCell` or check-before-write pattern with double-checked
locking.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]