zhaohaidao commented on code in PR #230:
URL: https://github.com/apache/fluss-rust/pull/230#discussion_r2757549680


##########
bindings/cpp/src/table.cpp:
##########
@@ -242,4 +278,89 @@ Result LogScanner::Poll(int64_t timeout_ms, ScanRecords& 
out) {
     return utils::make_ok();
 }
 
+ArrowRecordBatch::ArrowRecordBatch(
+    std::shared_ptr<arrow::RecordBatch> batch,
+    int64_t table_id,
+    int64_t partition_id,
+    int32_t bucket_id,
+    int64_t base_offset) noexcept
+    : batch_(std::move(batch)),
+      table_id_(table_id),
+      partition_id_(partition_id),
+      bucket_id_(bucket_id),
+      base_offset_(base_offset) {}
+
+bool ArrowRecordBatch::Available() const { return batch_ != nullptr; }
+
+int64_t ArrowRecordBatch::NumRows() const {
+    if (!Available()) return 0;
+    return batch_->num_rows();
+}
+
+
+int64_t ArrowRecordBatch::GetTableId() const {
+    if (!Available()) return 0;
+    return this->table_id_;
+}
+
+int64_t ArrowRecordBatch::GetPartitionId() const {
+    if (!Available()) return -1;
+    return this->partition_id_;
+}
+
+int32_t ArrowRecordBatch::GetBucketId() const {
+    if (!Available()) return -1;
+    return this->bucket_id_;
+}
+
+int64_t ArrowRecordBatch::GetBaseOffset() const {
+    if (!Available()) return -1;
+    return this->base_offset_;
+}
+
+int64_t ArrowRecordBatch::GetLastOffset() const {
+    if (!Available()) return -1;
+    return this->NumRows() + this->base_offset_;
+}
+
+Result LogScanner::PollRecordBatch(int64_t timeout_ms, ArrowRecordBatches& 
out) const {
+    if (!Available()) {
+        return utils::make_error(1, "LogScanner not available");
+    }
+
+    auto ffi_result = scanner_->poll_record_batch(timeout_ms);
+    auto result = utils::from_ffi_result(ffi_result.result);
+    if (!result.Ok()) {
+        return result;
+    }
+
+    // Convert the FFI Arrow record batches to C++ ArrowRecordBatch objects
+    out.batches.clear();
+    for (const auto& ffi_batch : ffi_result.arrow_batches.batches) {
+        auto* c_array = reinterpret_cast<struct 
ArrowArray*>(ffi_batch.array_ptr);
+        auto* c_schema = reinterpret_cast<struct 
ArrowSchema*>(ffi_batch.schema_ptr);
+
+        auto import_result = arrow::ImportRecordBatch(c_array, c_schema);
+        if (import_result.ok()) {
+            auto batch_ptr = import_result.ValueOrDie();
+            auto batch_wrapper = std::unique_ptr<ArrowRecordBatch>(new 
ArrowRecordBatch(
+                std::move(batch_ptr),
+                ffi_batch.table_id,
+                ffi_batch.partition_id,
+                ffi_batch.bucket_id,
+                ffi_batch.base_offset
+            ));
+            out.batches.push_back(std::move(batch_wrapper));
+            
+            // Free the container structures that were allocated in Rust after 
successful import
+            ffi::free_arrow_ffi_structures(ffi_batch.array_ptr, 
ffi_batch.schema_ptr);
+        } else {
+            // Even if import failed, we should free the container structures 
to avoid leaks
+            ffi::free_arrow_ffi_structures(ffi_batch.array_ptr, 
ffi_batch.schema_ptr);

Review Comment:
   -1
   
   arrow::ImportRecordBatch takes ownership of the underlying Arrow data (via 
release callbacks), but it does not free the memory that stores the ArrowArray 
/ ArrowSchema structs themselves.
   In this code, FFI_ArrowArray / FFI_ArrowSchema are heap-allocated on the 
Rust side via Box. If the C++ side stops explicitly freeing these two container 
objects, you will leak at least two heap allocations per batch.
   
   ```cpp
     // File: cpp/src/arrow/c/bridge.cc
     Result<std::shared_ptr<RecordBatch>> ImportRecordBatch(struct ArrowArray* 
array,
                                                            
std::shared_ptr<Schema> schema) {
       auto type = struct_(schema->fields());
       ArrayImporter importer(type);
       RETURN_NOT_OK(importer.Import(array));
       return importer.MakeRecordBatch(std::move(schema));
     }
     
     Status Import(struct ArrowArray* src) {
       if (ArrowArrayIsReleased(src)) {
         return Status::Invalid("Cannot import released ArrowArray");
       }
       recursion_level_ = 0;
       import_ = std::make_shared<ImportedArrayData>();
       c_struct_ = &import_->array_;
       import_->device_type_ = device_type_;
       ArrowArrayMove(src, c_struct_);
       return DoImport();
     }
       inline void ArrowArrayMarkReleased(struct ArrowArray* array) { 
array->release = NULL; }
   
     inline void ArrowArrayMove(struct ArrowArray* src, struct ArrowArray* 
dest) {
       assert(dest != src);
       assert(!ArrowArrayIsReleased(src));
       memcpy(dest, src, sizeof(struct ArrowArray));
       ArrowArrayMarkReleased(src);
     }
     // File: cpp/src/arrow/c/helpers.
     inline void ArrowArrayRelease(struct ArrowArray* array) {
       if (!ArrowArrayIsReleased(array)) {
         array->release(array);
         ARROW_C_ASSERT(ArrowArrayIsReleased(array),
                        "ArrowArrayRelease did not cleanup release callback");
       }
     }
   
     inline void ArrowSchemaRelease(struct ArrowSchema* schema) {
       if (!ArrowSchemaIsReleased(schema)) {
         schema->release(schema);
         ARROW_C_ASSERT(ArrowSchemaIsReleased(schema),
                        "ArrowSchemaRelease did not cleanup release callback");
       }
     }
     
       // File: cpp/src/arrow/c/bridge.cc
   
     struct SchemaImporter {
       Status Import(struct ArrowSchema* src) {
         if (ArrowSchemaIsReleased(src)) {
           return Status::Invalid("Cannot import released ArrowSchema");
         }
         guard_.Reset(src);
         recursion_level_ = 0;
         c_struct_ = src;
         return DoImport();
       }
   
       struct ArrowSchema* c_struct_{nullptr};
       SchemaExportGuard guard_{nullptr};
     };
   
     // File: cpp/src/arrow/c/util_internal.h
   
     template <typename Traits>
     class ExportGuard {
      public:
       ~ExportGuard() { Release(); }
       void Reset(CType* c_export) { c_export_ = c_export; }
       void Release() {
         if (c_export_) {
           Traits::ReleaseFunc(c_export_);
           c_export_ = nullptr;
         }
       }
     };
   
     struct SchemaExportTraits {
       typedef struct ArrowSchema CType;
       static constexpr auto ReleaseFunc = &ArrowSchemaRelease;
     };
   
     using SchemaExportGuard = ExportGuard<SchemaExportTraits>;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to