This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new e5adc3d feat: introduce scan record batch in cpp (#230)
e5adc3d is described below
commit e5adc3db21e9ea86541e6d199081c5d920120cda
Author: yuxia Luo <[email protected]>
AuthorDate: Tue Feb 3 22:08:17 2026 +0800
feat: introduce scan record batch in cpp (#230)
---
bindings/cpp/.gitignore | 1 +
bindings/cpp/CMakeLists.txt | 8 +-
bindings/cpp/Cargo.toml | 2 +-
bindings/cpp/examples/example.cpp | 58 ++++++++-
bindings/cpp/include/fluss.hpp | 55 ++++++++-
bindings/cpp/src/lib.rs | 250 +++++++++++++++++++++++++++++++-------
bindings/cpp/src/table.cpp | 127 +++++++++++++++++++
bindings/cpp/src/types.rs | 31 +++++
8 files changed, 479 insertions(+), 53 deletions(-)
diff --git a/bindings/cpp/.gitignore b/bindings/cpp/.gitignore
index 43f761c..da15a58 100644
--- a/bindings/cpp/.gitignore
+++ b/bindings/cpp/.gitignore
@@ -1,5 +1,6 @@
build/
cmake-build-*/
+CMakeFiles/
.idea/
*.o
*.a
diff --git a/bindings/cpp/CMakeLists.txt b/bindings/cpp/CMakeLists.txt
index 629f3f0..93cfc41 100644
--- a/bindings/cpp/CMakeLists.txt
+++ b/bindings/cpp/CMakeLists.txt
@@ -29,6 +29,8 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
find_package(Threads REQUIRED)
+find_package(Arrow REQUIRED)
+
if (NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE Debug)
endif()
@@ -88,12 +90,16 @@ target_sources(fluss_cpp PRIVATE ${RUST_HEADER_FILE})
target_include_directories(fluss_cpp PUBLIC ${CPP_INCLUDE_DIR})
target_link_libraries(fluss_cpp PUBLIC ${RUST_LIB})
target_link_libraries(fluss_cpp PRIVATE ${CMAKE_DL_LIBS} Threads::Threads)
+target_link_libraries(fluss_cpp PUBLIC Arrow::arrow_shared)
+target_compile_definitions(fluss_cpp PRIVATE ARROW_FOUND)
if(APPLE)
target_link_libraries(fluss_cpp PUBLIC "-framework CoreFoundation"
"-framework Security")
endif()
add_executable(fluss_cpp_example examples/example.cpp)
-target_link_libraries(fluss_cpp_example fluss_cpp)
+target_link_libraries(fluss_cpp_example PRIVATE fluss_cpp)
+target_link_libraries(fluss_cpp_example PRIVATE Arrow::arrow_shared)
+target_compile_definitions(fluss_cpp_example PRIVATE ARROW_FOUND)
target_include_directories(fluss_cpp_example PUBLIC ${CPP_INCLUDE_DIR})
set_target_properties(fluss_cpp
diff --git a/bindings/cpp/Cargo.toml b/bindings/cpp/Cargo.toml
index 2d3d913..0bbcbf0 100644
--- a/bindings/cpp/Cargo.toml
+++ b/bindings/cpp/Cargo.toml
@@ -27,7 +27,7 @@ crate-type = ["staticlib"]
[dependencies]
anyhow = "1.0"
-arrow = { workspace = true }
+arrow = { workspace = true, features = ["ffi"] }
cxx = "1.0"
fluss = { path = "../../crates/fluss" }
tokio = { version = "1.27", features = ["rt-multi-thread", "macros"] }
diff --git a/bindings/cpp/examples/example.cpp
b/bindings/cpp/examples/example.cpp
index 6ff2b9b..45f7f9e 100644
--- a/bindings/cpp/examples/example.cpp
+++ b/bindings/cpp/examples/example.cpp
@@ -15,12 +15,14 @@
// specific language governing permissions and limitations
// under the License.
-#include "fluss.hpp"
+#include <arrow/record_batch.h>
+#include <chrono>
#include <iostream>
-#include <vector>
#include <unordered_map>
-#include <chrono>
+#include <vector>
+
+#include "fluss.hpp"
static void check(const char* step, const fluss::Result& r) {
if (!r.Ok()) {
@@ -179,6 +181,7 @@ int main() {
// 8.1) Query earliest offsets for all buckets
std::vector<int32_t> all_bucket_ids;
+ all_bucket_ids.reserve(buckets);
for (int b = 0; b < buckets; ++b) {
all_bucket_ids.push_back(b);
}
@@ -250,5 +253,54 @@ int main() {
std::cout << " ... and " << (batch_records.Size() - 5) << " more
records" << std::endl;
}
+ // 9) Test the new Arrow record batch polling functionality
+ std::cout << "\n=== Testing Arrow Record Batch Polling ===" << std::endl;
+
+ fluss::LogScanner arrow_scanner;
+ check("new_record_batch_log_scanner",
table.NewRecordBatchLogScanner(arrow_scanner));
+
+ // Subscribe to all buckets starting from offset 0
+ for (int b = 0; b < buckets; ++b) {
+ check("subscribe_arrow", arrow_scanner.Subscribe(b, 0));
+ }
+
+ fluss::ArrowRecordBatches arrow_batches;
+ check("poll_record_batch", arrow_scanner.PollRecordBatch(5000,
arrow_batches));
+
+ std::cout << "Polled " << arrow_batches.Size() << " Arrow record batches"
<< std::endl;
+ for (size_t i = 0; i < arrow_batches.Size(); ++i) {
+ const auto& batch = arrow_batches[i];
+ if (batch->Available()) {
+ std::cout << " Batch " << i << ": " <<
batch->GetArrowRecordBatch()->num_rows() << " rows. " << std::endl;
+ } else {
+ std::cout << " Batch " << i << ": not available" << std::endl;
+ }
+ }
+
+ // 10) Test the new Arrow record batch polling with projection
+ std::cout << "\n=== Testing Arrow Record Batch Polling with Projection
===" << std::endl;
+
+ fluss::LogScanner projected_arrow_scanner;
+ check("new_record_batch_log_scanner_with_projection",
+ table.NewRecordBatchLogScannerWithProjection(projected_columns,
projected_arrow_scanner));
+
+ // Subscribe to all buckets starting from offset 0
+ for (int b = 0; b < buckets; ++b) {
+ check("subscribe_projected_arrow",
projected_arrow_scanner.Subscribe(b, 0));
+ }
+
+ fluss::ArrowRecordBatches projected_arrow_batches;
+ check("poll_projected_record_batch",
projected_arrow_scanner.PollRecordBatch(5000, projected_arrow_batches));
+
+ std::cout << "Polled " << projected_arrow_batches.Size() << " projected
Arrow record batches" << std::endl;
+ for (size_t i = 0; i < projected_arrow_batches.Size(); ++i) {
+ const auto& batch = projected_arrow_batches[i];
+ if (batch->Available()) {
+ std::cout << " Batch " << i << ": " <<
batch->GetArrowRecordBatch()->num_rows() << " rows " << std::endl;
+ } else {
+ std::cout << " Batch " << i << ": not available" << std::endl;
+ }
+ }
+
return 0;
}
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index 4ef3fe1..968cb06 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -19,13 +19,17 @@
#pragma once
-#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include <unordered_map>
#include <vector>
+// Forward declare Arrow classes to avoid including heavy Arrow headers in
header
+namespace arrow {
+ class RecordBatch;
+}
+
namespace fluss {
namespace ffi {
@@ -336,6 +340,52 @@ struct ScanRecords {
auto end() const { return records.end(); }
};
+class ArrowRecordBatch {
+public:
+
+ std::shared_ptr<arrow::RecordBatch> GetArrowRecordBatch() const { return
batch_; }
+
+ bool Available() const;
+
+ // Get number of rows in the batch
+ int64_t NumRows() const;
+
+ // Get ScanBatch metadata
+ int64_t GetTableId() const;
+ int64_t GetPartitionId() const;
+ int32_t GetBucketId() const;
+ int64_t GetBaseOffset() const;
+ int64_t GetLastOffset() const;
+
+private:
+ friend class LogScanner;
+ explicit ArrowRecordBatch(
+ std::shared_ptr<arrow::RecordBatch> batch,
+ int64_t table_id,
+ int64_t partition_id,
+ int32_t bucket_id,
+ int64_t base_offset) noexcept;
+
+ std::shared_ptr<arrow::RecordBatch> batch_{nullptr};
+
+ int64_t table_id_;
+ int64_t partition_id_;
+ int32_t bucket_id_;
+ int64_t base_offset_;
+};
+
+
+struct ArrowRecordBatches {
+ std::vector<std::unique_ptr<ArrowRecordBatch>> batches;
+
+ size_t Size() const { return batches.size(); }
+ bool Empty() const { return batches.empty(); }
+ const std::unique_ptr<ArrowRecordBatch>& operator[](size_t idx) const {
return batches[idx]; }
+
+ auto begin() const { return batches.begin(); }
+ auto end() const { return batches.end(); }
+};
+
struct BucketOffset {
int64_t table_id;
int64_t partition_id;
@@ -442,6 +492,8 @@ public:
Result NewAppendWriter(AppendWriter& out);
Result NewLogScanner(LogScanner& out);
Result NewLogScannerWithProjection(const std::vector<size_t>&
column_indices, LogScanner& out);
+ Result NewRecordBatchLogScanner(LogScanner& out);
+ Result NewRecordBatchLogScannerWithProjection(const std::vector<size_t>&
column_indices, LogScanner& out);
TableInfo GetTableInfo() const;
TablePath GetTablePath() const;
@@ -493,6 +545,7 @@ public:
Result Subscribe(int32_t bucket_id, int64_t start_offset);
Result Subscribe(const std::vector<BucketSubscription>& bucket_offsets);
Result Poll(int64_t timeout_ms, ScanRecords& out);
+ Result PollRecordBatch(int64_t timeout_ms, ArrowRecordBatches& out);
private:
friend class Table;
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index b327ba5..e083598 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -119,6 +119,24 @@ mod ffi {
scan_records: FfiScanRecords,
}
+ struct FfiArrowRecordBatch {
+ array_ptr: usize,
+ schema_ptr: usize,
+ table_id: i64,
+ partition_id: i64,
+ bucket_id: i32,
+ base_offset: i64,
+ }
+
+ struct FfiArrowRecordBatches {
+ batches: Vec<FfiArrowRecordBatch>,
+ }
+
+ struct FfiArrowRecordBatchesResult {
+ result: FfiResult,
+ arrow_batches: FfiArrowRecordBatches,
+ }
+
struct FfiLakeSnapshot {
snapshot_id: i64,
bucket_offsets: Vec<FfiBucketOffset>,
@@ -209,6 +227,11 @@ mod ffi {
self: &Table,
column_indices: Vec<usize>,
) -> Result<*mut LogScanner>;
+ fn new_record_batch_log_scanner(self: &Table) -> Result<*mut
LogScanner>;
+ fn new_record_batch_log_scanner_with_projection(
+ self: &Table,
+ column_indices: Vec<usize>,
+ ) -> Result<*mut LogScanner>;
fn get_table_info_from_table(self: &Table) -> FfiTableInfo;
fn get_table_path(self: &Table) -> FfiTablePath;
fn has_primary_key(self: &Table) -> bool;
@@ -226,6 +249,8 @@ mod ffi {
subscriptions: Vec<FfiBucketSubscription>,
) -> FfiResult;
fn poll(self: &LogScanner, timeout_ms: i64) -> FfiScanRecordsResult;
+ fn poll_record_batch(self: &LogScanner, timeout_ms: i64) ->
FfiArrowRecordBatchesResult;
+ fn free_arrow_ffi_structures(array_ptr: usize, schema_ptr: usize);
}
}
@@ -252,7 +277,8 @@ pub struct AppendWriter {
}
pub struct LogScanner {
- inner: fcore::client::LogScanner,
+ inner: Option<fcore::client::LogScanner>,
+ inner_batch: Option<fcore::client::RecordBatchLogScanner>,
}
fn ok_result() -> ffi::FfiResult {
@@ -551,41 +577,101 @@ impl Table {
}
fn new_log_scanner(&self) -> Result<*mut LogScanner, String> {
- let fluss_table = fcore::client::FlussTable::new(
- &self.connection,
- self.metadata.clone(),
- self.table_info.clone(),
- );
+ RUNTIME.block_on(async {
+ let fluss_table = fcore::client::FlussTable::new(
+ &self.connection,
+ self.metadata.clone(),
+ self.table_info.clone(),
+ );
+
+ let scanner = match fluss_table.new_scan().create_log_scanner() {
+ Ok(a) => a,
+ Err(e) => return Err(format!("Failed to create log scanner:
{e}")),
+ };
+
+ let scanner_ptr = Box::into_raw(Box::new(LogScanner {
+ inner: Some(scanner),
+ inner_batch: None,
+ }));
- let scanner = match fluss_table.new_scan().create_log_scanner() {
- Ok(a) => a,
- Err(e) => return Err(format!("Failed to create log scanner: {e}")),
- };
- let scanner = Box::into_raw(Box::new(LogScanner { inner: scanner }));
- Ok(scanner)
+ Ok(scanner_ptr)
+ })
}
fn new_log_scanner_with_projection(
&self,
column_indices: Vec<usize>,
) -> Result<*mut LogScanner, String> {
- let fluss_table = fcore::client::FlussTable::new(
- &self.connection,
- self.metadata.clone(),
- self.table_info.clone(),
- );
+ RUNTIME.block_on(async {
+ let fluss_table = fcore::client::FlussTable::new(
+ &self.connection,
+ self.metadata.clone(),
+ self.table_info.clone(),
+ );
+
+ let scan = fluss_table.new_scan();
+ let scan = match scan.project(&column_indices) {
+ Ok(s) => s,
+ Err(e) => return Err(format!("Failed to project columns:
{e}")),
+ };
+ let scanner = match scan.create_log_scanner() {
+ Ok(a) => a,
+ Err(e) => return Err(format!("Failed to create log scanner:
{e}")),
+ };
+ let scanner = Box::into_raw(Box::new(LogScanner {
+ inner: Some(scanner),
+ inner_batch: None,
+ }));
+ Ok(scanner)
+ })
+ }
+
+ fn new_record_batch_log_scanner(&self) -> Result<*mut LogScanner, String> {
+ RUNTIME.block_on(async {
+ let fluss_table = fcore::client::FlussTable::new(
+ &self.connection,
+ self.metadata.clone(),
+ self.table_info.clone(),
+ );
+
+ let scanner = match
fluss_table.new_scan().create_record_batch_log_scanner() {
+ Ok(a) => a,
+ Err(e) => return Err(format!("Failed to create record batch
log scanner: {e}")),
+ };
+ let scanner = Box::into_raw(Box::new(LogScanner {
+ inner: None,
+ inner_batch: Some(scanner),
+ }));
+ Ok(scanner)
+ })
+ }
- let scan = fluss_table.new_scan();
- let scan = match scan.project(&column_indices) {
- Ok(s) => s,
- Err(e) => return Err(format!("Failed to project columns: {e}")),
- };
- let scanner = match scan.create_log_scanner() {
- Ok(a) => a,
- Err(e) => return Err(format!("Failed to create log scanner: {e}")),
- };
- let scanner = Box::into_raw(Box::new(LogScanner { inner: scanner }));
- Ok(scanner)
+ fn new_record_batch_log_scanner_with_projection(
+ &self,
+ column_indices: Vec<usize>,
+ ) -> Result<*mut LogScanner, String> {
+ RUNTIME.block_on(async {
+ let fluss_table = fcore::client::FlussTable::new(
+ &self.connection,
+ self.metadata.clone(),
+ self.table_info.clone(),
+ );
+
+ let scan = fluss_table.new_scan();
+ let scan = match scan.project(&column_indices) {
+ Ok(s) => s,
+ Err(e) => return Err(format!("Failed to project columns:
{e}")),
+ };
+ let scanner = match scan.create_record_batch_log_scanner() {
+ Ok(a) => a,
+ Err(e) => return Err(format!("Failed to create record batch
log scanner: {e}")),
+ };
+ let scanner = Box::into_raw(Box::new(LogScanner {
+ inner: None,
+ inner_batch: Some(scanner),
+ }));
+ Ok(scanner)
+ })
}
fn get_table_info_from_table(&self) -> ffi::FfiTableInfo {
@@ -644,14 +730,36 @@ unsafe fn delete_log_scanner(scanner: *mut LogScanner) {
}
}
+// Helper function to free the Arrow FFI structures separately (for use after
ImportRecordBatch)
+pub extern "C" fn free_arrow_ffi_structures(array_ptr: usize, schema_ptr:
usize) {
+ use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
+ if array_ptr != 0 {
+ let _array = unsafe { Box::from_raw(array_ptr as *mut FFI_ArrowArray)
};
+ }
+ if schema_ptr != 0 {
+ let _schema = unsafe { Box::from_raw(schema_ptr as *mut
FFI_ArrowSchema) };
+ }
+}
+
impl LogScanner {
fn subscribe(&self, bucket_id: i32, start_offset: i64) -> ffi::FfiResult {
- let result =
- RUNTIME.block_on(async { self.inner.subscribe(bucket_id,
start_offset).await });
+ if let Some(ref inner) = self.inner {
+ let result = RUNTIME.block_on(async { inner.subscribe(bucket_id,
start_offset).await });
- match result {
- Ok(_) => ok_result(),
- Err(e) => err_result(1, e.to_string()),
+ match result {
+ Ok(_) => ok_result(),
+ Err(e) => err_result(1, e.to_string()),
+ }
+ } else if let Some(ref inner_batch) = self.inner_batch {
+ let result =
+ RUNTIME.block_on(async { inner_batch.subscribe(bucket_id,
start_offset).await });
+
+ match result {
+ Ok(_) => ok_result(),
+ Err(e) => err_result(1, e.to_string()),
+ }
+ } else {
+ err_result(1, "LogScanner not initialized".to_string())
}
}
@@ -662,27 +770,75 @@ impl LogScanner {
bucket_offsets.insert(sub.bucket_id, sub.offset);
}
- let result = RUNTIME.block_on(async {
self.inner.subscribe_batch(&bucket_offsets).await });
+ if let Some(ref inner) = self.inner {
+ let result = RUNTIME.block_on(async {
inner.subscribe_batch(&bucket_offsets).await });
- match result {
- Ok(_) => ok_result(),
- Err(e) => err_result(1, e.to_string()),
+ match result {
+ Ok(_) => ok_result(),
+ Err(e) => err_result(1, e.to_string()),
+ }
+ } else if let Some(ref inner_batch) = self.inner_batch {
+ let result =
+ RUNTIME.block_on(async {
inner_batch.subscribe_batch(&bucket_offsets).await });
+
+ match result {
+ Ok(_) => ok_result(),
+ Err(e) => err_result(1, e.to_string()),
+ }
+ } else {
+ err_result(1, "LogScanner not initialized".to_string())
}
}
fn poll(&self, timeout_ms: i64) -> ffi::FfiScanRecordsResult {
- let timeout = Duration::from_millis(timeout_ms as u64);
- let result = RUNTIME.block_on(async { self.inner.poll(timeout).await
});
+ if let Some(ref inner) = self.inner {
+ let timeout = Duration::from_millis(timeout_ms as u64);
+ let result = RUNTIME.block_on(async { inner.poll(timeout).await });
- match result {
- Ok(records) => ffi::FfiScanRecordsResult {
- result: ok_result(),
- scan_records: types::core_scan_records_to_ffi(&records),
- },
- Err(e) => ffi::FfiScanRecordsResult {
- result: err_result(1, e.to_string()),
+ match result {
+ Ok(records) => ffi::FfiScanRecordsResult {
+ result: ok_result(),
+ scan_records: types::core_scan_records_to_ffi(&records),
+ },
+ Err(e) => ffi::FfiScanRecordsResult {
+ result: err_result(1, e.to_string()),
+ scan_records: ffi::FfiScanRecords { records: vec![] },
+ },
+ }
+ } else {
+ ffi::FfiScanRecordsResult {
+ result: err_result(1, "Record-based scanner not
available".to_string()),
scan_records: ffi::FfiScanRecords { records: vec![] },
- },
+ }
+ }
+ }
+
+ fn poll_record_batch(&self, timeout_ms: i64) ->
ffi::FfiArrowRecordBatchesResult {
+ if let Some(ref inner_batch) = self.inner_batch {
+ let timeout = Duration::from_millis(timeout_ms as u64);
+ let result = RUNTIME.block_on(async {
inner_batch.poll(timeout).await });
+
+ match result {
+ Ok(batches) => match types::core_scan_batches_to_ffi(&batches)
{
+ Ok(arrow_batches) => ffi::FfiArrowRecordBatchesResult {
+ result: ok_result(),
+ arrow_batches,
+ },
+ Err(e) => ffi::FfiArrowRecordBatchesResult {
+ result: err_result(1, e),
+ arrow_batches: ffi::FfiArrowRecordBatches { batches:
vec![] },
+ },
+ },
+ Err(e) => ffi::FfiArrowRecordBatchesResult {
+ result: err_result(1, e.to_string()),
+ arrow_batches: ffi::FfiArrowRecordBatches { batches:
vec![] },
+ },
+ }
+ } else {
+ ffi::FfiArrowRecordBatchesResult {
+ result: err_result(1, "Batch-based scanner not
available".to_string()),
+ arrow_batches: ffi::FfiArrowRecordBatches { batches: vec![] },
+ }
}
}
}
diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp
index d42e1a2..118ca3c 100644
--- a/bindings/cpp/src/table.cpp
+++ b/bindings/cpp/src/table.cpp
@@ -21,6 +21,10 @@
#include "lib.rs.h"
#include "ffi_converter.hpp"
#include "rust/cxx.h"
+#include <arrow/c/bridge.h>
+// todo: bindings/cpp/BUILD.bazel still doesn’t declare Arrow include/link
dependencies.
+// In environments where Bazel does not already have Arrow available, this
will fail at compile/link time.
+#include <arrow/record_batch.h>
namespace fluss {
@@ -101,6 +105,40 @@ Result Table::NewLogScannerWithProjection(const
std::vector<size_t>& column_indi
}
}
+Result Table::NewRecordBatchLogScanner(LogScanner& out) {
+ if (!Available()) {
+ return utils::make_error(1, "Table not available");
+ }
+
+ try {
+ out.scanner_ = table_->new_record_batch_log_scanner();
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
+Result Table::NewRecordBatchLogScannerWithProjection(const
std::vector<size_t>& column_indices, LogScanner& out) {
+ if (!Available()) {
+ return utils::make_error(1, "Table not available");
+ }
+
+ try {
+ rust::Vec<size_t> rust_indices;
+ for (size_t idx : column_indices) {
+ rust_indices.push_back(idx);
+ }
+ out.scanner_ =
table_->new_record_batch_log_scanner_with_projection(std::move(rust_indices));
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
TableInfo Table::GetTableInfo() const {
if (!Available()) {
return TableInfo{};
@@ -242,4 +280,93 @@ Result LogScanner::Poll(int64_t timeout_ms, ScanRecords&
out) {
return utils::make_ok();
}
+ArrowRecordBatch::ArrowRecordBatch(
+ std::shared_ptr<arrow::RecordBatch> batch,
+ int64_t table_id,
+ int64_t partition_id,
+ int32_t bucket_id,
+ int64_t base_offset) noexcept
+ : batch_(std::move(batch)),
+ table_id_(table_id),
+ partition_id_(partition_id),
+ bucket_id_(bucket_id),
+ base_offset_(base_offset) {}
+
+bool ArrowRecordBatch::Available() const { return batch_ != nullptr; }
+
+int64_t ArrowRecordBatch::NumRows() const {
+ if (!Available()) return 0;
+ return batch_->num_rows();
+}
+
+
+int64_t ArrowRecordBatch::GetTableId() const {
+ if (!Available()) return 0;
+ return this->table_id_;
+}
+
+int64_t ArrowRecordBatch::GetPartitionId() const {
+ if (!Available()) return -1;
+ return this->partition_id_;
+}
+
+int32_t ArrowRecordBatch::GetBucketId() const {
+ if (!Available()) return -1;
+ return this->bucket_id_;
+}
+
+int64_t ArrowRecordBatch::GetBaseOffset() const {
+ if (!Available()) return -1;
+ return this->base_offset_;
+}
+
+int64_t ArrowRecordBatch::GetLastOffset() const {
+ if (!Available()) return -1;
+ return this->base_offset_ + this->NumRows() - 1;
+}
+
+Result LogScanner::PollRecordBatch(int64_t timeout_ms, ArrowRecordBatches&
out) {
+ if (!Available()) {
+ return utils::make_error(1, "LogScanner not available");
+ }
+
+ auto ffi_result = scanner_->poll_record_batch(timeout_ms);
+ auto result = utils::from_ffi_result(ffi_result.result);
+ if (!result.Ok()) {
+ return result;
+ }
+
+ // Convert the FFI Arrow record batches to C++ ArrowRecordBatch objects
+ out.batches.clear();
+ for (const auto& ffi_batch : ffi_result.arrow_batches.batches) {
+ auto* c_array = reinterpret_cast<struct
ArrowArray*>(ffi_batch.array_ptr);
+ auto* c_schema = reinterpret_cast<struct
ArrowSchema*>(ffi_batch.schema_ptr);
+
+ auto import_result = arrow::ImportRecordBatch(c_array, c_schema);
+ if (import_result.ok()) {
+ auto batch_ptr = import_result.ValueOrDie();
+ auto batch_wrapper = std::unique_ptr<ArrowRecordBatch>(new
ArrowRecordBatch(
+ std::move(batch_ptr),
+ ffi_batch.table_id,
+ ffi_batch.partition_id,
+ ffi_batch.bucket_id,
+ ffi_batch.base_offset
+ ));
+ out.batches.push_back(std::move(batch_wrapper));
+
+ // Free the container structures that were allocated in Rust after
successful import
+ ffi::free_arrow_ffi_structures(ffi_batch.array_ptr,
ffi_batch.schema_ptr);
+ } else {
+ // Import failed, free the container structures to avoid leaks and
return error
+ ffi::free_arrow_ffi_structures(ffi_batch.array_ptr,
ffi_batch.schema_ptr);
+
+ // Return an error indicating that the import failed
+ std::string error_msg = "Failed to import Arrow record batch: " +
import_result.status().ToString();
+ return utils::make_error(1, error_msg);
+ }
+ }
+
+ return utils::make_ok();
+}
+
} // namespace fluss
diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs
index 726e3d1..91d6e26 100644
--- a/bindings/cpp/src/types.rs
+++ b/bindings/cpp/src/types.rs
@@ -23,10 +23,13 @@ use arrow::array::{
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
};
use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
+use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
use fcore::row::InternalRow;
use fluss as fcore;
use std::borrow::Cow;
+use arrow::array::Array;
+
pub const DATA_TYPE_BOOLEAN: i32 = 1;
pub const DATA_TYPE_TINYINT: i32 = 2;
pub const DATA_TYPE_SMALLINT: i32 = 3;
@@ -478,3 +481,31 @@ pub fn core_lake_snapshot_to_ffi(snapshot:
&fcore::metadata::LakeSnapshot) -> ff
bucket_offsets,
}
}
+
+pub fn core_scan_batches_to_ffi(
+ batches: &[fcore::record::ScanBatch],
+) -> Result<ffi::FfiArrowRecordBatches, String> {
+ let mut ffi_batches = Vec::new();
+ for batch in batches {
+ let record_batch = batch.batch();
+ // Convert RecordBatch to StructArray first, then get the data
+ let struct_array =
arrow::array::StructArray::from(record_batch.clone());
+ let ffi_array =
Box::new(FFI_ArrowArray::new(&struct_array.into_data()));
+ let ffi_schema = Box::new(
+
FFI_ArrowSchema::try_from(record_batch.schema().as_ref()).map_err(|e|
e.to_string())?,
+ );
+ // Export as raw pointers
+ ffi_batches.push(ffi::FfiArrowRecordBatch {
+ array_ptr: Box::into_raw(ffi_array) as usize,
+ schema_ptr: Box::into_raw(ffi_schema) as usize,
+ table_id: batch.bucket().table_id(),
+ partition_id: batch.bucket().partition_id().unwrap_or(-1),
+ bucket_id: batch.bucket().bucket_id(),
+ base_offset: batch.base_offset(),
+ });
+ }
+
+ Ok(ffi::FfiArrowRecordBatches {
+ batches: ffi_batches,
+ })
+}