zhaohaidao commented on code in PR #143:
URL: https://github.com/apache/fluss-rust/pull/143#discussion_r2700639358
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -485,62 +485,89 @@ impl LogFetcher {
remote_read_context,
remote_log_downloader:
Arc::new(RemoteLogDownloader::new(tmp_dir)?),
credentials_cache: Arc::new(CredentialsCache::new(conns.clone(),
metadata.clone())),
- log_fetch_buffer: Arc::new(LogFetchBuffer::new()),
+ log_fetch_buffer,
nodes_with_pending_fetch_requests:
Arc::new(Mutex::new(HashSet::new())),
- table_path: table_info.table_path.clone(),
- is_partitioned: table_info.is_partitioned(),
})
}
fn create_read_context(
full_arrow_schema: SchemaRef,
projected_fields: Option<Vec<usize>>,
is_from_remote: bool,
- ) -> ReadContext {
+ ) -> Result<ReadContext> {
match projected_fields {
- None => ReadContext::new(full_arrow_schema, is_from_remote),
+ None => Ok(ReadContext::new(full_arrow_schema, is_from_remote)),
Some(fields) => {
ReadContext::with_projection_pushdown(full_arrow_schema,
fields, is_from_remote)
}
}
}
- async fn check_and_update_metadata(&self) -> Result<()> {
- if self.is_partitioned {
- // TODO: Implement partition-aware metadata refresh for buckets
whose leaders are unknown.
- // The implementation will likely need to collect partition IDs
for such buckets and
- // perform targeted metadata updates. Until then, we avoid
computing unused partition_ids.
- return Ok(());
- }
-
- let need_update = self
- .fetchable_buckets()
- .iter()
- .any(|bucket| self.get_table_bucket_leader(bucket).is_none());
-
- if !need_update {
- return Ok(());
+ fn describe_fetch_error(
+ error: FlussError,
+ table_bucket: &TableBucket,
+ fetch_offset: i64,
+ error_message: &str,
+ ) -> FetchErrorContext {
+ match error {
+ FlussError::NotLeaderOrFollower
+ | FlussError::LogStorageException
+ | FlussError::KvStorageException
+ | FlussError::StorageException
+ | FlussError::FencedLeaderEpochException => FetchErrorContext {
+ action: FetchErrorAction::Ignore,
+ log_level: FetchErrorLogLevel::Debug,
+ log_message: format!(
+ "Error in fetch for bucket {table_bucket}: {error:?}:
{error_message}"
+ ),
+ },
+ FlussError::UnknownTableOrBucketException => FetchErrorContext {
+ action: FetchErrorAction::Ignore,
+ log_level: FetchErrorLogLevel::Warn,
+ log_message: format!(
+ "Received unknown table or bucket error in fetch for
bucket {table_bucket}"
+ ),
+ },
+ FlussError::LogOffsetOutOfRangeException => FetchErrorContext {
+ action: FetchErrorAction::LogOffsetOutOfRange,
+ log_level: FetchErrorLogLevel::Debug,
+ log_message: format!(
+ "The fetching offset {fetch_offset} is out of range for
bucket {table_bucket}: {error_message}"
+ ),
+ },
+ FlussError::AuthorizationException => FetchErrorContext {
+ action: FetchErrorAction::Authorization,
+ log_level: FetchErrorLogLevel::Debug,
+ log_message: format!(
+ "Authorization error while fetching offset {fetch_offset}
for bucket {table_bucket}: {error_message}"
+ ),
+ },
+ FlussError::UnknownServerError => FetchErrorContext {
+ action: FetchErrorAction::Ignore,
+ log_level: FetchErrorLogLevel::Warn,
+ log_message: format!(
+ "Unknown server error while fetching offset {fetch_offset}
for bucket {table_bucket}: {error_message}"
+ ),
+ },
+ FlussError::CorruptMessage => FetchErrorContext {
+ action: FetchErrorAction::CorruptMessage,
+ log_level: FetchErrorLogLevel::Debug,
+ log_message: format!(
+ "Encountered corrupt message when fetching offset
{fetch_offset} for bucket {table_bucket}: {error_message}"
+ ),
+ },
+ _ => FetchErrorContext {
+ action: FetchErrorAction::Unexpected,
+ log_level: FetchErrorLogLevel::Debug,
+ log_message: format!(
+ "Unexpected error code {error:?} while fetching at offset
{fetch_offset} from bucket {table_bucket}: {error_message}"
+ ),
+ },
}
-
- // TODO: Handle PartitionNotExist error
- self.metadata
- .update_tables_metadata(&HashSet::from([&self.table_path]))
- .await
- .or_else(|e| {
- if let Error::RpcError { source, .. } = &e
- && matches!(source, RpcError::ConnectionError(_) |
RpcError::Poisoned(_))
- {
- warn!("Retrying after encountering error while updating
table metadata: {e}");
- Ok(())
- } else {
- Err(e)
- }
- })
}
/// Send fetch requests asynchronously without waiting for responses
async fn send_fetches(&self) -> Result<()> {
- self.check_and_update_metadata().await?;
Review Comment:
Good catch. The changes made to MetadataRefreshScheduler were not completely
reversed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]