fresh-borzoni commented on code in PR #411:
URL: https://github.com/apache/fluss-rust/pull/411#discussion_r2918722599


##########
crates/fluss/src/client/table/lookup.rs:
##########
@@ -59,32 +61,80 @@ impl LookupResult {
     /// `CompactedRow` borrows from this result set and cannot outlive it.
     ///
     /// # Returns
-    /// - `Ok(Some(row))`: If exactly one row exists.
-    /// - `Ok(None)`: If the result set is empty.
-    /// - `Err(Error::UnexpectedError)`: If the result set contains more than 
one row.

Review Comment:
   why did we remove this?



##########
crates/fluss/src/client/table/lookup.rs:
##########
@@ -59,32 +61,80 @@ impl LookupResult {
     /// `CompactedRow` borrows from this result set and cannot outlive it.
     ///
     /// # Returns
-    /// - `Ok(Some(row))`: If exactly one row exists.
-    /// - `Ok(None)`: If the result set is empty.
-    /// - `Err(Error::UnexpectedError)`: If the result set contains more than 
one row.
+    /// - `Ok(rows)` - All rows in the result set.
+    /// - `Err(Error)` - If any row payload is too short to contain a schema 
id.
     ///
     pub fn get_single_row(&self) -> Result<Option<CompactedRow<'_>>> {
         match self.rows.len() {
             0 => Ok(None),
-            1 => Ok(Some(CompactedRow::from_bytes(
-                &self.row_type,
-                &self.rows[0][SCHEMA_ID_LENGTH..],
-            ))),
+            1 => {
+                let payload =
+                    self.rows[0]
+                        .get(SCHEMA_ID_LENGTH..)
+                        .ok_or_else(|| Error::UnexpectedError {
+                            message: format!(
+                                "Row payload too short: {} bytes, need at 
least {} for schema id",
+                                self.rows[0].len(),
+                                SCHEMA_ID_LENGTH
+                            ),
+                            source: None,
+                        })?;
+                Ok(Some(CompactedRow::from_bytes(&self.row_type, payload)))
+            }
             _ => Err(Error::UnexpectedError {
                 message: "LookupResult contains multiple rows, use get_rows() 
instead".to_string(),
                 source: None,
             }),
         }
     }
 
-    /// Returns all rows as CompactedRows.
-    pub fn get_rows(&self) -> Vec<CompactedRow<'_>> {
+    pub fn get_rows(&self) -> Result<Vec<CompactedRow<'_>>> {
         self.rows
             .iter()
-            // TODO Add schema id check and fetch when implementing prefix 
lookup
-            .map(|bytes| CompactedRow::from_bytes(&self.row_type, 
&bytes[SCHEMA_ID_LENGTH..]))
+            .map(|bytes| {
+                let payload =
+                    bytes
+                        .get(SCHEMA_ID_LENGTH..)

Review Comment:
   schema checks are duplicates, let's move to helper, wdyt?



##########
crates/fluss/src/client/table/lookup.rs:
##########
@@ -59,32 +61,80 @@ impl LookupResult {
     /// `CompactedRow` borrows from this result set and cannot outlive it.
     ///
     /// # Returns
-    /// - `Ok(Some(row))`: If exactly one row exists.
-    /// - `Ok(None)`: If the result set is empty.
-    /// - `Err(Error::UnexpectedError)`: If the result set contains more than 
one row.
+    /// - `Ok(rows)` - All rows in the result set.
+    /// - `Err(Error)` - If any row payload is too short to contain a schema 
id.
     ///
     pub fn get_single_row(&self) -> Result<Option<CompactedRow<'_>>> {
         match self.rows.len() {
             0 => Ok(None),
-            1 => Ok(Some(CompactedRow::from_bytes(
-                &self.row_type,
-                &self.rows[0][SCHEMA_ID_LENGTH..],
-            ))),
+            1 => {
+                let payload =
+                    self.rows[0]
+                        .get(SCHEMA_ID_LENGTH..)
+                        .ok_or_else(|| Error::UnexpectedError {
+                            message: format!(
+                                "Row payload too short: {} bytes, need at 
least {} for schema id",
+                                self.rows[0].len(),
+                                SCHEMA_ID_LENGTH
+                            ),
+                            source: None,
+                        })?;
+                Ok(Some(CompactedRow::from_bytes(&self.row_type, payload)))
+            }
             _ => Err(Error::UnexpectedError {
                 message: "LookupResult contains multiple rows, use get_rows() 
instead".to_string(),
                 source: None,
             }),
         }
     }
 
-    /// Returns all rows as CompactedRows.
-    pub fn get_rows(&self) -> Vec<CompactedRow<'_>> {
+    pub fn get_rows(&self) -> Result<Vec<CompactedRow<'_>>> {
         self.rows
             .iter()
-            // TODO Add schema id check and fetch when implementing prefix 
lookup

Review Comment:
   this comment should stay



##########
crates/fluss/src/client/table/lookup.rs:
##########
@@ -59,32 +61,80 @@ impl LookupResult {
     /// `CompactedRow` borrows from this result set and cannot outlive it.
     ///
     /// # Returns
-    /// - `Ok(Some(row))`: If exactly one row exists.
-    /// - `Ok(None)`: If the result set is empty.
-    /// - `Err(Error::UnexpectedError)`: If the result set contains more than 
one row.
+    /// - `Ok(rows)` - All rows in the result set.
+    /// - `Err(Error)` - If any row payload is too short to contain a schema 
id.
     ///
     pub fn get_single_row(&self) -> Result<Option<CompactedRow<'_>>> {
         match self.rows.len() {
             0 => Ok(None),
-            1 => Ok(Some(CompactedRow::from_bytes(
-                &self.row_type,
-                &self.rows[0][SCHEMA_ID_LENGTH..],
-            ))),
+            1 => {
+                let payload =
+                    self.rows[0]
+                        .get(SCHEMA_ID_LENGTH..)
+                        .ok_or_else(|| Error::UnexpectedError {
+                            message: format!(
+                                "Row payload too short: {} bytes, need at 
least {} for schema id",
+                                self.rows[0].len(),
+                                SCHEMA_ID_LENGTH
+                            ),
+                            source: None,
+                        })?;
+                Ok(Some(CompactedRow::from_bytes(&self.row_type, payload)))
+            }
             _ => Err(Error::UnexpectedError {
                 message: "LookupResult contains multiple rows, use get_rows() 
instead".to_string(),
                 source: None,
             }),
         }
     }
 
-    /// Returns all rows as CompactedRows.
-    pub fn get_rows(&self) -> Vec<CompactedRow<'_>> {
+    pub fn get_rows(&self) -> Result<Vec<CompactedRow<'_>>> {
         self.rows
             .iter()
-            // TODO Add schema id check and fetch when implementing prefix 
lookup
-            .map(|bytes| CompactedRow::from_bytes(&self.row_type, 
&bytes[SCHEMA_ID_LENGTH..]))
+            .map(|bytes| {
+                let payload =
+                    bytes
+                        .get(SCHEMA_ID_LENGTH..)
+                        .ok_or_else(|| Error::UnexpectedError {
+                            message: format!(
+                                "Row payload too short: {} bytes, need at 
least {} for schema id",
+                                bytes.len(),
+                                SCHEMA_ID_LENGTH
+                            ),
+                            source: None,
+                        })?;
+                Ok(CompactedRow::from_bytes(&self.row_type, payload))
+            })
             .collect()
     }
+    /// Converts all rows in this result into an Arrow [`RecordBatch`].
+    ///
+    /// This is useful for integration with DataFusion or other Arrow-based 
tools.
+    ///
+    /// # Returns
+    /// - `Ok(RecordBatch)` - All rows in columnar Arrow format. Returns an 
empty
+    ///   batch (with the correct schema) if the result set is empty.
+    /// - `Err(Error)` - If the conversion fails.
+    pub fn to_record_batch(&self) -> Result<RecordBatch> {
+        let mut builder = RowAppendRecordBatchBuilder::new(&self.row_type)?;
+
+        for bytes in &self.rows {
+            let payload = bytes.get(SCHEMA_ID_LENGTH..).ok_or_else(|| {
+                Error::RowConvertError {

Review Comment:
   different error types for the same schema check - let's use RowConvertError 
then in other places



##########
crates/fluss/src/client/table/lookup.rs:
##########
@@ -59,32 +61,80 @@ impl LookupResult {
     /// `CompactedRow` borrows from this result set and cannot outlive it.
     ///
     /// # Returns
-    /// - `Ok(Some(row))`: If exactly one row exists.
-    /// - `Ok(None)`: If the result set is empty.
-    /// - `Err(Error::UnexpectedError)`: If the result set contains more than 
one row.
+    /// - `Ok(rows)` - All rows in the result set.
+    /// - `Err(Error)` - If any row payload is too short to contain a schema 
id.
     ///
     pub fn get_single_row(&self) -> Result<Option<CompactedRow<'_>>> {
         match self.rows.len() {
             0 => Ok(None),
-            1 => Ok(Some(CompactedRow::from_bytes(
-                &self.row_type,
-                &self.rows[0][SCHEMA_ID_LENGTH..],
-            ))),
+            1 => {
+                let payload =
+                    self.rows[0]
+                        .get(SCHEMA_ID_LENGTH..)
+                        .ok_or_else(|| Error::UnexpectedError {
+                            message: format!(
+                                "Row payload too short: {} bytes, need at 
least {} for schema id",
+                                self.rows[0].len(),
+                                SCHEMA_ID_LENGTH
+                            ),
+                            source: None,
+                        })?;
+                Ok(Some(CompactedRow::from_bytes(&self.row_type, payload)))
+            }
             _ => Err(Error::UnexpectedError {
                 message: "LookupResult contains multiple rows, use get_rows() 
instead".to_string(),
                 source: None,
             }),
         }
     }
 
-    /// Returns all rows as CompactedRows.
-    pub fn get_rows(&self) -> Vec<CompactedRow<'_>> {
+    pub fn get_rows(&self) -> Result<Vec<CompactedRow<'_>>> {

Review Comment:
   we change public API, shall we update docs?
   cc @luoyuxia  also do we wish to make breaking change for this schema check 
or we trust server everywhere?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to