This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new e5adc3d  feat: introduce scan record batch in cpp (#230)
e5adc3d is described below

commit e5adc3db21e9ea86541e6d199081c5d920120cda
Author: yuxia Luo <[email protected]>
AuthorDate: Tue Feb 3 22:08:17 2026 +0800

    feat: introduce scan record batch in cpp (#230)
---
 bindings/cpp/.gitignore           |   1 +
 bindings/cpp/CMakeLists.txt       |   8 +-
 bindings/cpp/Cargo.toml           |   2 +-
 bindings/cpp/examples/example.cpp |  58 ++++++++-
 bindings/cpp/include/fluss.hpp    |  55 ++++++++-
 bindings/cpp/src/lib.rs           | 250 +++++++++++++++++++++++++++++++-------
 bindings/cpp/src/table.cpp        | 127 +++++++++++++++++++
 bindings/cpp/src/types.rs         |  31 +++++
 8 files changed, 479 insertions(+), 53 deletions(-)

diff --git a/bindings/cpp/.gitignore b/bindings/cpp/.gitignore
index 43f761c..da15a58 100644
--- a/bindings/cpp/.gitignore
+++ b/bindings/cpp/.gitignore
@@ -1,5 +1,6 @@
 build/
 cmake-build-*/
+CMakeFiles/
 .idea/
 *.o
 *.a
diff --git a/bindings/cpp/CMakeLists.txt b/bindings/cpp/CMakeLists.txt
index 629f3f0..93cfc41 100644
--- a/bindings/cpp/CMakeLists.txt
+++ b/bindings/cpp/CMakeLists.txt
@@ -29,6 +29,8 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
 
 find_package(Threads REQUIRED)
 
+find_package(Arrow REQUIRED)
+
 if (NOT CMAKE_BUILD_TYPE)
     set(CMAKE_BUILD_TYPE Debug)
 endif()
@@ -88,12 +90,16 @@ target_sources(fluss_cpp PRIVATE ${RUST_HEADER_FILE})
 target_include_directories(fluss_cpp PUBLIC ${CPP_INCLUDE_DIR})
 target_link_libraries(fluss_cpp PUBLIC ${RUST_LIB})
 target_link_libraries(fluss_cpp PRIVATE ${CMAKE_DL_LIBS} Threads::Threads)
+target_link_libraries(fluss_cpp PUBLIC Arrow::arrow_shared)
+target_compile_definitions(fluss_cpp PRIVATE ARROW_FOUND)
 if(APPLE)
     target_link_libraries(fluss_cpp PUBLIC "-framework CoreFoundation" 
"-framework Security")
 endif()
 
 add_executable(fluss_cpp_example examples/example.cpp)
-target_link_libraries(fluss_cpp_example fluss_cpp)
+target_link_libraries(fluss_cpp_example PRIVATE fluss_cpp)
+target_link_libraries(fluss_cpp_example PRIVATE Arrow::arrow_shared)
+target_compile_definitions(fluss_cpp_example PRIVATE ARROW_FOUND)
 target_include_directories(fluss_cpp_example PUBLIC ${CPP_INCLUDE_DIR})
 
 set_target_properties(fluss_cpp
diff --git a/bindings/cpp/Cargo.toml b/bindings/cpp/Cargo.toml
index 2d3d913..0bbcbf0 100644
--- a/bindings/cpp/Cargo.toml
+++ b/bindings/cpp/Cargo.toml
@@ -27,7 +27,7 @@ crate-type = ["staticlib"]
 
 [dependencies]
 anyhow = "1.0"
-arrow = { workspace = true }
+arrow = { workspace = true, features = ["ffi"] }
 cxx = "1.0"
 fluss = { path = "../../crates/fluss" }
 tokio = { version = "1.27", features = ["rt-multi-thread", "macros"] }
diff --git a/bindings/cpp/examples/example.cpp 
b/bindings/cpp/examples/example.cpp
index 6ff2b9b..45f7f9e 100644
--- a/bindings/cpp/examples/example.cpp
+++ b/bindings/cpp/examples/example.cpp
@@ -15,12 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "fluss.hpp"
+#include <arrow/record_batch.h>
 
+#include <chrono>
 #include <iostream>
-#include <vector>
 #include <unordered_map>
-#include <chrono>
+#include <vector>
+
+#include "fluss.hpp"
 
 static void check(const char* step, const fluss::Result& r) {
     if (!r.Ok()) {
@@ -179,6 +181,7 @@ int main() {
     
     // 8.1) Query earliest offsets for all buckets
     std::vector<int32_t> all_bucket_ids;
+    all_bucket_ids.reserve(buckets);
     for (int b = 0; b < buckets; ++b) {
         all_bucket_ids.push_back(b);
     }
@@ -250,5 +253,54 @@ int main() {
         std::cout << "  ... and " << (batch_records.Size() - 5) << " more 
records" << std::endl;
     }
 
+    // 9) Test the new Arrow record batch polling functionality
+    std::cout << "\n=== Testing Arrow Record Batch Polling ===" << std::endl;
+    
+    fluss::LogScanner arrow_scanner;
+    check("new_record_batch_log_scanner", 
table.NewRecordBatchLogScanner(arrow_scanner));
+    
+    // Subscribe to all buckets starting from offset 0
+    for (int b = 0; b < buckets; ++b) {
+        check("subscribe_arrow", arrow_scanner.Subscribe(b, 0));
+    }
+    
+    fluss::ArrowRecordBatches arrow_batches;
+    check("poll_record_batch", arrow_scanner.PollRecordBatch(5000, 
arrow_batches));
+    
+    std::cout << "Polled " << arrow_batches.Size() << " Arrow record batches" 
<< std::endl;
+    for (size_t i = 0; i < arrow_batches.Size(); ++i) {
+        const auto& batch = arrow_batches[i];
+        if (batch->Available()) {
+            std::cout << "  Batch " << i << ": " << 
batch->GetArrowRecordBatch()->num_rows() << " rows. " << std::endl;
+        } else {
+            std::cout << "  Batch " << i << ": not available" << std::endl;
+        }
+    }
+    
+    // 10) Test the new Arrow record batch polling with projection
+    std::cout << "\n=== Testing Arrow Record Batch Polling with Projection 
===" << std::endl;
+    
+    fluss::LogScanner projected_arrow_scanner;
+    check("new_record_batch_log_scanner_with_projection", 
+          table.NewRecordBatchLogScannerWithProjection(projected_columns, 
projected_arrow_scanner));
+    
+    // Subscribe to all buckets starting from offset 0
+    for (int b = 0; b < buckets; ++b) {
+        check("subscribe_projected_arrow", 
projected_arrow_scanner.Subscribe(b, 0));
+    }
+    
+    fluss::ArrowRecordBatches projected_arrow_batches;
+    check("poll_projected_record_batch", 
projected_arrow_scanner.PollRecordBatch(5000, projected_arrow_batches));
+    
+    std::cout << "Polled " << projected_arrow_batches.Size() << " projected 
Arrow record batches" << std::endl;
+    for (size_t i = 0; i < projected_arrow_batches.Size(); ++i) {
+        const auto& batch = projected_arrow_batches[i];
+        if (batch->Available()) {
+            std::cout << "  Batch " << i << ": " << 
batch->GetArrowRecordBatch()->num_rows() << " rows " << std::endl;
+        } else {
+            std::cout << "  Batch " << i << ": not available" << std::endl;
+        }
+    }
+
     return 0;
 }
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index 4ef3fe1..968cb06 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -19,13 +19,17 @@
 
 #pragma once
 
-#include <cstdint>
 #include <memory>
 #include <optional>
 #include <string>
 #include <unordered_map>
 #include <vector>
 
+// Forward declare Arrow classes to avoid including heavy Arrow headers in 
header
+namespace arrow {
+    class RecordBatch;
+}
+
 namespace fluss {
 
 namespace ffi {
@@ -336,6 +340,52 @@ struct ScanRecords {
     auto end() const { return records.end(); }
 };
 
+class ArrowRecordBatch {
+public:
+
+    std::shared_ptr<arrow::RecordBatch> GetArrowRecordBatch() const { return 
batch_; }
+
+    bool Available() const;
+
+    // Get number of rows in the batch
+    int64_t NumRows() const;
+    
+    // Get ScanBatch metadata
+    int64_t GetTableId() const;
+    int64_t GetPartitionId() const;
+    int32_t GetBucketId() const;
+    int64_t GetBaseOffset() const;
+    int64_t GetLastOffset() const;
+
+private:
+    friend class LogScanner;
+    explicit ArrowRecordBatch(
+        std::shared_ptr<arrow::RecordBatch> batch,
+        int64_t table_id,
+        int64_t partition_id,
+        int32_t bucket_id,
+        int64_t base_offset) noexcept;
+
+    std::shared_ptr<arrow::RecordBatch> batch_{nullptr};
+
+    int64_t table_id_;
+    int64_t partition_id_;
+    int32_t bucket_id_;
+    int64_t base_offset_;
+};
+
+
+struct ArrowRecordBatches {
+    std::vector<std::unique_ptr<ArrowRecordBatch>> batches;
+
+    size_t Size() const { return batches.size(); }
+    bool Empty() const { return batches.empty(); }
+    const std::unique_ptr<ArrowRecordBatch>& operator[](size_t idx) const { 
return batches[idx]; }
+
+    auto begin() const { return batches.begin(); }
+    auto end() const { return batches.end(); }
+};
+
 struct BucketOffset {
     int64_t table_id;
     int64_t partition_id;
@@ -442,6 +492,8 @@ public:
     Result NewAppendWriter(AppendWriter& out);
     Result NewLogScanner(LogScanner& out);
     Result NewLogScannerWithProjection(const std::vector<size_t>& 
column_indices, LogScanner& out);
+    Result NewRecordBatchLogScanner(LogScanner& out);
+    Result NewRecordBatchLogScannerWithProjection(const std::vector<size_t>& 
column_indices, LogScanner& out);
 
     TableInfo GetTableInfo() const;
     TablePath GetTablePath() const;
@@ -493,6 +545,7 @@ public:
     Result Subscribe(int32_t bucket_id, int64_t start_offset);
     Result Subscribe(const std::vector<BucketSubscription>& bucket_offsets);
     Result Poll(int64_t timeout_ms, ScanRecords& out);
+    Result PollRecordBatch(int64_t timeout_ms, ArrowRecordBatches& out);
 
 private:
     friend class Table;
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index b327ba5..e083598 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -119,6 +119,24 @@ mod ffi {
         scan_records: FfiScanRecords,
     }
 
+    struct FfiArrowRecordBatch {
+        array_ptr: usize,
+        schema_ptr: usize,
+        table_id: i64,
+        partition_id: i64,
+        bucket_id: i32,
+        base_offset: i64,
+    }
+
+    struct FfiArrowRecordBatches {
+        batches: Vec<FfiArrowRecordBatch>,
+    }
+
+    struct FfiArrowRecordBatchesResult {
+        result: FfiResult,
+        arrow_batches: FfiArrowRecordBatches,
+    }
+
     struct FfiLakeSnapshot {
         snapshot_id: i64,
         bucket_offsets: Vec<FfiBucketOffset>,
@@ -209,6 +227,11 @@ mod ffi {
             self: &Table,
             column_indices: Vec<usize>,
         ) -> Result<*mut LogScanner>;
+        fn new_record_batch_log_scanner(self: &Table) -> Result<*mut 
LogScanner>;
+        fn new_record_batch_log_scanner_with_projection(
+            self: &Table,
+            column_indices: Vec<usize>,
+        ) -> Result<*mut LogScanner>;
         fn get_table_info_from_table(self: &Table) -> FfiTableInfo;
         fn get_table_path(self: &Table) -> FfiTablePath;
         fn has_primary_key(self: &Table) -> bool;
@@ -226,6 +249,8 @@ mod ffi {
             subscriptions: Vec<FfiBucketSubscription>,
         ) -> FfiResult;
         fn poll(self: &LogScanner, timeout_ms: i64) -> FfiScanRecordsResult;
+        fn poll_record_batch(self: &LogScanner, timeout_ms: i64) -> 
FfiArrowRecordBatchesResult;
+        fn free_arrow_ffi_structures(array_ptr: usize, schema_ptr: usize);
     }
 }
 
@@ -252,7 +277,8 @@ pub struct AppendWriter {
 }
 
 pub struct LogScanner {
-    inner: fcore::client::LogScanner,
+    inner: Option<fcore::client::LogScanner>,
+    inner_batch: Option<fcore::client::RecordBatchLogScanner>,
 }
 
 fn ok_result() -> ffi::FfiResult {
@@ -551,41 +577,101 @@ impl Table {
     }
 
     fn new_log_scanner(&self) -> Result<*mut LogScanner, String> {
-        let fluss_table = fcore::client::FlussTable::new(
-            &self.connection,
-            self.metadata.clone(),
-            self.table_info.clone(),
-        );
+        RUNTIME.block_on(async {
+            let fluss_table = fcore::client::FlussTable::new(
+                &self.connection,
+                self.metadata.clone(),
+                self.table_info.clone(),
+            );
+
+            let scanner = match fluss_table.new_scan().create_log_scanner() {
+                Ok(a) => a,
+                Err(e) => return Err(format!("Failed to create log scanner: 
{e}")),
+            };
+
+            let scanner_ptr = Box::into_raw(Box::new(LogScanner {
+                inner: Some(scanner),
+                inner_batch: None,
+            }));
 
-        let scanner = match fluss_table.new_scan().create_log_scanner() {
-            Ok(a) => a,
-            Err(e) => return Err(format!("Failed to create log scanner: {e}")),
-        };
-        let scanner = Box::into_raw(Box::new(LogScanner { inner: scanner }));
-        Ok(scanner)
+            Ok(scanner_ptr)
+        })
     }
 
     fn new_log_scanner_with_projection(
         &self,
         column_indices: Vec<usize>,
     ) -> Result<*mut LogScanner, String> {
-        let fluss_table = fcore::client::FlussTable::new(
-            &self.connection,
-            self.metadata.clone(),
-            self.table_info.clone(),
-        );
+        RUNTIME.block_on(async {
+            let fluss_table = fcore::client::FlussTable::new(
+                &self.connection,
+                self.metadata.clone(),
+                self.table_info.clone(),
+            );
+
+            let scan = fluss_table.new_scan();
+            let scan = match scan.project(&column_indices) {
+                Ok(s) => s,
+                Err(e) => return Err(format!("Failed to project columns: 
{e}")),
+            };
+            let scanner = match scan.create_log_scanner() {
+                Ok(a) => a,
+                Err(e) => return Err(format!("Failed to create log scanner: 
{e}")),
+            };
+            let scanner = Box::into_raw(Box::new(LogScanner {
+                inner: Some(scanner),
+                inner_batch: None,
+            }));
+            Ok(scanner)
+        })
+    }
+
+    fn new_record_batch_log_scanner(&self) -> Result<*mut LogScanner, String> {
+        RUNTIME.block_on(async {
+            let fluss_table = fcore::client::FlussTable::new(
+                &self.connection,
+                self.metadata.clone(),
+                self.table_info.clone(),
+            );
+
+            let scanner = match 
fluss_table.new_scan().create_record_batch_log_scanner() {
+                Ok(a) => a,
+                Err(e) => return Err(format!("Failed to create record batch 
log scanner: {e}")),
+            };
+            let scanner = Box::into_raw(Box::new(LogScanner {
+                inner: None,
+                inner_batch: Some(scanner),
+            }));
+            Ok(scanner)
+        })
+    }
 
-        let scan = fluss_table.new_scan();
-        let scan = match scan.project(&column_indices) {
-            Ok(s) => s,
-            Err(e) => return Err(format!("Failed to project columns: {e}")),
-        };
-        let scanner = match scan.create_log_scanner() {
-            Ok(a) => a,
-            Err(e) => return Err(format!("Failed to create log scanner: {e}")),
-        };
-        let scanner = Box::into_raw(Box::new(LogScanner { inner: scanner }));
-        Ok(scanner)
+    fn new_record_batch_log_scanner_with_projection(
+        &self,
+        column_indices: Vec<usize>,
+    ) -> Result<*mut LogScanner, String> {
+        RUNTIME.block_on(async {
+            let fluss_table = fcore::client::FlussTable::new(
+                &self.connection,
+                self.metadata.clone(),
+                self.table_info.clone(),
+            );
+
+            let scan = fluss_table.new_scan();
+            let scan = match scan.project(&column_indices) {
+                Ok(s) => s,
+                Err(e) => return Err(format!("Failed to project columns: 
{e}")),
+            };
+            let scanner = match scan.create_record_batch_log_scanner() {
+                Ok(a) => a,
+                Err(e) => return Err(format!("Failed to create record batch 
log scanner: {e}")),
+            };
+            let scanner = Box::into_raw(Box::new(LogScanner {
+                inner: None,
+                inner_batch: Some(scanner),
+            }));
+            Ok(scanner)
+        })
     }
 
     fn get_table_info_from_table(&self) -> ffi::FfiTableInfo {
@@ -644,14 +730,36 @@ unsafe fn delete_log_scanner(scanner: *mut LogScanner) {
     }
 }
 
+// Helper function to free the Arrow FFI structures separately (for use after 
ImportRecordBatch)
+pub extern "C" fn free_arrow_ffi_structures(array_ptr: usize, schema_ptr: 
usize) {
+    use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
+    if array_ptr != 0 {
+        let _array = unsafe { Box::from_raw(array_ptr as *mut FFI_ArrowArray) 
};
+    }
+    if schema_ptr != 0 {
+        let _schema = unsafe { Box::from_raw(schema_ptr as *mut 
FFI_ArrowSchema) };
+    }
+}
+
 impl LogScanner {
     fn subscribe(&self, bucket_id: i32, start_offset: i64) -> ffi::FfiResult {
-        let result =
-            RUNTIME.block_on(async { self.inner.subscribe(bucket_id, 
start_offset).await });
+        if let Some(ref inner) = self.inner {
+            let result = RUNTIME.block_on(async { inner.subscribe(bucket_id, 
start_offset).await });
 
-        match result {
-            Ok(_) => ok_result(),
-            Err(e) => err_result(1, e.to_string()),
+            match result {
+                Ok(_) => ok_result(),
+                Err(e) => err_result(1, e.to_string()),
+            }
+        } else if let Some(ref inner_batch) = self.inner_batch {
+            let result =
+                RUNTIME.block_on(async { inner_batch.subscribe(bucket_id, 
start_offset).await });
+
+            match result {
+                Ok(_) => ok_result(),
+                Err(e) => err_result(1, e.to_string()),
+            }
+        } else {
+            err_result(1, "LogScanner not initialized".to_string())
         }
     }
 
@@ -662,27 +770,75 @@ impl LogScanner {
             bucket_offsets.insert(sub.bucket_id, sub.offset);
         }
 
-        let result = RUNTIME.block_on(async { 
self.inner.subscribe_batch(&bucket_offsets).await });
+        if let Some(ref inner) = self.inner {
+            let result = RUNTIME.block_on(async { 
inner.subscribe_batch(&bucket_offsets).await });
 
-        match result {
-            Ok(_) => ok_result(),
-            Err(e) => err_result(1, e.to_string()),
+            match result {
+                Ok(_) => ok_result(),
+                Err(e) => err_result(1, e.to_string()),
+            }
+        } else if let Some(ref inner_batch) = self.inner_batch {
+            let result =
+                RUNTIME.block_on(async { 
inner_batch.subscribe_batch(&bucket_offsets).await });
+
+            match result {
+                Ok(_) => ok_result(),
+                Err(e) => err_result(1, e.to_string()),
+            }
+        } else {
+            err_result(1, "LogScanner not initialized".to_string())
         }
     }
 
     fn poll(&self, timeout_ms: i64) -> ffi::FfiScanRecordsResult {
-        let timeout = Duration::from_millis(timeout_ms as u64);
-        let result = RUNTIME.block_on(async { self.inner.poll(timeout).await 
});
+        if let Some(ref inner) = self.inner {
+            let timeout = Duration::from_millis(timeout_ms as u64);
+            let result = RUNTIME.block_on(async { inner.poll(timeout).await });
 
-        match result {
-            Ok(records) => ffi::FfiScanRecordsResult {
-                result: ok_result(),
-                scan_records: types::core_scan_records_to_ffi(&records),
-            },
-            Err(e) => ffi::FfiScanRecordsResult {
-                result: err_result(1, e.to_string()),
+            match result {
+                Ok(records) => ffi::FfiScanRecordsResult {
+                    result: ok_result(),
+                    scan_records: types::core_scan_records_to_ffi(&records),
+                },
+                Err(e) => ffi::FfiScanRecordsResult {
+                    result: err_result(1, e.to_string()),
+                    scan_records: ffi::FfiScanRecords { records: vec![] },
+                },
+            }
+        } else {
+            ffi::FfiScanRecordsResult {
+                result: err_result(1, "Record-based scanner not 
available".to_string()),
                 scan_records: ffi::FfiScanRecords { records: vec![] },
-            },
+            }
+        }
+    }
+
+    fn poll_record_batch(&self, timeout_ms: i64) -> 
ffi::FfiArrowRecordBatchesResult {
+        if let Some(ref inner_batch) = self.inner_batch {
+            let timeout = Duration::from_millis(timeout_ms as u64);
+            let result = RUNTIME.block_on(async { 
inner_batch.poll(timeout).await });
+
+            match result {
+                Ok(batches) => match types::core_scan_batches_to_ffi(&batches) 
{
+                    Ok(arrow_batches) => ffi::FfiArrowRecordBatchesResult {
+                        result: ok_result(),
+                        arrow_batches,
+                    },
+                    Err(e) => ffi::FfiArrowRecordBatchesResult {
+                        result: err_result(1, e),
+                        arrow_batches: ffi::FfiArrowRecordBatches { batches: 
vec![] },
+                    },
+                },
+                Err(e) => ffi::FfiArrowRecordBatchesResult {
+                    result: err_result(1, e.to_string()),
+                    arrow_batches: ffi::FfiArrowRecordBatches { batches: 
vec![] },
+                },
+            }
+        } else {
+            ffi::FfiArrowRecordBatchesResult {
+                result: err_result(1, "Batch-based scanner not 
available".to_string()),
+                arrow_batches: ffi::FfiArrowRecordBatches { batches: vec![] },
+            }
         }
     }
 }
diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp
index d42e1a2..118ca3c 100644
--- a/bindings/cpp/src/table.cpp
+++ b/bindings/cpp/src/table.cpp
@@ -21,6 +21,10 @@
 #include "lib.rs.h"
 #include "ffi_converter.hpp"
 #include "rust/cxx.h"
+#include <arrow/c/bridge.h>
+// todo:  bindings/cpp/BUILD.bazel still doesn’t declare Arrow include/link 
dependencies.
+// In environments where Bazel does not already have Arrow available, this 
will fail at compile/link time.
+#include <arrow/record_batch.h>
 
 namespace fluss {
 
@@ -101,6 +105,40 @@ Result Table::NewLogScannerWithProjection(const 
std::vector<size_t>& column_indi
     }
 }
 
+Result Table::NewRecordBatchLogScanner(LogScanner& out) {
+    if (!Available()) {
+        return utils::make_error(1, "Table not available");
+    }
+
+    try {
+        out.scanner_ = table_->new_record_batch_log_scanner();
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
+Result Table::NewRecordBatchLogScannerWithProjection(const 
std::vector<size_t>& column_indices, LogScanner& out) {
+    if (!Available()) {
+        return utils::make_error(1, "Table not available");
+    }
+
+    try {
+        rust::Vec<size_t> rust_indices;
+        for (size_t idx : column_indices) {
+            rust_indices.push_back(idx);
+        }
+        out.scanner_ = 
table_->new_record_batch_log_scanner_with_projection(std::move(rust_indices));
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
 TableInfo Table::GetTableInfo() const {
     if (!Available()) {
         return TableInfo{};
@@ -242,4 +280,93 @@ Result LogScanner::Poll(int64_t timeout_ms, ScanRecords& 
out) {
     return utils::make_ok();
 }
 
+ArrowRecordBatch::ArrowRecordBatch(
+    std::shared_ptr<arrow::RecordBatch> batch,
+    int64_t table_id,
+    int64_t partition_id,
+    int32_t bucket_id,
+    int64_t base_offset) noexcept
+    : batch_(std::move(batch)),
+      table_id_(table_id),
+      partition_id_(partition_id),
+      bucket_id_(bucket_id),
+      base_offset_(base_offset) {}
+
+bool ArrowRecordBatch::Available() const { return batch_ != nullptr; }
+
+int64_t ArrowRecordBatch::NumRows() const {
+    if (!Available()) return 0;
+    return batch_->num_rows();
+}
+
+
+int64_t ArrowRecordBatch::GetTableId() const {
+    if (!Available()) return 0;
+    return this->table_id_;
+}
+
+int64_t ArrowRecordBatch::GetPartitionId() const {
+    if (!Available()) return -1;
+    return this->partition_id_;
+}
+
+int32_t ArrowRecordBatch::GetBucketId() const {
+    if (!Available()) return -1;
+    return this->bucket_id_;
+}
+
+int64_t ArrowRecordBatch::GetBaseOffset() const {
+    if (!Available()) return -1;
+    return this->base_offset_;
+}
+
+int64_t ArrowRecordBatch::GetLastOffset() const {
+    if (!Available()) return -1;
+    return this->base_offset_ + this->NumRows() - 1;
+}
+
+Result LogScanner::PollRecordBatch(int64_t timeout_ms, ArrowRecordBatches& 
out) {
+    if (!Available()) {
+        return utils::make_error(1, "LogScanner not available");
+    }
+
+    auto ffi_result = scanner_->poll_record_batch(timeout_ms);
+    auto result = utils::from_ffi_result(ffi_result.result);
+    if (!result.Ok()) {
+        return result;
+    }
+
+    // Convert the FFI Arrow record batches to C++ ArrowRecordBatch objects
+    out.batches.clear();
+    for (const auto& ffi_batch : ffi_result.arrow_batches.batches) {
+        auto* c_array = reinterpret_cast<struct 
ArrowArray*>(ffi_batch.array_ptr);
+        auto* c_schema = reinterpret_cast<struct 
ArrowSchema*>(ffi_batch.schema_ptr);
+
+        auto import_result = arrow::ImportRecordBatch(c_array, c_schema);
+        if (import_result.ok()) {
+            auto batch_ptr = import_result.ValueOrDie();
+            auto batch_wrapper = std::unique_ptr<ArrowRecordBatch>(new 
ArrowRecordBatch(
+                std::move(batch_ptr),
+                ffi_batch.table_id,
+                ffi_batch.partition_id,
+                ffi_batch.bucket_id,
+                ffi_batch.base_offset
+            ));
+            out.batches.push_back(std::move(batch_wrapper));
+            
+            // Free the container structures that were allocated in Rust after 
successful import
+            ffi::free_arrow_ffi_structures(ffi_batch.array_ptr, 
ffi_batch.schema_ptr);
+        } else {
+            // Import failed, free the container structures to avoid leaks and 
return error
+            ffi::free_arrow_ffi_structures(ffi_batch.array_ptr, 
ffi_batch.schema_ptr);
+            
+            // Return an error indicating that the import failed
+            std::string error_msg = "Failed to import Arrow record batch: " + 
import_result.status().ToString();
+            return utils::make_error(1, error_msg);
+        }
+    }
+    
+    return utils::make_ok();
+}
+
 }  // namespace fluss
diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs
index 726e3d1..91d6e26 100644
--- a/bindings/cpp/src/types.rs
+++ b/bindings/cpp/src/types.rs
@@ -23,10 +23,13 @@ use arrow::array::{
     TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
 };
 use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
+use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
 use fcore::row::InternalRow;
 use fluss as fcore;
 use std::borrow::Cow;
 
+use arrow::array::Array;
+
 pub const DATA_TYPE_BOOLEAN: i32 = 1;
 pub const DATA_TYPE_TINYINT: i32 = 2;
 pub const DATA_TYPE_SMALLINT: i32 = 3;
@@ -478,3 +481,31 @@ pub fn core_lake_snapshot_to_ffi(snapshot: 
&fcore::metadata::LakeSnapshot) -> ff
         bucket_offsets,
     }
 }
+
+pub fn core_scan_batches_to_ffi(
+    batches: &[fcore::record::ScanBatch],
+) -> Result<ffi::FfiArrowRecordBatches, String> {
+    let mut ffi_batches = Vec::new();
+    for batch in batches {
+        let record_batch = batch.batch();
+        // Convert RecordBatch to StructArray first, then get the data
+        let struct_array = 
arrow::array::StructArray::from(record_batch.clone());
+        let ffi_array = 
Box::new(FFI_ArrowArray::new(&struct_array.into_data()));
+        let ffi_schema = Box::new(
+            
FFI_ArrowSchema::try_from(record_batch.schema().as_ref()).map_err(|e| 
e.to_string())?,
+        );
+        // Export as raw pointers
+        ffi_batches.push(ffi::FfiArrowRecordBatch {
+            array_ptr: Box::into_raw(ffi_array) as usize,
+            schema_ptr: Box::into_raw(ffi_schema) as usize,
+            table_id: batch.bucket().table_id(),
+            partition_id: batch.bucket().partition_id().unwrap_or(-1),
+            bucket_id: batch.bucket().bucket_id(),
+            base_offset: batch.base_offset(),
+        });
+    }
+
+    Ok(ffi::FfiArrowRecordBatches {
+        batches: ffi_batches,
+    })
+}

Reply via email to