luoyuxia commented on code in PR #103:
URL: https://github.com/apache/fluss-rust/pull/103#discussion_r2637522774
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -211,125 +260,292 @@ impl LogFetcher {
projected_fields: Option<Vec<usize>>,
) -> Result<Self> {
let full_arrow_schema = to_arrow_schema(table_info.get_row_type());
- let read_context = Self::create_read_context(full_arrow_schema,
projected_fields.clone());
+ let read_context =
+ Self::create_read_context(full_arrow_schema.clone(),
projected_fields.clone(), false);
+ let remote_read_context =
+ Self::create_read_context(full_arrow_schema,
projected_fields.clone(), true);
let tmp_dir = TempDir::with_prefix("fluss-remote-logs")?;
Ok(LogFetcher {
- table_path: table_info.table_path.clone(),
- conns,
- table_info,
- metadata,
+ conns: conns.clone(),
+ metadata: metadata.clone(),
log_scanner_status,
read_context,
- remote_log_downloader: RemoteLogDownloader::new(tmp_dir)?,
- credentials_cache: CredentialsCache::new(),
+ remote_read_context,
+ remote_log_downloader:
Arc::new(RemoteLogDownloader::new(tmp_dir)?),
+ credentials_cache: Arc::new(CredentialsCache::new(conns.clone(),
metadata.clone())),
+ log_fetch_buffer: Arc::new(LogFetchBuffer::new()),
+ nodes_with_pending_fetch_requests:
Arc::new(Mutex::new(HashSet::new())),
})
}
fn create_read_context(
full_arrow_schema: SchemaRef,
projected_fields: Option<Vec<usize>>,
+ is_from_remote: bool,
) -> ReadContext {
match projected_fields {
- None => ReadContext::new(full_arrow_schema),
- Some(fields) =>
ReadContext::with_projection_pushdown(full_arrow_schema, fields),
+ None => ReadContext::new(full_arrow_schema, is_from_remote),
+ Some(fields) => {
+ ReadContext::with_projection_pushdown(full_arrow_schema,
fields, is_from_remote)
+ }
}
}
- async fn send_fetches_and_collect(&self) -> Result<HashMap<TableBucket,
Vec<ScanRecord>>> {
+ /// Send fetch requests asynchronously without waiting for responses
+ async fn send_fetches(&self) -> Result<()> {
+ // todo: check update metadata like fluss-java in case leader changes
let fetch_request = self.prepare_fetch_log_requests().await;
- let mut result: HashMap<TableBucket, Vec<ScanRecord>> = HashMap::new();
+
for (leader, fetch_request) in fetch_request {
- let cluster = self.metadata.get_cluster();
- let server_node = cluster
- .get_tablet_server(leader)
- .expect("todo: handle leader not exist.");
- let con = self.conns.get_connection(server_node).await?;
-
- let fetch_response = con
-
.request(crate::rpc::message::FetchLogRequest::new(fetch_request))
- .await?;
-
- for pb_fetch_log_resp in fetch_response.tables_resp {
- let table_id = pb_fetch_log_resp.table_id;
- let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
-
- for fetch_log_for_bucket in fetch_log_for_buckets {
- let bucket: i32 = fetch_log_for_bucket.bucket_id;
- let table_bucket = TableBucket::new(table_id, bucket);
-
- // Check if this is a remote log fetch
- if let Some(ref remote_log_fetch_info) =
- fetch_log_for_bucket.remote_log_fetch_info
- {
- let remote_fs_props = self
- .credentials_cache
- .get_or_refresh(&self.conns, &self.metadata)
- .await?;
- self.remote_log_downloader
- .set_remote_fs_props(remote_fs_props);
- let remote_fetch_info = RemoteLogFetchInfo::from_proto(
- remote_log_fetch_info,
- table_bucket.clone(),
- )?;
-
- if let Some(fetch_offset) =
-
self.log_scanner_status.get_bucket_offset(&table_bucket)
- {
- let high_watermark =
fetch_log_for_bucket.high_watermark.unwrap_or(-1);
- // Download and process remote log segments
- let mut pos_in_log_segment =
remote_fetch_info.first_start_pos;
- let mut current_fetch_offset = fetch_offset;
- // todo: make segment download in parallel
- for (i, segment) in
-
remote_fetch_info.remote_log_segments.iter().enumerate()
- {
- if i > 0 {
- pos_in_log_segment = 0;
- current_fetch_offset =
segment.start_offset;
- }
+ debug!("Adding pending request for node id {leader}");
+ // Check if we already have a pending request for this node
+ {
+ self.nodes_with_pending_fetch_requests.lock().insert(leader);
+ }
+
+ let cluster = self.metadata.get_cluster().clone();
+
+ let conns = Arc::clone(&self.conns);
+ let log_fetch_buffer = self.log_fetch_buffer.clone();
+ let log_scanner_status = self.log_scanner_status.clone();
+ let read_context = self.read_context.clone();
+ let remote_read_context = self.remote_read_context.clone();
+ let remote_log_downloader =
Arc::clone(&self.remote_log_downloader);
+ let creds_cache = self.credentials_cache.clone();
+ let nodes_with_pending =
self.nodes_with_pending_fetch_requests.clone();
+
+ // Spawn async task to handle the fetch request
+ tokio::spawn(async move {
+ // make sure it will always remote leader from pending nodes
+ let _guard = scopeguard::guard((), |_| {
+ nodes_with_pending.lock().remove(&leader);
+ });
+
+ let server_node = cluster
+ .get_tablet_server(leader)
+ .expect("todo: handle leader not exist.");
+
+ let con = match conns.get_connection(server_node).await {
+ Ok(con) => con,
+ Err(e) => {
+ // todo: handle failed to get connection
+ warn!("Failed to get connection to destination node:
{e:?}");
+ return;
+ }
+ };
+
+ let fetch_response = match con
+ .request(message::FetchLogRequest::new(fetch_request))
+ .await
+ {
+ Ok(resp) => resp,
+ Err(e) => {
+ // todo: handle fetch log from destination node
+ warn!("Failed to fetch log from destination node
{server_node:?}: {e:?}");
+ return;
+ }
+ };
+
+ if let Err(e) = Self::handle_fetch_response(
+ fetch_response,
+ &log_fetch_buffer,
+ &log_scanner_status,
+ &read_context,
+ &remote_read_context,
+ &remote_log_downloader,
+ &creds_cache,
+ )
+ .await
+ {
+ // todo: handle fail to handle fetch response
+ error!("Fail to handle fetch response: {e:?}");
+ }
+ });
+ }
+
+ Ok(())
+ }
- let download_future =
-
self.remote_log_downloader.request_remote_log(
-
&remote_fetch_info.remote_log_tablet_dir,
- segment,
- )?;
- let pending_fetch = RemotePendingFetch::new(
- segment.clone(),
- download_future,
- pos_in_log_segment,
- current_fetch_offset,
- high_watermark,
- self.read_context.clone(),
- );
- let remote_records =
-
pending_fetch.convert_to_completed_fetch().await?;
- // Update offset and merge results
- for (tb, records) in remote_records {
- if let Some(last_record) = records.last() {
- self.log_scanner_status
- .update_offset(&tb,
last_record.offset() + 1);
- }
-
result.entry(tb).or_default().extend(records);
+ /// Handle fetch response and add completed fetches to buffer
+ async fn handle_fetch_response(
+ fetch_response: crate::proto::FetchLogResponse,
+ log_fetch_buffer: &Arc<LogFetchBuffer>,
+ log_scanner_status: &Arc<LogScannerStatus>,
+ read_context: &ReadContext,
+ remote_read_context: &ReadContext,
+ remote_log_downloader: &Arc<RemoteLogDownloader>,
+ credentials_cache: &Arc<CredentialsCache>,
+ ) -> Result<()> {
+ for pb_fetch_log_resp in fetch_response.tables_resp {
+ let table_id = pb_fetch_log_resp.table_id;
+ let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
+
+ for fetch_log_for_bucket in fetch_log_for_buckets {
+ let bucket: i32 = fetch_log_for_bucket.bucket_id;
+ let table_bucket = TableBucket::new(table_id, bucket);
+
+ // todo: check fetch result code for per-bucket
+ let Some(fetch_offset) =
log_scanner_status.get_bucket_offset(&table_bucket) else {
+ debug!(
+ "Ignoring fetch log response for bucket {table_bucket}
because the bucket has been unsubscribed."
+ );
+ continue;
+ };
+
+ // Check if this is a remote log fetch
+ if let Some(ref remote_log_fetch_info) =
fetch_log_for_bucket.remote_log_fetch_info
+ {
+ // set remote fs props
+ let remote_fs_props =
credentials_cache.get_or_refresh().await?;
+ remote_log_downloader.set_remote_fs_props(remote_fs_props);
+
+ let remote_fetch_info =
+ RemoteLogFetchInfo::from_proto(remote_log_fetch_info,
table_bucket.clone());
+
+ let high_watermark =
fetch_log_for_bucket.high_watermark.unwrap_or(-1);
+ Self::pending_remote_fetches(
+ remote_log_downloader.clone(),
+ log_fetch_buffer.clone(),
+ remote_read_context.clone(),
+ &table_bucket,
+ remote_fetch_info,
+ fetch_offset,
+ high_watermark,
+ );
+ } else if fetch_log_for_bucket.records.is_some() {
+ // Handle regular in-memory records - create completed
fetch directly
+ let high_watermark =
fetch_log_for_bucket.high_watermark.unwrap_or(-1);
+ let records =
fetch_log_for_bucket.records.unwrap_or(vec![]);
+ let size_in_bytes = records.len();
+ let log_record_batch = LogRecordsBatches::new(records);
+
+ match DefaultCompletedFetch::new(
+ table_bucket.clone(),
+ log_record_batch,
+ size_in_bytes,
+ read_context.clone(),
+ fetch_offset,
+ high_watermark,
+ ) {
+ Ok(completed_fetch) => {
+ log_fetch_buffer.add(Box::new(completed_fetch));
+ }
+ Err(e) => {
+ // todo: handle error
+ log::warn!("Failed to create completed fetch:
{e:?}");
+ }
+ }
+ }
+ }
+ }
+ Ok(())
+ }
+
+ fn pending_remote_fetches(
+ remote_log_downloader: Arc<RemoteLogDownloader>,
+ log_fetch_buffer: Arc<LogFetchBuffer>,
+ read_context: ReadContext,
+ table_bucket: &TableBucket,
+ remote_fetch_info: RemoteLogFetchInfo,
+ fetch_offset: i64,
+ high_watermark: i64,
+ ) {
+ // Download and process remote log segments
+ let mut pos_in_log_segment = remote_fetch_info.first_start_pos;
+ let mut current_fetch_offset = fetch_offset;
+ for (i, segment) in
remote_fetch_info.remote_log_segments.iter().enumerate() {
+ if i > 0 {
+ pos_in_log_segment = 0;
+ current_fetch_offset = segment.start_offset;
+ }
+
+ // todo:
+ // 1: control the max threads to download remote segment
+ // 2: introduce priority queue to priority highest for earliest
segment
+ let download_future = remote_log_downloader
+ .request_remote_log(&remote_fetch_info.remote_log_tablet_dir,
segment);
+
+ // Register callback to be called when download completes
+ // (similar to Java's downloadFuture.onComplete)
+ // This must be done before creating RemotePendingFetch to avoid
move issues
+ let table_bucket = table_bucket.clone();
+ let log_fetch_buffer_clone = log_fetch_buffer.clone();
+ download_future.on_complete(move || {
+ log_fetch_buffer_clone.try_complete(&table_bucket);
+ });
+
+ let pending_fetch = RemotePendingFetch::new(
+ segment.clone(),
+ download_future,
+ pos_in_log_segment,
+ current_fetch_offset,
+ high_watermark,
+ read_context.clone(),
+ );
+ // Add to pending fetches in buffer (similar to Java's
logFetchBuffer.pend)
+ log_fetch_buffer.pend(Box::new(pending_fetch));
+ }
+ }
+
+ /// Collect completed fetches from buffer
+ /// Reference: LogFetchCollector.collectFetch in Java
+ fn collect_fetches(&self) -> Result<HashMap<TableBucket, Vec<ScanRecord>>>
{
+ const MAX_POLL_RECORDS: usize = 500; // Default max poll records
+ let mut result: HashMap<TableBucket, Vec<ScanRecord>> = HashMap::new();
+ let mut records_remaining = MAX_POLL_RECORDS;
+
+ while records_remaining > 0 {
+ // Get the next in line fetch, or get a new one from buffer
+ let next_in_line = self.log_fetch_buffer.next_in_line_fetch();
Review Comment:
Good catch. I think you're right. It'll cause unconsumed records dropped
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]