This is an automated email from the ASF dual-hosted git repository. yuxia pushed a commit to branch improve-log-poll in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
commit f664daa74103b2530727c711bef321ab0f3dfe52 Author: luoyuxia <[email protected]> AuthorDate: Sat Dec 20 19:38:49 2025 +0800 improve fetch logic --- crates/fluss/Cargo.toml | 1 + crates/fluss/src/client/credentials.rs | 30 +- crates/fluss/src/client/table/log_fetch_buffer.rs | 238 +++++-------- crates/fluss/src/client/table/remote_log.rs | 87 +++-- crates/fluss/src/client/table/scanner.rs | 395 ++++++++++++--------- crates/fluss/src/record/arrow.rs | 109 ++++-- crates/fluss/tests/integration/table.rs | 4 +- .../fluss/tests/integration/table_remote_scan.rs | 8 +- 8 files changed, 460 insertions(+), 412 deletions(-) diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml index cdba9de..27604ee 100644 --- a/crates/fluss/Cargo.toml +++ b/crates/fluss/Cargo.toml @@ -58,6 +58,7 @@ url = "2.5.7" uuid = { version = "1.10", features = ["v4"] } tempfile = "3.23.0" snafu = "0.8.3" +scopeguard = "1.2.0" [target.'cfg(target_arch = "wasm32")'.dependencies] jiff = { workspace = true, features = ["js"] } diff --git a/crates/fluss/src/client/credentials.rs b/crates/fluss/src/client/credentials.rs index 6b07d08..8adfe48 100644 --- a/crates/fluss/src/client/credentials.rs +++ b/crates/fluss/src/client/credentials.rs @@ -90,20 +90,20 @@ fn convert_hadoop_key_to_opendal(hadoop_key: &str) -> Option<(String, bool)> { pub struct CredentialsCache { inner: RwLock<Option<CachedToken>>, + rpc_client: Arc<RpcClient>, + metadata: Arc<Metadata>, } impl CredentialsCache { - pub fn new() -> Self { + pub fn new(rpc_client: Arc<RpcClient>, metadata: Arc<Metadata>) -> Self { Self { inner: RwLock::new(None), + rpc_client, + metadata, } } - pub async fn get_or_refresh( - &self, - rpc_client: &Arc<RpcClient>, - metadata: &Arc<Metadata>, - ) -> Result<HashMap<String, String>> { + pub async fn get_or_refresh(&self) -> Result<HashMap<String, String>> { { let guard = self.inner.read(); if let Some(cached) = guard.as_ref() { @@ -113,17 +113,13 @@ impl CredentialsCache { } } - self.refresh_from_server(rpc_client, metadata).await + self.refresh_from_server().await } - async fn refresh_from_server( - &self, - rpc_client: &Arc<RpcClient>, - metadata: &Arc<Metadata>, - ) -> Result<HashMap<String, String>> { - let cluster = metadata.get_cluster(); + async fn refresh_from_server(&self) -> Result<HashMap<String, String>> { + let cluster = self.metadata.get_cluster(); let server_node = cluster.get_one_available_server(); - let conn = rpc_client.get_connection(server_node).await?; + let conn = self.rpc_client.get_connection(server_node).await?; let request = GetSecurityTokenRequest::new(); let response = conn.request(request).await?; @@ -158,9 +154,3 @@ impl CredentialsCache { Ok(props) } } - -impl Default for CredentialsCache { - fn default() -> Self { - Self::new() - } -} diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs b/crates/fluss/src/client/table/log_fetch_buffer.rs index eca4de9..5b70dcd 100644 --- a/crates/fluss/src/client/table/log_fetch_buffer.rs +++ b/crates/fluss/src/client/table/log_fetch_buffer.rs @@ -15,10 +15,11 @@ // specific language governing permissions and limitations // under the License. -use crate::error::{Error, Result}; +use crate::error::Result; use crate::metadata::TableBucket; -use crate::proto::PbFetchLogRespForBucket; -use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord}; +use crate::record::{ + LogRecordBatch, LogRecordIterator, LogRecordsBatches, ReadContext, ScanRecord, +}; use parking_lot::Mutex; use std::collections::{HashMap, VecDeque}; use std::sync::Arc; @@ -30,14 +31,13 @@ use tokio::sync::Notify; pub trait CompletedFetch: Send + Sync { fn table_bucket(&self) -> &TableBucket; fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>>; - fn next_fetch_offset(&self) -> i64; fn is_consumed(&self) -> bool; fn drain(&mut self); fn size_in_bytes(&self) -> usize; fn high_watermark(&self) -> i64; fn is_initialized(&self) -> bool; fn set_initialized(&mut self); - fn fetch_offset(&self) -> i64; + fn next_fetch_offset(&self) -> i64; } /// Represents a pending fetch that is waiting to be completed @@ -66,34 +66,34 @@ impl LogFetchBuffer { woken_up: Arc::new(AtomicBool::new(false)), } } - + /// Check if the buffer is empty pub fn is_empty(&self) -> bool { self.completed_fetches.lock().is_empty() } - + /// Wait for the buffer to become non-empty, with timeout /// Returns true if data became available, false if timeout pub async fn await_not_empty(&self, timeout: Duration) -> bool { let deadline = std::time::Instant::now() + timeout; - + loop { // Check if buffer is not empty if !self.is_empty() { return true; } - + // Check if woken up if self.woken_up.swap(false, Ordering::Acquire) { return true; } - + // Check if timeout let now = std::time::Instant::now(); if now >= deadline { return false; } - + // Wait for notification with remaining time let remaining = deadline - now; let notified = self.not_empty_notify.notified(); @@ -108,7 +108,8 @@ impl LogFetchBuffer { } } } - + + #[allow(dead_code)] /// Wake up any waiting threads pub fn wakeup(&self) { self.woken_up.store(true, Ordering::Release); @@ -121,7 +122,7 @@ impl LogFetchBuffer { self.pending_fetches .lock() .entry(table_bucket) - .or_insert_with(VecDeque::new) + .or_default() .push_back(pending_fetch); } @@ -132,50 +133,49 @@ impl LogFetchBuffer { let mut has_completed = false; while let Some(front) = pendings.front() { if front.is_completed() { - if let Some(pending) = pendings.pop_front() { - match pending.to_completed_fetch() { - Ok(completed) => { - self.completed_fetches.lock().push_back(completed); - // Signal that buffer is not empty - self.not_empty_notify.notify_waiters(); - has_completed = true; - } - Err(_) => { - // Skip failed fetches - } + let pending = pendings.pop_front().unwrap(); + match pending.to_completed_fetch() { + Ok(completed) => { + self.completed_fetches.lock().push_back(completed); + // Signal that buffer is not empty + self.not_empty_notify.notify_waiters(); + has_completed = true; + } + Err(e) => { + // todo: handle exception? + log::error!("Error when completing: {e}"); } } } else { break; } } - if pendings.is_empty() { - pending_map.remove(table_bucket); + + if has_completed { + self.not_empty_notify.notify_waiters(); + if pendings.is_empty() { + pending_map.remove(table_bucket); + } } } } /// Add a completed fetch to the buffer pub fn add(&self, completed_fetch: Box<dyn CompletedFetch>) { - let table_bucket = completed_fetch.table_bucket().clone(); + let table_bucket = completed_fetch.table_bucket(); let mut pending_map = self.pending_fetches.lock(); - let should_notify = if let Some(pendings) = pending_map.get_mut(&table_bucket) { + + if let Some(pendings) = pending_map.get_mut(table_bucket) { if pendings.is_empty() { self.completed_fetches.lock().push_back(completed_fetch); - true + self.not_empty_notify.notify_waiters(); } else { - // Convert to pending fetch wrapper - let completed_pending = CompletedPendingFetch::new(completed_fetch); - pendings.push_back(Box::new(completed_pending)); - false + pendings.push_back(Box::new(CompletedPendingFetch::new(completed_fetch))); } } else { + // If there's no pending fetch for this table_bucket, + // directly add to completed_fetches self.completed_fetches.lock().push_back(completed_fetch); - true - }; - - // Signal that buffer is not empty if we added to completed_fetches - if should_notify { self.not_empty_notify.notify_waiters(); } } @@ -198,6 +198,14 @@ impl LogFetchBuffer { /// Get the set of buckets that have buffered data pub fn buffered_buckets(&self) -> Vec<TableBucket> { let mut buckets = Vec::new(); + + let next_in_line_fetch = self.next_in_line_fetch.lock(); + if let Some(complete_fetch) = next_in_line_fetch.as_ref() { + if !complete_fetch.is_consumed() { + buckets.push(complete_fetch.table_bucket().clone()); + } + } + let completed = self.completed_fetches.lock(); for fetch in completed.iter() { buckets.push(fetch.table_bucket().clone()); @@ -214,58 +222,6 @@ impl Default for LogFetchBuffer { } } -/// Pending fetch that waits for fetch log response -pub struct FetchPendingFetch { - table_bucket: TableBucket, - response: Arc<Mutex<Option<Result<PbFetchLogRespForBucket>>>>, - read_context: ReadContext, - fetch_offset: i64, -} - -impl FetchPendingFetch { - pub fn new( - table_bucket: TableBucket, - read_context: ReadContext, - fetch_offset: i64, - ) -> (Self, Arc<Mutex<Option<Result<PbFetchLogRespForBucket>>>>) { - let response = Arc::new(Mutex::new(None)); - let pending = Self { - table_bucket, - response: Arc::clone(&response), - read_context, - fetch_offset, - }; - (pending, response) - } - - pub fn set_response(&self, response: Result<PbFetchLogRespForBucket>) { - *self.response.lock() = Some(response); - } -} - -impl PendingFetch for FetchPendingFetch { - fn table_bucket(&self) -> &TableBucket { - &self.table_bucket - } - - fn is_completed(&self) -> bool { - self.response.lock().is_some() - } - - fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>> { - let response = self.response.lock().take().ok_or_else(|| { - Error::Io(std::io::Error::other("Fetch response not available")) - })??; - let completed = DefaultCompletedFetch::new( - self.table_bucket, - &response, - self.read_context, - self.fetch_offset, - )?; - Ok(Box::new(completed)) - } -} - /// A wrapper that makes a completed fetch look like a pending fetch struct CompletedPendingFetch { completed_fetch: Box<dyn CompletedFetch>, @@ -294,64 +250,65 @@ impl PendingFetch for CompletedPendingFetch { /// Default implementation of CompletedFetch for in-memory log records pub struct DefaultCompletedFetch { table_bucket: TableBucket, - data: Vec<u8>, + log_record_batch: LogRecordsBatches, read_context: ReadContext, - fetch_offset: i64, // The offset at which this fetch started next_fetch_offset: i64, high_watermark: i64, size_in_bytes: usize, consumed: bool, initialized: bool, - // Pre-parsed records for efficient access - records: Vec<ScanRecord>, - current_index: usize, + records_read: usize, + current_record_iterator: Option<LogRecordIterator>, + current_record_batch: Option<LogRecordBatch>, } impl DefaultCompletedFetch { pub fn new( table_bucket: TableBucket, - fetch_response: &PbFetchLogRespForBucket, + log_record_batch: LogRecordsBatches, + size_in_bytes: usize, read_context: ReadContext, fetch_offset: i64, + high_watermark: i64, ) -> Result<Self> { - let data = fetch_response.records.clone().unwrap_or_default(); - let size_in_bytes = data.len(); - let high_watermark = fetch_response.high_watermark.unwrap_or(-1); - - // Parse all records upfront - let mut records = Vec::new(); - for log_record in &mut LogRecordsBatchs::new(&data) { - let last_offset = log_record.last_log_offset(); - let batch_records = log_record.records(&read_context)?; - for record in batch_records { - records.push(record); - } - // Update next_fetch_offset based on the last batch - let next_offset = last_offset + 1; - // We'll update this when we actually consume records - } - - // Set next_fetch_offset based on the last record if available - let next_fetch_offset = if let Some(last_record) = records.last() { - last_record.offset() + 1 - } else { - fetch_offset - }; - Ok(Self { table_bucket, - data, + log_record_batch, read_context, - fetch_offset, - next_fetch_offset, + next_fetch_offset: fetch_offset, high_watermark, size_in_bytes, consumed: false, initialized: false, - records, - current_index: 0, + records_read: 0, + current_record_iterator: None, + current_record_batch: None, }) } + + /// Get the next fetched record, handling batch iteration and record skipping + fn next_fetched_record(&mut self) -> Result<Option<ScanRecord>> { + loop { + if let Some(record) = self + .current_record_iterator + .as_mut() + .and_then(Iterator::next) + { + if record.offset() >= self.next_fetch_offset { + return Ok(Some(record)); + } + } else if let Some(batch) = self.log_record_batch.next() { + self.current_record_iterator = Some(batch.records(&self.read_context)?); + self.current_record_batch = Some(batch); + } else { + if let Some(batch) = self.current_record_batch.take() { + self.next_fetch_offset = batch.next_log_offset(); + } + self.drain(); + return Ok(None); + } + } + } } impl CompletedFetch for DefaultCompletedFetch { @@ -360,30 +317,24 @@ impl CompletedFetch for DefaultCompletedFetch { } fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>> { + // todo: handle corrupt_last_record if self.consumed { return Ok(Vec::new()); } - let end_index = std::cmp::min(self.current_index + max_records, self.records.len()); - let records = self.records[self.current_index..end_index].to_vec(); - self.current_index = end_index; + let mut scan_records = Vec::new(); - if self.current_index >= self.records.len() { - self.consumed = true; - // Update next_fetch_offset based on the last record - if let Some(last_record) = self.records.last() { - self.next_fetch_offset = last_record.offset() + 1; + for _ in 0..max_records { + if let Some(record) = self.next_fetched_record()? { + self.next_fetch_offset = record.offset() + 1; + self.records_read += 1; + scan_records.push(record); + } else { + break; } - } else if let Some(last_record) = records.last() { - // Update next_fetch_offset as we consume records - self.next_fetch_offset = last_record.offset() + 1; } - Ok(records) - } - - fn next_fetch_offset(&self) -> i64 { - self.next_fetch_offset + Ok(scan_records) } fn is_consumed(&self) -> bool { @@ -392,7 +343,6 @@ impl CompletedFetch for DefaultCompletedFetch { fn drain(&mut self) { self.consumed = true; - self.current_index = self.records.len(); } fn size_in_bytes(&self) -> usize { @@ -411,7 +361,7 @@ impl CompletedFetch for DefaultCompletedFetch { self.initialized = true; } - fn fetch_offset(&self) -> i64 { - self.fetch_offset + fn next_fetch_offset(&self) -> i64 { + self.next_fetch_offset } } diff --git a/crates/fluss/src/client/table/remote_log.rs b/crates/fluss/src/client/table/remote_log.rs index 4c835bb..4cc5ae1 100644 --- a/crates/fluss/src/client/table/remote_log.rs +++ b/crates/fluss/src/client/table/remote_log.rs @@ -18,11 +18,11 @@ use crate::client::table::log_fetch_buffer::{CompletedFetch, DefaultCompletedFet use crate::error::{Error, Result}; use crate::io::{FileIO, Storage}; use crate::metadata::TableBucket; -use crate::proto::PbFetchLogRespForBucket; use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment}; -use crate::record::ReadContext; +use crate::record::{LogRecordsBatches, ReadContext}; use crate::util::delete_file; -use parking_lot::Mutex; +use parking_lot::{Mutex, RwLock}; +use std::collections::HashMap; use std::io; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -72,19 +72,19 @@ pub struct RemoteLogFetchInfo { } impl RemoteLogFetchInfo { - pub fn from_proto(info: &PbRemoteLogFetchInfo, table_bucket: TableBucket) -> Result<Self> { + pub fn from_proto(info: &PbRemoteLogFetchInfo, table_bucket: TableBucket) -> Self { let segments = info .remote_log_segments .iter() .map(|s| RemoteLogSegment::from_proto(s, table_bucket.clone())) .collect(); - Ok(Self { + Self { remote_log_tablet_dir: info.remote_log_tablet_dir.clone(), partition_name: info.partition_name.clone(), remote_log_segments: segments, first_start_pos: info.first_start_pos.unwrap_or(0), - }) + } } } @@ -94,13 +94,15 @@ type CompletionCallback = Box<dyn Fn() + Send + Sync>; pub struct RemoteLogDownloadFuture { result: Arc<Mutex<Option<Result<PathBuf>>>>, completion_callbacks: Arc<Mutex<Vec<CompletionCallback>>>, + // todo: add recycleCallback } impl RemoteLogDownloadFuture { pub fn new(receiver: oneshot::Receiver<Result<PathBuf>>) -> Self { let result = Arc::new(Mutex::new(None)); let result_clone = Arc::clone(&result); - let completion_callbacks: Arc<Mutex<Vec<CompletionCallback>>> = Arc::new(Mutex::new(Vec::new())); + let completion_callbacks: Arc<Mutex<Vec<CompletionCallback>>> = + Arc::new(Mutex::new(Vec::new())); let callbacks_clone = Arc::clone(&completion_callbacks); // Spawn a task to wait for the download and update result, then call callbacks @@ -108,7 +110,10 @@ impl RemoteLogDownloadFuture { let download_result = match receiver.await { Ok(Ok(path)) => Ok(path), Ok(Err(e)) => Err(e), - Err(e) => Err(Error::Io(io::Error::other(format!("Download future cancelled: {e:?}")))), + Err(e) => Err(Error::UnexpectedError { + message: format!("Download future cancelled: {e:?}"), + source: None, + }), }; *result_clone.lock() = Some(download_result); @@ -152,17 +157,24 @@ impl RemoteLogDownloadFuture { } } - /// Get the downloaded file path - pub async fn get_file_path(&mut self) -> Result<PathBuf> { - let receiver = self.receiver.take().ok_or_else(|| Error::UnexpectedError { - message: "Downloaded file already consumed".to_string(), - source: None, - })?; + pub fn is_done(&self) -> bool { + self.result.lock().is_some() + } - receiver.await.map_err(|e| Error::UnexpectedError { - message: format!("Download future cancelled: {e:?}"), - source: None, - })? + /// Get the downloaded file path (synchronous, only works after is_done() returns true) + pub fn get_file_path(&self) -> Result<PathBuf> { + let guard = self.result.lock(); + match guard.as_ref() { + Some(Ok(path)) => Ok(path.clone()), + Some(Err(e)) => Err(Error::IoUnexpectedError { + message: format!("Download failed: {e}"), + source: io::Error::other(format!("{e:?}")), + }), + None => Err(Error::IoUnexpectedError { + message: "Download not completed yet".to_string(), + source: io::Error::other("Download not completed yet"), + }), + } } } @@ -198,8 +210,13 @@ impl RemoteLogDownloader { let remote_fs_props = self.remote_fs_props.read().clone(); // Spawn async download task tokio::spawn(async move { - let result = - Self::download_file(&remote_log_tablet_dir, &remote_path, &local_file_path).await; + let result = Self::download_file( + &remote_log_tablet_dir, + &remote_path, + &local_file_path, + &remote_fs_props, + ) + .await; let _ = sender.send(result); }); RemoteLogDownloadFuture::new(receiver) @@ -305,7 +322,6 @@ impl RemoteLogDownloader { } } - /// Pending fetch that waits for remote log file to be downloaded pub struct RemotePendingFetch { segment: RemoteLogSegment, @@ -348,39 +364,32 @@ impl PendingFetch for RemotePendingFetch { fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>> { // Get the file path (this should only be called when is_completed() returns true) let file_path = self.download_future.get_file_path()?; - // Read the file data synchronously (we're in a sync context) // Note: This is a limitation - we need to use blocking I/O here - let file_data = std::fs::read(&file_path).map_err(|e| { - Error::Io(io::Error::other(format!("Failed to read downloaded file: {e:?}"))) + let mut file_data = std::fs::read(&file_path).map_err(|e| Error::IoUnexpectedError { + message: format!("Failed to read downloaded file: {file_path:?}."), + source: e, })?; // Slice the data if needed let data = if self.pos_in_log_segment > 0 { - &file_data[self.pos_in_log_segment as usize..] + file_data.split_off(self.pos_in_log_segment as usize) } else { - &file_data + file_data }; - // Create a mock PbFetchLogRespForBucket for DefaultCompletedFetch - // We'll use the data we read from the file - let fetch_response = PbFetchLogRespForBucket { - bucket_id: self.segment.table_bucket.bucket_id(), - partition_id: None, - error_code: None, - error_message: None, - high_watermark: Some(self.high_watermark), - log_start_offset: None, - remote_log_fetch_info: None, - records: Some(data.to_vec()), - }; + let size_in_bytes = data.len(); + + let log_record_batch = LogRecordsBatches::new(data); // Create DefaultCompletedFetch from the data let completed_fetch = DefaultCompletedFetch::new( self.segment.table_bucket, - &fetch_response, + log_record_batch, + size_in_bytes, self.read_context, self.fetch_offset, + self.high_watermark, )?; // Delete the downloaded local file to free disk (async, but we'll do it in background) diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 5053865..26d37d7 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -18,26 +18,26 @@ use crate::client::connection::FlussConnection; use crate::client::credentials::CredentialsCache; use crate::client::metadata::Metadata; +use crate::client::table::log_fetch_buffer::{ + CompletedFetch, DefaultCompletedFetch, LogFetchBuffer, +}; +use crate::client::table::remote_log::{ + RemoteLogDownloader, RemoteLogFetchInfo, RemotePendingFetch, +}; use crate::error::{Error, Result}; use crate::metadata::{TableBucket, TableInfo, TablePath}; use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket, PbFetchLogReqForTable}; -use crate::record::{to_arrow_schema, ReadContext, ScanRecord, ScanRecords}; -use crate::rpc::{message, RpcClient}; +use crate::record::{LogRecordsBatches, ReadContext, ScanRecord, ScanRecords, to_arrow_schema}; +use crate::rpc::{RpcClient, message}; use crate::util::FairBucketStatusMap; use arrow_schema::SchemaRef; +use log::{debug, error, warn}; use parking_lot::{Mutex, RwLock}; use std::collections::{HashMap, HashSet}; use std::slice::from_ref; use std::sync::Arc; use std::time::Duration; use tempfile::TempDir; -use log::warn; -use crate::client::table::log_fetch_buffer::{ - CompletedFetch, DefaultCompletedFetch, LogFetchBuffer, -}; -use crate::client::table::remote_log::{ - RemoteLogDownloader, RemoteLogFetchInfo, RemotePendingFetch, -}; const LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024; #[allow(dead_code)] @@ -198,7 +198,11 @@ impl LogScanner { // Wait for buffer to become non-empty with remaining time let remaining = deadline - now; - let has_data = self.log_fetcher.log_fetch_buffer.await_not_empty(remaining).await; + let has_data = self + .log_fetcher + .log_fetch_buffer + .await_not_empty(remaining) + .await; if !has_data { // Timeout while waiting @@ -220,7 +224,12 @@ impl LogScanner { } async fn poll_for_fetches(&self) -> Result<HashMap<TableBucket, Vec<ScanRecord>>> { - // Send fetch requests (non-blocking) + let result = self.log_fetcher.collect_fetches()?; + if !result.is_empty() { + return Ok(result); + } + + // send any new fetches (won't resend pending fetches). self.log_fetcher.send_fetches().await?; // Collect completed fetches from buffer @@ -229,14 +238,15 @@ impl LogScanner { } struct LogFetcher { - table_path: TablePath, conns: Arc<RpcClient>, - table_info: TableInfo, metadata: Arc<Metadata>, log_scanner_status: Arc<LogScannerStatus>, read_context: ReadContext, + remote_read_context: ReadContext, remote_log_downloader: Arc<RemoteLogDownloader>, - credentials_cache: CredentialsCache, + // todo: consider schedule a background thread to update + // token instead of update in fetch phase + credentials_cache: Arc<CredentialsCache>, log_fetch_buffer: Arc<LogFetchBuffer>, nodes_with_pending_fetch_requests: Arc<Mutex<HashSet<i32>>>, } @@ -250,19 +260,21 @@ impl LogFetcher { projected_fields: Option<Vec<usize>>, ) -> Result<Self> { let full_arrow_schema = to_arrow_schema(table_info.get_row_type()); - let read_context = Self::create_read_context(full_arrow_schema, projected_fields.clone()); + let read_context = + Self::create_read_context(full_arrow_schema.clone(), projected_fields.clone(), false); + let remote_read_context = + Self::create_read_context(full_arrow_schema, projected_fields.clone(), true); let tmp_dir = TempDir::with_prefix("fluss-remote-logs")?; Ok(LogFetcher { - table_path: table_info.table_path.clone(), - conns, - table_info, - metadata, + conns: conns.clone(), + metadata: metadata.clone(), log_scanner_status, read_context, + remote_read_context, remote_log_downloader: Arc::new(RemoteLogDownloader::new(tmp_dir)?), - credentials_cache: CredentialsCache::new(), + credentials_cache: Arc::new(CredentialsCache::new(conns.clone(), metadata.clone())), log_fetch_buffer: Arc::new(LogFetchBuffer::new()), nodes_with_pending_fetch_requests: Arc::new(Mutex::new(HashSet::new())), }) @@ -271,22 +283,26 @@ impl LogFetcher { fn create_read_context( full_arrow_schema: SchemaRef, projected_fields: Option<Vec<usize>>, + is_from_remote: bool, ) -> ReadContext { match projected_fields { - None => ReadContext::new(full_arrow_schema), - Some(fields) => ReadContext::with_projection_pushdown(full_arrow_schema, fields), + None => ReadContext::new(full_arrow_schema, is_from_remote), + Some(fields) => { + ReadContext::with_projection_pushdown(full_arrow_schema, fields, is_from_remote) + } } } /// Send fetch requests asynchronously without waiting for responses async fn send_fetches(&self) -> Result<()> { + // todo: check update metadata like fluss-java in case leader changes let fetch_request = self.prepare_fetch_log_requests().await; for (leader, fetch_request) in fetch_request { + debug!("Adding pending request for node id {leader}"); // Check if we already have a pending request for this node { - self.nodes_with_pending_fetch_requests.lock() - .insert(leader); + self.nodes_with_pending_fetch_requests.lock().insert(leader); } let cluster = self.metadata.get_cluster().clone(); @@ -295,45 +311,57 @@ impl LogFetcher { let log_fetch_buffer = self.log_fetch_buffer.clone(); let log_scanner_status = self.log_scanner_status.clone(); let read_context = self.read_context.clone(); + let remote_read_context = self.remote_read_context.clone(); let remote_log_downloader = Arc::clone(&self.remote_log_downloader); + let creds_cache = self.credentials_cache.clone(); let nodes_with_pending = self.nodes_with_pending_fetch_requests.clone(); // Spawn async task to handle the fetch request tokio::spawn(async move { + // make sure it will always remote leader from pending nodes + let _guard = scopeguard::guard((), |_| { + nodes_with_pending.lock().remove(&leader); + }); + let server_node = cluster .get_tablet_server(leader) - .expect("todo: handle leader not exist.").clone(); - let result = conns.get_connection(&server_node).await; - match result { - Ok(con) => { - let fetch_result = con - .request(message::FetchLogRequest::new(fetch_request)) - .await; - - match fetch_result { - Ok(fetch_response) => { - Self::handle_fetch_response( - fetch_response, - &log_fetch_buffer, - &log_scanner_status, - &read_context, - &remote_log_downloader, - ).await; - } - Err(e) => { - warn!("Failed to fetch log from destination node {:?}: {:?}", - server_node, - e); - } - } + .expect("todo: handle leader not exist."); + + let con = match conns.get_connection(server_node).await { + Ok(con) => con, + Err(e) => { + // todo: handle failed to get connection + warn!("Failed to get connection to destination node: {e:?}"); + return; } + }; + + let fetch_response = match con + .request(message::FetchLogRequest::new(fetch_request)) + .await + { + Ok(resp) => resp, Err(e) => { - warn!("Failed to get connection to destination node: {:?}", e); + // todo: handle fetch log from destination node + warn!("Failed to fetch log from destination node {server_node:?}: {e:?}"); + return; } - } + }; - // Remove from pending set - nodes_with_pending.lock().remove(&leader); + if let Err(e) = Self::handle_fetch_response( + fetch_response, + &log_fetch_buffer, + &log_scanner_status, + &read_context, + &remote_read_context, + &remote_log_downloader, + &creds_cache, + ) + .await + { + // todo: handle fail to handle fetch response + error!("Fail to handle fetch response: {e:?}"); + } }); } @@ -346,8 +374,10 @@ impl LogFetcher { log_fetch_buffer: &Arc<LogFetchBuffer>, log_scanner_status: &Arc<LogScannerStatus>, read_context: &ReadContext, + remote_read_context: &ReadContext, remote_log_downloader: &Arc<RemoteLogDownloader>, - ) { + credentials_cache: &Arc<CredentialsCache>, + ) -> Result<()> { for pb_fetch_log_resp in fetch_response.tables_resp { let table_id = pb_fetch_log_resp.table_id; let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp; @@ -356,90 +386,107 @@ impl LogFetcher { let bucket: i32 = fetch_log_for_bucket.bucket_id; let table_bucket = TableBucket::new(table_id, bucket); + // todo: check fetch result code for per-bucket + let Some(fetch_offset) = log_scanner_status.get_bucket_offset(&table_bucket) else { + debug!( + "Ignoring fetch log response for bucket {table_bucket} because the bucket has been unsubscribed." + ); + continue; + }; + // Check if this is a remote log fetch - if let Some(ref remote_log_fetch_info) = - fetch_log_for_bucket.remote_log_fetch_info + if let Some(ref remote_log_fetch_info) = fetch_log_for_bucket.remote_log_fetch_info { + // set remote fs props + let remote_fs_props = credentials_cache.get_or_refresh().await?; + remote_log_downloader.set_remote_fs_props(remote_fs_props); + + let remote_fetch_info = + RemoteLogFetchInfo::from_proto(remote_log_fetch_info, table_bucket.clone()); + + let high_watermark = fetch_log_for_bucket.high_watermark.unwrap_or(-1); + Self::pending_remote_fetches( + remote_log_downloader.clone(), + log_fetch_buffer.clone(), + remote_read_context.clone(), + &table_bucket, + remote_fetch_info, + fetch_offset, + high_watermark, + ); + } else if fetch_log_for_bucket.records.is_some() { + // Handle regular in-memory records - create completed fetch directly + let high_watermark = fetch_log_for_bucket.high_watermark.unwrap_or(-1); + let records = fetch_log_for_bucket.records.unwrap_or(vec![]); + let size_in_bytes = records.len(); + let log_record_batch = LogRecordsBatches::new(records); - let remote_fetch_info = match RemoteLogFetchInfo::from_proto( - remote_log_fetch_info, + match DefaultCompletedFetch::new( table_bucket.clone(), + log_record_batch, + size_in_bytes, + read_context.clone(), + fetch_offset, + high_watermark, ) { - Ok(info) => info, - Err(e) => { - eprintln!("Failed to parse remote log fetch info: {:?}", e); - continue; - } - }; - - if let Some(fetch_offset) = - log_scanner_status.get_bucket_offset(&table_bucket) - { - let high_watermark = fetch_log_for_bucket.high_watermark.unwrap_or(-1); - // Download and process remote log segments - let mut pos_in_log_segment = remote_fetch_info.first_start_pos; - let mut current_fetch_offset = fetch_offset; - // todo: make segment download parallelly - for (i, segment) in - remote_fetch_info.remote_log_segments.iter().enumerate() - { - if i > 0 { - pos_in_log_segment = 0; - current_fetch_offset = segment.start_offset; - } - - let download_future = remote_log_downloader.request_remote_log( - &remote_fetch_info.remote_log_tablet_dir, - segment, - ); - - let table_bucket_clone = table_bucket.clone(); - - // Register callback to be called when download completes - // (similar to Java's downloadFuture.onComplete) - // This must be done before creating RemotePendingFetch to avoid move issues - let log_fetch_buffer_clone = Arc::clone(log_fetch_buffer); - download_future.on_complete(move || { - log_fetch_buffer_clone.try_complete(&table_bucket_clone); - }); - - let pending_fetch = RemotePendingFetch::new( - segment.clone(), - download_future, - pos_in_log_segment, - current_fetch_offset, - high_watermark, - read_context.clone(), - ); - // Add to pending fetches in buffer (similar to Java's logFetchBuffer.pend) - log_fetch_buffer.pend(Box::new(pending_fetch)); + Ok(completed_fetch) => { + log_fetch_buffer.add(Box::new(completed_fetch)); } - } else { - // if the offset is null, it means the bucket has been unsubscribed, - // skip processing and continue to the next bucket. - continue; - } - } else if fetch_log_for_bucket.records.is_some() { - // Handle regular in-memory records - create completed fetch directly - if let Some(fetch_offset) = log_scanner_status.get_bucket_offset(&table_bucket) { - match DefaultCompletedFetch::new( - table_bucket.clone(), - &fetch_log_for_bucket, - read_context.clone(), - fetch_offset, - ) { - Ok(completed_fetch) => { - log_fetch_buffer.add(Box::new(completed_fetch)); - } - Err(e) => { - // todo: handle error - eprintln!("Failed to create completed fetch: {:?}", e); - } + Err(e) => { + // todo: handle error + log::warn!("Failed to create completed fetch: {e:?}"); } } } } } + Ok(()) + } + + fn pending_remote_fetches( + remote_log_downloader: Arc<RemoteLogDownloader>, + log_fetch_buffer: Arc<LogFetchBuffer>, + read_context: ReadContext, + table_bucket: &TableBucket, + remote_fetch_info: RemoteLogFetchInfo, + fetch_offset: i64, + high_watermark: i64, + ) { + // Download and process remote log segments + let mut pos_in_log_segment = remote_fetch_info.first_start_pos; + let mut current_fetch_offset = fetch_offset; + for (i, segment) in remote_fetch_info.remote_log_segments.iter().enumerate() { + if i > 0 { + pos_in_log_segment = 0; + current_fetch_offset = segment.start_offset; + } + + // todo: + // 1: control the max threads to download remote segment + // 2: introduce priority queue to priority highest for earliest segment + let download_future = remote_log_downloader + .request_remote_log(&remote_fetch_info.remote_log_tablet_dir, segment); + + // Register callback to be called when download completes + // (similar to Java's downloadFuture.onComplete) + // This must be done before creating RemotePendingFetch to avoid move issues + let table_bucket = table_bucket.clone(); + let log_fetch_buffer_clone = log_fetch_buffer.clone(); + download_future.on_complete(move || { + log_fetch_buffer_clone.try_complete(&table_bucket); + }); + + let pending_fetch = RemotePendingFetch::new( + segment.clone(), + download_future, + pos_in_log_segment, + current_fetch_offset, + high_watermark, + read_context.clone(), + ); + // Add to pending fetches in buffer (similar to Java's logFetchBuffer.pend) + log_fetch_buffer.pend(Box::new(pending_fetch)); + } } /// Collect completed fetches from buffer @@ -470,14 +517,15 @@ impl LogFetcher { // (2) there are no fetched records with actual content preceding this // exception. if result.is_empty() && size_in_bytes == 0 { - // todo: consider it? + // todo: do we need to consider it like java ? // self.log_fetch_buffer.poll(); } return Err(e); } } } else { - self.log_fetch_buffer.set_next_in_line_fetch(Some(completed_fetch)); + self.log_fetch_buffer + .set_next_in_line_fetch(Some(completed_fetch)); } // Note: peek() already removed the fetch from buffer, so no need to call poll() } else { @@ -487,7 +535,8 @@ impl LogFetcher { } else { // Fetch records from next_in_line if let Some(mut next_fetch) = next_in_line { - let records = self.fetch_records_from_fetch(&mut next_fetch, records_remaining)?; + let records = + self.fetch_records_from_fetch(&mut next_fetch, records_remaining)?; if !records.is_empty() { let table_bucket = next_fetch.table_bucket().clone(); @@ -510,29 +559,22 @@ impl LogFetcher { &self, mut completed_fetch: Box<dyn CompletedFetch>, ) -> Result<Option<Box<dyn CompletedFetch>>> { - - // todo: handle initialize failure - - let table_bucket = completed_fetch.table_bucket().clone(); - let fetch_offset = completed_fetch.fetch_offset(); + // todo: handle error in initialize fetch + let table_bucket = completed_fetch.table_bucket(); + let fetch_offset = completed_fetch.next_fetch_offset(); // Check if bucket is still subscribed - let current_offset = self.log_scanner_status.get_bucket_offset(&table_bucket); - if current_offset.is_none() { + let Some(current_offset) = self.log_scanner_status.get_bucket_offset(table_bucket) else { warn!( - "Discarding stale fetch response for bucket {:?} since the bucket has been unsubscribed", - table_bucket + "Discarding stale fetch response for bucket {table_bucket:?} since the bucket has been unsubscribed" ); return Ok(None); - } - - let current_offset = current_offset.unwrap(); + }; // Check if offset matches if fetch_offset != current_offset { warn!( - "Discarding stale fetch response for bucket {:?} since its offset {} does not match the expected offset {}", - table_bucket, fetch_offset, current_offset + "Discarding stale fetch response for bucket {table_bucket:?} since its offset {fetch_offset} does not match the expected offset {current_offset}" ); return Ok(None); } @@ -540,7 +582,8 @@ impl LogFetcher { // Update high watermark let high_watermark = completed_fetch.high_watermark(); if high_watermark >= 0 { - self.log_scanner_status.update_high_watermark(&table_bucket, high_watermark); + self.log_scanner_status + .update_high_watermark(table_bucket, high_watermark); } completed_fetch.set_initialized(); @@ -558,15 +601,14 @@ impl LogFetcher { if current_offset.is_none() { warn!( - "Ignoring fetched records for {:?} since the bucket has been unsubscribed", - table_bucket + "Ignoring fetched records for {table_bucket:?} since the bucket has been unsubscribed" ); next_in_line_fetch.drain(); return Ok(Vec::new()); } let current_offset = current_offset.unwrap(); - let fetch_offset = next_in_line_fetch.fetch_offset(); + let fetch_offset = next_in_line_fetch.next_fetch_offset(); // Check if this fetch is next in line if fetch_offset == current_offset { @@ -574,15 +616,15 @@ impl LogFetcher { let next_fetch_offset = next_in_line_fetch.next_fetch_offset(); if next_fetch_offset > current_offset { - self.log_scanner_status.update_offset(&table_bucket, next_fetch_offset); + self.log_scanner_status + .update_offset(&table_bucket, next_fetch_offset); } Ok(records) } else { // These records aren't next in line, ignore them warn!( - "Ignoring fetched records for {:?} at offset {} since the current offset is {}", - table_bucket, fetch_offset, current_offset + "Ignoring fetched records for {table_bucket:?} at offset {fetch_offset} since the current offset is {current_offset}" ); next_in_line_fetch.drain(); Ok(Vec::new()) @@ -601,27 +643,43 @@ impl LogFetcher { let offset = match self.log_scanner_status.get_bucket_offset(&bucket) { Some(offset) => offset, None => { - // todo: debug + debug!( + "Skipping fetch request for bucket {bucket} because the bucket has been unsubscribed." + ); continue; } }; - if let Some(leader) = self.get_table_bucket_leader(&bucket) { - if !self.nodes_with_pending_fetch_requests.lock() - .contains(&leader) { - let fetch_log_req_for_bucket = PbFetchLogReqForBucket { - partition_id: None, - bucket_id: bucket.bucket_id(), - fetch_offset: offset, - // 1M - max_fetch_bytes: 1024 * 1024, - }; - - fetch_log_req_for_buckets - .entry(leader) - .or_insert_with(Vec::new) - .push(fetch_log_req_for_bucket); - ready_for_fetch_count += 1; + match self.get_table_bucket_leader(&bucket) { + None => { + log::trace!( + "Skipping fetch request for bucket {bucket} because leader is not available." + ) + } + Some(leader) => { + if self + .nodes_with_pending_fetch_requests + .lock() + .contains(&leader) + { + log::trace!( + "Skipping fetch request for bucket {bucket} because previous request to server {leader} has not been processed." + ) + } else { + let fetch_log_req_for_bucket = PbFetchLogReqForBucket { + partition_id: None, + bucket_id: bucket.bucket_id(), + fetch_offset: offset, + // 1M + max_fetch_bytes: 1024 * 1024, + }; + + fetch_log_req_for_buckets + .entry(leader) + .or_insert_with(Vec::new) + .push(fetch_log_req_for_bucket); + ready_for_fetch_count += 1; + } } } } @@ -662,7 +720,8 @@ impl LogFetcher { // Get buckets that are not already in the buffer let buffered = self.log_fetch_buffer.buffered_buckets(); let buffered_set: HashSet<TableBucket> = buffered.into_iter().collect(); - self.log_scanner_status.fetchable_buckets(|tb| !buffered_set.contains(tb)) + self.log_scanner_status + .fetchable_buckets(|tb| !buffered_set.contains(tb)) } fn get_table_bucket_leader(&self, tb: &TableBucket) -> Option<i32> { diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 9295713..7fa878e 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -39,6 +39,7 @@ use arrow_schema::SchemaRef; use arrow_schema::{DataType as ArrowDataType, Field}; use byteorder::WriteBytesExt; use byteorder::{ByteOrder, LittleEndian}; +use bytes::Bytes; use crc32c::crc32c; use parking_lot::Mutex; use std::{ @@ -347,17 +348,17 @@ pub trait ToArrow { fn append_to(&self, builder: &mut dyn ArrayBuilder) -> Result<()>; } -pub struct LogRecordsBatchs<'a> { - data: &'a [u8], +pub struct LogRecordsBatches { + data: Bytes, current_pos: usize, remaining_bytes: usize, } -impl<'a> LogRecordsBatchs<'a> { - pub fn new(data: &'a [u8]) -> Self { +impl LogRecordsBatches { + pub fn new(data: Vec<u8>) -> Self { let remaining_bytes: usize = data.len(); Self { - data, + data: Bytes::from(data), current_pos: 0, remaining_bytes, } @@ -378,14 +379,17 @@ impl<'a> LogRecordsBatchs<'a> { } } -impl<'a> Iterator for &'a mut LogRecordsBatchs<'a> { - type Item = LogRecordBatch<'a>; +impl Iterator for LogRecordsBatches { + type Item = LogRecordBatch; fn next(&mut self) -> Option<Self::Item> { match self.next_batch_size() { Some(batch_size) => { - let data_slice = &self.data[self.current_pos..self.current_pos + batch_size]; - let record_batch = LogRecordBatch::new(data_slice); + let start = self.current_pos; + let end = start + batch_size; + // Since LogRecordsBatchs owns the Vec<u8>, the slice is valid + // as long as the mutable reference exists, which is 'a + let record_batch = LogRecordBatch::new(self.data.slice(start..end)); self.current_pos += batch_size; self.remaining_bytes -= batch_size; Some(record_batch) @@ -395,13 +399,13 @@ impl<'a> Iterator for &'a mut LogRecordsBatchs<'a> { } } -pub struct LogRecordBatch<'a> { - data: &'a [u8], +pub struct LogRecordBatch { + data: Bytes, } #[allow(dead_code)] -impl<'a> LogRecordBatch<'a> { - pub fn new(data: &'a [u8]) -> Self { +impl LogRecordBatch { + pub fn new(data: Bytes) -> Self { LogRecordBatch { data } } @@ -710,6 +714,7 @@ pub struct ReadContext { target_schema: SchemaRef, full_schema: SchemaRef, projection: Option<Projection>, + is_from_remote: bool, } #[derive(Clone)] @@ -723,24 +728,35 @@ struct Projection { } impl ReadContext { - pub fn new(arrow_schema: SchemaRef) -> ReadContext { + pub fn new(arrow_schema: SchemaRef, is_from_remote: bool) -> ReadContext { ReadContext { target_schema: arrow_schema.clone(), full_schema: arrow_schema, projection: None, + is_from_remote, } } pub fn with_projection_pushdown( arrow_schema: SchemaRef, projected_fields: Vec<usize>, + is_from_remote: bool, ) -> ReadContext { let target_schema = Self::project_schema(arrow_schema.clone(), projected_fields.as_slice()); - let mut sorted_fields = projected_fields.clone(); - sorted_fields.sort_unstable(); + let (need_do_reorder, sorted_fields) = { + // currently, for remote read, arrow log doesn't support projection pushdown, + // so, only need to do reordering when is not from remote + if !is_from_remote { + let mut sorted_fields = projected_fields.clone(); + sorted_fields.sort_unstable(); + (!sorted_fields.eq(&projected_fields), sorted_fields) + } else { + (false, vec![]) + } + }; let project = { - if !sorted_fields.eq(&projected_fields) { + if need_do_reorder { // reordering is required // Calculate reordering indexes to transform from sorted order to user-requested order let mut reordering_indexes = Vec::with_capacity(projected_fields.len()); @@ -778,6 +794,7 @@ impl ReadContext { target_schema, full_schema: arrow_schema, projection: Some(project), + is_from_remote, } } @@ -805,17 +822,24 @@ impl ReadContext { pub fn record_batch(&self, data: &[u8]) -> Result<RecordBatch> { let (batch_metadata, body_buffer, version) = parse_ipc_message(data)?; - // the record batch from server must be ordered by field pos, - // according to project to decide what arrow schema to use - // to parse the record batch - let resolve_schema = match self.projection { - Some(ref projection) => { - // projection, should use ordered schema by project field pos - projection.ordered_schema.clone() - } - None => { - // no projection, use target output schema - self.target_schema.clone() + let resolve_schema = { + // if from remote, no projection, need to use full schema + if self.is_from_remote { + self.full_schema.clone() + } else { + // the record batch from server must be ordered by field pos, + // according to project to decide what arrow schema to use + // to parse the record batch + match self.projection { + Some(ref projection) => { + // projection, should use ordered schema by project field pos + projection.ordered_schema.clone() + } + None => { + // no projection, use target output schema + self.target_schema.clone() + } + } } }; @@ -829,14 +853,27 @@ impl ReadContext { )?; let record_batch = match &self.projection { - Some(projection) if projection.reordering_needed => { - // Reorder columns if needed (when projection pushdown with non-sorted order) - let reordered_columns: Vec<_> = projection - .reordering_indexes - .iter() - .map(|&idx| record_batch.column(idx).clone()) - .collect(); - RecordBatch::try_new(self.target_schema.clone(), reordered_columns)? + Some(projection) => { + let reordered_columns = { + // need to do reorder + if self.is_from_remote { + Some(&projection.projected_fields) + } else if projection.reordering_needed { + Some(&projection.reordering_indexes) + } else { + None + } + }; + match reordered_columns { + Some(reordered_columns) => { + let arrow_columns = reordered_columns + .iter() + .map(|&idx| record_batch.column(idx).clone()) + .collect(); + RecordBatch::try_new(self.target_schema.clone(), arrow_columns)? + } + _ => record_batch, + } } _ => record_batch, }; diff --git a/crates/fluss/tests/integration/table.rs b/crates/fluss/tests/integration/table.rs index a54f469..9eec98e 100644 --- a/crates/fluss/tests/integration/table.rs +++ b/crates/fluss/tests/integration/table.rs @@ -178,7 +178,7 @@ mod table_test { } let scan_records_projected = log_scanner_projected - .poll(std::time::Duration::from_secs(5)) + .poll(std::time::Duration::from_secs(10)) .await .expect("Failed to poll"); @@ -227,7 +227,7 @@ mod table_test { // Poll for records let scan_records = log_scanner - .poll(tokio::time::Duration::from_secs(5)) + .poll(tokio::time::Duration::from_secs(10)) .await .expect("Failed to poll records"); diff --git a/crates/fluss/tests/integration/table_remote_scan.rs b/crates/fluss/tests/integration/table_remote_scan.rs index ca61ff8..bdbced9 100644 --- a/crates/fluss/tests/integration/table_remote_scan.rs +++ b/crates/fluss/tests/integration/table_remote_scan.rs @@ -175,6 +175,8 @@ mod table_remote_scan_test { let num_buckets = table.table_info().get_num_buckets(); let log_scanner = table .new_scan() + .project(&[1, 0]) + .unwrap() .create_log_scanner() .expect("Failed to create log scanner"); for bucket_id in 0..num_buckets { @@ -186,7 +188,7 @@ mod table_remote_scan_test { let mut records = Vec::with_capacity(record_count); let start = std::time::Instant::now(); - const MAX_WAIT_DURATION: Duration = Duration::from_secs(30); + const MAX_WAIT_DURATION: Duration = Duration::from_secs(60); while records.len() < record_count { if start.elapsed() > MAX_WAIT_DURATION { panic!( @@ -208,8 +210,8 @@ mod table_remote_scan_test { let row = record.row(); let expected_c1 = i as i32; let expected_c2 = format!("v{}", i); - assert_eq!(row.get_int(0), expected_c1, "c1 mismatch at index {}", i); - assert_eq!(row.get_string(1), expected_c2, "c2 mismatch at index {}", i); + assert_eq!(row.get_int(1), expected_c1, "c1 mismatch at index {}", i); + assert_eq!(row.get_string(0), expected_c2, "c2 mismatch at index {}", i); } }
