Copilot commented on code in PR #132:
URL: https://github.com/apache/fluss-rust/pull/132#discussion_r2678550082
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -74,49 +74,11 @@ impl<'a> TableScan<'a> {
/// Returns an error if `column_indices` is empty or if any column index
is out of range.
///
/// # Example
- /// ```
- /// # use fluss::client::FlussConnection;
- /// # use fluss::config::Config;
- /// # use fluss::error::Result;
- /// # use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
- /// # use fluss::row::InternalRow;
- /// # use std::time::Duration;
- ///
- /// # pub async fn example() -> Result<()> {
- /// let mut config = Config::default();
- /// config.bootstrap_server = Some("127.0.0.1:9123".to_string());
- /// let conn = FlussConnection::new(config).await?;
- ///
- /// let table_descriptor = TableDescriptor::builder()
- /// .schema(
- /// Schema::builder()
- /// .column("col1", DataTypes::int())
- /// .column("col2", DataTypes::string())
- /// .column("col3", DataTypes::string())
- /// .column("col3", DataTypes::string())
- /// .build()?,
- /// ).build()?;
- /// let table_path = TablePath::new("fluss".to_owned(),
"rust_test_long".to_owned());
- /// let admin = conn.get_admin().await?;
- /// admin.create_table(&table_path, &table_descriptor, true)
- /// .await?;
- /// let table_info = admin.get_table(&table_path).await?;
- /// let table = conn.get_table(&table_path).await?;
- ///
- /// // Project columns by indices
- /// let scanner = table.new_scan().project(&[0, 2,
3])?.create_log_scanner()?;
- /// let scan_records = scanner.poll(Duration::from_secs(10)).await?;
- /// for record in scan_records {
- /// let row = record.row();
- /// println!(
- /// "{{{}, {}, {}}}@{}",
- /// row.get_int(0),
- /// row.get_string(2),
- /// row.get_string(3),
- /// record.offset()
- /// );
- /// }
- /// # Ok(())
+ /// ```no_run
+ /// # fn example() -> fluss::error::Result<()> {
+ /// # let table: fluss::client::FlussTable<'_> = todo!("requires a Fluss
connection");
+ /// let scanner = table.new_scan().project(&[0, 2,
3])?.create_log_scanner()?;
Review Comment:
The simplified example now uses no_run which is appropriate since it
requires actual Fluss connections. However, the example signature shows "fn
example()" which is synchronous, but TableScan operations like
create_log_scanner() may be async in actual usage. Consider verifying the
example compiles correctly or adding async if needed.
```suggestion
/// let scan = table.new_scan().project(&[0, 2, 3])?;
```
##########
crates/fluss/src/client/write/batch.rs:
##########
@@ -175,7 +218,13 @@ impl ArrowLogWriteBatch {
}
pub fn build(&self) -> Result<Vec<u8>> {
- self.arrow_builder.build()
+ let mut cached = self.built_records.lock();
+ if let Some(bytes) = cached.as_ref() {
+ return Ok(bytes.clone());
+ }
+ let bytes = self.arrow_builder.build()?;
+ *cached = Some(bytes.clone());
+ Ok(bytes)
}
Review Comment:
The built_records cache can grow indefinitely as it is never cleared. Once a
batch is built and cached, the cached Vec<u8> is cloned on subsequent calls,
but the cache is never invalidated even after the batch completes or is
re-enqueued. This could lead to memory accumulation, especially if batches are
retried multiple times or if many batches are processed.
Consider clearing the cache when the batch is re-enqueued (in the
re_enqueued method) or when the batch completes, or evaluate if caching is
necessary given that build() should only be called once per successful send
attempt.
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -150,47 +112,14 @@ impl<'a> TableScan<'a> {
/// Returns an error if `column_names` is empty or if any column name is
not found in the table schema.
///
/// # Example
- /// ```
- /// # use fluss::client::FlussConnection;
- /// # use fluss::config::Config;
- /// # use fluss::error::Result;
- /// # use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
- /// # use fluss::row::InternalRow;
- /// # use std::time::Duration;
- ///
- /// # pub async fn example() -> Result<()> {
- /// let mut config = Config::default();
- /// config.bootstrap_server = Some("127.0.0.1:9123".to_string());
- /// let conn = FlussConnection::new(config).await?;
- ///
- /// let table_descriptor = TableDescriptor::builder()
- /// .schema(
- /// Schema::builder()
- /// .column("col1", DataTypes::int())
- /// .column("col2", DataTypes::string())
- /// .column("col3", DataTypes::string())
- /// .build()?,
- /// ).build()?;
- /// let table_path = TablePath::new("fluss".to_owned(),
"rust_test_long".to_owned());
- /// let admin = conn.get_admin().await?;
- /// admin.create_table(&table_path, &table_descriptor, true)
- /// .await?;
- /// let table_info = admin.get_table(&table_path).await?;
- /// let table = conn.get_table(&table_path).await?;
- ///
- /// // Project columns by column names
- /// let scanner = table.new_scan().project_by_name(&["col1",
"col3"])?.create_log_scanner()?;
- /// let scan_records = scanner.poll(Duration::from_secs(10)).await?;
- /// for record in scan_records {
- /// let row = record.row();
- /// println!(
- /// "{{{}, {}}}@{}",
- /// row.get_int(0),
- /// row.get_string(1),
- /// record.offset()
- /// );
- /// }
- /// # Ok(())
+ /// ```no_run
+ /// # fn example() -> fluss::error::Result<()> {
+ /// # let table: fluss::client::FlussTable<'_> = todo!("requires a Fluss
connection");
+ /// let scanner = table
+ /// .new_scan()
+ /// .project_by_name(&["col1", "col3"])?
+ /// .create_log_scanner()?;
+ /// # Ok(())
Review Comment:
Similar to the project() example, this simplified example uses a synchronous
function signature but may require async context. Consider verifying the
example compiles correctly or marking the function as async if needed.
##########
crates/fluss/src/client/write/sender.rs:
##########
@@ -132,78 +133,502 @@ impl Sender {
&self,
destination: i32,
acks: i16,
- batches: &Vec<Arc<ReadyWriteBatch>>,
+ batches: Vec<ReadyWriteBatch>,
) -> Result<()> {
if batches.is_empty() {
return Ok(());
}
- let mut records_by_bucket = HashMap::new();
- let mut write_batch_by_table = HashMap::new();
+ let mut records_by_bucket: HashMap<TableBucket, ReadyWriteBatch> =
HashMap::new();
+ let mut write_batch_by_table: HashMap<i64, Vec<TableBucket>> =
HashMap::new();
for batch in batches {
- records_by_bucket.insert(batch.table_bucket.clone(),
batch.clone());
+ let table_bucket = batch.table_bucket.clone();
write_batch_by_table
- .entry(batch.table_bucket.table_id())
- .or_insert_with(Vec::new)
- .push(batch);
+ .entry(table_bucket.table_id())
+ .or_default()
+ .push(table_bucket.clone());
+ records_by_bucket.insert(table_bucket, batch);
}
let cluster = self.metadata.get_cluster();
- let destination_node =
- cluster
- .get_tablet_server(destination)
- .ok_or(Error::LeaderNotAvailable {
- message: format!("destination node not found in metadata
cache {destination}."),
- })?;
- let connection = self.metadata.get_connection(destination_node).await?;
+ let destination_node = match cluster.get_tablet_server(destination) {
+ Some(node) => node,
+ None => {
+ self.handle_batches_with_error(
+ records_by_bucket.into_values().collect(),
+ FlussError::LeaderNotAvailableException,
+ format!("Destination node not found in metadata cache
{destination}."),
+ )
+ .await?;
+ return Ok(());
+ }
+ };
+ let connection = match
self.metadata.get_connection(destination_node).await {
+ Ok(connection) => connection,
+ Err(e) => {
+ self.handle_batches_with_error(
+ records_by_bucket.into_values().collect(),
+ FlussError::NetworkException,
+ format!("Failed to connect destination node {destination}:
{e}"),
+ )
+ .await?;
+ return Ok(());
+ }
+ };
+
+ for (table_id, table_buckets) in write_batch_by_table {
+ let request_batches: Vec<&ReadyWriteBatch> = table_buckets
+ .iter()
+ .filter_map(|bucket| records_by_bucket.get(bucket))
+ .collect();
+ if request_batches.is_empty() {
+ continue;
+ }
+ let request = match ProduceLogRequest::new(
+ table_id,
+ acks,
+ self.max_request_timeout_ms,
+ request_batches.as_slice(),
+ ) {
+ Ok(request) => request,
+ Err(e) => {
+ self.handle_batches_with_error(
+ table_buckets
+ .iter()
+ .filter_map(|bucket|
records_by_bucket.remove(bucket))
+ .collect(),
+ FlussError::UnknownServerError,
+ format!("Failed to build produce request: {e}"),
+ )
+ .await?;
+ continue;
+ }
+ };
- for (table_id, write_batches) in write_batch_by_table {
- let request =
- ProduceLogRequest::new(table_id, acks,
self.max_request_timeout_ms, write_batches)?;
- let response = connection.request(request).await?;
- self.handle_produce_response(table_id, &records_by_bucket,
response)?
+ let response = match connection.request(request).await {
+ Ok(response) => response,
+ Err(e) => {
+ self.handle_batches_with_error(
+ table_buckets
+ .iter()
+ .filter_map(|bucket|
records_by_bucket.remove(bucket))
+ .collect(),
+ FlussError::NetworkException,
+ format!("Failed to send produce request: {e}"),
+ )
+ .await?;
+ continue;
+ }
+ };
+
+ self.handle_produce_response(
+ table_id,
+ &table_buckets,
+ &mut records_by_bucket,
+ response,
+ )
+ .await?;
}
Ok(())
}
- fn handle_produce_response(
+ async fn handle_produce_response(
&self,
table_id: i64,
- records_by_bucket: &HashMap<TableBucket, Arc<ReadyWriteBatch>>,
+ request_buckets: &[TableBucket],
+ records_by_bucket: &mut HashMap<TableBucket, ReadyWriteBatch>,
response: ProduceLogResponse,
) -> Result<()> {
+ let mut invalid_metadata_tables: HashSet<TablePath> = HashSet::new();
+ let mut pending_buckets: HashSet<TableBucket> =
request_buckets.iter().cloned().collect();
for produce_log_response_for_bucket in response.buckets_resp.iter() {
let tb = TableBucket::new(table_id,
produce_log_response_for_bucket.bucket_id);
- let ready_batch = records_by_bucket.get(&tb).unwrap();
+ let Some(ready_batch) = records_by_bucket.remove(&tb) else {
+ warn!("Missing ready batch for table bucket {tb}");
+ continue;
+ };
Review Comment:
The logic for handling buckets in the produce response may have an issue.
When a bucket appears in the response but not in the request_buckets (line
247), a warning is logged and the code continues, removing the batch from
records_by_bucket. However, this bucket is never in pending_buckets (which is
initialized from request_buckets on line 245), so it won't be handled in the
pending_buckets loop (lines 276-290) either.
This means if the server returns a response for a bucket that wasn't in the
request, that batch gets silently removed from records_by_bucket without being
completed or failed. Consider whether this scenario should be treated as an
error condition, or if the batch should be explicitly handled.
##########
crates/fluss/src/client/write/batch.rs:
##########
@@ -53,15 +57,36 @@ impl InnerWriteBatch {
}
fn complete(&self, write_result: BatchWriteResult) -> bool {
- if !self.completed {
- self.results.broadcast(write_result);
+ if self
+ .completed
+ .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
+ .is_err()
+ {
+ return false;
}
+ self.results.broadcast(write_result);
true
}
fn drained(&mut self, now_ms: i64) {
self.drained_ms = max(self.drained_ms, now_ms);
Review Comment:
The drained_ms field in InnerWriteBatch is not protected by any
synchronization mechanism (not an atomic type and accessed via &mut self).
While the drained method currently takes &mut self which prevents concurrent
modifications, this field could become a source of data races if the API is
later changed or if WriteBatch is shared across threads in unexpected ways.
Consider making drained_ms an AtomicI64 for consistency with the other shared
state fields (completed and attempts), or add documentation that WriteBatch
must not be shared across threads while mutable operations like drained() are
being called.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]