luoyuxia commented on code in PR #132:
URL: https://github.com/apache/fluss-rust/pull/132#discussion_r2678587336
##########
crates/fluss/src/client/write/batch.rs:
##########
@@ -175,7 +218,13 @@ impl ArrowLogWriteBatch {
}
pub fn build(&self) -> Result<Vec<u8>> {
- self.arrow_builder.build()
+ let mut cached = self.built_records.lock();
+ if let Some(bytes) = cached.as_ref() {
+ return Ok(bytes.clone());
+ }
+ let bytes = self.arrow_builder.build()?;
+ *cached = Some(bytes.clone());
Review Comment:
It'll bring another bytes copy here which is time and memory cost.
The key is that the proto `PbProduceLogReqForBucket#records` only accept
Vec<u8> now, so we need another copy.
But we can make `PbProduceLogReqForBucket#records` accept `Bytes` whose
clone is just clone a reference.
In `build.rs`, we can do that
```
fn main() -> Result<()> {
let mut config = prost_build::Config::new();
config.bytes([".proto.PbProduceLogReqForBucket.records"]);
config.compile_protos(&["src/proto/fluss_api.proto"], &["src/proto"])?;
Ok(())
}
```
##########
crates/fluss/src/client/metadata.rs:
##########
@@ -135,7 +135,101 @@ impl Metadata {
guard.clone()
}
- pub fn leader_for(&self, _table_bucket: &TableBucket) ->
Option<&ServerNode> {
- todo!()
+ pub fn leader_for(&self, table_bucket: &TableBucket) -> Option<ServerNode>
{
+ let cluster = self.cluster.read();
+ cluster.leader_for(table_bucket).cloned()
+ }
+}
+
+#[cfg(test)]
+impl Metadata {
+ pub(crate) fn new_for_test(cluster: Arc<Cluster>) -> Self {
+ Metadata {
+ cluster: RwLock::new(cluster),
+ connections: Arc::new(RpcClient::new()),
+ bootstrap: Arc::from(""),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::cluster::{BucketLocation, Cluster, ServerNode, ServerType};
+ use crate::metadata::{DataField, DataTypes, Schema, TableDescriptor,
TableInfo, TablePath};
+ use std::collections::HashMap;
+
+ fn build_table_info(table_path: TablePath, table_id: i64) -> TableInfo {
Review Comment:
I found accumulator also have `build_table_info`, `build_cluster`?
Could we extract them into a shared method?
##########
crates/fluss/src/client/write/sender.rs:
##########
@@ -132,78 +133,502 @@ impl Sender {
&self,
destination: i32,
acks: i16,
- batches: &Vec<Arc<ReadyWriteBatch>>,
+ batches: Vec<ReadyWriteBatch>,
) -> Result<()> {
if batches.is_empty() {
return Ok(());
}
- let mut records_by_bucket = HashMap::new();
- let mut write_batch_by_table = HashMap::new();
+ let mut records_by_bucket: HashMap<TableBucket, ReadyWriteBatch> =
HashMap::new();
+ let mut write_batch_by_table: HashMap<i64, Vec<TableBucket>> =
HashMap::new();
for batch in batches {
- records_by_bucket.insert(batch.table_bucket.clone(),
batch.clone());
+ let table_bucket = batch.table_bucket.clone();
write_batch_by_table
- .entry(batch.table_bucket.table_id())
- .or_insert_with(Vec::new)
- .push(batch);
+ .entry(table_bucket.table_id())
+ .or_default()
+ .push(table_bucket.clone());
+ records_by_bucket.insert(table_bucket, batch);
}
let cluster = self.metadata.get_cluster();
- let destination_node =
- cluster
- .get_tablet_server(destination)
- .ok_or(Error::LeaderNotAvailable {
- message: format!("destination node not found in metadata
cache {destination}."),
- })?;
- let connection = self.metadata.get_connection(destination_node).await?;
+ let destination_node = match cluster.get_tablet_server(destination) {
+ Some(node) => node,
+ None => {
+ self.handle_batches_with_error(
+ records_by_bucket.into_values().collect(),
+ FlussError::LeaderNotAvailableException,
+ format!("Destination node not found in metadata cache
{destination}."),
+ )
+ .await?;
+ return Ok(());
+ }
+ };
+ let connection = match
self.metadata.get_connection(destination_node).await {
+ Ok(connection) => connection,
+ Err(e) => {
+ self.handle_batches_with_error(
+ records_by_bucket.into_values().collect(),
+ FlussError::NetworkException,
+ format!("Failed to connect destination node {destination}:
{e}"),
+ )
+ .await?;
+ return Ok(());
+ }
+ };
+
+ for (table_id, table_buckets) in write_batch_by_table {
+ let request_batches: Vec<&ReadyWriteBatch> = table_buckets
+ .iter()
+ .filter_map(|bucket| records_by_bucket.get(bucket))
+ .collect();
+ if request_batches.is_empty() {
+ continue;
+ }
+ let request = match ProduceLogRequest::new(
+ table_id,
+ acks,
+ self.max_request_timeout_ms,
+ request_batches.as_slice(),
+ ) {
+ Ok(request) => request,
+ Err(e) => {
+ self.handle_batches_with_error(
+ table_buckets
+ .iter()
+ .filter_map(|bucket|
records_by_bucket.remove(bucket))
+ .collect(),
+ FlussError::UnknownServerError,
Review Comment:
actually, it's not unknow server error, it's just client error while
building request.
##########
crates/fluss/src/client/write/accumulator.rs:
##########
@@ -342,6 +342,29 @@ impl RecordAccumulator {
self.incomplete_batches.write().remove(&batch_id);
}
+ pub async fn re_enqueue(&self, ready_write_batch: ReadyWriteBatch) {
+ ready_write_batch.write_batch.re_enqueued();
+ let table_path = ready_write_batch.write_batch.table_path().clone();
+ let bucket_id = ready_write_batch.table_bucket.bucket_id();
+ let table_id =
u64::try_from(ready_write_batch.table_bucket.table_id()).unwrap_or(0);
+ let mut binding =
+ self.write_batches
+ .entry(table_path)
+ .or_insert_with(|| BucketAndWriteBatches {
+ table_id,
+ is_partitioned_table: false,
+ partition_id: None,
+ batches: Default::default(),
+ });
+ let bucket_and_batches = binding.value_mut();
+ let dq = bucket_and_batches
+ .batches
+ .entry(bucket_id)
+ .or_insert_with(|| Mutex::new(VecDeque::new()));
+ let mut dq_guard = dq.lock().await;
Review Comment:
I found a issue that it while `await` the lock, it still hold the
`write_batches` lock which may cause panic or deadlock.
I create a issue #135 to track it, but now, let's keep in this pr.
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -74,49 +74,11 @@ impl<'a> TableScan<'a> {
/// Returns an error if `column_indices` is empty or if any column index
is out of range.
///
/// # Example
- /// ```
- /// # use fluss::client::FlussConnection;
- /// # use fluss::config::Config;
- /// # use fluss::error::Result;
- /// # use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
- /// # use fluss::row::InternalRow;
- /// # use std::time::Duration;
- ///
- /// # pub async fn example() -> Result<()> {
- /// let mut config = Config::default();
- /// config.bootstrap_server = Some("127.0.0.1:9123".to_string());
- /// let conn = FlussConnection::new(config).await?;
- ///
- /// let table_descriptor = TableDescriptor::builder()
- /// .schema(
- /// Schema::builder()
- /// .column("col1", DataTypes::int())
- /// .column("col2", DataTypes::string())
- /// .column("col3", DataTypes::string())
- /// .column("col3", DataTypes::string())
- /// .build()?,
- /// ).build()?;
- /// let table_path = TablePath::new("fluss".to_owned(),
"rust_test_long".to_owned());
- /// let admin = conn.get_admin().await?;
- /// admin.create_table(&table_path, &table_descriptor, true)
- /// .await?;
- /// let table_info = admin.get_table(&table_path).await?;
- /// let table = conn.get_table(&table_path).await?;
- ///
- /// // Project columns by indices
- /// let scanner = table.new_scan().project(&[0, 2,
3])?.create_log_scanner()?;
- /// let scan_records = scanner.poll(Duration::from_secs(10)).await?;
- /// for record in scan_records {
- /// let row = record.row();
- /// println!(
- /// "{{{}, {}, {}}}@{}",
- /// row.get_int(0),
- /// row.get_string(2),
- /// row.get_string(3),
- /// record.offset()
- /// );
- /// }
- /// # Ok(())
+ /// ```no_run
+ /// # fn example() -> fluss::error::Result<()> {
+ /// # let table: fluss::client::FlussTable<'_> = todo!("requires a Fluss
connection");
Review Comment:
why remove the origin example code and left a todo? Could we just reuse some
of the origin example code?
##########
crates/fluss/src/client/write/sender.rs:
##########
@@ -132,78 +133,502 @@ impl Sender {
&self,
destination: i32,
acks: i16,
- batches: &Vec<Arc<ReadyWriteBatch>>,
+ batches: Vec<ReadyWriteBatch>,
) -> Result<()> {
if batches.is_empty() {
return Ok(());
}
- let mut records_by_bucket = HashMap::new();
- let mut write_batch_by_table = HashMap::new();
+ let mut records_by_bucket: HashMap<TableBucket, ReadyWriteBatch> =
HashMap::new();
+ let mut write_batch_by_table: HashMap<i64, Vec<TableBucket>> =
HashMap::new();
for batch in batches {
- records_by_bucket.insert(batch.table_bucket.clone(),
batch.clone());
+ let table_bucket = batch.table_bucket.clone();
write_batch_by_table
- .entry(batch.table_bucket.table_id())
- .or_insert_with(Vec::new)
- .push(batch);
+ .entry(table_bucket.table_id())
+ .or_default()
+ .push(table_bucket.clone());
+ records_by_bucket.insert(table_bucket, batch);
}
let cluster = self.metadata.get_cluster();
- let destination_node =
- cluster
- .get_tablet_server(destination)
- .ok_or(Error::LeaderNotAvailable {
- message: format!("destination node not found in metadata
cache {destination}."),
- })?;
- let connection = self.metadata.get_connection(destination_node).await?;
+ let destination_node = match cluster.get_tablet_server(destination) {
+ Some(node) => node,
+ None => {
+ self.handle_batches_with_error(
+ records_by_bucket.into_values().collect(),
+ FlussError::LeaderNotAvailableException,
+ format!("Destination node not found in metadata cache
{destination}."),
+ )
+ .await?;
+ return Ok(());
+ }
+ };
+ let connection = match
self.metadata.get_connection(destination_node).await {
+ Ok(connection) => connection,
+ Err(e) => {
+ self.handle_batches_with_error(
+ records_by_bucket.into_values().collect(),
+ FlussError::NetworkException,
+ format!("Failed to connect destination node {destination}:
{e}"),
+ )
+ .await?;
+ return Ok(());
+ }
+ };
+
+ for (table_id, table_buckets) in write_batch_by_table {
+ let request_batches: Vec<&ReadyWriteBatch> = table_buckets
+ .iter()
+ .filter_map(|bucket| records_by_bucket.get(bucket))
+ .collect();
+ if request_batches.is_empty() {
+ continue;
+ }
+ let request = match ProduceLogRequest::new(
+ table_id,
+ acks,
+ self.max_request_timeout_ms,
+ request_batches.as_slice(),
+ ) {
+ Ok(request) => request,
+ Err(e) => {
+ self.handle_batches_with_error(
+ table_buckets
+ .iter()
+ .filter_map(|bucket|
records_by_bucket.remove(bucket))
+ .collect(),
+ FlussError::UnknownServerError,
+ format!("Failed to build produce request: {e}"),
+ )
+ .await?;
+ continue;
+ }
+ };
- for (table_id, write_batches) in write_batch_by_table {
- let request =
- ProduceLogRequest::new(table_id, acks,
self.max_request_timeout_ms, write_batches)?;
- let response = connection.request(request).await?;
- self.handle_produce_response(table_id, &records_by_bucket,
response)?
+ let response = match connection.request(request).await {
+ Ok(response) => response,
+ Err(e) => {
+ self.handle_batches_with_error(
+ table_buckets
+ .iter()
+ .filter_map(|bucket|
records_by_bucket.remove(bucket))
+ .collect(),
+ FlussError::NetworkException,
+ format!("Failed to send produce request: {e}"),
+ )
+ .await?;
+ continue;
+ }
+ };
+
+ self.handle_produce_response(
+ table_id,
+ &table_buckets,
+ &mut records_by_bucket,
+ response,
+ )
+ .await?;
}
Ok(())
}
- fn handle_produce_response(
+ async fn handle_produce_response(
&self,
table_id: i64,
- records_by_bucket: &HashMap<TableBucket, Arc<ReadyWriteBatch>>,
+ request_buckets: &[TableBucket],
+ records_by_bucket: &mut HashMap<TableBucket, ReadyWriteBatch>,
response: ProduceLogResponse,
) -> Result<()> {
+ let mut invalid_metadata_tables: HashSet<TablePath> = HashSet::new();
+ let mut pending_buckets: HashSet<TableBucket> =
request_buckets.iter().cloned().collect();
for produce_log_response_for_bucket in response.buckets_resp.iter() {
let tb = TableBucket::new(table_id,
produce_log_response_for_bucket.bucket_id);
- let ready_batch = records_by_bucket.get(&tb).unwrap();
+ let Some(ready_batch) = records_by_bucket.remove(&tb) else {
+ warn!("Missing ready batch for table bucket {tb}");
+ continue;
+ };
+ pending_buckets.remove(&tb);
+
if let Some(error_code) =
produce_log_response_for_bucket.error_code {
- todo!("handle_produce_response error: {}", error_code)
+ if error_code == FlussError::None.code() {
+ self.complete_batch(ready_batch);
+ continue;
+ }
+
+ let error = FlussError::for_code(error_code);
+ let message = produce_log_response_for_bucket
+ .error_message
+ .clone()
+ .unwrap_or_else(|| error.message().to_string());
+ if let Some(table_path) = self
+ .handle_write_batch_error(ready_batch, error, message)
+ .await?
+ {
+ invalid_metadata_tables.insert(table_path);
+ }
} else {
self.complete_batch(ready_batch)
}
}
+ if !pending_buckets.is_empty() {
+ for bucket in pending_buckets {
+ if let Some(ready_batch) = records_by_bucket.remove(&bucket) {
+ let message =
+ format!("Missing response for table bucket {bucket} in
produce response.");
Review Comment:
when will it happen? I'm think if it won't happen, can we just panic to make
code clean?
##########
crates/fluss/src/client/write/mod.rs:
##########
@@ -81,10 +81,16 @@ impl ResultHandle {
}
pub fn result(&self, batch_result: BatchWriteResult) -> Result<(), Error> {
- // do nothing, just return empty result
- batch_result.map_err(|e| Error::UnexpectedError {
- message: format!("Fail to get write result {e:?}"),
- source: None,
+ batch_result.map_err(|e| match e {
+ crate::client::broadcast::Error::WriteFailed { code, message } => {
Review Comment:
nit:
```suggestion
broadcast::Error::WriteFailed { code, message } => {
```
##########
crates/fluss/src/client/write/mod.rs:
##########
@@ -81,10 +81,16 @@ impl ResultHandle {
}
pub fn result(&self, batch_result: BatchWriteResult) -> Result<(), Error> {
- // do nothing, just return empty result
- batch_result.map_err(|e| Error::UnexpectedError {
- message: format!("Fail to get write result {e:?}"),
- source: None,
+ batch_result.map_err(|e| match e {
+ crate::client::broadcast::Error::WriteFailed { code, message } => {
+ Error::FlussAPIError {
+ api_error: crate::rpc::ApiError { code, message },
+ }
+ }
+ crate::client::broadcast::Error::Dropped => Error::UnexpectedError
{
Review Comment:
dito
##########
crates/fluss/src/client/write/sender.rs:
##########
@@ -132,78 +133,502 @@ impl Sender {
&self,
destination: i32,
acks: i16,
- batches: &Vec<Arc<ReadyWriteBatch>>,
+ batches: Vec<ReadyWriteBatch>,
) -> Result<()> {
if batches.is_empty() {
return Ok(());
}
- let mut records_by_bucket = HashMap::new();
- let mut write_batch_by_table = HashMap::new();
+ let mut records_by_bucket: HashMap<TableBucket, ReadyWriteBatch> =
HashMap::new();
Review Comment:
nit:
```
let mut records_by_bucket = HashMap::new();
```
##########
crates/fluss/src/client/write/sender.rs:
##########
@@ -132,78 +133,502 @@ impl Sender {
&self,
destination: i32,
acks: i16,
- batches: &Vec<Arc<ReadyWriteBatch>>,
+ batches: Vec<ReadyWriteBatch>,
) -> Result<()> {
if batches.is_empty() {
return Ok(());
}
- let mut records_by_bucket = HashMap::new();
- let mut write_batch_by_table = HashMap::new();
+ let mut records_by_bucket: HashMap<TableBucket, ReadyWriteBatch> =
HashMap::new();
+ let mut write_batch_by_table: HashMap<i64, Vec<TableBucket>> =
HashMap::new();
for batch in batches {
- records_by_bucket.insert(batch.table_bucket.clone(),
batch.clone());
+ let table_bucket = batch.table_bucket.clone();
write_batch_by_table
- .entry(batch.table_bucket.table_id())
- .or_insert_with(Vec::new)
- .push(batch);
+ .entry(table_bucket.table_id())
+ .or_default()
+ .push(table_bucket.clone());
+ records_by_bucket.insert(table_bucket, batch);
}
let cluster = self.metadata.get_cluster();
- let destination_node =
- cluster
- .get_tablet_server(destination)
- .ok_or(Error::LeaderNotAvailable {
- message: format!("destination node not found in metadata
cache {destination}."),
- })?;
- let connection = self.metadata.get_connection(destination_node).await?;
+ let destination_node = match cluster.get_tablet_server(destination) {
+ Some(node) => node,
+ None => {
+ self.handle_batches_with_error(
+ records_by_bucket.into_values().collect(),
+ FlussError::LeaderNotAvailableException,
+ format!("Destination node not found in metadata cache
{destination}."),
+ )
+ .await?;
+ return Ok(());
+ }
+ };
+ let connection = match
self.metadata.get_connection(destination_node).await {
+ Ok(connection) => connection,
+ Err(e) => {
+ self.handle_batches_with_error(
+ records_by_bucket.into_values().collect(),
+ FlussError::NetworkException,
+ format!("Failed to connect destination node {destination}:
{e}"),
+ )
+ .await?;
+ return Ok(());
+ }
+ };
+
+ for (table_id, table_buckets) in write_batch_by_table {
+ let request_batches: Vec<&ReadyWriteBatch> = table_buckets
+ .iter()
+ .filter_map(|bucket| records_by_bucket.get(bucket))
+ .collect();
+ if request_batches.is_empty() {
+ continue;
+ }
+ let request = match ProduceLogRequest::new(
+ table_id,
+ acks,
+ self.max_request_timeout_ms,
+ request_batches.as_slice(),
+ ) {
+ Ok(request) => request,
+ Err(e) => {
+ self.handle_batches_with_error(
+ table_buckets
+ .iter()
+ .filter_map(|bucket|
records_by_bucket.remove(bucket))
+ .collect(),
+ FlussError::UnknownServerError,
+ format!("Failed to build produce request: {e}"),
+ )
+ .await?;
+ continue;
+ }
+ };
- for (table_id, write_batches) in write_batch_by_table {
- let request =
- ProduceLogRequest::new(table_id, acks,
self.max_request_timeout_ms, write_batches)?;
- let response = connection.request(request).await?;
- self.handle_produce_response(table_id, &records_by_bucket,
response)?
+ let response = match connection.request(request).await {
+ Ok(response) => response,
+ Err(e) => {
+ self.handle_batches_with_error(
+ table_buckets
+ .iter()
+ .filter_map(|bucket|
records_by_bucket.remove(bucket))
+ .collect(),
+ FlussError::NetworkException,
+ format!("Failed to send produce request: {e}"),
+ )
+ .await?;
+ continue;
+ }
+ };
+
+ self.handle_produce_response(
+ table_id,
+ &table_buckets,
+ &mut records_by_bucket,
+ response,
+ )
+ .await?;
}
Ok(())
}
- fn handle_produce_response(
+ async fn handle_produce_response(
&self,
table_id: i64,
- records_by_bucket: &HashMap<TableBucket, Arc<ReadyWriteBatch>>,
+ request_buckets: &[TableBucket],
+ records_by_bucket: &mut HashMap<TableBucket, ReadyWriteBatch>,
response: ProduceLogResponse,
) -> Result<()> {
+ let mut invalid_metadata_tables: HashSet<TablePath> = HashSet::new();
+ let mut pending_buckets: HashSet<TableBucket> =
request_buckets.iter().cloned().collect();
for produce_log_response_for_bucket in response.buckets_resp.iter() {
let tb = TableBucket::new(table_id,
produce_log_response_for_bucket.bucket_id);
- let ready_batch = records_by_bucket.get(&tb).unwrap();
+ let Some(ready_batch) = records_by_bucket.remove(&tb) else {
+ warn!("Missing ready batch for table bucket {tb}");
+ continue;
+ };
+ pending_buckets.remove(&tb);
+
if let Some(error_code) =
produce_log_response_for_bucket.error_code {
- todo!("handle_produce_response error: {}", error_code)
+ if error_code == FlussError::None.code() {
+ self.complete_batch(ready_batch);
+ continue;
+ }
+
+ let error = FlussError::for_code(error_code);
+ let message = produce_log_response_for_bucket
+ .error_message
+ .clone()
+ .unwrap_or_else(|| error.message().to_string());
+ if let Some(table_path) = self
+ .handle_write_batch_error(ready_batch, error, message)
+ .await?
+ {
+ invalid_metadata_tables.insert(table_path);
+ }
} else {
self.complete_batch(ready_batch)
}
}
+ if !pending_buckets.is_empty() {
+ for bucket in pending_buckets {
+ if let Some(ready_batch) = records_by_bucket.remove(&bucket) {
+ let message =
+ format!("Missing response for table bucket {bucket} in
produce response.");
+ let error = FlussError::UnknownServerError;
+ if let Some(table_path) = self
+ .handle_write_batch_error(ready_batch, error, message)
+ .await?
+ {
+ invalid_metadata_tables.insert(table_path);
+ }
+ }
+ }
+ }
+ self.update_metadata_if_needed(invalid_metadata_tables)
+ .await;
Ok(())
}
- fn complete_batch(&self, ready_write_batch: &Arc<ReadyWriteBatch>) {
- if ready_write_batch.write_batch.complete(Ok(())) {
- // remove from in flight batches
- let mut in_flight_guard = self.in_flight_batches.lock();
- if let Some(in_flight) =
in_flight_guard.get_mut(&ready_write_batch.table_bucket) {
- in_flight.retain(|b| !Arc::ptr_eq(b, ready_write_batch));
- if in_flight.is_empty() {
- in_flight_guard.remove(&ready_write_batch.table_bucket);
- }
- }
+ fn complete_batch(&self, ready_write_batch: ReadyWriteBatch) {
+ self.finish_batch(ready_write_batch, Ok(()));
+ }
+
+ fn fail_batch(&self, ready_write_batch: ReadyWriteBatch, error:
broadcast::Error) {
+ self.finish_batch(ready_write_batch, Err(error));
+ }
+
+ fn finish_batch(&self, ready_write_batch: ReadyWriteBatch, result:
broadcast::Result<()>) {
+ if ready_write_batch.write_batch.complete(result) {
+ self.remove_from_inflight_batches(&ready_write_batch);
// remove from incomplete batches
self.accumulator
.remove_incomplete_batches(ready_write_batch.write_batch.batch_id())
}
}
+ async fn handle_batches_with_error(
+ &self,
+ batches: Vec<ReadyWriteBatch>,
+ error: FlussError,
+ message: String,
+ ) -> Result<()> {
+ let mut invalid_metadata_tables: HashSet<TablePath> = HashSet::new();
+ for batch in batches {
+ if let Some(table_path) = self
+ .handle_write_batch_error(batch, error, message.clone())
+ .await?
+ {
+ invalid_metadata_tables.insert(table_path);
+ }
+ }
+ self.update_metadata_if_needed(invalid_metadata_tables)
+ .await;
+ Ok(())
+ }
+
+ async fn handle_write_batch_error(
+ &self,
+ ready_write_batch: ReadyWriteBatch,
+ error: FlussError,
+ message: String,
+ ) -> Result<Option<TablePath>> {
+ let table_path = ready_write_batch.write_batch.table_path().clone();
+ if self.can_retry(&ready_write_batch, error) {
+ warn!(
+ "Retrying write batch for {table_path} on bucket {} after
error {error:?}: {message}",
+ ready_write_batch.table_bucket.bucket_id()
+ );
+ self.re_enqueue_batch(ready_write_batch).await;
+ return
Ok(Self::is_invalid_metadata_error(error).then_some(table_path));
+ }
+
+ if error == FlussError::DuplicateSequenceException {
+ self.complete_batch(ready_write_batch);
+ return Ok(None);
+ }
+
+ self.fail_batch(
Review Comment:
nit: add warn just like java side?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]