luoyuxia commented on code in PR #143:
URL: https://github.com/apache/fluss-rust/pull/143#discussion_r2679366958


##########
crates/fluss/src/client/table/log_fetch_buffer.rs:
##########
@@ -259,6 +294,121 @@ impl PendingFetch for CompletedPendingFetch {
     }
 }
 
+struct ErrorCompletedFetch {

Review Comment:
   why we need another `ErrorCompletedFetch`? Can we just reuse the 
`CompletedFetch` just like what did in java side? I feel some logic are 
duplicated.



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -1231,3 +1527,274 @@ impl BucketScanStatus {
         *self.high_watermark.write() = high_watermark
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::client::WriteRecord;
+    use crate::client::metadata::Metadata;
+    use crate::cluster::{BucketLocation, Cluster, ServerNode, ServerType};
+    use crate::compression::{
+        ArrowCompressionInfo, ArrowCompressionType, 
DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
+    };
+    use crate::metadata::{DataField, DataTypes, Schema, TableDescriptor, 
TableInfo, TablePath};
+    use crate::record::MemoryLogRecordsArrowBuilder;
+    use crate::row::{Datum, GenericRow};
+    use crate::rpc::FlussError;
+    use std::sync::atomic::{AtomicUsize, Ordering};
+    use tokio::task::yield_now;
+    use tokio::time::sleep;
+
+    fn build_table_info(table_path: TablePath, table_id: i64) -> TableInfo {

Review Comment:
   can we use `build_table_info` in `test_utils`?



##########
crates/fluss/src/client/table/log_fetch_buffer.rs:
##########
@@ -272,6 +422,9 @@ pub struct DefaultCompletedFetch {
     records_read: usize,
     current_record_iterator: Option<LogRecordIterator>,
     current_record_batch: Option<LogRecordBatch>,
+    last_record: Option<ScanRecord>,
+    cached_record_error: Option<String>,
+    corrupt_last_record: bool,
 }
 
 impl DefaultCompletedFetch {

Review Comment:
   actually, we don't make it return `Result`
   ```
   pub fn new(
           table_bucket: TableBucket,
           log_record_batch: LogRecordsBatches,
           size_in_bytes: usize,
           read_context: ReadContext,
           fetch_offset: i64,
           high_watermark: i64,
       ) -> Self {
           Self {
               table_bucket,
               log_record_batch,
               read_context,
               next_fetch_offset: fetch_offset,
               high_watermark,
               size_in_bytes,
               consumed: false,
               initialized: false,
               records_read: 0,
               current_record_iterator: None,
               current_record_batch: None,
               last_record: None,
               cached_record_error: None,
               corrupt_last_record: false,
           }
       }
   ```



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -46,6 +49,7 @@ const LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024;
 const LOG_FETCH_MAX_BYTES_FOR_BUCKET: i32 = 1024;
 const LOG_FETCH_MIN_BYTES: i32 = 1;
 const LOG_FETCH_WAIT_MAX_TIME: i32 = 500;
+const METADATA_REFRESH_MIN_INTERVAL: Duration = Duration::from_secs(1);

Review Comment:
   we do need metadata refresh, but I'd like to wait java side to introduce 
such mechanism.
   Can we remove this in this pr? 



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -787,7 +994,10 @@ impl LogFetcher {
                                     // todo: do we need to consider it like 
java ?
                                     // self.log_fetch_buffer.poll();
                                 }
-                                return Err(e);
+                                if result.is_empty() {

Review Comment:
   why change this. In java side, it seems the exception always be thrown



##########
crates/fluss/src/client/table/log_fetch_buffer.rs:
##########
@@ -360,22 +529,54 @@ impl CompletedFetch for DefaultCompletedFetch {
         &self.table_bucket
     }
 
+    fn api_error(&self) -> Option<&ApiError> {
+        None
+    }
+
+    fn take_error(&mut self) -> Option<Error> {
+        None
+    }
+
     fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>> 
{
-        // todo: handle corrupt_last_record
+        if self.corrupt_last_record {
+            return Err(self.fetch_error());
+        }
+
         if self.consumed {
             return Ok(Vec::new());
         }
 
         let mut scan_records = Vec::new();
 
         for _ in 0..max_records {
-            if let Some(record) = self.next_fetched_record()? {
-                self.next_fetch_offset = record.offset() + 1;
-                self.records_read += 1;
-                scan_records.push(record);
-            } else {
-                break;
+            if self.cached_record_error.is_none() {

Review Comment:
   I'm wondering whether the following code more clear?
   ```
   for _ in 0..max_records {
               if self.cached_record_error.is_some() {
                   break;
               }
               
               self.corrupt_last_record = true;
               match self.next_fetched_record() {
                   Ok(Some(record)) => {
                       self.corrupt_last_record = false;
                       self.next_fetch_offset = record.offset() + 1;
                       self.records_read += 1;
                       scan_records.push(record);
                   }
                   Ok(None) => {
                       self.corrupt_last_record = false;
                       self.last_record = None;
                       break;
                   }
                   Err(e) => {
                       self.cached_record_error = Some(e);
                       break;
                   }
               }
           }
   ```



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -619,6 +747,21 @@ impl LogFetcher {
                 .await

Review Comment:
   actually, we can make `handle_fetch_response` not to return `Result` so that 
we don't need to handle the error in the `handle_fetch_response`. The only 
blocker to prevent it not to return `Result` is that 
   ```
    credentials_cache.get_or_refresh().await?
   ```
   But in the future, we will extract it from method `handle_fetch_response` 
just like we did in java side. 
   So, I'd like to suggest just  `unwrap` get  credentials in this pr, and 
extract it in another folloing pr. So that, we don't need to care about handle 
the error, which is align with Java side.
   



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -894,6 +1182,11 @@ impl LogFetcher {
                     .update_offset(&table_bucket, next_fetch_offset);
             }
 
+            if next_in_line_fetch.is_consumed() && 
next_in_line_fetch.records_read() > 0 {

Review Comment:
   But it remind me that `next_in_line_fetch.drain()`
   should `moveBucketToEnd` just like java.
   Curernt `drain` won't move bucket to end



##########
crates/fluss/src/client/table/log_fetch_buffer.rs:
##########
@@ -148,8 +176,9 @@ impl LogFetchBuffer {
                                 has_completed = true;
                             }
                             Err(e) => {

Review Comment:
   if meet any error, shouldn't it still mark `has_completed` to `false` and 
retry again?
   Otherwsie, the fail pending fetch will be ignore and start to read next 
fetch, which case miss the data in pending fetch, and we'll nerver know that..



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -661,6 +804,70 @@ impl LogFetcher {
                     continue;
                 };
 
+                let error_code = fetch_log_for_bucket

Review Comment:
   will it be clear using the following code
   ```
   if let Some(error_code) = fetch_log_for_bucket.error_code {
                       let api_error:ApiError = ErrorResponse {
                           error_code,
                           error_message: fetch_log_for_bucket.error_message,
                       }.into();
   
                       
log_scanner_status.move_bucket_to_end(table_bucket.clone());
                       log_fetch_buffer.add_api_error(
                           table_bucket.clone(),
                           api_error.clone(),
                           fetch_offset,
                       );
   .....
   ```



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -894,6 +1182,11 @@ impl LogFetcher {
                     .update_offset(&table_bucket, next_fetch_offset);
             }
 
+            if next_in_line_fetch.is_consumed() && 
next_in_line_fetch.records_read() > 0 {

Review Comment:
   Seem no such logic in java side, right?



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -522,6 +631,26 @@ impl LogFetcher {
             return Ok(());
         }
 
+        if self.is_partitioned {

Review Comment:
   Can we just combine it with the following code in line
   
https://github.com/apache/fluss-rust/pull/143/files#diff-540a310c6f8139ea4592281917afecdd364792ffc3655a65221307465cabd09eR655
   ?



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -862,6 +881,10 @@ impl ReadContext {
             .map(|p| p.ordered_fields.as_slice())
     }
 
+    pub fn is_projection_pushdowned(&self) -> bool {

Review Comment:
   seem this method is not used?
   Also, 
   is_projection_pushdowned -> is_projection_push_downed



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -1231,3 +1527,274 @@ impl BucketScanStatus {
         *self.high_watermark.write() = high_watermark
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::client::WriteRecord;
+    use crate::client::metadata::Metadata;
+    use crate::cluster::{BucketLocation, Cluster, ServerNode, ServerType};
+    use crate::compression::{
+        ArrowCompressionInfo, ArrowCompressionType, 
DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
+    };
+    use crate::metadata::{DataField, DataTypes, Schema, TableDescriptor, 
TableInfo, TablePath};
+    use crate::record::MemoryLogRecordsArrowBuilder;
+    use crate::row::{Datum, GenericRow};
+    use crate::rpc::FlussError;
+    use std::sync::atomic::{AtomicUsize, Ordering};
+    use tokio::task::yield_now;
+    use tokio::time::sleep;
+
+    fn build_table_info(table_path: TablePath, table_id: i64) -> TableInfo {
+        let row_type = DataTypes::row(vec![DataField::new(
+            "id".to_string(),
+            DataTypes::int(),
+            None,
+        )]);
+        let mut schema_builder = Schema::builder().with_row_type(&row_type);
+        let schema = schema_builder.build().expect("schema build");
+        let table_descriptor = TableDescriptor::builder()
+            .schema(schema)
+            .distributed_by(Some(1), vec![])
+            .build()
+            .expect("descriptor build");
+        TableInfo::of(table_path, table_id, 1, table_descriptor, 0, 0)
+    }
+
+    fn build_cluster(table_path: &TablePath, table_id: i64) -> Arc<Cluster> {

Review Comment:
   dito



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -833,12 +1055,78 @@ impl LogFetcher {
         &self,
         mut completed_fetch: Box<dyn CompletedFetch>,
     ) -> Result<Option<Box<dyn CompletedFetch>>> {
-        // todo: handle error in initialize fetch
-        let table_bucket = completed_fetch.table_bucket();
+        if let Some(error) = completed_fetch.take_error() {
+            return Err(error);
+        }
+
+        let table_bucket = completed_fetch.table_bucket().clone();
         let fetch_offset = completed_fetch.next_fetch_offset();
 
+        if let Some(api_error) = completed_fetch.api_error() {
+            let error = FlussError::for_code(api_error.code);
+            let error_message = api_error.message.as_str();
+            self.log_scanner_status
+                .move_bucket_to_end(table_bucket.clone());
+            match error {
+                FlussError::NotLeaderOrFollower
+                | FlussError::LogStorageException
+                | FlussError::KvStorageException
+                | FlussError::StorageException
+                | FlussError::FencedLeaderEpochException => {
+                    debug!("Error in fetch for bucket {table_bucket}: 
{error:?}: {error_message}");
+                    self.schedule_metadata_update(error);
+                    return Ok(None);
+                }
+                FlussError::UnknownTableOrBucketException => {
+                    warn!(
+                        "Received unknown table or bucket error in fetch for 
bucket {table_bucket}"
+                    );
+                    self.schedule_metadata_update(error);
+                    return Ok(None);
+                }
+                FlussError::LogOffsetOutOfRangeException => {
+                    return Err(Error::UnexpectedError {
+                        message: format!(
+                            "The fetching offset {fetch_offset} is out of 
range: {error_message}"
+                        ),
+                        source: None,
+                    });
+                }
+                FlussError::AuthorizationException => {
+                    return Err(Error::FlussAPIError {
+                        api_error: ApiError {
+                            code: api_error.code,
+                            message: api_error.message.to_string(),
+                        },
+                    });
+                }
+                FlussError::UnknownServerError => {
+                    warn!(
+                        "Unknown server error while fetching offset 
{fetch_offset} for bucket {table_bucket}: {error_message}"
+                    );
+                    return Ok(None);
+                }
+                FlussError::CorruptMessage => {
+                    return Err(Error::UnexpectedError {
+                        message: format!(
+                            "Encountered corrupt message when fetching offset 
{fetch_offset} for bucket {table_bucket}: {error_message}"
+                        ),
+                        source: None,
+                    });
+                }
+                _ => {
+                    return Err(Error::UnexpectedError {
+                        message: format!(
+                            "Unexpected error code {error:?} while fetching at 
offset {fetch_offset} from bucket {table_bucket}: {error_message}"
+                        ),
+                        source: None,
+                    });
+                }
+            }

Review Comment:
   is it possible to extract the duplicate code?



##########
crates/fluss/src/rpc/message/list_offsets.rs:
##########
@@ -108,22 +108,54 @@ impl ListOffsetsResponse {
         self.buckets_resp
             .iter()
             .map(|resp| {
-                if resp.error_code.is_some() {
-                    // todo: consider use another suitable error
-                    Err(Error::UnexpectedError {
+                if let Some(error_code) = resp.error_code {

Review Comment:
   nit:
   ```
   if let Some(error_code) = resp.error_code && error_code != 
FlussError::None.code() {
                       let api_error = ErrorResponse {
                           error_code,
                           error_message: resp.error_message.clone(),
                       }.into();
                       return Err(Error::FlussAPIError {
                           api_error
                       });
                   }
   ```



##########
crates/fluss/src/client/table/log_fetch_buffer.rs:
##########
@@ -75,34 +78,36 @@ impl LogFetchBuffer {
         self.completed_fetches.lock().is_empty()
     }
 
-    /// Wait for the buffer to become non-empty, with timeout
-    /// Returns true if data became available, false if timeout
-    pub async fn await_not_empty(&self, timeout: Duration) -> bool {
+    /// Wait for the buffer to become non-empty, with timeout.
+    /// Returns true if data became available, false if timeout.
+    pub async fn await_not_empty(&self, timeout: Duration) -> Result<bool> {
         let deadline = std::time::Instant::now() + timeout;
 
         loop {
             // Check if buffer is not empty
             if !self.is_empty() {
-                return true;
+                return Ok(true);
             }
 
             // Check if woken up
             if self.woken_up.swap(false, Ordering::Acquire) {
-                return true;
+                return Err(Error::WakeupError {
+                    message: "The await is wakeup.".to_string(),

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to