Copilot commented on code in PR #143:
URL: https://github.com/apache/fluss-rust/pull/143#discussion_r2679335201
##########
crates/fluss/src/client/table/log_fetch_buffer.rs:
##########
@@ -75,34 +78,36 @@ impl LogFetchBuffer {
self.completed_fetches.lock().is_empty()
}
- /// Wait for the buffer to become non-empty, with timeout
- /// Returns true if data became available, false if timeout
- pub async fn await_not_empty(&self, timeout: Duration) -> bool {
+ /// Wait for the buffer to become non-empty, with timeout.
+ /// Returns true if data became available, false if timeout.
+ pub async fn await_not_empty(&self, timeout: Duration) -> Result<bool> {
let deadline = std::time::Instant::now() + timeout;
loop {
// Check if buffer is not empty
if !self.is_empty() {
- return true;
+ return Ok(true);
}
// Check if woken up
if self.woken_up.swap(false, Ordering::Acquire) {
- return true;
+ return Err(Error::WakeupError {
+ message: "The await is wakeup.".to_string(),
Review Comment:
The error message "The await is wakeup" is grammatically incorrect. It
should be "The await was woken up" or "The await operation was interrupted by
wakeup" for proper grammar and clarity.
```suggestion
message: "The await operation was interrupted by
wakeup.".to_string(),
```
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -661,6 +804,70 @@ impl LogFetcher {
continue;
};
+ let error_code = fetch_log_for_bucket
+ .error_code
+ .unwrap_or(FlussError::None.code());
+ if error_code != FlussError::None.code() {
+ let error = FlussError::for_code(error_code);
+ let error_message = fetch_log_for_bucket
+ .error_message
+ .clone()
+ .unwrap_or_else(|| error.message().to_string());
+
+
log_scanner_status.move_bucket_to_end(table_bucket.clone());
+ log_fetch_buffer.add_api_error(
+ table_bucket.clone(),
+ ApiError {
+ code: error_code,
+ message: error_message.clone(),
+ },
+ fetch_offset,
+ );
+
+ match error {
+ FlussError::NotLeaderOrFollower
+ | FlussError::LogStorageException
+ | FlussError::KvStorageException
+ | FlussError::StorageException
+ | FlussError::FencedLeaderEpochException => {
+ debug!(
+ "Error in fetch for bucket {table_bucket}:
{error:?}: {error_message}"
+ );
+ }
+ FlussError::UnknownTableOrBucketException => {
+ warn!(
+ "Received unknown table or bucket error in
fetch for bucket {table_bucket}"
+ );
Review Comment:
In the handle_fetch_response function, errors like NotLeaderOrFollower,
LogStorageException, KvStorageException, StorageException,
FencedLeaderEpochException, and UnknownTableOrBucketException are logged but do
not trigger metadata refresh. However, in the initialize_fetch function (lines
1071-1085), these same errors do trigger metadata refresh via
schedule_metadata_update. This inconsistency could lead to delayed recovery
from transient errors. Consider adding metadata refresh scheduling for these
error cases in handle_fetch_response as well.
```suggestion
);
self.schedule_metadata_update();
}
FlussError::UnknownTableOrBucketException => {
warn!(
"Received unknown table or bucket error in
fetch for bucket {table_bucket}"
);
self.schedule_metadata_update();
```
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -445,6 +449,100 @@ impl RecordBatchLogScanner {
}
}
+type RefreshFuture = Pin<Box<dyn Future<Output = Result<()>> + Send>>;
+type RefreshFn = Arc<dyn Fn() -> RefreshFuture + Send + Sync>;
+
+struct MetadataRefreshScheduler {
+ refresh: RefreshFn,
+ min_interval: Duration,
+ table_path: TablePath,
+ state: Arc<Mutex<RefreshState>>,
+}
+
+struct RefreshState {
+ running: bool,
+ pending: bool,
+ last_refresh: Option<Instant>,
+ last_error: Option<FlussError>,
+}
+
+impl MetadataRefreshScheduler {
+ fn new(table_path: TablePath, refresh: RefreshFn, min_interval: Duration)
-> Self {
+ Self {
+ refresh,
+ min_interval,
+ table_path,
+ state: Arc::new(Mutex::new(RefreshState {
+ running: false,
+ pending: false,
+ last_refresh: None,
+ last_error: None,
+ })),
+ }
+ }
+
+ fn schedule(&self, error: FlussError) {
+ let state = Arc::clone(&self.state);
+ let refresh = self.refresh.clone();
+ let table_path = self.table_path.clone();
+ let min_interval = self.min_interval;
+
+ {
+ let mut guard = state.lock();
+ guard.pending = true;
+ guard.last_error = Some(error);
+ if guard.running {
+ return;
+ }
+ guard.running = true;
+ }
+
+ tokio::spawn(async move {
+ loop {
+ let (delay, error_for_log) = {
+ let mut guard = state.lock();
+ if !guard.pending {
+ guard.running = false;
+ return;
+ }
+ guard.pending = false;
+
+ let now = Instant::now();
+ let delay = match guard.last_refresh {
+ Some(last) => {
+ let earliest = last + min_interval;
+ if now < earliest {
+ earliest - now
+ } else {
+ Duration::from_millis(0)
+ }
+ }
+ None => Duration::from_millis(0),
+ };
+ (delay, guard.last_error.take())
+ };
+
+ if !delay.is_zero() {
+ sleep(delay).await;
+ }
+
+ if let Err(e) = (refresh)().await {
+ if let Some(error) = error_for_log {
+ warn!(
+ "Failed to update metadata for {table_path} after
fetch error {error:?}: {e:?}"
+ );
+ } else {
+ warn!("Failed to update metadata for {table_path}:
{e:?}");
+ }
+ }
+
+ let mut guard = state.lock();
+ guard.last_refresh = Some(Instant::now());
+ }
+ });
+ }
Review Comment:
The MetadataRefreshScheduler spawns a background tokio task (line 500) that
is not explicitly tracked or canceled. When the LogFetcher (and its
metadata_refresh field) is dropped, this background task will continue running
until it naturally exits, potentially holding references to shared state longer
than necessary. Consider using JoinHandle to track the spawned task and
implement a cancellation mechanism (e.g., via CancellationToken from
tokio_util) to ensure clean shutdown when the LogFetcher is dropped.
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -833,12 +1055,78 @@ impl LogFetcher {
&self,
mut completed_fetch: Box<dyn CompletedFetch>,
) -> Result<Option<Box<dyn CompletedFetch>>> {
- // todo: handle error in initialize fetch
- let table_bucket = completed_fetch.table_bucket();
+ if let Some(error) = completed_fetch.take_error() {
+ return Err(error);
+ }
+
+ let table_bucket = completed_fetch.table_bucket().clone();
let fetch_offset = completed_fetch.next_fetch_offset();
+ if let Some(api_error) = completed_fetch.api_error() {
+ let error = FlussError::for_code(api_error.code);
+ let error_message = api_error.message.as_str();
+ self.log_scanner_status
+ .move_bucket_to_end(table_bucket.clone());
+ match error {
+ FlussError::NotLeaderOrFollower
+ | FlussError::LogStorageException
+ | FlussError::KvStorageException
+ | FlussError::StorageException
+ | FlussError::FencedLeaderEpochException => {
+ debug!("Error in fetch for bucket {table_bucket}:
{error:?}: {error_message}");
+ self.schedule_metadata_update(error);
+ return Ok(None);
+ }
+ FlussError::UnknownTableOrBucketException => {
+ warn!(
+ "Received unknown table or bucket error in fetch for
bucket {table_bucket}"
+ );
+ self.schedule_metadata_update(error);
+ return Ok(None);
+ }
+ FlussError::LogOffsetOutOfRangeException => {
+ return Err(Error::UnexpectedError {
+ message: format!(
+ "The fetching offset {fetch_offset} is out of
range: {error_message}"
+ ),
+ source: None,
+ });
+ }
+ FlussError::AuthorizationException => {
+ return Err(Error::FlussAPIError {
+ api_error: ApiError {
+ code: api_error.code,
+ message: api_error.message.to_string(),
+ },
+ });
+ }
+ FlussError::UnknownServerError => {
+ warn!(
+ "Unknown server error while fetching offset
{fetch_offset} for bucket {table_bucket}: {error_message}"
+ );
+ return Ok(None);
+ }
+ FlussError::CorruptMessage => {
+ return Err(Error::UnexpectedError {
+ message: format!(
+ "Encountered corrupt message when fetching offset
{fetch_offset} for bucket {table_bucket}: {error_message}"
+ ),
+ source: None,
+ });
+ }
+ _ => {
+ return Err(Error::UnexpectedError {
+ message: format!(
+ "Unexpected error code {error:?} while fetching at
offset {fetch_offset} from bucket {table_bucket}: {error_message}"
+ ),
+ source: None,
+ });
+ }
+ }
Review Comment:
There is significant code duplication between the error handling logic in
this match statement (lines 1070-1125) and the nearly identical match statement
in the handle_fetch_response function (lines 827-867). This duplication makes
maintenance harder and increases the risk of inconsistencies. Consider
extracting this error handling logic into a shared helper function that both
code paths can call.
##########
crates/fluss/src/client/table/log_fetch_buffer.rs:
##########
@@ -322,6 +478,19 @@ impl DefaultCompletedFetch {
}
}
+ fn fetch_error(&self) -> Error {
+ let mut message = format!(
+ "Received exception when fetching the next record from
{table_bucket}. If needed, please back to past the record to continue
scanning.",
Review Comment:
The phrase "back to past the record" is grammatically incorrect. It should
be "go back past the record" or "skip past the record" to be clear and correct.
```suggestion
"Received exception when fetching the next record from
{table_bucket}. If needed, please skip past the record to continue scanning.",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]