zhaohaidao commented on code in PR #230:
URL: https://github.com/apache/fluss-rust/pull/230#discussion_r2756810965
##########
bindings/cpp/src/table.cpp:
##########
@@ -21,6 +21,8 @@
#include "lib.rs.h"
#include "ffi_converter.hpp"
#include "rust/cxx.h"
+#include <arrow/c/bridge.h>
Review Comment:
After adding #include <arrow/...> and calling arrow::ImportRecordBatch,
bindings/cpp/BUILD.bazel still doesn’t declare Arrow include/link dependencies.
In environments where Bazel does not already have Arrow available, this will
fail at compile/link time.
I can create a separate PR to fix this issue; This pr is quite attractive,
and our scenario is likely to utilize it. U can just add a todo in here and let
me fix it
##########
bindings/cpp/src/types.rs:
##########
@@ -478,3 +481,30 @@ pub fn core_lake_snapshot_to_ffi(snapshot:
&fcore::metadata::LakeSnapshot) -> ff
bucket_offsets,
}
}
+
+pub fn core_scan_batches_to_ffi(
+ batches: &[fcore::record::ScanBatch],
+) -> ffi::FfiArrowRecordBatches {
+ let mut ffi_batches = Vec::new();
+ for batch in batches {
+ let record_batch = batch.batch();
+ // Convert RecordBatch to StructArray first, then get the data
+ let struct_array =
arrow::array::StructArray::from(record_batch.clone());
+ let ffi_array =
Box::new(FFI_ArrowArray::new(&struct_array.into_data()));
+ let ffi_schema =
+
Box::new(FFI_ArrowSchema::try_from(record_batch.schema().as_ref()).unwrap());
Review Comment:
unwrap() can panic on export failure, and a panic crossing an FFI boundary
effectively crashes the process.
Fix suggestions:
1. core_scan_batches_to_ffi fallible (e.g. return
Result<ffi::FfiArrowRecordBatches, String>), propagate errors with ?/map_err,
and remove unwrap() entirely.
2. In poll_record_batch, handle that error by returning
FfiArrowRecordBatchesResult.result = err_result(...) and an empty batches
payload (consistent with the existing “result
+ payload” FFI pattern)
##########
bindings/cpp/src/table.cpp:
##########
@@ -242,4 +278,89 @@ Result LogScanner::Poll(int64_t timeout_ms, ScanRecords&
out) {
return utils::make_ok();
}
+ArrowRecordBatch::ArrowRecordBatch(
+ std::shared_ptr<arrow::RecordBatch> batch,
+ int64_t table_id,
+ int64_t partition_id,
+ int32_t bucket_id,
+ int64_t base_offset) noexcept
+ : batch_(std::move(batch)),
+ table_id_(table_id),
+ partition_id_(partition_id),
+ bucket_id_(bucket_id),
+ base_offset_(base_offset) {}
+
+bool ArrowRecordBatch::Available() const { return batch_ != nullptr; }
+
+int64_t ArrowRecordBatch::NumRows() const {
+ if (!Available()) return 0;
+ return batch_->num_rows();
+}
+
+
+int64_t ArrowRecordBatch::GetTableId() const {
+ if (!Available()) return 0;
+ return this->table_id_;
+}
+
+int64_t ArrowRecordBatch::GetPartitionId() const {
+ if (!Available()) return -1;
+ return this->partition_id_;
+}
+
+int32_t ArrowRecordBatch::GetBucketId() const {
+ if (!Available()) return -1;
+ return this->bucket_id_;
+}
+
+int64_t ArrowRecordBatch::GetBaseOffset() const {
+ if (!Available()) return -1;
+ return this->base_offset_;
+}
+
+int64_t ArrowRecordBatch::GetLastOffset() const {
+ if (!Available()) return -1;
+ return this->NumRows() + this->base_offset_;
+}
+
+Result LogScanner::PollRecordBatch(int64_t timeout_ms, ArrowRecordBatches&
out) const {
+ if (!Available()) {
+ return utils::make_error(1, "LogScanner not available");
+ }
+
+ auto ffi_result = scanner_->poll_record_batch(timeout_ms);
+ auto result = utils::from_ffi_result(ffi_result.result);
+ if (!result.Ok()) {
+ return result;
+ }
+
+ // Convert the FFI Arrow record batches to C++ ArrowRecordBatch objects
+ out.batches.clear();
+ for (const auto& ffi_batch : ffi_result.arrow_batches.batches) {
+ auto* c_array = reinterpret_cast<struct
ArrowArray*>(ffi_batch.array_ptr);
+ auto* c_schema = reinterpret_cast<struct
ArrowSchema*>(ffi_batch.schema_ptr);
+
+ auto import_result = arrow::ImportRecordBatch(c_array, c_schema);
Review Comment:
When arrow::ImportRecordBatch fails, the code still returns
utils::make_ok(). Callers can’t distinguish “no data” from “import failed”,
which is effectively silent data loss.
Fix suggestions: Can we return error when arrow::ImportRecordBatch fails
##########
bindings/cpp/CMakeLists.txt:
##########
@@ -88,12 +90,16 @@ target_sources(fluss_cpp PRIVATE ${RUST_HEADER_FILE})
target_include_directories(fluss_cpp PUBLIC ${CPP_INCLUDE_DIR})
target_link_libraries(fluss_cpp PUBLIC ${RUST_LIB})
target_link_libraries(fluss_cpp PRIVATE ${CMAKE_DL_LIBS} Threads::Threads)
+target_link_libraries(fluss_cpp PRIVATE Arrow::arrow_shared)
Review Comment:
fluss_cpp is a static library and src/table.cpp references Arrow symbols.
With target_link_libraries(fluss_cpp PRIVATE Arrow::arrow_shared), downstream
projects that only link fluss_cpp can fail due to missing Arrow symbols
Fix suggestions: Change bindings/cpp/CMakeLists.txt:93 to
target_link_libraries(fluss_cpp PUBLIC Arrow::arrow_shared) (or INTERFACE,
depending on whether you want Arrow linked for this target itself as well)
##########
bindings/cpp/src/table.cpp:
##########
@@ -242,4 +278,89 @@ Result LogScanner::Poll(int64_t timeout_ms, ScanRecords&
out) {
return utils::make_ok();
}
+ArrowRecordBatch::ArrowRecordBatch(
+ std::shared_ptr<arrow::RecordBatch> batch,
+ int64_t table_id,
+ int64_t partition_id,
+ int32_t bucket_id,
+ int64_t base_offset) noexcept
+ : batch_(std::move(batch)),
+ table_id_(table_id),
+ partition_id_(partition_id),
+ bucket_id_(bucket_id),
+ base_offset_(base_offset) {}
+
+bool ArrowRecordBatch::Available() const { return batch_ != nullptr; }
+
+int64_t ArrowRecordBatch::NumRows() const {
+ if (!Available()) return 0;
+ return batch_->num_rows();
+}
+
+
+int64_t ArrowRecordBatch::GetTableId() const {
+ if (!Available()) return 0;
+ return this->table_id_;
+}
+
+int64_t ArrowRecordBatch::GetPartitionId() const {
+ if (!Available()) return -1;
+ return this->partition_id_;
+}
+
+int32_t ArrowRecordBatch::GetBucketId() const {
+ if (!Available()) return -1;
+ return this->bucket_id_;
+}
+
+int64_t ArrowRecordBatch::GetBaseOffset() const {
+ if (!Available()) return -1;
+ return this->base_offset_;
+}
+
+int64_t ArrowRecordBatch::GetLastOffset() const {
Review Comment:
Current implementation returns base_offset + NumRows(), which does not match
the Rust ScanBatch::last_offset() inclusive semantics (non-empty: base + rows -
1, empty: base - 1).
```rust
/// Returns the offset of the last record in this batch.
pub fn last_offset(&self) -> i64 {
if self.batch.num_rows() == 0 {
self.base_offset - 1
} else {
self.base_offset + self.batch.num_rows() as i64 - 1
}
}`
##########
bindings/cpp/src/types.rs:
##########
@@ -478,3 +481,30 @@ pub fn core_lake_snapshot_to_ffi(snapshot:
&fcore::metadata::LakeSnapshot) -> ff
bucket_offsets,
}
}
+
+pub fn core_scan_batches_to_ffi(
+ batches: &[fcore::record::ScanBatch],
+) -> ffi::FfiArrowRecordBatches {
+ let mut ffi_batches = Vec::new();
+ for batch in batches {
+ let record_batch = batch.batch();
+ // Convert RecordBatch to StructArray first, then get the data
+ let struct_array =
arrow::array::StructArray::from(record_batch.clone());
+ let ffi_array =
Box::new(FFI_ArrowArray::new(&struct_array.into_data()));
+ let ffi_schema =
+
Box::new(FFI_ArrowSchema::try_from(record_batch.schema().as_ref()).unwrap());
Review Comment:
+1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]