Copilot commented on code in PR #103:
URL: https://github.com/apache/fluss-rust/pull/103#discussion_r2637114720
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -350,25 +643,44 @@ impl LogFetcher {
let offset = match
self.log_scanner_status.get_bucket_offset(&bucket) {
Some(offset) => offset,
None => {
- // todo: debug
+ debug!(
+ "Skipping fetch request for bucket {bucket} because
the bucket has been unsubscribed."
+ );
continue;
}
};
- if let Some(leader) = self.get_table_bucket_leader(&bucket) {
- let fetch_log_req_for_bucket = PbFetchLogReqForBucket {
- partition_id: None,
- bucket_id: bucket.bucket_id(),
- fetch_offset: offset,
- // 1M
- max_fetch_bytes: 1024 * 1024,
- };
-
- fetch_log_req_for_buckets
- .entry(leader)
- .or_insert_with(Vec::new)
- .push(fetch_log_req_for_bucket);
- ready_for_fetch_count += 1;
+ match self.get_table_bucket_leader(&bucket) {
+ None => {
+ log::trace!(
+ "Skipping fetch request for bucket {bucket} because
leader is not available."
+ )
+ }
+ Some(leader) => {
+ if self
+ .nodes_with_pending_fetch_requests
+ .lock()
+ .contains(&leader)
+ {
+ log::trace!(
+ "Skipping fetch request for bucket {bucket}
because previous request to server {leader} has not been processed."
+ )
+ } else {
+ let fetch_log_req_for_bucket = PbFetchLogReqForBucket {
+ partition_id: None,
+ bucket_id: bucket.bucket_id(),
+ fetch_offset: offset,
+ // 1M
+ max_fetch_bytes: 1024 * 1024,
+ };
+
+ fetch_log_req_for_buckets
+ .entry(leader)
+ .or_insert_with(Vec::new)
+ .push(fetch_log_req_for_bucket);
+ ready_for_fetch_count += 1;
+ }
+ }
Review Comment:
The fetch request preparation logic now skips buckets with pending requests
to a leader (lines 660-667), but this could lead to starvation if a particular
leader becomes slow. If one tablet server is slow to respond, all buckets on
that server will be blocked from fetching even if other buckets on different
servers are ready. Consider adding a timeout for pending requests or allowing
concurrent requests per bucket rather than per leader.
##########
crates/fluss/src/client/table/remote_log.rs:
##########
@@ -286,32 +351,54 @@ impl RemotePendingFetch {
read_context,
}
}
+}
+
+impl PendingFetch for RemotePendingFetch {
+ fn table_bucket(&self) -> &TableBucket {
+ &self.segment.table_bucket
+ }
- /// Convert to completed fetch by reading the downloaded file
- pub async fn convert_to_completed_fetch(
- mut self,
- ) -> Result<HashMap<TableBucket, Vec<ScanRecord>>> {
- let file_path = self.download_future.get_file_path().await?;
- let file_data = tokio::fs::read(&file_path).await?;
+ fn is_completed(&self) -> bool {
+ self.download_future.is_done()
+ }
+
+ fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>> {
+ // Get the file path (this should only be called when is_completed()
returns true)
+ let file_path = self.download_future.get_file_path()?;
+ // Read the file data synchronously (we're in a sync context)
+ // Note: This is a limitation - we need to use blocking I/O here
+ let mut file_data = std::fs::read(&file_path).map_err(|e|
Error::IoUnexpectedError {
+ message: format!("Failed to read downloaded file: {file_path:?}."),
+ source: e,
+ })?;
Review Comment:
The blocking I/O operation std::fs::read() is called in a potentially async
context (within to_completed_fetch which can be called from async code paths).
This could block the tokio runtime thread. Consider using tokio::fs::read()
with a spawn_blocking wrapper, or documenting that this method should only be
called from blocking contexts.
```suggestion
// Read the file data using a Tokio-aware blocking section. This
method may be
// called from async code paths, so we use `block_in_place` to avoid
stalling
// the runtime worker thread while performing blocking I/O.
let mut file_data = tokio::task::block_in_place(||
std::fs::read(&file_path))
.map_err(|e| Error::IoUnexpectedError {
message: format!("Failed to read downloaded file:
{file_path:?}."),
source: e,
})?;
```
##########
crates/fluss/src/client/table/remote_log.rs:
##########
@@ -70,45 +72,110 @@ pub struct RemoteLogFetchInfo {
}
impl RemoteLogFetchInfo {
- pub fn from_proto(info: &PbRemoteLogFetchInfo, table_bucket: TableBucket)
-> Result<Self> {
+ pub fn from_proto(info: &PbRemoteLogFetchInfo, table_bucket: TableBucket)
-> Self {
let segments = info
.remote_log_segments
.iter()
.map(|s| RemoteLogSegment::from_proto(s, table_bucket.clone()))
.collect();
- Ok(Self {
+ Self {
remote_log_tablet_dir: info.remote_log_tablet_dir.clone(),
partition_name: info.partition_name.clone(),
remote_log_segments: segments,
first_start_pos: info.first_start_pos.unwrap_or(0),
- })
+ }
}
}
+type CompletionCallback = Box<dyn Fn() + Send + Sync>;
+
/// Future for a remote log download request
pub struct RemoteLogDownloadFuture {
- receiver: Option<oneshot::Receiver<Result<PathBuf>>>,
+ result: Arc<Mutex<Option<Result<PathBuf>>>>,
+ completion_callbacks: Arc<Mutex<Vec<CompletionCallback>>>,
+ // todo: add recycleCallback
}
impl RemoteLogDownloadFuture {
pub fn new(receiver: oneshot::Receiver<Result<PathBuf>>) -> Self {
+ let result = Arc::new(Mutex::new(None));
+ let result_clone = Arc::clone(&result);
+ let completion_callbacks: Arc<Mutex<Vec<CompletionCallback>>> =
+ Arc::new(Mutex::new(Vec::new()));
+ let callbacks_clone = Arc::clone(&completion_callbacks);
+
+ // Spawn a task to wait for the download and update result, then call
callbacks
+ tokio::spawn(async move {
+ let download_result = match receiver.await {
+ Ok(Ok(path)) => Ok(path),
+ Ok(Err(e)) => Err(e),
+ Err(e) => Err(Error::UnexpectedError {
+ message: format!("Download future cancelled: {e:?}"),
+ source: None,
+ }),
+ };
+ *result_clone.lock() = Some(download_result);
+
+ // Call all registered callbacks
+ // We need to take the callbacks to avoid holding the lock while
calling them
+ // This also ensures that any callbacks registered after this
point will be called immediately
+ let callbacks: Vec<CompletionCallback> = {
+ let mut callbacks_guard = callbacks_clone.lock();
+ std::mem::take(&mut *callbacks_guard)
+ };
+ for callback in callbacks {
+ callback();
+ }
+
+ // After calling callbacks, any new callbacks registered will see
is_done() == true
+ // and will be called immediately in on_complete()
+ });
+
Self {
- receiver: Some(receiver),
+ result,
+ completion_callbacks,
}
}
- /// Get the downloaded file path
- pub async fn get_file_path(&mut self) -> Result<PathBuf> {
- let receiver = self.receiver.take().ok_or_else(||
Error::UnexpectedError {
- message: "Downloaded file already consumed".to_string(),
- source: None,
- })?;
+ /// Register a callback to be called when download completes (similar to
Java's onComplete)
+ pub fn on_complete<F>(&self, callback: F)
+ where
+ F: Fn() + Send + Sync + 'static,
+ {
+ // Check if already completed - need to check while holding the lock
to avoid race condition
+ let mut callbacks_guard = self.completion_callbacks.lock();
+ let is_done = self.is_done();
+
+ if is_done {
+ // If already completed, call immediately (drop lock first to
avoid deadlock)
+ drop(callbacks_guard);
+ callback();
+ } else {
+ // Otherwise, register the callback
+ callbacks_guard.push(Box::new(callback));
+ }
+ }
Review Comment:
The on_complete method has a race condition in checking completion status.
The callbacks_guard lock is acquired on line 147, but is_done() on line 148
acquires a separate lock (self.result) to check completion. Between releasing
the result lock and checking the boolean, the background task could complete,
execute all callbacks, and clear the callback list. This could result in the
callback being added to an already-cleared list on line 156, causing it to
never be executed. The two locks (callbacks and result) should be checked
atomically, or the completion check should be done while holding the
callbacks_guard.
##########
crates/fluss/src/record/arrow.rs:
##########
@@ -378,14 +379,17 @@ impl<'a> LogRecordsBatchs<'a> {
}
}
-impl<'a> Iterator for &'a mut LogRecordsBatchs<'a> {
- type Item = LogRecordBatch<'a>;
+impl Iterator for LogRecordsBatches {
+ type Item = LogRecordBatch;
fn next(&mut self) -> Option<Self::Item> {
match self.next_batch_size() {
Some(batch_size) => {
- let data_slice = &self.data[self.current_pos..self.current_pos
+ batch_size];
- let record_batch = LogRecordBatch::new(data_slice);
+ let start = self.current_pos;
+ let end = start + batch_size;
+ // Since LogRecordsBatchs owns the Vec<u8>, the slice is valid
Review Comment:
The struct name was changed from `LogRecordsBatchs` to `LogRecordsBatches`,
but there's a typo in the comment on line 390. The comment refers to
"LogRecordsBatchs" (the old, misspelled name) instead of the new correct name
"LogRecordsBatches".
```suggestion
// Since LogRecordsBatches owns the Vec<u8>, the slice is
valid
```
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -211,125 +260,292 @@ impl LogFetcher {
projected_fields: Option<Vec<usize>>,
) -> Result<Self> {
let full_arrow_schema = to_arrow_schema(table_info.get_row_type());
- let read_context = Self::create_read_context(full_arrow_schema,
projected_fields.clone());
+ let read_context =
+ Self::create_read_context(full_arrow_schema.clone(),
projected_fields.clone(), false);
+ let remote_read_context =
+ Self::create_read_context(full_arrow_schema,
projected_fields.clone(), true);
let tmp_dir = TempDir::with_prefix("fluss-remote-logs")?;
Ok(LogFetcher {
- table_path: table_info.table_path.clone(),
- conns,
- table_info,
- metadata,
+ conns: conns.clone(),
+ metadata: metadata.clone(),
log_scanner_status,
read_context,
- remote_log_downloader: RemoteLogDownloader::new(tmp_dir)?,
- credentials_cache: CredentialsCache::new(),
+ remote_read_context,
+ remote_log_downloader:
Arc::new(RemoteLogDownloader::new(tmp_dir)?),
+ credentials_cache: Arc::new(CredentialsCache::new(conns.clone(),
metadata.clone())),
+ log_fetch_buffer: Arc::new(LogFetchBuffer::new()),
+ nodes_with_pending_fetch_requests:
Arc::new(Mutex::new(HashSet::new())),
})
}
fn create_read_context(
full_arrow_schema: SchemaRef,
projected_fields: Option<Vec<usize>>,
+ is_from_remote: bool,
) -> ReadContext {
match projected_fields {
- None => ReadContext::new(full_arrow_schema),
- Some(fields) =>
ReadContext::with_projection_pushdown(full_arrow_schema, fields),
+ None => ReadContext::new(full_arrow_schema, is_from_remote),
+ Some(fields) => {
+ ReadContext::with_projection_pushdown(full_arrow_schema,
fields, is_from_remote)
+ }
}
}
- async fn send_fetches_and_collect(&self) -> Result<HashMap<TableBucket,
Vec<ScanRecord>>> {
+ /// Send fetch requests asynchronously without waiting for responses
+ async fn send_fetches(&self) -> Result<()> {
+ // todo: check update metadata like fluss-java in case leader changes
let fetch_request = self.prepare_fetch_log_requests().await;
- let mut result: HashMap<TableBucket, Vec<ScanRecord>> = HashMap::new();
+
for (leader, fetch_request) in fetch_request {
- let cluster = self.metadata.get_cluster();
- let server_node = cluster
- .get_tablet_server(leader)
- .expect("todo: handle leader not exist.");
- let con = self.conns.get_connection(server_node).await?;
-
- let fetch_response = con
-
.request(crate::rpc::message::FetchLogRequest::new(fetch_request))
- .await?;
-
- for pb_fetch_log_resp in fetch_response.tables_resp {
- let table_id = pb_fetch_log_resp.table_id;
- let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
-
- for fetch_log_for_bucket in fetch_log_for_buckets {
- let bucket: i32 = fetch_log_for_bucket.bucket_id;
- let table_bucket = TableBucket::new(table_id, bucket);
-
- // Check if this is a remote log fetch
- if let Some(ref remote_log_fetch_info) =
- fetch_log_for_bucket.remote_log_fetch_info
- {
- let remote_fs_props = self
- .credentials_cache
- .get_or_refresh(&self.conns, &self.metadata)
- .await?;
- self.remote_log_downloader
- .set_remote_fs_props(remote_fs_props);
- let remote_fetch_info = RemoteLogFetchInfo::from_proto(
- remote_log_fetch_info,
- table_bucket.clone(),
- )?;
-
- if let Some(fetch_offset) =
-
self.log_scanner_status.get_bucket_offset(&table_bucket)
- {
- let high_watermark =
fetch_log_for_bucket.high_watermark.unwrap_or(-1);
- // Download and process remote log segments
- let mut pos_in_log_segment =
remote_fetch_info.first_start_pos;
- let mut current_fetch_offset = fetch_offset;
- // todo: make segment download in parallel
- for (i, segment) in
-
remote_fetch_info.remote_log_segments.iter().enumerate()
- {
- if i > 0 {
- pos_in_log_segment = 0;
- current_fetch_offset =
segment.start_offset;
- }
+ debug!("Adding pending request for node id {leader}");
+ // Check if we already have a pending request for this node
+ {
+ self.nodes_with_pending_fetch_requests.lock().insert(leader);
+ }
+
+ let cluster = self.metadata.get_cluster().clone();
+
+ let conns = Arc::clone(&self.conns);
+ let log_fetch_buffer = self.log_fetch_buffer.clone();
+ let log_scanner_status = self.log_scanner_status.clone();
+ let read_context = self.read_context.clone();
+ let remote_read_context = self.remote_read_context.clone();
+ let remote_log_downloader =
Arc::clone(&self.remote_log_downloader);
+ let creds_cache = self.credentials_cache.clone();
+ let nodes_with_pending =
self.nodes_with_pending_fetch_requests.clone();
+
+ // Spawn async task to handle the fetch request
+ tokio::spawn(async move {
+ // make sure it will always remote leader from pending nodes
Review Comment:
The error message contains a typo: "remote" should be "remove". The guard is
ensuring that the leader will always be removed from the pending nodes set, not
"remote" from it.
```suggestion
// make sure it will always remove leader from pending nodes
```
##########
crates/fluss/src/client/table/log_fetch_buffer.rs:
##########
@@ -0,0 +1,367 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Result;
+use crate::metadata::TableBucket;
+use crate::record::{
+ LogRecordBatch, LogRecordIterator, LogRecordsBatches, ReadContext,
ScanRecord,
+};
+use parking_lot::Mutex;
+use std::collections::{HashMap, VecDeque};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::time::Duration;
+use tokio::sync::Notify;
+
+/// Represents a completed fetch that can be consumed
+pub trait CompletedFetch: Send + Sync {
+ fn table_bucket(&self) -> &TableBucket;
+ fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>>;
+ fn is_consumed(&self) -> bool;
+ fn drain(&mut self);
+ fn size_in_bytes(&self) -> usize;
+ fn high_watermark(&self) -> i64;
+ fn is_initialized(&self) -> bool;
+ fn set_initialized(&mut self);
+ fn next_fetch_offset(&self) -> i64;
+}
+
+/// Represents a pending fetch that is waiting to be completed
+pub trait PendingFetch: Send + Sync {
+ fn table_bucket(&self) -> &TableBucket;
+ fn is_completed(&self) -> bool;
+ fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>>;
+}
+
+/// Thread-safe buffer for completed fetches
+pub struct LogFetchBuffer {
+ completed_fetches: Mutex<VecDeque<Box<dyn CompletedFetch>>>,
+ pending_fetches: Mutex<HashMap<TableBucket, VecDeque<Box<dyn
PendingFetch>>>>,
+ next_in_line_fetch: Mutex<Option<Box<dyn CompletedFetch>>>,
+ not_empty_notify: Notify,
+ woken_up: Arc<AtomicBool>,
+}
+
+impl LogFetchBuffer {
+ pub fn new() -> Self {
+ Self {
+ completed_fetches: Mutex::new(VecDeque::new()),
+ pending_fetches: Mutex::new(HashMap::new()),
+ next_in_line_fetch: Mutex::new(None),
+ not_empty_notify: Notify::new(),
+ woken_up: Arc::new(AtomicBool::new(false)),
+ }
+ }
+
+ /// Check if the buffer is empty
+ pub fn is_empty(&self) -> bool {
+ self.completed_fetches.lock().is_empty()
+ }
+
+ /// Wait for the buffer to become non-empty, with timeout
+ /// Returns true if data became available, false if timeout
+ pub async fn await_not_empty(&self, timeout: Duration) -> bool {
+ let deadline = std::time::Instant::now() + timeout;
+
+ loop {
+ // Check if buffer is not empty
+ if !self.is_empty() {
+ return true;
+ }
+
+ // Check if woken up
+ if self.woken_up.swap(false, Ordering::Acquire) {
+ return true;
+ }
+
+ // Check if timeout
+ let now = std::time::Instant::now();
+ if now >= deadline {
+ return false;
+ }
+
+ // Wait for notification with remaining time
+ let remaining = deadline - now;
+ let notified = self.not_empty_notify.notified();
+ tokio::select! {
+ _ = tokio::time::sleep(remaining) => {
+ return false; // Timeout
+ }
+ _ = notified => {
+ // Got notification, check again
+ continue;
+ }
+ }
+ }
+ }
Review Comment:
The await_not_empty method uses a potentially infinite loop with a select!
inside. If the notification system fails or if there's a logic bug, this could
result in a tight loop. Additionally, the woken_up atomic is checked with swap
but isn't coordinated with the notification, which could lead to missed wakeups
if woken_up is set after the swap check but before the select! is reached.
##########
crates/fluss/src/record/arrow.rs:
##########
@@ -723,24 +728,35 @@ struct Projection {
}
impl ReadContext {
- pub fn new(arrow_schema: SchemaRef) -> ReadContext {
+ pub fn new(arrow_schema: SchemaRef, is_from_remote: bool) -> ReadContext {
ReadContext {
target_schema: arrow_schema.clone(),
full_schema: arrow_schema,
projection: None,
+ is_from_remote,
}
}
pub fn with_projection_pushdown(
arrow_schema: SchemaRef,
projected_fields: Vec<usize>,
+ is_from_remote: bool,
) -> ReadContext {
let target_schema = Self::project_schema(arrow_schema.clone(),
projected_fields.as_slice());
- let mut sorted_fields = projected_fields.clone();
- sorted_fields.sort_unstable();
+ let (need_do_reorder, sorted_fields) = {
+ // currently, for remote read, arrow log doesn't support
projection pushdown,
+ // so, only need to do reordering when is not from remote
+ if !is_from_remote {
+ let mut sorted_fields = projected_fields.clone();
+ sorted_fields.sort_unstable();
+ (!sorted_fields.eq(&projected_fields), sorted_fields)
+ } else {
+ (false, vec![])
+ }
+ };
Review Comment:
When is_from_remote is true, sorted_fields is set to an empty vector on line
754. However, this empty vector is then used to create the Projection structure
on lines 780-790 as ordered_fields. This seems inconsistent - if remote reads
don't support projection pushdown, consider either returning None for
projection or documenting why this structure with empty ordered_fields is
necessary.
##########
crates/fluss/src/client/table/log_fetch_buffer.rs:
##########
@@ -0,0 +1,367 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Result;
+use crate::metadata::TableBucket;
+use crate::record::{
+ LogRecordBatch, LogRecordIterator, LogRecordsBatches, ReadContext,
ScanRecord,
+};
+use parking_lot::Mutex;
+use std::collections::{HashMap, VecDeque};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::time::Duration;
+use tokio::sync::Notify;
+
+/// Represents a completed fetch that can be consumed
+pub trait CompletedFetch: Send + Sync {
+ fn table_bucket(&self) -> &TableBucket;
+ fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>>;
+ fn is_consumed(&self) -> bool;
+ fn drain(&mut self);
+ fn size_in_bytes(&self) -> usize;
+ fn high_watermark(&self) -> i64;
+ fn is_initialized(&self) -> bool;
+ fn set_initialized(&mut self);
+ fn next_fetch_offset(&self) -> i64;
+}
+
+/// Represents a pending fetch that is waiting to be completed
+pub trait PendingFetch: Send + Sync {
+ fn table_bucket(&self) -> &TableBucket;
+ fn is_completed(&self) -> bool;
+ fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>>;
+}
+
+/// Thread-safe buffer for completed fetches
+pub struct LogFetchBuffer {
+ completed_fetches: Mutex<VecDeque<Box<dyn CompletedFetch>>>,
+ pending_fetches: Mutex<HashMap<TableBucket, VecDeque<Box<dyn
PendingFetch>>>>,
+ next_in_line_fetch: Mutex<Option<Box<dyn CompletedFetch>>>,
+ not_empty_notify: Notify,
+ woken_up: Arc<AtomicBool>,
+}
+
+impl LogFetchBuffer {
+ pub fn new() -> Self {
+ Self {
+ completed_fetches: Mutex::new(VecDeque::new()),
+ pending_fetches: Mutex::new(HashMap::new()),
+ next_in_line_fetch: Mutex::new(None),
+ not_empty_notify: Notify::new(),
+ woken_up: Arc::new(AtomicBool::new(false)),
+ }
+ }
+
+ /// Check if the buffer is empty
+ pub fn is_empty(&self) -> bool {
+ self.completed_fetches.lock().is_empty()
+ }
+
+ /// Wait for the buffer to become non-empty, with timeout
+ /// Returns true if data became available, false if timeout
+ pub async fn await_not_empty(&self, timeout: Duration) -> bool {
+ let deadline = std::time::Instant::now() + timeout;
+
+ loop {
+ // Check if buffer is not empty
+ if !self.is_empty() {
+ return true;
+ }
+
+ // Check if woken up
+ if self.woken_up.swap(false, Ordering::Acquire) {
+ return true;
+ }
+
+ // Check if timeout
+ let now = std::time::Instant::now();
+ if now >= deadline {
+ return false;
+ }
+
+ // Wait for notification with remaining time
+ let remaining = deadline - now;
+ let notified = self.not_empty_notify.notified();
+ tokio::select! {
+ _ = tokio::time::sleep(remaining) => {
+ return false; // Timeout
+ }
+ _ = notified => {
+ // Got notification, check again
+ continue;
+ }
+ }
+ }
+ }
+
+ #[allow(dead_code)]
+ /// Wake up any waiting threads
+ pub fn wakeup(&self) {
+ self.woken_up.store(true, Ordering::Release);
+ self.not_empty_notify.notify_waiters();
+ }
+
+ /// Add a pending fetch to the buffer
+ pub fn pend(&self, pending_fetch: Box<dyn PendingFetch>) {
+ let table_bucket = pending_fetch.table_bucket().clone();
+ self.pending_fetches
+ .lock()
+ .entry(table_bucket)
+ .or_default()
+ .push_back(pending_fetch);
+ }
+
+ /// Try to complete pending fetches in order, converting them to completed
fetches
+ pub fn try_complete(&self, table_bucket: &TableBucket) {
+ let mut pending_map = self.pending_fetches.lock();
+ if let Some(pendings) = pending_map.get_mut(table_bucket) {
+ let mut has_completed = false;
+ while let Some(front) = pendings.front() {
+ if front.is_completed() {
+ let pending = pendings.pop_front().unwrap();
+ match pending.to_completed_fetch() {
+ Ok(completed) => {
+ self.completed_fetches.lock().push_back(completed);
+ // Signal that buffer is not empty
+ self.not_empty_notify.notify_waiters();
+ has_completed = true;
+ }
+ Err(e) => {
+ // todo: handle exception?
+ log::error!("Error when completing: {e}");
+ }
+ }
+ } else {
+ break;
+ }
+ }
+
+ if has_completed {
+ self.not_empty_notify.notify_waiters();
+ if pendings.is_empty() {
+ pending_map.remove(table_bucket);
+ }
+ }
+ }
Review Comment:
The try_complete method modifies the completed_fetches queue while holding
the pending_fetches lock. If the completed_fetches lock is held by another
thread, this could cause lock contention. Consider restructuring to minimize
the time locks are held or use a single lock if these operations need to be
atomic.
```suggestion
// Collect completed fetches while holding the pending_fetches lock,
// then push them to completed_fetches after releasing it to avoid
// holding both locks simultaneously.
let mut completed_to_push: Vec<Box<dyn CompletedFetch>> = Vec::new();
let mut has_completed = false;
{
let mut pending_map = self.pending_fetches.lock();
if let Some(pendings) = pending_map.get_mut(table_bucket) {
while let Some(front) = pendings.front() {
if front.is_completed() {
let pending = pendings.pop_front().unwrap();
match pending.to_completed_fetch() {
Ok(completed) => {
completed_to_push.push(completed);
has_completed = true;
}
Err(e) => {
// todo: handle exception?
log::error!("Error when completing: {e}");
}
}
} else {
break;
}
}
if has_completed && pendings.is_empty() {
pending_map.remove(table_bucket);
}
}
}
if !completed_to_push.is_empty() {
let mut completed_queue = self.completed_fetches.lock();
for completed in completed_to_push {
completed_queue.push_back(completed);
}
}
if has_completed {
// Signal that buffer is not empty
self.not_empty_notify.notify_waiters();
}
```
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -211,125 +260,292 @@ impl LogFetcher {
projected_fields: Option<Vec<usize>>,
) -> Result<Self> {
let full_arrow_schema = to_arrow_schema(table_info.get_row_type());
- let read_context = Self::create_read_context(full_arrow_schema,
projected_fields.clone());
+ let read_context =
+ Self::create_read_context(full_arrow_schema.clone(),
projected_fields.clone(), false);
+ let remote_read_context =
+ Self::create_read_context(full_arrow_schema,
projected_fields.clone(), true);
let tmp_dir = TempDir::with_prefix("fluss-remote-logs")?;
Ok(LogFetcher {
- table_path: table_info.table_path.clone(),
- conns,
- table_info,
- metadata,
+ conns: conns.clone(),
+ metadata: metadata.clone(),
log_scanner_status,
read_context,
- remote_log_downloader: RemoteLogDownloader::new(tmp_dir)?,
- credentials_cache: CredentialsCache::new(),
+ remote_read_context,
+ remote_log_downloader:
Arc::new(RemoteLogDownloader::new(tmp_dir)?),
+ credentials_cache: Arc::new(CredentialsCache::new(conns.clone(),
metadata.clone())),
+ log_fetch_buffer: Arc::new(LogFetchBuffer::new()),
+ nodes_with_pending_fetch_requests:
Arc::new(Mutex::new(HashSet::new())),
})
}
fn create_read_context(
full_arrow_schema: SchemaRef,
projected_fields: Option<Vec<usize>>,
+ is_from_remote: bool,
) -> ReadContext {
match projected_fields {
- None => ReadContext::new(full_arrow_schema),
- Some(fields) =>
ReadContext::with_projection_pushdown(full_arrow_schema, fields),
+ None => ReadContext::new(full_arrow_schema, is_from_remote),
+ Some(fields) => {
+ ReadContext::with_projection_pushdown(full_arrow_schema,
fields, is_from_remote)
+ }
}
}
- async fn send_fetches_and_collect(&self) -> Result<HashMap<TableBucket,
Vec<ScanRecord>>> {
+ /// Send fetch requests asynchronously without waiting for responses
+ async fn send_fetches(&self) -> Result<()> {
+ // todo: check update metadata like fluss-java in case leader changes
let fetch_request = self.prepare_fetch_log_requests().await;
- let mut result: HashMap<TableBucket, Vec<ScanRecord>> = HashMap::new();
+
for (leader, fetch_request) in fetch_request {
- let cluster = self.metadata.get_cluster();
- let server_node = cluster
- .get_tablet_server(leader)
- .expect("todo: handle leader not exist.");
- let con = self.conns.get_connection(server_node).await?;
-
- let fetch_response = con
-
.request(crate::rpc::message::FetchLogRequest::new(fetch_request))
- .await?;
-
- for pb_fetch_log_resp in fetch_response.tables_resp {
- let table_id = pb_fetch_log_resp.table_id;
- let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
-
- for fetch_log_for_bucket in fetch_log_for_buckets {
- let bucket: i32 = fetch_log_for_bucket.bucket_id;
- let table_bucket = TableBucket::new(table_id, bucket);
-
- // Check if this is a remote log fetch
- if let Some(ref remote_log_fetch_info) =
- fetch_log_for_bucket.remote_log_fetch_info
- {
- let remote_fs_props = self
- .credentials_cache
- .get_or_refresh(&self.conns, &self.metadata)
- .await?;
- self.remote_log_downloader
- .set_remote_fs_props(remote_fs_props);
- let remote_fetch_info = RemoteLogFetchInfo::from_proto(
- remote_log_fetch_info,
- table_bucket.clone(),
- )?;
-
- if let Some(fetch_offset) =
-
self.log_scanner_status.get_bucket_offset(&table_bucket)
- {
- let high_watermark =
fetch_log_for_bucket.high_watermark.unwrap_or(-1);
- // Download and process remote log segments
- let mut pos_in_log_segment =
remote_fetch_info.first_start_pos;
- let mut current_fetch_offset = fetch_offset;
- // todo: make segment download in parallel
- for (i, segment) in
-
remote_fetch_info.remote_log_segments.iter().enumerate()
- {
- if i > 0 {
- pos_in_log_segment = 0;
- current_fetch_offset =
segment.start_offset;
- }
+ debug!("Adding pending request for node id {leader}");
+ // Check if we already have a pending request for this node
+ {
+ self.nodes_with_pending_fetch_requests.lock().insert(leader);
+ }
+
+ let cluster = self.metadata.get_cluster().clone();
+
+ let conns = Arc::clone(&self.conns);
+ let log_fetch_buffer = self.log_fetch_buffer.clone();
+ let log_scanner_status = self.log_scanner_status.clone();
+ let read_context = self.read_context.clone();
+ let remote_read_context = self.remote_read_context.clone();
+ let remote_log_downloader =
Arc::clone(&self.remote_log_downloader);
+ let creds_cache = self.credentials_cache.clone();
+ let nodes_with_pending =
self.nodes_with_pending_fetch_requests.clone();
+
+ // Spawn async task to handle the fetch request
+ tokio::spawn(async move {
+ // make sure it will always remote leader from pending nodes
+ let _guard = scopeguard::guard((), |_| {
+ nodes_with_pending.lock().remove(&leader);
+ });
+
+ let server_node = cluster
+ .get_tablet_server(leader)
+ .expect("todo: handle leader not exist.");
+
+ let con = match conns.get_connection(server_node).await {
+ Ok(con) => con,
+ Err(e) => {
+ // todo: handle failed to get connection
+ warn!("Failed to get connection to destination node:
{e:?}");
+ return;
+ }
+ };
+
+ let fetch_response = match con
+ .request(message::FetchLogRequest::new(fetch_request))
+ .await
+ {
+ Ok(resp) => resp,
+ Err(e) => {
+ // todo: handle fetch log from destination node
+ warn!("Failed to fetch log from destination node
{server_node:?}: {e:?}");
+ return;
+ }
+ };
+
+ if let Err(e) = Self::handle_fetch_response(
+ fetch_response,
+ &log_fetch_buffer,
+ &log_scanner_status,
+ &read_context,
+ &remote_read_context,
+ &remote_log_downloader,
+ &creds_cache,
+ )
+ .await
+ {
+ // todo: handle fail to handle fetch response
+ error!("Fail to handle fetch response: {e:?}");
+ }
+ });
+ }
+
+ Ok(())
+ }
- let download_future =
-
self.remote_log_downloader.request_remote_log(
-
&remote_fetch_info.remote_log_tablet_dir,
- segment,
- )?;
- let pending_fetch = RemotePendingFetch::new(
- segment.clone(),
- download_future,
- pos_in_log_segment,
- current_fetch_offset,
- high_watermark,
- self.read_context.clone(),
- );
- let remote_records =
-
pending_fetch.convert_to_completed_fetch().await?;
- // Update offset and merge results
- for (tb, records) in remote_records {
- if let Some(last_record) = records.last() {
- self.log_scanner_status
- .update_offset(&tb,
last_record.offset() + 1);
- }
-
result.entry(tb).or_default().extend(records);
+ /// Handle fetch response and add completed fetches to buffer
+ async fn handle_fetch_response(
+ fetch_response: crate::proto::FetchLogResponse,
+ log_fetch_buffer: &Arc<LogFetchBuffer>,
+ log_scanner_status: &Arc<LogScannerStatus>,
+ read_context: &ReadContext,
+ remote_read_context: &ReadContext,
+ remote_log_downloader: &Arc<RemoteLogDownloader>,
+ credentials_cache: &Arc<CredentialsCache>,
+ ) -> Result<()> {
+ for pb_fetch_log_resp in fetch_response.tables_resp {
+ let table_id = pb_fetch_log_resp.table_id;
+ let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
+
+ for fetch_log_for_bucket in fetch_log_for_buckets {
+ let bucket: i32 = fetch_log_for_bucket.bucket_id;
+ let table_bucket = TableBucket::new(table_id, bucket);
+
+ // todo: check fetch result code for per-bucket
+ let Some(fetch_offset) =
log_scanner_status.get_bucket_offset(&table_bucket) else {
+ debug!(
+ "Ignoring fetch log response for bucket {table_bucket}
because the bucket has been unsubscribed."
+ );
+ continue;
+ };
+
+ // Check if this is a remote log fetch
+ if let Some(ref remote_log_fetch_info) =
fetch_log_for_bucket.remote_log_fetch_info
+ {
+ // set remote fs props
+ let remote_fs_props =
credentials_cache.get_or_refresh().await?;
+ remote_log_downloader.set_remote_fs_props(remote_fs_props);
+
+ let remote_fetch_info =
+ RemoteLogFetchInfo::from_proto(remote_log_fetch_info,
table_bucket.clone());
+
+ let high_watermark =
fetch_log_for_bucket.high_watermark.unwrap_or(-1);
+ Self::pending_remote_fetches(
+ remote_log_downloader.clone(),
+ log_fetch_buffer.clone(),
+ remote_read_context.clone(),
+ &table_bucket,
+ remote_fetch_info,
+ fetch_offset,
+ high_watermark,
+ );
+ } else if fetch_log_for_bucket.records.is_some() {
+ // Handle regular in-memory records - create completed
fetch directly
+ let high_watermark =
fetch_log_for_bucket.high_watermark.unwrap_or(-1);
+ let records =
fetch_log_for_bucket.records.unwrap_or(vec![]);
+ let size_in_bytes = records.len();
+ let log_record_batch = LogRecordsBatches::new(records);
+
+ match DefaultCompletedFetch::new(
+ table_bucket.clone(),
+ log_record_batch,
+ size_in_bytes,
+ read_context.clone(),
+ fetch_offset,
+ high_watermark,
+ ) {
+ Ok(completed_fetch) => {
+ log_fetch_buffer.add(Box::new(completed_fetch));
+ }
+ Err(e) => {
+ // todo: handle error
+ log::warn!("Failed to create completed fetch:
{e:?}");
+ }
+ }
+ }
+ }
+ }
+ Ok(())
+ }
+
+ fn pending_remote_fetches(
+ remote_log_downloader: Arc<RemoteLogDownloader>,
+ log_fetch_buffer: Arc<LogFetchBuffer>,
+ read_context: ReadContext,
+ table_bucket: &TableBucket,
+ remote_fetch_info: RemoteLogFetchInfo,
+ fetch_offset: i64,
+ high_watermark: i64,
+ ) {
+ // Download and process remote log segments
+ let mut pos_in_log_segment = remote_fetch_info.first_start_pos;
+ let mut current_fetch_offset = fetch_offset;
+ for (i, segment) in
remote_fetch_info.remote_log_segments.iter().enumerate() {
+ if i > 0 {
+ pos_in_log_segment = 0;
+ current_fetch_offset = segment.start_offset;
+ }
+
+ // todo:
+ // 1: control the max threads to download remote segment
+ // 2: introduce priority queue to priority highest for earliest
segment
+ let download_future = remote_log_downloader
+ .request_remote_log(&remote_fetch_info.remote_log_tablet_dir,
segment);
+
+ // Register callback to be called when download completes
+ // (similar to Java's downloadFuture.onComplete)
+ // This must be done before creating RemotePendingFetch to avoid
move issues
+ let table_bucket = table_bucket.clone();
+ let log_fetch_buffer_clone = log_fetch_buffer.clone();
+ download_future.on_complete(move || {
+ log_fetch_buffer_clone.try_complete(&table_bucket);
+ });
+
+ let pending_fetch = RemotePendingFetch::new(
+ segment.clone(),
+ download_future,
+ pos_in_log_segment,
+ current_fetch_offset,
+ high_watermark,
+ read_context.clone(),
+ );
+ // Add to pending fetches in buffer (similar to Java's
logFetchBuffer.pend)
+ log_fetch_buffer.pend(Box::new(pending_fetch));
+ }
+ }
+
+ /// Collect completed fetches from buffer
+ /// Reference: LogFetchCollector.collectFetch in Java
+ fn collect_fetches(&self) -> Result<HashMap<TableBucket, Vec<ScanRecord>>>
{
+ const MAX_POLL_RECORDS: usize = 500; // Default max poll records
Review Comment:
The constant MAX_POLL_RECORDS is set to 500 with a comment "Default max poll
records", but there's no mechanism to configure this value. If users need to
tune this for performance or memory reasons, they cannot. Consider making this
configurable through the LogScanner or LogFetcher API.
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -211,125 +260,292 @@ impl LogFetcher {
projected_fields: Option<Vec<usize>>,
) -> Result<Self> {
let full_arrow_schema = to_arrow_schema(table_info.get_row_type());
- let read_context = Self::create_read_context(full_arrow_schema,
projected_fields.clone());
+ let read_context =
+ Self::create_read_context(full_arrow_schema.clone(),
projected_fields.clone(), false);
+ let remote_read_context =
+ Self::create_read_context(full_arrow_schema,
projected_fields.clone(), true);
let tmp_dir = TempDir::with_prefix("fluss-remote-logs")?;
Ok(LogFetcher {
- table_path: table_info.table_path.clone(),
- conns,
- table_info,
- metadata,
+ conns: conns.clone(),
+ metadata: metadata.clone(),
log_scanner_status,
read_context,
- remote_log_downloader: RemoteLogDownloader::new(tmp_dir)?,
- credentials_cache: CredentialsCache::new(),
+ remote_read_context,
+ remote_log_downloader:
Arc::new(RemoteLogDownloader::new(tmp_dir)?),
+ credentials_cache: Arc::new(CredentialsCache::new(conns.clone(),
metadata.clone())),
+ log_fetch_buffer: Arc::new(LogFetchBuffer::new()),
+ nodes_with_pending_fetch_requests:
Arc::new(Mutex::new(HashSet::new())),
})
}
fn create_read_context(
full_arrow_schema: SchemaRef,
projected_fields: Option<Vec<usize>>,
+ is_from_remote: bool,
) -> ReadContext {
match projected_fields {
- None => ReadContext::new(full_arrow_schema),
- Some(fields) =>
ReadContext::with_projection_pushdown(full_arrow_schema, fields),
+ None => ReadContext::new(full_arrow_schema, is_from_remote),
+ Some(fields) => {
+ ReadContext::with_projection_pushdown(full_arrow_schema,
fields, is_from_remote)
+ }
}
}
- async fn send_fetches_and_collect(&self) -> Result<HashMap<TableBucket,
Vec<ScanRecord>>> {
+ /// Send fetch requests asynchronously without waiting for responses
+ async fn send_fetches(&self) -> Result<()> {
+ // todo: check update metadata like fluss-java in case leader changes
let fetch_request = self.prepare_fetch_log_requests().await;
- let mut result: HashMap<TableBucket, Vec<ScanRecord>> = HashMap::new();
+
for (leader, fetch_request) in fetch_request {
- let cluster = self.metadata.get_cluster();
- let server_node = cluster
- .get_tablet_server(leader)
- .expect("todo: handle leader not exist.");
- let con = self.conns.get_connection(server_node).await?;
-
- let fetch_response = con
-
.request(crate::rpc::message::FetchLogRequest::new(fetch_request))
- .await?;
-
- for pb_fetch_log_resp in fetch_response.tables_resp {
- let table_id = pb_fetch_log_resp.table_id;
- let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
-
- for fetch_log_for_bucket in fetch_log_for_buckets {
- let bucket: i32 = fetch_log_for_bucket.bucket_id;
- let table_bucket = TableBucket::new(table_id, bucket);
-
- // Check if this is a remote log fetch
- if let Some(ref remote_log_fetch_info) =
- fetch_log_for_bucket.remote_log_fetch_info
- {
- let remote_fs_props = self
- .credentials_cache
- .get_or_refresh(&self.conns, &self.metadata)
- .await?;
- self.remote_log_downloader
- .set_remote_fs_props(remote_fs_props);
- let remote_fetch_info = RemoteLogFetchInfo::from_proto(
- remote_log_fetch_info,
- table_bucket.clone(),
- )?;
-
- if let Some(fetch_offset) =
-
self.log_scanner_status.get_bucket_offset(&table_bucket)
- {
- let high_watermark =
fetch_log_for_bucket.high_watermark.unwrap_or(-1);
- // Download and process remote log segments
- let mut pos_in_log_segment =
remote_fetch_info.first_start_pos;
- let mut current_fetch_offset = fetch_offset;
- // todo: make segment download in parallel
- for (i, segment) in
-
remote_fetch_info.remote_log_segments.iter().enumerate()
- {
- if i > 0 {
- pos_in_log_segment = 0;
- current_fetch_offset =
segment.start_offset;
- }
+ debug!("Adding pending request for node id {leader}");
+ // Check if we already have a pending request for this node
+ {
+ self.nodes_with_pending_fetch_requests.lock().insert(leader);
+ }
+
+ let cluster = self.metadata.get_cluster().clone();
+
+ let conns = Arc::clone(&self.conns);
+ let log_fetch_buffer = self.log_fetch_buffer.clone();
+ let log_scanner_status = self.log_scanner_status.clone();
+ let read_context = self.read_context.clone();
+ let remote_read_context = self.remote_read_context.clone();
+ let remote_log_downloader =
Arc::clone(&self.remote_log_downloader);
+ let creds_cache = self.credentials_cache.clone();
+ let nodes_with_pending =
self.nodes_with_pending_fetch_requests.clone();
+
+ // Spawn async task to handle the fetch request
+ tokio::spawn(async move {
+ // make sure it will always remote leader from pending nodes
+ let _guard = scopeguard::guard((), |_| {
+ nodes_with_pending.lock().remove(&leader);
+ });
+
+ let server_node = cluster
+ .get_tablet_server(leader)
+ .expect("todo: handle leader not exist.");
+
+ let con = match conns.get_connection(server_node).await {
+ Ok(con) => con,
+ Err(e) => {
+ // todo: handle failed to get connection
+ warn!("Failed to get connection to destination node:
{e:?}");
+ return;
+ }
+ };
+
+ let fetch_response = match con
+ .request(message::FetchLogRequest::new(fetch_request))
+ .await
+ {
+ Ok(resp) => resp,
+ Err(e) => {
+ // todo: handle fetch log from destination node
+ warn!("Failed to fetch log from destination node
{server_node:?}: {e:?}");
+ return;
+ }
+ };
+
+ if let Err(e) = Self::handle_fetch_response(
+ fetch_response,
+ &log_fetch_buffer,
+ &log_scanner_status,
+ &read_context,
+ &remote_read_context,
+ &remote_log_downloader,
+ &creds_cache,
+ )
+ .await
+ {
+ // todo: handle fail to handle fetch response
+ error!("Fail to handle fetch response: {e:?}");
+ }
+ });
Review Comment:
There's an inconsistency in error handling for the spawned async task. The
task logs warnings or errors but doesn't propagate failures back to the caller.
If fetch requests consistently fail (e.g., due to network issues), the poll()
method could timeout without the user knowing why. Consider adding a mechanism
to track and report persistent failures, such as maintaining error counters or
surfacing errors through a dedicated channel.
##########
crates/fluss/src/client/table/log_fetch_buffer.rs:
##########
@@ -0,0 +1,367 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Result;
+use crate::metadata::TableBucket;
+use crate::record::{
+ LogRecordBatch, LogRecordIterator, LogRecordsBatches, ReadContext,
ScanRecord,
+};
+use parking_lot::Mutex;
+use std::collections::{HashMap, VecDeque};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::time::Duration;
+use tokio::sync::Notify;
+
+/// Represents a completed fetch that can be consumed
+pub trait CompletedFetch: Send + Sync {
+ fn table_bucket(&self) -> &TableBucket;
+ fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>>;
+ fn is_consumed(&self) -> bool;
+ fn drain(&mut self);
+ fn size_in_bytes(&self) -> usize;
+ fn high_watermark(&self) -> i64;
+ fn is_initialized(&self) -> bool;
+ fn set_initialized(&mut self);
+ fn next_fetch_offset(&self) -> i64;
+}
+
+/// Represents a pending fetch that is waiting to be completed
+pub trait PendingFetch: Send + Sync {
+ fn table_bucket(&self) -> &TableBucket;
+ fn is_completed(&self) -> bool;
+ fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>>;
+}
+
+/// Thread-safe buffer for completed fetches
+pub struct LogFetchBuffer {
+ completed_fetches: Mutex<VecDeque<Box<dyn CompletedFetch>>>,
+ pending_fetches: Mutex<HashMap<TableBucket, VecDeque<Box<dyn
PendingFetch>>>>,
+ next_in_line_fetch: Mutex<Option<Box<dyn CompletedFetch>>>,
+ not_empty_notify: Notify,
+ woken_up: Arc<AtomicBool>,
+}
+
+impl LogFetchBuffer {
+ pub fn new() -> Self {
+ Self {
+ completed_fetches: Mutex::new(VecDeque::new()),
+ pending_fetches: Mutex::new(HashMap::new()),
+ next_in_line_fetch: Mutex::new(None),
+ not_empty_notify: Notify::new(),
+ woken_up: Arc::new(AtomicBool::new(false)),
+ }
+ }
+
+ /// Check if the buffer is empty
+ pub fn is_empty(&self) -> bool {
+ self.completed_fetches.lock().is_empty()
+ }
+
+ /// Wait for the buffer to become non-empty, with timeout
+ /// Returns true if data became available, false if timeout
+ pub async fn await_not_empty(&self, timeout: Duration) -> bool {
+ let deadline = std::time::Instant::now() + timeout;
+
+ loop {
+ // Check if buffer is not empty
+ if !self.is_empty() {
+ return true;
+ }
+
+ // Check if woken up
+ if self.woken_up.swap(false, Ordering::Acquire) {
+ return true;
+ }
+
+ // Check if timeout
+ let now = std::time::Instant::now();
+ if now >= deadline {
+ return false;
+ }
+
+ // Wait for notification with remaining time
+ let remaining = deadline - now;
+ let notified = self.not_empty_notify.notified();
+ tokio::select! {
+ _ = tokio::time::sleep(remaining) => {
+ return false; // Timeout
+ }
+ _ = notified => {
+ // Got notification, check again
+ continue;
+ }
+ }
+ }
+ }
+
+ #[allow(dead_code)]
+ /// Wake up any waiting threads
+ pub fn wakeup(&self) {
+ self.woken_up.store(true, Ordering::Release);
+ self.not_empty_notify.notify_waiters();
+ }
+
+ /// Add a pending fetch to the buffer
+ pub fn pend(&self, pending_fetch: Box<dyn PendingFetch>) {
+ let table_bucket = pending_fetch.table_bucket().clone();
+ self.pending_fetches
+ .lock()
+ .entry(table_bucket)
+ .or_default()
+ .push_back(pending_fetch);
+ }
+
+ /// Try to complete pending fetches in order, converting them to completed
fetches
+ pub fn try_complete(&self, table_bucket: &TableBucket) {
+ let mut pending_map = self.pending_fetches.lock();
+ if let Some(pendings) = pending_map.get_mut(table_bucket) {
+ let mut has_completed = false;
+ while let Some(front) = pendings.front() {
+ if front.is_completed() {
+ let pending = pendings.pop_front().unwrap();
+ match pending.to_completed_fetch() {
+ Ok(completed) => {
+ self.completed_fetches.lock().push_back(completed);
+ // Signal that buffer is not empty
+ self.not_empty_notify.notify_waiters();
+ has_completed = true;
+ }
+ Err(e) => {
+ // todo: handle exception?
+ log::error!("Error when completing: {e}");
+ }
+ }
+ } else {
+ break;
+ }
+ }
+
+ if has_completed {
+ self.not_empty_notify.notify_waiters();
+ if pendings.is_empty() {
+ pending_map.remove(table_bucket);
+ }
+ }
+ }
+ }
+
+ /// Add a completed fetch to the buffer
+ pub fn add(&self, completed_fetch: Box<dyn CompletedFetch>) {
+ let table_bucket = completed_fetch.table_bucket();
+ let mut pending_map = self.pending_fetches.lock();
+
+ if let Some(pendings) = pending_map.get_mut(table_bucket) {
+ if pendings.is_empty() {
+ self.completed_fetches.lock().push_back(completed_fetch);
+ self.not_empty_notify.notify_waiters();
+ } else {
+
pendings.push_back(Box::new(CompletedPendingFetch::new(completed_fetch)));
+ }
+ } else {
+ // If there's no pending fetch for this table_bucket,
+ // directly add to completed_fetches
+ self.completed_fetches.lock().push_back(completed_fetch);
+ self.not_empty_notify.notify_waiters();
+ }
+ }
Review Comment:
The add method has duplicate notification calls. Line 171 and 179 both call
not_empty_notify.notify_waiters() in different branches, but this is redundant.
Consider consolidating the notification logic or documenting why both are
necessary.
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -171,8 +174,43 @@ impl LogScanner {
})
}
- pub async fn poll(&self, _timeout: Duration) -> Result<ScanRecords> {
- Ok(ScanRecords::new(self.poll_for_fetches().await?))
+ pub async fn poll(&self, timeout: Duration) -> Result<ScanRecords> {
+ let start = std::time::Instant::now();
+ let deadline = start + timeout;
+
+ loop {
+ // Try to collect fetches
+ let fetch_result = self.poll_for_fetches().await?;
+
+ if !fetch_result.is_empty() {
+ // We have data, send next round of fetches and return
+ // This enables pipelining while user processes the data
+ self.log_fetcher.send_fetches().await?;
+ return Ok(ScanRecords::new(fetch_result));
+ }
+
+ // No data available, check if we should wait
+ let now = std::time::Instant::now();
+ if now >= deadline {
+ // Timeout reached, return empty result
+ return Ok(ScanRecords::new(HashMap::new()));
+ }
+
+ // Wait for buffer to become non-empty with remaining time
+ let remaining = deadline - now;
+ let has_data = self
+ .log_fetcher
+ .log_fetch_buffer
+ .await_not_empty(remaining)
+ .await;
+
+ if !has_data {
+ // Timeout while waiting
+ return Ok(ScanRecords::new(HashMap::new()));
+ }
+
+ // Buffer became non-empty, try again
+ }
Review Comment:
The poll loop could busy-wait when the buffer is empty but the timeout
hasn't been reached. While await_not_empty() waits with the remaining timeout,
if it returns true but the subsequent poll_for_fetches() finds no data (race
condition), the loop continues immediately without any backoff. Consider adding
a small sleep or exponential backoff between retries to avoid excessive CPU
usage in edge cases.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]