Copilot commented on code in PR #230:
URL: https://github.com/apache/fluss-rust/pull/230#discussion_r2752983689


##########
bindings/cpp/src/types.rs:
##########
@@ -478,3 +481,30 @@ pub fn core_lake_snapshot_to_ffi(snapshot: 
&fcore::metadata::LakeSnapshot) -> ff
         bucket_offsets,
     }
 }
+
+pub fn core_scan_batches_to_ffi(
+    batches: &[fcore::record::ScanBatch],
+) -> ffi::FfiArrowRecordBatches {
+    let mut ffi_batches = Vec::new();
+    for batch in batches {
+        let record_batch = batch.batch();
+        // Convert RecordBatch to StructArray first, then get the data
+        let struct_array = 
arrow::array::StructArray::from(record_batch.clone());
+        let ffi_array = 
Box::new(FFI_ArrowArray::new(&struct_array.into_data()));
+        let ffi_schema =
+            
Box::new(FFI_ArrowSchema::try_from(record_batch.schema().as_ref()).unwrap());

Review Comment:
   Using `unwrap()` can cause a panic if the schema conversion fails. This 
should be handled gracefully and propagated as an error to the caller instead. 
Consider returning a Result type or handling the error case explicitly.



##########
bindings/cpp/include/fluss.hpp:
##########
@@ -25,6 +25,12 @@
 #include <string>
 #include <unordered_map>
 #include <vector>
+#include <memory>  // for shared_ptr

Review Comment:
   The include for `memory` (for shared_ptr) is duplicated on lines 23 and 28. 
Remove the duplicate on line 28.
   ```suggestion
   
   ```



##########
bindings/cpp/src/table.cpp:
##########
@@ -242,4 +278,89 @@ Result LogScanner::Poll(int64_t timeout_ms, ScanRecords& 
out) {
     return utils::make_ok();
 }
 
+ArrowRecordBatch::ArrowRecordBatch(
+    std::shared_ptr<arrow::RecordBatch> batch,
+    int64_t table_id,
+    int64_t partition_id,
+    int32_t bucket_id,
+    int64_t base_offset) noexcept
+    : batch_(std::move(batch)),
+      table_id_(table_id),
+      partition_id_(partition_id),
+      bucket_id_(bucket_id),
+      base_offset_(base_offset) {}
+
+bool ArrowRecordBatch::Available() const { return batch_ != nullptr; }
+
+int64_t ArrowRecordBatch::NumRows() const {
+    if (!Available()) return 0;
+    return batch_->num_rows();
+}
+
+
+int64_t ArrowRecordBatch::GetTableId() const {
+    if (!Available()) return 0;
+    return this->table_id_;
+}
+
+int64_t ArrowRecordBatch::GetPartitionId() const {
+    if (!Available()) return -1;
+    return this->partition_id_;
+}
+
+int32_t ArrowRecordBatch::GetBucketId() const {
+    if (!Available()) return -1;
+    return this->bucket_id_;
+}
+
+int64_t ArrowRecordBatch::GetBaseOffset() const {
+    if (!Available()) return -1;
+    return this->base_offset_;
+}
+
+int64_t ArrowRecordBatch::GetLastOffset() const {
+    if (!Available()) return -1;
+    return this->NumRows() + this->base_offset_;
+}
+
+Result LogScanner::PollRecordBatch(int64_t timeout_ms, ArrowRecordBatches& 
out) const {
+    if (!Available()) {
+        return utils::make_error(1, "LogScanner not available");
+    }
+
+    auto ffi_result = scanner_->poll_record_batch(timeout_ms);
+    auto result = utils::from_ffi_result(ffi_result.result);
+    if (!result.Ok()) {
+        return result;
+    }
+
+    // Convert the FFI Arrow record batches to C++ ArrowRecordBatch objects
+    out.batches.clear();
+    for (const auto& ffi_batch : ffi_result.arrow_batches.batches) {
+        auto* c_array = reinterpret_cast<struct 
ArrowArray*>(ffi_batch.array_ptr);
+        auto* c_schema = reinterpret_cast<struct 
ArrowSchema*>(ffi_batch.schema_ptr);
+
+        auto import_result = arrow::ImportRecordBatch(c_array, c_schema);
+        if (import_result.ok()) {
+            auto batch_ptr = import_result.ValueOrDie();
+            auto batch_wrapper = std::unique_ptr<ArrowRecordBatch>(new 
ArrowRecordBatch(
+                std::move(batch_ptr),
+                ffi_batch.table_id,
+                ffi_batch.partition_id,
+                ffi_batch.bucket_id,
+                ffi_batch.base_offset
+            ));
+            out.batches.push_back(std::move(batch_wrapper));
+            
+            // Free the container structures that were allocated in Rust after 
successful import
+            ffi::free_arrow_ffi_structures(ffi_batch.array_ptr, 
ffi_batch.schema_ptr);
+        } else {
+            // Even if import failed, we should free the container structures 
to avoid leaks
+            ffi::free_arrow_ffi_structures(ffi_batch.array_ptr, 
ffi_batch.schema_ptr);
+        }

Review Comment:
   When ImportRecordBatch fails, the function silently continues and returns 
success even though some batches may have failed to import. This could lead to 
data loss or unexpected behavior. The function should either return an error 
when any batch fails to import, or at minimum log the failure and provide 
feedback about partial success. Consider accumulating errors and returning 
them, or returning early on the first failure.



##########
bindings/cpp/examples/example.cpp:
##########
@@ -250,5 +253,56 @@ int main() {
         std::cout << "  ... and " << (batch_records.Size() - 5) << " more 
records" << std::endl;
     }
 
+    // 9) Test the new Arrow record batch polling functionality
+    std::cout << "\n=== Testing Arrow Record Batch Polling ===" << std::endl;
+    
+    fluss::LogScanner arrow_scanner;
+    check("new_record_batch_log_scanner", 
table.NewRecordBatchLogScanner(arrow_scanner));
+    
+    // Subscribe to all buckets starting from offset 0
+    for (int b = 0; b < buckets; ++b) {
+        check("subscribe_arrow", arrow_scanner.Subscribe(b, 0));
+    }
+    
+    fluss::ArrowRecordBatches arrow_batches;
+    check("poll_record_batch", arrow_scanner.PollRecordBatch(5000, 
arrow_batches));
+    
+    std::cout << "Polled " << arrow_batches.Size() << " Arrow record batches" 
<< std::endl;
+    for (size_t i = 0; i < arrow_batches.Size(); ++i) {
+        const auto& batch = arrow_batches[i];
+        if (batch->Available()) {
+            std::cout << "  Batch " << i << ": " << batch->NumRows() << " 
rows, " 
+                      <<batch->GetArrowRecordBatch()->num_rows() << " bytes of 
serialized data" << std::endl;
+        } else {
+            std::cout << "  Batch " << i << ": not available" << std::endl;
+        }
+    }
+    
+    // 10) Test the new Arrow record batch polling with projection
+    std::cout << "\n=== Testing Arrow Record Batch Polling with Projection 
===" << std::endl;
+    
+    fluss::LogScanner projected_arrow_scanner;
+    check("new_record_batch_log_scanner_with_projection", 
+          table.NewRecordBatchLogScannerWithProjection(projected_columns, 
projected_arrow_scanner));
+    
+    // Subscribe to all buckets starting from offset 0
+    for (int b = 0; b < buckets; ++b) {
+        check("subscribe_projected_arrow", 
projected_arrow_scanner.Subscribe(b, 0));
+    }
+    
+    fluss::ArrowRecordBatches projected_arrow_batches;
+    check("poll_projected_record_batch", 
projected_arrow_scanner.PollRecordBatch(5000, projected_arrow_batches));
+    
+    std::cout << "Polled " << projected_arrow_batches.Size() << " projected 
Arrow record batches" << std::endl;
+    for (size_t i = 0; i < projected_arrow_batches.Size(); ++i) {
+        const auto& batch = projected_arrow_batches[i];
+        if (batch->Available()) {
+            std::cout << "  Batch " << i << ": " << batch->NumRows() << " 
rows, " 
+                      << batch->GetArrowRecordBatch()->num_rows() << " bytes 
of serialized data" << std::endl;

Review Comment:
   The output message is misleading - it prints 
"batch->GetArrowRecordBatch()->num_rows()" and says "bytes of serialized data", 
but num_rows() returns the number of rows, not bytes. This should either use an 
appropriate method to get the byte size of the batch, or change the message to 
say "rows" instead of "bytes of serialized data".



##########
bindings/cpp/examples/example.cpp:
##########
@@ -250,5 +253,56 @@ int main() {
         std::cout << "  ... and " << (batch_records.Size() - 5) << " more 
records" << std::endl;
     }
 
+    // 9) Test the new Arrow record batch polling functionality
+    std::cout << "\n=== Testing Arrow Record Batch Polling ===" << std::endl;
+    
+    fluss::LogScanner arrow_scanner;
+    check("new_record_batch_log_scanner", 
table.NewRecordBatchLogScanner(arrow_scanner));
+    
+    // Subscribe to all buckets starting from offset 0
+    for (int b = 0; b < buckets; ++b) {
+        check("subscribe_arrow", arrow_scanner.Subscribe(b, 0));
+    }
+    
+    fluss::ArrowRecordBatches arrow_batches;
+    check("poll_record_batch", arrow_scanner.PollRecordBatch(5000, 
arrow_batches));
+    
+    std::cout << "Polled " << arrow_batches.Size() << " Arrow record batches" 
<< std::endl;
+    for (size_t i = 0; i < arrow_batches.Size(); ++i) {
+        const auto& batch = arrow_batches[i];
+        if (batch->Available()) {
+            std::cout << "  Batch " << i << ": " << batch->NumRows() << " 
rows, " 
+                      <<batch->GetArrowRecordBatch()->num_rows() << " bytes of 
serialized data" << std::endl;

Review Comment:
   The output message is misleading - it prints 
"batch->GetArrowRecordBatch()->num_rows()" and says "bytes of serialized data", 
but num_rows() returns the number of rows, not bytes. This should either use an 
appropriate method to get the byte size of the batch, or change the message to 
say "rows" instead of "bytes of serialized data".



##########
bindings/cpp/src/table.cpp:
##########
@@ -242,4 +278,89 @@ Result LogScanner::Poll(int64_t timeout_ms, ScanRecords& 
out) {
     return utils::make_ok();
 }
 
+ArrowRecordBatch::ArrowRecordBatch(
+    std::shared_ptr<arrow::RecordBatch> batch,
+    int64_t table_id,
+    int64_t partition_id,
+    int32_t bucket_id,
+    int64_t base_offset) noexcept
+    : batch_(std::move(batch)),
+      table_id_(table_id),
+      partition_id_(partition_id),
+      bucket_id_(bucket_id),
+      base_offset_(base_offset) {}
+
+bool ArrowRecordBatch::Available() const { return batch_ != nullptr; }
+
+int64_t ArrowRecordBatch::NumRows() const {
+    if (!Available()) return 0;
+    return batch_->num_rows();
+}
+
+
+int64_t ArrowRecordBatch::GetTableId() const {
+    if (!Available()) return 0;
+    return this->table_id_;
+}
+
+int64_t ArrowRecordBatch::GetPartitionId() const {
+    if (!Available()) return -1;
+    return this->partition_id_;
+}
+
+int32_t ArrowRecordBatch::GetBucketId() const {
+    if (!Available()) return -1;
+    return this->bucket_id_;
+}
+
+int64_t ArrowRecordBatch::GetBaseOffset() const {
+    if (!Available()) return -1;
+    return this->base_offset_;
+}
+
+int64_t ArrowRecordBatch::GetLastOffset() const {
+    if (!Available()) return -1;
+    return this->NumRows() + this->base_offset_;

Review Comment:
   GetLastOffset returns base_offset + num_rows, but this appears to be off by 
one. If base_offset is 10 and there are 5 rows (offsets 10, 11, 12, 13, 14), 
the last offset should be 14, not 15. The calculation should be base_offset + 
NumRows() - 1.
   ```suggestion
       return this->base_offset_ + this->NumRows() - 1;
   ```



##########
bindings/cpp/src/table.cpp:
##########
@@ -242,4 +278,89 @@ Result LogScanner::Poll(int64_t timeout_ms, ScanRecords& 
out) {
     return utils::make_ok();
 }
 
+ArrowRecordBatch::ArrowRecordBatch(
+    std::shared_ptr<arrow::RecordBatch> batch,
+    int64_t table_id,
+    int64_t partition_id,
+    int32_t bucket_id,
+    int64_t base_offset) noexcept
+    : batch_(std::move(batch)),
+      table_id_(table_id),
+      partition_id_(partition_id),
+      bucket_id_(bucket_id),
+      base_offset_(base_offset) {}
+
+bool ArrowRecordBatch::Available() const { return batch_ != nullptr; }
+
+int64_t ArrowRecordBatch::NumRows() const {
+    if (!Available()) return 0;
+    return batch_->num_rows();
+}
+
+
+int64_t ArrowRecordBatch::GetTableId() const {
+    if (!Available()) return 0;
+    return this->table_id_;
+}
+
+int64_t ArrowRecordBatch::GetPartitionId() const {
+    if (!Available()) return -1;
+    return this->partition_id_;
+}
+
+int32_t ArrowRecordBatch::GetBucketId() const {
+    if (!Available()) return -1;
+    return this->bucket_id_;
+}
+
+int64_t ArrowRecordBatch::GetBaseOffset() const {
+    if (!Available()) return -1;
+    return this->base_offset_;
+}
+
+int64_t ArrowRecordBatch::GetLastOffset() const {
+    if (!Available()) return -1;
+    return this->NumRows() + this->base_offset_;
+}
+
+Result LogScanner::PollRecordBatch(int64_t timeout_ms, ArrowRecordBatches& 
out) const {
+    if (!Available()) {
+        return utils::make_error(1, "LogScanner not available");
+    }
+
+    auto ffi_result = scanner_->poll_record_batch(timeout_ms);
+    auto result = utils::from_ffi_result(ffi_result.result);
+    if (!result.Ok()) {
+        return result;
+    }
+
+    // Convert the FFI Arrow record batches to C++ ArrowRecordBatch objects
+    out.batches.clear();
+    for (const auto& ffi_batch : ffi_result.arrow_batches.batches) {
+        auto* c_array = reinterpret_cast<struct 
ArrowArray*>(ffi_batch.array_ptr);
+        auto* c_schema = reinterpret_cast<struct 
ArrowSchema*>(ffi_batch.schema_ptr);
+
+        auto import_result = arrow::ImportRecordBatch(c_array, c_schema);
+        if (import_result.ok()) {
+            auto batch_ptr = import_result.ValueOrDie();
+            auto batch_wrapper = std::unique_ptr<ArrowRecordBatch>(new 
ArrowRecordBatch(
+                std::move(batch_ptr),
+                ffi_batch.table_id,
+                ffi_batch.partition_id,
+                ffi_batch.bucket_id,
+                ffi_batch.base_offset
+            ));
+            out.batches.push_back(std::move(batch_wrapper));
+            
+            // Free the container structures that were allocated in Rust after 
successful import
+            ffi::free_arrow_ffi_structures(ffi_batch.array_ptr, 
ffi_batch.schema_ptr);
+        } else {
+            // Even if import failed, we should free the container structures 
to avoid leaks
+            ffi::free_arrow_ffi_structures(ffi_batch.array_ptr, 
ffi_batch.schema_ptr);

Review Comment:
   This manual freeing of Arrow FFI structures after ImportRecordBatch is 
incorrect and will cause a double-free. Arrow's ImportRecordBatch takes 
ownership of the ArrowArray and ArrowSchema structures and will automatically 
call their release callbacks when they are no longer needed. The FFI_ArrowArray 
and FFI_ArrowSchema containers should not be manually freed here. Remove both 
calls to free_arrow_ffi_structures and let Arrow handle the cleanup through its 
release callbacks.
   ```suggestion
           } else {
               // Import failed; Arrow will handle cleanup of FFI structures 
via release callbacks
   ```



##########
bindings/cpp/src/lib.rs:
##########
@@ -517,41 +543,101 @@ impl Table {
     }
 
     fn new_log_scanner(&self) -> Result<*mut LogScanner, String> {
-        let fluss_table = fcore::client::FlussTable::new(
-            &self.connection,
-            self.metadata.clone(),
-            self.table_info.clone(),
-        );
+        RUNTIME.block_on(async {
+            let fluss_table = fcore::client::FlussTable::new(
+                &self.connection,
+                self.metadata.clone(),
+                self.table_info.clone(),
+            );
+
+            let scanner = match fluss_table.new_scan().create_log_scanner() {
+                Ok(a) => a,
+                Err(e) => return Err(format!("Failed to create log scanner: 
{e}")),
+            };
+
+            let scanner_ptr = Box::into_raw(Box::new(LogScanner {
+                inner: Some(scanner),
+                inner_batch: None,
+            }));
 
-        let scanner = match fluss_table.new_scan().create_log_scanner() {
-            Ok(a) => a,
-            Err(e) => return Err(format!("Failed to create log scanner: {e}")),
-        };
-        let scanner = Box::into_raw(Box::new(LogScanner { inner: scanner }));
-        Ok(scanner)
+            Ok(scanner_ptr)
+        })

Review Comment:
   The `RUNTIME.block_on(async { ... })` wrapper is unnecessary here because 
`create_log_scanner()` is a synchronous function that doesn't return a Future. 
This adds unnecessary overhead. The function should be called directly without 
the async wrapper.



##########
bindings/cpp/src/lib.rs:
##########
@@ -517,41 +543,101 @@ impl Table {
     }
 
     fn new_log_scanner(&self) -> Result<*mut LogScanner, String> {
-        let fluss_table = fcore::client::FlussTable::new(
-            &self.connection,
-            self.metadata.clone(),
-            self.table_info.clone(),
-        );
+        RUNTIME.block_on(async {
+            let fluss_table = fcore::client::FlussTable::new(
+                &self.connection,
+                self.metadata.clone(),
+                self.table_info.clone(),
+            );
+
+            let scanner = match fluss_table.new_scan().create_log_scanner() {
+                Ok(a) => a,
+                Err(e) => return Err(format!("Failed to create log scanner: 
{e}")),
+            };
+
+            let scanner_ptr = Box::into_raw(Box::new(LogScanner {
+                inner: Some(scanner),
+                inner_batch: None,
+            }));
 
-        let scanner = match fluss_table.new_scan().create_log_scanner() {
-            Ok(a) => a,
-            Err(e) => return Err(format!("Failed to create log scanner: {e}")),
-        };
-        let scanner = Box::into_raw(Box::new(LogScanner { inner: scanner }));
-        Ok(scanner)
+            Ok(scanner_ptr)
+        })
     }
 
     fn new_log_scanner_with_projection(
         &self,
         column_indices: Vec<usize>,
     ) -> Result<*mut LogScanner, String> {
-        let fluss_table = fcore::client::FlussTable::new(
-            &self.connection,
-            self.metadata.clone(),
-            self.table_info.clone(),
-        );
+        RUNTIME.block_on(async {
+            let fluss_table = fcore::client::FlussTable::new(
+                &self.connection,
+                self.metadata.clone(),
+                self.table_info.clone(),
+            );
+
+            let scan = fluss_table.new_scan();
+            let scan = match scan.project(&column_indices) {
+                Ok(s) => s,
+                Err(e) => return Err(format!("Failed to project columns: 
{e}")),
+            };
+            let scanner = match scan.create_log_scanner() {
+                Ok(a) => a,
+                Err(e) => return Err(format!("Failed to create log scanner: 
{e}")),
+            };
+            let scanner = Box::into_raw(Box::new(LogScanner {
+                inner: Some(scanner),
+                inner_batch: None,
+            }));
+            Ok(scanner)
+        })
+    }
+
+    fn new_record_batch_log_scanner(&self) -> Result<*mut LogScanner, String> {
+        RUNTIME.block_on(async {
+            let fluss_table = fcore::client::FlussTable::new(
+                &self.connection,
+                self.metadata.clone(),
+                self.table_info.clone(),
+            );
+
+            let scanner = match 
fluss_table.new_scan().create_record_batch_log_scanner() {
+                Ok(a) => a,
+                Err(e) => return Err(format!("Failed to create record batch 
log scanner: {e}")),
+            };
+            let scanner = Box::into_raw(Box::new(LogScanner {
+                inner: None,
+                inner_batch: Some(scanner),
+            }));
+            Ok(scanner)
+        })
+    }
 
-        let scan = fluss_table.new_scan();
-        let scan = match scan.project(&column_indices) {
-            Ok(s) => s,
-            Err(e) => return Err(format!("Failed to project columns: {e}")),
-        };
-        let scanner = match scan.create_log_scanner() {
-            Ok(a) => a,
-            Err(e) => return Err(format!("Failed to create log scanner: {e}")),
-        };
-        let scanner = Box::into_raw(Box::new(LogScanner { inner: scanner }));
-        Ok(scanner)
+    fn new_record_batch_log_scanner_with_projection(
+        &self,
+        column_indices: Vec<usize>,
+    ) -> Result<*mut LogScanner, String> {
+        RUNTIME.block_on(async {
+            let fluss_table = fcore::client::FlussTable::new(
+                &self.connection,
+                self.metadata.clone(),
+                self.table_info.clone(),
+            );
+
+            let scan = fluss_table.new_scan();
+            let scan = match scan.project(&column_indices) {
+                Ok(s) => s,
+                Err(e) => return Err(format!("Failed to project columns: 
{e}")),
+            };
+            let scanner = match scan.create_record_batch_log_scanner() {
+                Ok(a) => a,
+                Err(e) => return Err(format!("Failed to create record batch 
log scanner: {e}")),
+            };
+            let scanner = Box::into_raw(Box::new(LogScanner {
+                inner: None,
+                inner_batch: Some(scanner),
+            }));
+            Ok(scanner)
+        })

Review Comment:
   The `RUNTIME.block_on(async { ... })` wrapper is unnecessary here because 
`create_record_batch_log_scanner()` is a synchronous function that doesn't 
return a Future. This adds unnecessary overhead. The function should be called 
directly without the async wrapper.



##########
bindings/cpp/src/lib.rs:
##########
@@ -517,41 +543,101 @@ impl Table {
     }
 
     fn new_log_scanner(&self) -> Result<*mut LogScanner, String> {
-        let fluss_table = fcore::client::FlussTable::new(
-            &self.connection,
-            self.metadata.clone(),
-            self.table_info.clone(),
-        );
+        RUNTIME.block_on(async {
+            let fluss_table = fcore::client::FlussTable::new(
+                &self.connection,
+                self.metadata.clone(),
+                self.table_info.clone(),
+            );
+
+            let scanner = match fluss_table.new_scan().create_log_scanner() {
+                Ok(a) => a,
+                Err(e) => return Err(format!("Failed to create log scanner: 
{e}")),
+            };
+
+            let scanner_ptr = Box::into_raw(Box::new(LogScanner {
+                inner: Some(scanner),
+                inner_batch: None,
+            }));
 
-        let scanner = match fluss_table.new_scan().create_log_scanner() {
-            Ok(a) => a,
-            Err(e) => return Err(format!("Failed to create log scanner: {e}")),
-        };
-        let scanner = Box::into_raw(Box::new(LogScanner { inner: scanner }));
-        Ok(scanner)
+            Ok(scanner_ptr)
+        })
     }
 
     fn new_log_scanner_with_projection(
         &self,
         column_indices: Vec<usize>,
     ) -> Result<*mut LogScanner, String> {
-        let fluss_table = fcore::client::FlussTable::new(
-            &self.connection,
-            self.metadata.clone(),
-            self.table_info.clone(),
-        );
+        RUNTIME.block_on(async {
+            let fluss_table = fcore::client::FlussTable::new(
+                &self.connection,
+                self.metadata.clone(),
+                self.table_info.clone(),
+            );
+
+            let scan = fluss_table.new_scan();
+            let scan = match scan.project(&column_indices) {
+                Ok(s) => s,
+                Err(e) => return Err(format!("Failed to project columns: 
{e}")),
+            };
+            let scanner = match scan.create_log_scanner() {
+                Ok(a) => a,
+                Err(e) => return Err(format!("Failed to create log scanner: 
{e}")),
+            };
+            let scanner = Box::into_raw(Box::new(LogScanner {
+                inner: Some(scanner),
+                inner_batch: None,
+            }));
+            Ok(scanner)
+        })

Review Comment:
   The `RUNTIME.block_on(async { ... })` wrapper is unnecessary here because 
`create_log_scanner()` is a synchronous function that doesn't return a Future. 
This adds unnecessary overhead. The function should be called directly without 
the async wrapper.



##########
bindings/cpp/include/fluss.hpp:
##########
@@ -481,6 +535,7 @@ class LogScanner {
     Result Subscribe(int32_t bucket_id, int64_t start_offset);
     Result Subscribe(const std::vector<BucketSubscription>& bucket_offsets);
     Result Poll(int64_t timeout_ms, ScanRecords& out);
+    Result PollRecordBatch(int64_t timeout_ms, ArrowRecordBatches& out) const;

Review Comment:
   The PollRecordBatch method is marked as const, but Poll is not. This is 
inconsistent. Both methods call poll on the underlying scanner which likely 
modifies its internal state (e.g., tracking offsets). Either both should be 
const or both should be non-const. Given that Poll is non-const and polling 
operations typically modify internal state, PollRecordBatch should also be 
non-const for consistency.
   ```suggestion
       Result PollRecordBatch(int64_t timeout_ms, ArrowRecordBatches& out);
   ```



##########
bindings/cpp/src/table.cpp:
##########
@@ -242,4 +278,89 @@ Result LogScanner::Poll(int64_t timeout_ms, ScanRecords& 
out) {
     return utils::make_ok();
 }
 
+ArrowRecordBatch::ArrowRecordBatch(
+    std::shared_ptr<arrow::RecordBatch> batch,
+    int64_t table_id,
+    int64_t partition_id,
+    int32_t bucket_id,
+    int64_t base_offset) noexcept
+    : batch_(std::move(batch)),
+      table_id_(table_id),
+      partition_id_(partition_id),
+      bucket_id_(bucket_id),
+      base_offset_(base_offset) {}
+
+bool ArrowRecordBatch::Available() const { return batch_ != nullptr; }
+
+int64_t ArrowRecordBatch::NumRows() const {
+    if (!Available()) return 0;
+    return batch_->num_rows();
+}
+
+
+int64_t ArrowRecordBatch::GetTableId() const {
+    if (!Available()) return 0;
+    return this->table_id_;
+}
+
+int64_t ArrowRecordBatch::GetPartitionId() const {
+    if (!Available()) return -1;
+    return this->partition_id_;
+}
+
+int32_t ArrowRecordBatch::GetBucketId() const {
+    if (!Available()) return -1;
+    return this->bucket_id_;
+}
+
+int64_t ArrowRecordBatch::GetBaseOffset() const {
+    if (!Available()) return -1;
+    return this->base_offset_;
+}
+
+int64_t ArrowRecordBatch::GetLastOffset() const {
+    if (!Available()) return -1;
+    return this->NumRows() + this->base_offset_;
+}
+
+Result LogScanner::PollRecordBatch(int64_t timeout_ms, ArrowRecordBatches& 
out) const {

Review Comment:
   The PollRecordBatch method is marked as const, but Poll is not. This is 
inconsistent. Both methods call poll on the underlying scanner which likely 
modifies its internal state (e.g., tracking offsets). Either both should be 
const or both should be non-const. Given that Poll is non-const and polling 
operations typically modify internal state, PollRecordBatch should also be 
non-const for consistency.
   ```suggestion
   Result LogScanner::PollRecordBatch(int64_t timeout_ms, ArrowRecordBatches& 
out) {
   ```



##########
bindings/cpp/src/lib.rs:
##########
@@ -517,41 +543,101 @@ impl Table {
     }
 
     fn new_log_scanner(&self) -> Result<*mut LogScanner, String> {
-        let fluss_table = fcore::client::FlussTable::new(
-            &self.connection,
-            self.metadata.clone(),
-            self.table_info.clone(),
-        );
+        RUNTIME.block_on(async {
+            let fluss_table = fcore::client::FlussTable::new(
+                &self.connection,
+                self.metadata.clone(),
+                self.table_info.clone(),
+            );
+
+            let scanner = match fluss_table.new_scan().create_log_scanner() {
+                Ok(a) => a,
+                Err(e) => return Err(format!("Failed to create log scanner: 
{e}")),
+            };
+
+            let scanner_ptr = Box::into_raw(Box::new(LogScanner {
+                inner: Some(scanner),
+                inner_batch: None,
+            }));
 
-        let scanner = match fluss_table.new_scan().create_log_scanner() {
-            Ok(a) => a,
-            Err(e) => return Err(format!("Failed to create log scanner: {e}")),
-        };
-        let scanner = Box::into_raw(Box::new(LogScanner { inner: scanner }));
-        Ok(scanner)
+            Ok(scanner_ptr)
+        })
     }
 
     fn new_log_scanner_with_projection(
         &self,
         column_indices: Vec<usize>,
     ) -> Result<*mut LogScanner, String> {
-        let fluss_table = fcore::client::FlussTable::new(
-            &self.connection,
-            self.metadata.clone(),
-            self.table_info.clone(),
-        );
+        RUNTIME.block_on(async {
+            let fluss_table = fcore::client::FlussTable::new(
+                &self.connection,
+                self.metadata.clone(),
+                self.table_info.clone(),
+            );
+
+            let scan = fluss_table.new_scan();
+            let scan = match scan.project(&column_indices) {
+                Ok(s) => s,
+                Err(e) => return Err(format!("Failed to project columns: 
{e}")),
+            };
+            let scanner = match scan.create_log_scanner() {
+                Ok(a) => a,
+                Err(e) => return Err(format!("Failed to create log scanner: 
{e}")),
+            };
+            let scanner = Box::into_raw(Box::new(LogScanner {
+                inner: Some(scanner),
+                inner_batch: None,
+            }));
+            Ok(scanner)
+        })
+    }
+
+    fn new_record_batch_log_scanner(&self) -> Result<*mut LogScanner, String> {
+        RUNTIME.block_on(async {
+            let fluss_table = fcore::client::FlussTable::new(
+                &self.connection,
+                self.metadata.clone(),
+                self.table_info.clone(),
+            );
+
+            let scanner = match 
fluss_table.new_scan().create_record_batch_log_scanner() {
+                Ok(a) => a,
+                Err(e) => return Err(format!("Failed to create record batch 
log scanner: {e}")),
+            };
+            let scanner = Box::into_raw(Box::new(LogScanner {
+                inner: None,
+                inner_batch: Some(scanner),
+            }));
+            Ok(scanner)
+        })

Review Comment:
   The `RUNTIME.block_on(async { ... })` wrapper is unnecessary here because 
`create_record_batch_log_scanner()` is a synchronous function that doesn't 
return a Future. This adds unnecessary overhead. The function should be called 
directly without the async wrapper.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to