zhaohaidao commented on code in PR #143:
URL: https://github.com/apache/fluss-rust/pull/143#discussion_r2686196501
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -833,12 +1055,78 @@ impl LogFetcher {
&self,
mut completed_fetch: Box<dyn CompletedFetch>,
) -> Result<Option<Box<dyn CompletedFetch>>> {
- // todo: handle error in initialize fetch
- let table_bucket = completed_fetch.table_bucket();
+ if let Some(error) = completed_fetch.take_error() {
+ return Err(error);
+ }
+
+ let table_bucket = completed_fetch.table_bucket().clone();
let fetch_offset = completed_fetch.next_fetch_offset();
+ if let Some(api_error) = completed_fetch.api_error() {
+ let error = FlussError::for_code(api_error.code);
+ let error_message = api_error.message.as_str();
+ self.log_scanner_status
+ .move_bucket_to_end(table_bucket.clone());
+ match error {
+ FlussError::NotLeaderOrFollower
+ | FlussError::LogStorageException
+ | FlussError::KvStorageException
+ | FlussError::StorageException
+ | FlussError::FencedLeaderEpochException => {
+ debug!("Error in fetch for bucket {table_bucket}:
{error:?}: {error_message}");
+ self.schedule_metadata_update(error);
+ return Ok(None);
+ }
+ FlussError::UnknownTableOrBucketException => {
+ warn!(
+ "Received unknown table or bucket error in fetch for
bucket {table_bucket}"
+ );
+ self.schedule_metadata_update(error);
+ return Ok(None);
+ }
+ FlussError::LogOffsetOutOfRangeException => {
+ return Err(Error::UnexpectedError {
+ message: format!(
+ "The fetching offset {fetch_offset} is out of
range: {error_message}"
+ ),
+ source: None,
+ });
+ }
+ FlussError::AuthorizationException => {
+ return Err(Error::FlussAPIError {
+ api_error: ApiError {
+ code: api_error.code,
+ message: api_error.message.to_string(),
+ },
+ });
+ }
+ FlussError::UnknownServerError => {
+ warn!(
+ "Unknown server error while fetching offset
{fetch_offset} for bucket {table_bucket}: {error_message}"
+ );
+ return Ok(None);
+ }
+ FlussError::CorruptMessage => {
+ return Err(Error::UnexpectedError {
+ message: format!(
+ "Encountered corrupt message when fetching offset
{fetch_offset} for bucket {table_bucket}: {error_message}"
+ ),
+ source: None,
+ });
+ }
+ _ => {
+ return Err(Error::UnexpectedError {
+ message: format!(
+ "Unexpected error code {error:?} while fetching at
offset {fetch_offset} from bucket {table_bucket}: {error_message}"
+ ),
+ source: None,
+ });
+ }
+ }
Review Comment:
makes sense.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]