Copilot commented on code in PR #143:
URL: https://github.com/apache/fluss-rust/pull/143#discussion_r2679335201


##########
crates/fluss/src/client/table/log_fetch_buffer.rs:
##########
@@ -75,34 +78,36 @@ impl LogFetchBuffer {
         self.completed_fetches.lock().is_empty()
     }
 
-    /// Wait for the buffer to become non-empty, with timeout
-    /// Returns true if data became available, false if timeout
-    pub async fn await_not_empty(&self, timeout: Duration) -> bool {
+    /// Wait for the buffer to become non-empty, with timeout.
+    /// Returns true if data became available, false if timeout.
+    pub async fn await_not_empty(&self, timeout: Duration) -> Result<bool> {
         let deadline = std::time::Instant::now() + timeout;
 
         loop {
             // Check if buffer is not empty
             if !self.is_empty() {
-                return true;
+                return Ok(true);
             }
 
             // Check if woken up
             if self.woken_up.swap(false, Ordering::Acquire) {
-                return true;
+                return Err(Error::WakeupError {
+                    message: "The await is wakeup.".to_string(),

Review Comment:
   The error message "The await is wakeup" is grammatically incorrect. It 
should be "The await was woken up" or "The await operation was interrupted by 
wakeup" for proper grammar and clarity.
   ```suggestion
                       message: "The await operation was interrupted by 
wakeup.".to_string(),
   ```



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -661,6 +804,70 @@ impl LogFetcher {
                     continue;
                 };
 
+                let error_code = fetch_log_for_bucket
+                    .error_code
+                    .unwrap_or(FlussError::None.code());
+                if error_code != FlussError::None.code() {
+                    let error = FlussError::for_code(error_code);
+                    let error_message = fetch_log_for_bucket
+                        .error_message
+                        .clone()
+                        .unwrap_or_else(|| error.message().to_string());
+
+                    
log_scanner_status.move_bucket_to_end(table_bucket.clone());
+                    log_fetch_buffer.add_api_error(
+                        table_bucket.clone(),
+                        ApiError {
+                            code: error_code,
+                            message: error_message.clone(),
+                        },
+                        fetch_offset,
+                    );
+
+                    match error {
+                        FlussError::NotLeaderOrFollower
+                        | FlussError::LogStorageException
+                        | FlussError::KvStorageException
+                        | FlussError::StorageException
+                        | FlussError::FencedLeaderEpochException => {
+                            debug!(
+                                "Error in fetch for bucket {table_bucket}: 
{error:?}: {error_message}"
+                            );
+                        }
+                        FlussError::UnknownTableOrBucketException => {
+                            warn!(
+                                "Received unknown table or bucket error in 
fetch for bucket {table_bucket}"
+                            );

Review Comment:
   In the handle_fetch_response function, errors like NotLeaderOrFollower, 
LogStorageException, KvStorageException, StorageException, 
FencedLeaderEpochException, and UnknownTableOrBucketException are logged but do 
not trigger metadata refresh. However, in the initialize_fetch function (lines 
1071-1085), these same errors do trigger metadata refresh via 
schedule_metadata_update. This inconsistency could lead to delayed recovery 
from transient errors. Consider adding metadata refresh scheduling for these 
error cases in handle_fetch_response as well.
   ```suggestion
                               );
                               self.schedule_metadata_update();
                           }
                           FlussError::UnknownTableOrBucketException => {
                               warn!(
                                   "Received unknown table or bucket error in 
fetch for bucket {table_bucket}"
                               );
                               self.schedule_metadata_update();
   ```



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -445,6 +449,100 @@ impl RecordBatchLogScanner {
     }
 }
 
+type RefreshFuture = Pin<Box<dyn Future<Output = Result<()>> + Send>>;
+type RefreshFn = Arc<dyn Fn() -> RefreshFuture + Send + Sync>;
+
+struct MetadataRefreshScheduler {
+    refresh: RefreshFn,
+    min_interval: Duration,
+    table_path: TablePath,
+    state: Arc<Mutex<RefreshState>>,
+}
+
+struct RefreshState {
+    running: bool,
+    pending: bool,
+    last_refresh: Option<Instant>,
+    last_error: Option<FlussError>,
+}
+
+impl MetadataRefreshScheduler {
+    fn new(table_path: TablePath, refresh: RefreshFn, min_interval: Duration) 
-> Self {
+        Self {
+            refresh,
+            min_interval,
+            table_path,
+            state: Arc::new(Mutex::new(RefreshState {
+                running: false,
+                pending: false,
+                last_refresh: None,
+                last_error: None,
+            })),
+        }
+    }
+
+    fn schedule(&self, error: FlussError) {
+        let state = Arc::clone(&self.state);
+        let refresh = self.refresh.clone();
+        let table_path = self.table_path.clone();
+        let min_interval = self.min_interval;
+
+        {
+            let mut guard = state.lock();
+            guard.pending = true;
+            guard.last_error = Some(error);
+            if guard.running {
+                return;
+            }
+            guard.running = true;
+        }
+
+        tokio::spawn(async move {
+            loop {
+                let (delay, error_for_log) = {
+                    let mut guard = state.lock();
+                    if !guard.pending {
+                        guard.running = false;
+                        return;
+                    }
+                    guard.pending = false;
+
+                    let now = Instant::now();
+                    let delay = match guard.last_refresh {
+                        Some(last) => {
+                            let earliest = last + min_interval;
+                            if now < earliest {
+                                earliest - now
+                            } else {
+                                Duration::from_millis(0)
+                            }
+                        }
+                        None => Duration::from_millis(0),
+                    };
+                    (delay, guard.last_error.take())
+                };
+
+                if !delay.is_zero() {
+                    sleep(delay).await;
+                }
+
+                if let Err(e) = (refresh)().await {
+                    if let Some(error) = error_for_log {
+                        warn!(
+                            "Failed to update metadata for {table_path} after 
fetch error {error:?}: {e:?}"
+                        );
+                    } else {
+                        warn!("Failed to update metadata for {table_path}: 
{e:?}");
+                    }
+                }
+
+                let mut guard = state.lock();
+                guard.last_refresh = Some(Instant::now());
+            }
+        });
+    }

Review Comment:
   The MetadataRefreshScheduler spawns a background tokio task (line 500) that 
is not explicitly tracked or canceled. When the LogFetcher (and its 
metadata_refresh field) is dropped, this background task will continue running 
until it naturally exits, potentially holding references to shared state longer 
than necessary. Consider using JoinHandle to track the spawned task and 
implement a cancellation mechanism (e.g., via CancellationToken from 
tokio_util) to ensure clean shutdown when the LogFetcher is dropped.



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -833,12 +1055,78 @@ impl LogFetcher {
         &self,
         mut completed_fetch: Box<dyn CompletedFetch>,
     ) -> Result<Option<Box<dyn CompletedFetch>>> {
-        // todo: handle error in initialize fetch
-        let table_bucket = completed_fetch.table_bucket();
+        if let Some(error) = completed_fetch.take_error() {
+            return Err(error);
+        }
+
+        let table_bucket = completed_fetch.table_bucket().clone();
         let fetch_offset = completed_fetch.next_fetch_offset();
 
+        if let Some(api_error) = completed_fetch.api_error() {
+            let error = FlussError::for_code(api_error.code);
+            let error_message = api_error.message.as_str();
+            self.log_scanner_status
+                .move_bucket_to_end(table_bucket.clone());
+            match error {
+                FlussError::NotLeaderOrFollower
+                | FlussError::LogStorageException
+                | FlussError::KvStorageException
+                | FlussError::StorageException
+                | FlussError::FencedLeaderEpochException => {
+                    debug!("Error in fetch for bucket {table_bucket}: 
{error:?}: {error_message}");
+                    self.schedule_metadata_update(error);
+                    return Ok(None);
+                }
+                FlussError::UnknownTableOrBucketException => {
+                    warn!(
+                        "Received unknown table or bucket error in fetch for 
bucket {table_bucket}"
+                    );
+                    self.schedule_metadata_update(error);
+                    return Ok(None);
+                }
+                FlussError::LogOffsetOutOfRangeException => {
+                    return Err(Error::UnexpectedError {
+                        message: format!(
+                            "The fetching offset {fetch_offset} is out of 
range: {error_message}"
+                        ),
+                        source: None,
+                    });
+                }
+                FlussError::AuthorizationException => {
+                    return Err(Error::FlussAPIError {
+                        api_error: ApiError {
+                            code: api_error.code,
+                            message: api_error.message.to_string(),
+                        },
+                    });
+                }
+                FlussError::UnknownServerError => {
+                    warn!(
+                        "Unknown server error while fetching offset 
{fetch_offset} for bucket {table_bucket}: {error_message}"
+                    );
+                    return Ok(None);
+                }
+                FlussError::CorruptMessage => {
+                    return Err(Error::UnexpectedError {
+                        message: format!(
+                            "Encountered corrupt message when fetching offset 
{fetch_offset} for bucket {table_bucket}: {error_message}"
+                        ),
+                        source: None,
+                    });
+                }
+                _ => {
+                    return Err(Error::UnexpectedError {
+                        message: format!(
+                            "Unexpected error code {error:?} while fetching at 
offset {fetch_offset} from bucket {table_bucket}: {error_message}"
+                        ),
+                        source: None,
+                    });
+                }
+            }

Review Comment:
   There is significant code duplication between the error handling logic in 
this match statement (lines 1070-1125) and the nearly identical match statement 
in the handle_fetch_response function (lines 827-867). This duplication makes 
maintenance harder and increases the risk of inconsistencies. Consider 
extracting this error handling logic into a shared helper function that both 
code paths can call.



##########
crates/fluss/src/client/table/log_fetch_buffer.rs:
##########
@@ -322,6 +478,19 @@ impl DefaultCompletedFetch {
         }
     }
 
+    fn fetch_error(&self) -> Error {
+        let mut message = format!(
+            "Received exception when fetching the next record from 
{table_bucket}. If needed, please back to past the record to continue 
scanning.",

Review Comment:
   The phrase "back to past the record" is grammatically incorrect. It should 
be "go back past the record" or "skip past the record" to be clear and correct.
   ```suggestion
               "Received exception when fetching the next record from 
{table_bucket}. If needed, please skip past the record to continue scanning.",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to