Copilot commented on code in PR #83: URL: https://github.com/apache/fluss-rust/pull/83#discussion_r2600834987
########## bindings/cpp/src/table.cpp: ########## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "fluss.hpp" +#include "lib.rs.h" +#include "ffi_converter.hpp" +#include "rust/cxx.h" + +namespace fluss { + +Table::Table() noexcept = default; + +Table::Table(ffi::Table* table) noexcept : table_(table) {} + +Table::~Table() noexcept { Destroy(); } + +void Table::Destroy() noexcept { + if (table_) { + ffi::delete_table(table_); + table_ = nullptr; + } +} + +Table::Table(Table&& other) noexcept : table_(other.table_) { + other.table_ = nullptr; +} + +Table& Table::operator=(Table&& other) noexcept { + if (this != &other) { + Destroy(); + table_ = other.table_; + other.table_ = nullptr; + } + return *this; +} + +bool Table::Available() const { return table_ != nullptr; } + +Result Table::NewAppendWriter(AppendWriter& out) { + if (!Available()) { + return utils::make_error(1, "Table not available"); + } + Review Comment: Potential memory leak: If `out.writer_` is already non-null when `NewAppendWriter` is called, the existing resource will be overwritten without being freed, causing a memory leak. Consider calling `out.Destroy()` before assigning the new pointer, or check and free the existing resource first. ```suggestion // Free any existing resource to prevent memory leak out.Destroy(); ``` ########## bindings/cpp/src/connection.cpp: ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "fluss.hpp" +#include "lib.rs.h" +#include "ffi_converter.hpp" +#include "rust/cxx.h" + +namespace fluss { + +Connection::Connection() noexcept = default; + +Connection::~Connection() noexcept { Destroy(); } + +void Connection::Destroy() noexcept { + if (conn_) { + ffi::delete_connection(conn_); + conn_ = nullptr; + } +} + +Connection::Connection(Connection&& other) noexcept : conn_(other.conn_) { + other.conn_ = nullptr; +} + +Connection& Connection::operator=(Connection&& other) noexcept { + if (this != &other) { + Destroy(); + conn_ = other.conn_; + other.conn_ = nullptr; + } + return *this; +} + +Result Connection::Connect(const std::string& bootstrap_server, Connection& out) { + try { Review Comment: Potential memory leak: If `out.conn_` is already non-null when `Connect` is called, the existing resource will be overwritten without being freed, causing a memory leak. Consider calling `out.Destroy()` before assigning the new pointer, or check and free the existing resource first. ```suggestion try { out.Destroy(); ``` ########## bindings/cpp/src/connection.cpp: ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "fluss.hpp" +#include "lib.rs.h" +#include "ffi_converter.hpp" +#include "rust/cxx.h" + +namespace fluss { + +Connection::Connection() noexcept = default; + +Connection::~Connection() noexcept { Destroy(); } + +void Connection::Destroy() noexcept { + if (conn_) { + ffi::delete_connection(conn_); + conn_ = nullptr; + } +} + +Connection::Connection(Connection&& other) noexcept : conn_(other.conn_) { + other.conn_ = nullptr; +} + +Connection& Connection::operator=(Connection&& other) noexcept { + if (this != &other) { + Destroy(); + conn_ = other.conn_; + other.conn_ = nullptr; + } + return *this; +} + +Result Connection::Connect(const std::string& bootstrap_server, Connection& out) { + try { + out.conn_ = ffi::new_connection(bootstrap_server); + return utils::make_ok(); + } catch (const rust::Error& e) { + return utils::make_error(1, e.what()); + } catch (const std::exception& e) { + return utils::make_error(1, e.what()); + } +} + +bool Connection::Available() const { return conn_ != nullptr; } + +Result Connection::GetAdmin(Admin& out) { + if (!Available()) { + return utils::make_error(1, "Connection not available"); + } + + try { + out.admin_ = conn_->get_admin(); + return utils::make_ok(); + } catch (const rust::Error& e) { + return utils::make_error(1, e.what()); + } catch (const std::exception& e) { + return utils::make_error(1, e.what()); + } +} + +Result Connection::GetTable(const TablePath& table_path, Table& out) { + if (!Available()) { + return utils::make_error(1, "Connection not available"); + } + Review Comment: Potential memory leak: If `out.table_` is already non-null when `GetTable` is called, the existing resource will be overwritten without being freed, causing a memory leak. Consider calling `out.Destroy()` before assigning the new pointer, or check and free the existing resource first. ```suggestion // Free any existing resource before overwriting out.table_ out.Destroy(); ``` ########## bindings/cpp/src/table.cpp: ########## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "fluss.hpp" +#include "lib.rs.h" +#include "ffi_converter.hpp" +#include "rust/cxx.h" + +namespace fluss { + +Table::Table() noexcept = default; + +Table::Table(ffi::Table* table) noexcept : table_(table) {} + +Table::~Table() noexcept { Destroy(); } + +void Table::Destroy() noexcept { + if (table_) { + ffi::delete_table(table_); + table_ = nullptr; + } +} + +Table::Table(Table&& other) noexcept : table_(other.table_) { + other.table_ = nullptr; +} + +Table& Table::operator=(Table&& other) noexcept { + if (this != &other) { + Destroy(); + table_ = other.table_; + other.table_ = nullptr; + } + return *this; +} + +bool Table::Available() const { return table_ != nullptr; } + +Result Table::NewAppendWriter(AppendWriter& out) { + if (!Available()) { + return utils::make_error(1, "Table not available"); + } + + try { + out.writer_ = table_->new_append_writer(); + return utils::make_ok(); + } catch (const rust::Error& e) { + return utils::make_error(1, e.what()); + } catch (const std::exception& e) { + return utils::make_error(1, e.what()); + } +} + +Result Table::NewLogScanner(LogScanner& out) { + if (!Available()) { + return utils::make_error(1, "Table not available"); + } + + try { + out.scanner_ = table_->new_log_scanner(); + return utils::make_ok(); + } catch (const rust::Error& e) { + return utils::make_error(1, e.what()); + } catch (const std::exception& e) { + return utils::make_error(1, e.what()); + } +} + +Result Table::NewLogScannerWithProjection(const std::vector<size_t>& column_indices, LogScanner& out) { + if (!Available()) { + return utils::make_error(1, "Table not available"); + } + + try { + rust::Vec<size_t> rust_indices; + for (size_t idx : column_indices) { + rust_indices.push_back(idx); + } + out.scanner_ = table_->new_log_scanner_with_projection(std::move(rust_indices)); Review Comment: Potential memory leak: If `out.scanner_` is already non-null when `NewLogScannerWithProjection` is called, the existing resource will be overwritten without being freed, causing a memory leak. Consider calling `out.Destroy()` before assigning the new pointer, or check and free the existing resource first. ########## bindings/cpp/src/connection.cpp: ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "fluss.hpp" +#include "lib.rs.h" +#include "ffi_converter.hpp" +#include "rust/cxx.h" + +namespace fluss { + +Connection::Connection() noexcept = default; + +Connection::~Connection() noexcept { Destroy(); } + +void Connection::Destroy() noexcept { + if (conn_) { + ffi::delete_connection(conn_); + conn_ = nullptr; + } +} + +Connection::Connection(Connection&& other) noexcept : conn_(other.conn_) { + other.conn_ = nullptr; +} + +Connection& Connection::operator=(Connection&& other) noexcept { + if (this != &other) { + Destroy(); + conn_ = other.conn_; + other.conn_ = nullptr; + } + return *this; +} + +Result Connection::Connect(const std::string& bootstrap_server, Connection& out) { + try { + out.conn_ = ffi::new_connection(bootstrap_server); + return utils::make_ok(); + } catch (const rust::Error& e) { + return utils::make_error(1, e.what()); + } catch (const std::exception& e) { + return utils::make_error(1, e.what()); + } +} + +bool Connection::Available() const { return conn_ != nullptr; } + +Result Connection::GetAdmin(Admin& out) { + if (!Available()) { + return utils::make_error(1, "Connection not available"); + } + + try { + out.admin_ = conn_->get_admin(); Review Comment: Potential memory leak: If `out.admin_` or `out.table_` is already non-null when `GetAdmin` or `GetTable` is called, the existing resource will be overwritten without being freed, causing a memory leak. Consider calling `out.Destroy()` before assigning the new pointer, or check and free the existing resource first. ########## bindings/cpp/src/types.rs: ########## @@ -0,0 +1,498 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::ffi; +use anyhow::{anyhow, Result}; +use arrow::array::{ + Date32Array, LargeBinaryArray, LargeStringArray, Time32MillisecondArray, Time32SecondArray, + Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, + TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, +}; +use arrow::datatypes::{DataType as ArrowDataType, TimeUnit}; +use fluss as fcore; +use fcore::row::InternalRow; + +pub const DATA_TYPE_BOOLEAN: i32 = 1; +pub const DATA_TYPE_TINYINT: i32 = 2; +pub const DATA_TYPE_SMALLINT: i32 = 3; +pub const DATA_TYPE_INT: i32 = 4; +pub const DATA_TYPE_BIGINT: i32 = 5; +pub const DATA_TYPE_FLOAT: i32 = 6; +pub const DATA_TYPE_DOUBLE: i32 = 7; +pub const DATA_TYPE_STRING: i32 = 8; +pub const DATA_TYPE_BYTES: i32 = 9; +pub const DATA_TYPE_DATE: i32 = 10; +pub const DATA_TYPE_TIME: i32 = 11; +pub const DATA_TYPE_TIMESTAMP: i32 = 12; +pub const DATA_TYPE_TIMESTAMP_LTZ: i32 = 13; + +pub const DATUM_TYPE_NULL: i32 = 0; +pub const DATUM_TYPE_BOOL: i32 = 1; +pub const DATUM_TYPE_INT32: i32 = 2; +pub const DATUM_TYPE_INT64: i32 = 3; +pub const DATUM_TYPE_FLOAT32: i32 = 4; +pub const DATUM_TYPE_FLOAT64: i32 = 5; +pub const DATUM_TYPE_STRING: i32 = 6; +pub const DATUM_TYPE_BYTES: i32 = 7; + +fn ffi_data_type_to_core(dt: i32) -> Result<fcore::metadata::DataType> { + match dt { + DATA_TYPE_BOOLEAN => Ok(fcore::metadata::DataTypes::boolean()), + DATA_TYPE_TINYINT => Ok(fcore::metadata::DataTypes::tinyint()), + DATA_TYPE_SMALLINT => Ok(fcore::metadata::DataTypes::smallint()), + DATA_TYPE_INT => Ok(fcore::metadata::DataTypes::int()), + DATA_TYPE_BIGINT => Ok(fcore::metadata::DataTypes::bigint()), + DATA_TYPE_FLOAT => Ok(fcore::metadata::DataTypes::float()), + DATA_TYPE_DOUBLE => Ok(fcore::metadata::DataTypes::double()), + DATA_TYPE_STRING => Ok(fcore::metadata::DataTypes::string()), + DATA_TYPE_BYTES => Ok(fcore::metadata::DataTypes::bytes()), + DATA_TYPE_DATE => Ok(fcore::metadata::DataTypes::date()), + DATA_TYPE_TIME => Ok(fcore::metadata::DataTypes::time()), + DATA_TYPE_TIMESTAMP => Ok(fcore::metadata::DataTypes::timestamp()), + DATA_TYPE_TIMESTAMP_LTZ => Ok(fcore::metadata::DataTypes::timestamp_ltz()), + _ => Err(anyhow!("Unknown data type: {}", dt)), + } +} + +fn core_data_type_to_ffi(dt: &fcore::metadata::DataType) -> i32 { + match dt { + fcore::metadata::DataType::Boolean(_) => DATA_TYPE_BOOLEAN, + fcore::metadata::DataType::TinyInt(_) => DATA_TYPE_TINYINT, + fcore::metadata::DataType::SmallInt(_) => DATA_TYPE_SMALLINT, + fcore::metadata::DataType::Int(_) => DATA_TYPE_INT, + fcore::metadata::DataType::BigInt(_) => DATA_TYPE_BIGINT, + fcore::metadata::DataType::Float(_) => DATA_TYPE_FLOAT, + fcore::metadata::DataType::Double(_) => DATA_TYPE_DOUBLE, + fcore::metadata::DataType::String(_) => DATA_TYPE_STRING, + fcore::metadata::DataType::Bytes(_) => DATA_TYPE_BYTES, + fcore::metadata::DataType::Date(_) => DATA_TYPE_DATE, + fcore::metadata::DataType::Time(_) => DATA_TYPE_TIME, + fcore::metadata::DataType::Timestamp(_) => DATA_TYPE_TIMESTAMP, + fcore::metadata::DataType::TimestampLTz(_) => DATA_TYPE_TIMESTAMP_LTZ, + _ => 0, + } +} + +pub fn ffi_descriptor_to_core( + descriptor: &ffi::FfiTableDescriptor, +) -> Result<fcore::metadata::TableDescriptor> { + let mut schema_builder = fcore::metadata::Schema::builder(); + + for col in &descriptor.schema.columns { + let dt = ffi_data_type_to_core(col.data_type)?; + schema_builder = schema_builder.column(&col.name, dt); + if !col.comment.is_empty() { + schema_builder = schema_builder.with_comment(&col.comment); + } + } + + if !descriptor.schema.primary_keys.is_empty() { + schema_builder = schema_builder.primary_key(descriptor.schema.primary_keys.clone()); + } + + let schema = schema_builder.build()?; + + let mut builder = fcore::metadata::TableDescriptor::builder() + .schema(schema) + .partitioned_by(descriptor.partition_keys.clone()); + + if descriptor.bucket_count > 0 { + builder = builder.distributed_by(Some(descriptor.bucket_count), descriptor.bucket_keys.clone()); + } else { + builder = builder.distributed_by(None, descriptor.bucket_keys.clone()); + } + + for prop in &descriptor.properties { + builder = builder.property(&prop.key, &prop.value); + } + + if !descriptor.comment.is_empty() { + builder = builder.comment(&descriptor.comment); + } + + Ok(builder.build()?) +} + +pub fn core_table_info_to_ffi(info: &fcore::metadata::TableInfo) -> ffi::FfiTableInfo { + let schema = info.get_schema(); + let columns: Vec<ffi::FfiColumn> = schema + .columns() + .iter() + .map(|col| ffi::FfiColumn { + name: col.name().to_string(), + data_type: core_data_type_to_ffi(col.data_type()), + comment: col.comment().unwrap_or("").to_string(), + }) + .collect(); + + let primary_keys: Vec<String> = schema + .primary_key() + .map(|pk| pk.column_names().to_vec()) + .unwrap_or_default(); + + let properties: Vec<ffi::HashMapValue> = info + .get_properties() + .iter() + .map(|(k, v)| ffi::HashMapValue { + key: k.clone(), + value: v.clone(), + }) + .collect(); + + ffi::FfiTableInfo { + table_id: info.get_table_id(), + schema_id: info.get_schema_id(), + table_path: ffi::FfiTablePath { + database_name: info.get_table_path().database().to_string(), + table_name: info.get_table_path().table().to_string(), + }, + created_time: info.get_created_time(), + modified_time: info.get_modified_time(), + primary_keys: info.get_primary_keys().clone(), + bucket_keys: info.get_bucket_keys().to_vec(), + partition_keys: info.get_partition_keys().to_vec(), + num_buckets: info.get_num_buckets(), + has_primary_key: info.has_primary_key(), + is_partitioned: info.is_partitioned(), + properties, + comment: info.get_comment().unwrap_or("").to_string(), + schema: ffi::FfiSchema { + columns, + primary_keys, + }, + } +} + +pub fn empty_table_info() -> ffi::FfiTableInfo { + ffi::FfiTableInfo { + table_id: 0, + schema_id: 0, + table_path: ffi::FfiTablePath { + database_name: String::new(), + table_name: String::new(), + }, + created_time: 0, + modified_time: 0, + primary_keys: vec![], + bucket_keys: vec![], + partition_keys: vec![], + num_buckets: 0, + has_primary_key: false, + is_partitioned: false, + properties: vec![], + comment: String::new(), + schema: ffi::FfiSchema { + columns: vec![], + primary_keys: vec![], + }, + } +} + +pub struct OwnedRowData { + strings: Vec<String>, +} + +impl OwnedRowData { + pub fn new() -> Self { + Self { strings: Vec::new() } + } + + pub fn collect_strings(&mut self, row: &ffi::FfiGenericRow) { + for field in &row.fields { + if field.datum_type == DATUM_TYPE_STRING { + self.strings.push(field.string_val.to_string()); + } + } + } + + pub fn get_strings(&self) -> &[String] { + &self.strings + } +} + +pub fn ffi_row_to_core<'a>( + row: &ffi::FfiGenericRow, + owner: &'a OwnedRowData, +) -> fcore::row::GenericRow<'a> { + use fcore::row::{Blob, Datum, F32, F64}; + + let mut generic_row = fcore::row::GenericRow::new(); + let mut string_idx = 0; + + for (idx, field) in row.fields.iter().enumerate() { + let datum = match field.datum_type { + DATUM_TYPE_NULL => Datum::Null, + DATUM_TYPE_BOOL => Datum::Bool(field.bool_val), + DATUM_TYPE_INT32 => Datum::Int32(field.i32_val), + DATUM_TYPE_INT64 => Datum::Int64(field.i64_val), + DATUM_TYPE_FLOAT32 => Datum::Float32(F32::from(field.f32_val)), + DATUM_TYPE_FLOAT64 => Datum::Float64(F64::from(field.f64_val)), + DATUM_TYPE_STRING => { + let str_ref = owner.get_strings()[string_idx].as_str(); + string_idx += 1; + Datum::String(str_ref) + } + DATUM_TYPE_BYTES => Datum::Blob(Blob::from(field.bytes_val.clone())), + _ => Datum::Null, + }; + generic_row.set_field(idx, datum); + } + + generic_row +} + +pub fn core_scan_records_to_ffi(records: &fcore::record::ScanRecords) -> ffi::FfiScanRecords { + let mut ffi_records = Vec::new(); + + // Iterate over all buckets and their records + for bucket_records in records.records_by_buckets().values() { + for record in bucket_records { + let row = record.row(); + let fields = core_row_to_ffi_fields(row); + + ffi_records.push(ffi::FfiScanRecord { + offset: record.offset(), + timestamp: record.timestamp(), + row: ffi::FfiGenericRow { fields }, + }); + } + } + + ffi::FfiScanRecords { records: ffi_records } +} + +fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) -> Vec<ffi::FfiDatum> { + fn new_datum(datum_type: i32) -> ffi::FfiDatum { + ffi::FfiDatum { + datum_type, + bool_val: false, + i32_val: 0, + i64_val: 0, + f32_val: 0.0, + f64_val: 0.0, + string_val: String::new(), + bytes_val: vec![], + } + } + + let record_batch = row.get_record_batch(); + let schema = record_batch.schema(); + let row_id = row.get_row_id(); + + let mut fields = Vec::with_capacity(schema.fields().len()); + + for (i, field) in schema.fields().iter().enumerate() { + if row.is_null_at(i) { + fields.push(new_datum(DATUM_TYPE_NULL)); + continue; + } + + let datum = match field.data_type() { + ArrowDataType::Boolean => { + let mut datum = new_datum(DATUM_TYPE_BOOL); + datum.bool_val = row.get_boolean(i); + datum + } + ArrowDataType::Int8 => { + let mut datum = new_datum(DATUM_TYPE_INT32); + datum.i32_val = row.get_byte(i) as i32; + datum + } + ArrowDataType::Int16 => { + let mut datum = new_datum(DATUM_TYPE_INT32); + datum.i32_val = row.get_short(i) as i32; + datum + } + ArrowDataType::Int32 => { + let mut datum = new_datum(DATUM_TYPE_INT32); + datum.i32_val = row.get_int(i); + datum + } + ArrowDataType::Int64 => { + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = row.get_long(i); + datum + } + ArrowDataType::Float32 => { + let mut datum = new_datum(DATUM_TYPE_FLOAT32); + datum.f32_val = row.get_float(i); + datum + } + ArrowDataType::Float64 => { + let mut datum = new_datum(DATUM_TYPE_FLOAT64); + datum.f64_val = row.get_double(i); + datum + } + ArrowDataType::Utf8 => { + let mut datum = new_datum(DATUM_TYPE_STRING); + datum.string_val = row.get_string(i).to_string(); + datum + } + ArrowDataType::LargeUtf8 => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<LargeStringArray>() + .expect("LargeUtf8 column expected"); + let mut datum = new_datum(DATUM_TYPE_STRING); + datum.string_val = array.value(row_id).to_string(); + datum + } + ArrowDataType::Binary => { + let mut datum = new_datum(DATUM_TYPE_BYTES); + datum.bytes_val = row.get_bytes(i); + datum + } + ArrowDataType::FixedSizeBinary(len) => { + let mut datum = new_datum(DATUM_TYPE_BYTES); + datum.bytes_val = row.get_binary(i, *len as usize); + datum + } + ArrowDataType::LargeBinary => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<LargeBinaryArray>() + .expect("LargeBinary column expected"); + let mut datum = new_datum(DATUM_TYPE_BYTES); + datum.bytes_val = array.value(row_id).to_vec(); + datum + } + ArrowDataType::Date32 => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<Date32Array>() + .expect("Date32 column expected"); + let mut datum = new_datum(DATUM_TYPE_INT32); + datum.i32_val = array.value(row_id); + datum + } + ArrowDataType::Timestamp(unit, _) => match unit { + TimeUnit::Second => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<TimestampSecondArray>() + .expect("Timestamp(second) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = array.value(row_id); + datum + } + TimeUnit::Millisecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<TimestampMillisecondArray>() + .expect("Timestamp(millisecond) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = array.value(row_id); + datum + } + TimeUnit::Microsecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<TimestampMicrosecondArray>() + .expect("Timestamp(microsecond) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = array.value(row_id); + datum + } + TimeUnit::Nanosecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<TimestampNanosecondArray>() + .expect("Timestamp(nanosecond) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = array.value(row_id); + datum + } + }, + ArrowDataType::Time32(unit) => match unit { + TimeUnit::Second => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<Time32SecondArray>() + .expect("Time32(second) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT32); + datum.i32_val = array.value(row_id); + datum + } + TimeUnit::Millisecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<Time32MillisecondArray>() + .expect("Time32(millisecond) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT32); + datum.i32_val = array.value(row_id); + datum + } + _ => panic!("Unsupported Time32 unit for column {}", i), + }, + ArrowDataType::Time64(unit) => match unit { + TimeUnit::Microsecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<Time64MicrosecondArray>() + .expect("Time64(microsecond) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = array.value(row_id); + datum + } + TimeUnit::Nanosecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<Time64NanosecondArray>() + .expect("Time64(nanosecond) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = array.value(row_id); + datum + } + _ => panic!("Unsupported Time64 unit for column {}", i), Review Comment: Panics can cause undefined behavior when crossing FFI boundaries. Instead of `panic!`, consider returning an error Result or using a default/fallback value. The panic will not be caught by C++ exception handlers and will abort the process. ########## bindings/cpp/src/table.cpp: ########## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "fluss.hpp" +#include "lib.rs.h" +#include "ffi_converter.hpp" +#include "rust/cxx.h" + +namespace fluss { + +Table::Table() noexcept = default; + +Table::Table(ffi::Table* table) noexcept : table_(table) {} + +Table::~Table() noexcept { Destroy(); } + +void Table::Destroy() noexcept { + if (table_) { + ffi::delete_table(table_); + table_ = nullptr; + } +} + +Table::Table(Table&& other) noexcept : table_(other.table_) { + other.table_ = nullptr; +} + +Table& Table::operator=(Table&& other) noexcept { + if (this != &other) { + Destroy(); + table_ = other.table_; + other.table_ = nullptr; + } + return *this; +} + +bool Table::Available() const { return table_ != nullptr; } + +Result Table::NewAppendWriter(AppendWriter& out) { + if (!Available()) { + return utils::make_error(1, "Table not available"); + } + + try { + out.writer_ = table_->new_append_writer(); + return utils::make_ok(); + } catch (const rust::Error& e) { + return utils::make_error(1, e.what()); + } catch (const std::exception& e) { + return utils::make_error(1, e.what()); + } +} + +Result Table::NewLogScanner(LogScanner& out) { + if (!Available()) { + return utils::make_error(1, "Table not available"); + } + Review Comment: Potential memory leak: If `out.scanner_` is already non-null when `NewLogScanner` is called, the existing resource will be overwritten without being freed, causing a memory leak. Consider calling `out.Destroy()` before assigning the new pointer, or check and free the existing resource first. ```suggestion out.Destroy(); ``` ########## bindings/cpp/src/types.rs: ########## @@ -0,0 +1,498 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::ffi; +use anyhow::{anyhow, Result}; +use arrow::array::{ + Date32Array, LargeBinaryArray, LargeStringArray, Time32MillisecondArray, Time32SecondArray, + Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, + TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, +}; +use arrow::datatypes::{DataType as ArrowDataType, TimeUnit}; +use fluss as fcore; +use fcore::row::InternalRow; + +pub const DATA_TYPE_BOOLEAN: i32 = 1; +pub const DATA_TYPE_TINYINT: i32 = 2; +pub const DATA_TYPE_SMALLINT: i32 = 3; +pub const DATA_TYPE_INT: i32 = 4; +pub const DATA_TYPE_BIGINT: i32 = 5; +pub const DATA_TYPE_FLOAT: i32 = 6; +pub const DATA_TYPE_DOUBLE: i32 = 7; +pub const DATA_TYPE_STRING: i32 = 8; +pub const DATA_TYPE_BYTES: i32 = 9; +pub const DATA_TYPE_DATE: i32 = 10; +pub const DATA_TYPE_TIME: i32 = 11; +pub const DATA_TYPE_TIMESTAMP: i32 = 12; +pub const DATA_TYPE_TIMESTAMP_LTZ: i32 = 13; + +pub const DATUM_TYPE_NULL: i32 = 0; +pub const DATUM_TYPE_BOOL: i32 = 1; +pub const DATUM_TYPE_INT32: i32 = 2; +pub const DATUM_TYPE_INT64: i32 = 3; +pub const DATUM_TYPE_FLOAT32: i32 = 4; +pub const DATUM_TYPE_FLOAT64: i32 = 5; +pub const DATUM_TYPE_STRING: i32 = 6; +pub const DATUM_TYPE_BYTES: i32 = 7; + +fn ffi_data_type_to_core(dt: i32) -> Result<fcore::metadata::DataType> { + match dt { + DATA_TYPE_BOOLEAN => Ok(fcore::metadata::DataTypes::boolean()), + DATA_TYPE_TINYINT => Ok(fcore::metadata::DataTypes::tinyint()), + DATA_TYPE_SMALLINT => Ok(fcore::metadata::DataTypes::smallint()), + DATA_TYPE_INT => Ok(fcore::metadata::DataTypes::int()), + DATA_TYPE_BIGINT => Ok(fcore::metadata::DataTypes::bigint()), + DATA_TYPE_FLOAT => Ok(fcore::metadata::DataTypes::float()), + DATA_TYPE_DOUBLE => Ok(fcore::metadata::DataTypes::double()), + DATA_TYPE_STRING => Ok(fcore::metadata::DataTypes::string()), + DATA_TYPE_BYTES => Ok(fcore::metadata::DataTypes::bytes()), + DATA_TYPE_DATE => Ok(fcore::metadata::DataTypes::date()), + DATA_TYPE_TIME => Ok(fcore::metadata::DataTypes::time()), + DATA_TYPE_TIMESTAMP => Ok(fcore::metadata::DataTypes::timestamp()), + DATA_TYPE_TIMESTAMP_LTZ => Ok(fcore::metadata::DataTypes::timestamp_ltz()), + _ => Err(anyhow!("Unknown data type: {}", dt)), + } +} + +fn core_data_type_to_ffi(dt: &fcore::metadata::DataType) -> i32 { + match dt { + fcore::metadata::DataType::Boolean(_) => DATA_TYPE_BOOLEAN, + fcore::metadata::DataType::TinyInt(_) => DATA_TYPE_TINYINT, + fcore::metadata::DataType::SmallInt(_) => DATA_TYPE_SMALLINT, + fcore::metadata::DataType::Int(_) => DATA_TYPE_INT, + fcore::metadata::DataType::BigInt(_) => DATA_TYPE_BIGINT, + fcore::metadata::DataType::Float(_) => DATA_TYPE_FLOAT, + fcore::metadata::DataType::Double(_) => DATA_TYPE_DOUBLE, + fcore::metadata::DataType::String(_) => DATA_TYPE_STRING, + fcore::metadata::DataType::Bytes(_) => DATA_TYPE_BYTES, + fcore::metadata::DataType::Date(_) => DATA_TYPE_DATE, + fcore::metadata::DataType::Time(_) => DATA_TYPE_TIME, + fcore::metadata::DataType::Timestamp(_) => DATA_TYPE_TIMESTAMP, + fcore::metadata::DataType::TimestampLTz(_) => DATA_TYPE_TIMESTAMP_LTZ, + _ => 0, + } +} + +pub fn ffi_descriptor_to_core( + descriptor: &ffi::FfiTableDescriptor, +) -> Result<fcore::metadata::TableDescriptor> { + let mut schema_builder = fcore::metadata::Schema::builder(); + + for col in &descriptor.schema.columns { + let dt = ffi_data_type_to_core(col.data_type)?; + schema_builder = schema_builder.column(&col.name, dt); + if !col.comment.is_empty() { + schema_builder = schema_builder.with_comment(&col.comment); + } + } + + if !descriptor.schema.primary_keys.is_empty() { + schema_builder = schema_builder.primary_key(descriptor.schema.primary_keys.clone()); + } + + let schema = schema_builder.build()?; + + let mut builder = fcore::metadata::TableDescriptor::builder() + .schema(schema) + .partitioned_by(descriptor.partition_keys.clone()); + + if descriptor.bucket_count > 0 { + builder = builder.distributed_by(Some(descriptor.bucket_count), descriptor.bucket_keys.clone()); + } else { + builder = builder.distributed_by(None, descriptor.bucket_keys.clone()); + } + + for prop in &descriptor.properties { + builder = builder.property(&prop.key, &prop.value); + } + + if !descriptor.comment.is_empty() { + builder = builder.comment(&descriptor.comment); + } + + Ok(builder.build()?) +} + +pub fn core_table_info_to_ffi(info: &fcore::metadata::TableInfo) -> ffi::FfiTableInfo { + let schema = info.get_schema(); + let columns: Vec<ffi::FfiColumn> = schema + .columns() + .iter() + .map(|col| ffi::FfiColumn { + name: col.name().to_string(), + data_type: core_data_type_to_ffi(col.data_type()), + comment: col.comment().unwrap_or("").to_string(), + }) + .collect(); + + let primary_keys: Vec<String> = schema + .primary_key() + .map(|pk| pk.column_names().to_vec()) + .unwrap_or_default(); + + let properties: Vec<ffi::HashMapValue> = info + .get_properties() + .iter() + .map(|(k, v)| ffi::HashMapValue { + key: k.clone(), + value: v.clone(), + }) + .collect(); + + ffi::FfiTableInfo { + table_id: info.get_table_id(), + schema_id: info.get_schema_id(), + table_path: ffi::FfiTablePath { + database_name: info.get_table_path().database().to_string(), + table_name: info.get_table_path().table().to_string(), + }, + created_time: info.get_created_time(), + modified_time: info.get_modified_time(), + primary_keys: info.get_primary_keys().clone(), + bucket_keys: info.get_bucket_keys().to_vec(), + partition_keys: info.get_partition_keys().to_vec(), + num_buckets: info.get_num_buckets(), + has_primary_key: info.has_primary_key(), + is_partitioned: info.is_partitioned(), + properties, + comment: info.get_comment().unwrap_or("").to_string(), + schema: ffi::FfiSchema { + columns, + primary_keys, + }, + } +} + +pub fn empty_table_info() -> ffi::FfiTableInfo { + ffi::FfiTableInfo { + table_id: 0, + schema_id: 0, + table_path: ffi::FfiTablePath { + database_name: String::new(), + table_name: String::new(), + }, + created_time: 0, + modified_time: 0, + primary_keys: vec![], + bucket_keys: vec![], + partition_keys: vec![], + num_buckets: 0, + has_primary_key: false, + is_partitioned: false, + properties: vec![], + comment: String::new(), + schema: ffi::FfiSchema { + columns: vec![], + primary_keys: vec![], + }, + } +} + +pub struct OwnedRowData { + strings: Vec<String>, +} + +impl OwnedRowData { + pub fn new() -> Self { + Self { strings: Vec::new() } + } + + pub fn collect_strings(&mut self, row: &ffi::FfiGenericRow) { + for field in &row.fields { + if field.datum_type == DATUM_TYPE_STRING { + self.strings.push(field.string_val.to_string()); + } + } + } + + pub fn get_strings(&self) -> &[String] { + &self.strings + } +} + +pub fn ffi_row_to_core<'a>( + row: &ffi::FfiGenericRow, + owner: &'a OwnedRowData, +) -> fcore::row::GenericRow<'a> { + use fcore::row::{Blob, Datum, F32, F64}; + + let mut generic_row = fcore::row::GenericRow::new(); + let mut string_idx = 0; + + for (idx, field) in row.fields.iter().enumerate() { + let datum = match field.datum_type { + DATUM_TYPE_NULL => Datum::Null, + DATUM_TYPE_BOOL => Datum::Bool(field.bool_val), + DATUM_TYPE_INT32 => Datum::Int32(field.i32_val), + DATUM_TYPE_INT64 => Datum::Int64(field.i64_val), + DATUM_TYPE_FLOAT32 => Datum::Float32(F32::from(field.f32_val)), + DATUM_TYPE_FLOAT64 => Datum::Float64(F64::from(field.f64_val)), + DATUM_TYPE_STRING => { + let str_ref = owner.get_strings()[string_idx].as_str(); + string_idx += 1; + Datum::String(str_ref) + } + DATUM_TYPE_BYTES => Datum::Blob(Blob::from(field.bytes_val.clone())), + _ => Datum::Null, + }; + generic_row.set_field(idx, datum); + } + + generic_row +} + +pub fn core_scan_records_to_ffi(records: &fcore::record::ScanRecords) -> ffi::FfiScanRecords { + let mut ffi_records = Vec::new(); + + // Iterate over all buckets and their records + for bucket_records in records.records_by_buckets().values() { + for record in bucket_records { + let row = record.row(); + let fields = core_row_to_ffi_fields(row); + + ffi_records.push(ffi::FfiScanRecord { + offset: record.offset(), + timestamp: record.timestamp(), + row: ffi::FfiGenericRow { fields }, + }); + } + } + + ffi::FfiScanRecords { records: ffi_records } +} + +fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) -> Vec<ffi::FfiDatum> { + fn new_datum(datum_type: i32) -> ffi::FfiDatum { + ffi::FfiDatum { + datum_type, + bool_val: false, + i32_val: 0, + i64_val: 0, + f32_val: 0.0, + f64_val: 0.0, + string_val: String::new(), + bytes_val: vec![], + } + } + + let record_batch = row.get_record_batch(); + let schema = record_batch.schema(); + let row_id = row.get_row_id(); + + let mut fields = Vec::with_capacity(schema.fields().len()); + + for (i, field) in schema.fields().iter().enumerate() { + if row.is_null_at(i) { + fields.push(new_datum(DATUM_TYPE_NULL)); + continue; + } + + let datum = match field.data_type() { + ArrowDataType::Boolean => { + let mut datum = new_datum(DATUM_TYPE_BOOL); + datum.bool_val = row.get_boolean(i); + datum + } + ArrowDataType::Int8 => { + let mut datum = new_datum(DATUM_TYPE_INT32); + datum.i32_val = row.get_byte(i) as i32; + datum + } + ArrowDataType::Int16 => { + let mut datum = new_datum(DATUM_TYPE_INT32); + datum.i32_val = row.get_short(i) as i32; + datum + } + ArrowDataType::Int32 => { + let mut datum = new_datum(DATUM_TYPE_INT32); + datum.i32_val = row.get_int(i); + datum + } + ArrowDataType::Int64 => { + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = row.get_long(i); + datum + } + ArrowDataType::Float32 => { + let mut datum = new_datum(DATUM_TYPE_FLOAT32); + datum.f32_val = row.get_float(i); + datum + } + ArrowDataType::Float64 => { + let mut datum = new_datum(DATUM_TYPE_FLOAT64); + datum.f64_val = row.get_double(i); + datum + } + ArrowDataType::Utf8 => { + let mut datum = new_datum(DATUM_TYPE_STRING); + datum.string_val = row.get_string(i).to_string(); + datum + } + ArrowDataType::LargeUtf8 => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<LargeStringArray>() + .expect("LargeUtf8 column expected"); + let mut datum = new_datum(DATUM_TYPE_STRING); + datum.string_val = array.value(row_id).to_string(); + datum + } + ArrowDataType::Binary => { + let mut datum = new_datum(DATUM_TYPE_BYTES); + datum.bytes_val = row.get_bytes(i); + datum + } + ArrowDataType::FixedSizeBinary(len) => { + let mut datum = new_datum(DATUM_TYPE_BYTES); + datum.bytes_val = row.get_binary(i, *len as usize); + datum + } + ArrowDataType::LargeBinary => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<LargeBinaryArray>() + .expect("LargeBinary column expected"); + let mut datum = new_datum(DATUM_TYPE_BYTES); + datum.bytes_val = array.value(row_id).to_vec(); + datum + } + ArrowDataType::Date32 => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<Date32Array>() + .expect("Date32 column expected"); + let mut datum = new_datum(DATUM_TYPE_INT32); + datum.i32_val = array.value(row_id); + datum + } + ArrowDataType::Timestamp(unit, _) => match unit { + TimeUnit::Second => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<TimestampSecondArray>() + .expect("Timestamp(second) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = array.value(row_id); + datum + } + TimeUnit::Millisecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<TimestampMillisecondArray>() + .expect("Timestamp(millisecond) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = array.value(row_id); + datum + } + TimeUnit::Microsecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<TimestampMicrosecondArray>() + .expect("Timestamp(microsecond) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = array.value(row_id); + datum + } + TimeUnit::Nanosecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<TimestampNanosecondArray>() + .expect("Timestamp(nanosecond) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = array.value(row_id); + datum + } + }, + ArrowDataType::Time32(unit) => match unit { + TimeUnit::Second => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<Time32SecondArray>() + .expect("Time32(second) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT32); + datum.i32_val = array.value(row_id); + datum + } + TimeUnit::Millisecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<Time32MillisecondArray>() + .expect("Time32(millisecond) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT32); + datum.i32_val = array.value(row_id); + datum + } + _ => panic!("Unsupported Time32 unit for column {}", i), + }, + ArrowDataType::Time64(unit) => match unit { + TimeUnit::Microsecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<Time64MicrosecondArray>() + .expect("Time64(microsecond) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = array.value(row_id); + datum + } + TimeUnit::Nanosecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<Time64NanosecondArray>() + .expect("Time64(nanosecond) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = array.value(row_id); + datum + } + _ => panic!("Unsupported Time64 unit for column {}", i), + }, + other => panic!("Unsupported Arrow data type for column {}: {:?}", i, other), Review Comment: Panics can cause undefined behavior when crossing FFI boundaries. Instead of `panic!`, consider returning an error Result or using a default/fallback value. The panic will not be caught by C++ exception handlers and will abort the process. ########## bindings/cpp/include/fluss.hpp: ########## @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include <cstdint> +#include <memory> +#include <optional> +#include <string> +#include <unordered_map> +#include <vector> + +namespace fluss { + +namespace ffi { + struct Connection; + struct Admin; + struct Table; + struct AppendWriter; + struct LogScanner; +} // namespace ffi + +enum class DataType { + Boolean = 1, + TinyInt = 2, + SmallInt = 3, + Int = 4, + BigInt = 5, + Float = 6, + Double = 7, + String = 8, + Bytes = 9, + Date = 10, + Time = 11, + Timestamp = 12, + TimestampLtz = 13, +}; + +enum class DatumType { + Null = 0, + Bool = 1, + Int32 = 2, + Int64 = 3, + Float32 = 4, + Float64 = 5, + String = 6, + Bytes = 7, +}; + +struct Result { + int32_t error_code{0}; + std::string error_message; + + bool Ok() const { return error_code == 0; } +}; + +struct TablePath { + std::string database_name; + std::string table_name; + + TablePath() = default; + TablePath(std::string db, std::string tbl) + : database_name(std::move(db)), table_name(std::move(tbl)) {} + + std::string ToString() const { return database_name + "." + table_name; } +}; + +struct Column { + std::string name; + DataType data_type; + std::string comment; +}; + +struct Schema { + std::vector<Column> columns; + std::vector<std::string> primary_keys; + + class Builder { + public: + Builder& AddColumn(std::string name, DataType type, + std::string comment = "") { + columns_.push_back({std::move(name), type, std::move(comment)}); + return *this; + } + + Builder& SetPrimaryKeys(std::vector<std::string> keys) { + primary_keys_ = std::move(keys); + return *this; + } + + Schema Build() { + return Schema{std::move(columns_), std::move(primary_keys_)}; + } + + private: + std::vector<Column> columns_; + std::vector<std::string> primary_keys_; + }; + + static Builder NewBuilder() { return Builder(); } +}; + +struct TableDescriptor { + Schema schema; + std::vector<std::string> partition_keys; + int32_t bucket_count{0}; + std::vector<std::string> bucket_keys; + std::unordered_map<std::string, std::string> properties; + std::string comment; + + class Builder { + public: + Builder& SetSchema(Schema s) { + schema_ = std::move(s); + return *this; + } + + Builder& SetPartitionKeys(std::vector<std::string> keys) { + partition_keys_ = std::move(keys); + return *this; + } + + Builder& SetBucketCount(int32_t count) { + bucket_count_ = count; + return *this; + } + + Builder& SetBucketKeys(std::vector<std::string> keys) { + bucket_keys_ = std::move(keys); + return *this; + } + + Builder& SetProperty(std::string key, std::string value) { + properties_[std::move(key)] = std::move(value); + return *this; + } + + Builder& SetComment(std::string comment) { + comment_ = std::move(comment); + return *this; + } + + TableDescriptor Build() { + return TableDescriptor{std::move(schema_), + std::move(partition_keys_), + bucket_count_, + std::move(bucket_keys_), + std::move(properties_), + std::move(comment_)}; + } + + private: + Schema schema_; + std::vector<std::string> partition_keys_; + int32_t bucket_count_{0}; + std::vector<std::string> bucket_keys_; + std::unordered_map<std::string, std::string> properties_; + std::string comment_; + }; + + static Builder NewBuilder() { return Builder(); } +}; + +struct TableInfo { + int64_t table_id; + int32_t schema_id; + TablePath table_path; + int64_t created_time; + int64_t modified_time; + std::vector<std::string> primary_keys; + std::vector<std::string> bucket_keys; + std::vector<std::string> partition_keys; + int32_t num_buckets; + bool has_primary_key; + bool is_partitioned; + std::unordered_map<std::string, std::string> properties; + std::string comment; + Schema schema; +}; + +struct Datum { + DatumType type; + bool bool_val; + int32_t i32_val; + int64_t i64_val; + float f32_val; + double f64_val; + std::string string_val; + std::vector<uint8_t> bytes_val; + + static Datum Null() { return Datum{DatumType::Null}; } + static Datum Bool(bool v) { return Datum{DatumType::Bool, v}; } + static Datum Int32(int32_t v) { + Datum d; + d.type = DatumType::Int32; + d.i32_val = v; + return d; + } + static Datum Int64(int64_t v) { + Datum d; + d.type = DatumType::Int64; + d.i64_val = v; + return d; + } + static Datum Float32(float v) { + Datum d; + d.type = DatumType::Float32; + d.f32_val = v; + return d; + } + static Datum Float64(double v) { + Datum d; + d.type = DatumType::Float64; + d.f64_val = v; + return d; + } + static Datum String(std::string v) { + Datum d; + d.type = DatumType::String; + d.string_val = std::move(v); + return d; + } + static Datum Bytes(std::vector<uint8_t> v) { + Datum d; + d.type = DatumType::Bytes; + d.bytes_val = std::move(v); + return d; + } Review Comment: The `Datum` static factory methods don't initialize all fields, leaving some with undefined values. For example, `Datum::Int32(v)` only sets `type` and `i32_val`, but leaves `bool_val`, `i64_val`, `f32_val`, `f64_val`, `string_val`, and `bytes_val` uninitialized. Consider using aggregate initialization with all fields explicitly set, or add default member initializers to the `Datum` struct. ########## bindings/cpp/src/types.rs: ########## @@ -0,0 +1,498 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::ffi; +use anyhow::{anyhow, Result}; +use arrow::array::{ + Date32Array, LargeBinaryArray, LargeStringArray, Time32MillisecondArray, Time32SecondArray, + Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, + TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, +}; +use arrow::datatypes::{DataType as ArrowDataType, TimeUnit}; +use fluss as fcore; +use fcore::row::InternalRow; + +pub const DATA_TYPE_BOOLEAN: i32 = 1; +pub const DATA_TYPE_TINYINT: i32 = 2; +pub const DATA_TYPE_SMALLINT: i32 = 3; +pub const DATA_TYPE_INT: i32 = 4; +pub const DATA_TYPE_BIGINT: i32 = 5; +pub const DATA_TYPE_FLOAT: i32 = 6; +pub const DATA_TYPE_DOUBLE: i32 = 7; +pub const DATA_TYPE_STRING: i32 = 8; +pub const DATA_TYPE_BYTES: i32 = 9; +pub const DATA_TYPE_DATE: i32 = 10; +pub const DATA_TYPE_TIME: i32 = 11; +pub const DATA_TYPE_TIMESTAMP: i32 = 12; +pub const DATA_TYPE_TIMESTAMP_LTZ: i32 = 13; + +pub const DATUM_TYPE_NULL: i32 = 0; +pub const DATUM_TYPE_BOOL: i32 = 1; +pub const DATUM_TYPE_INT32: i32 = 2; +pub const DATUM_TYPE_INT64: i32 = 3; +pub const DATUM_TYPE_FLOAT32: i32 = 4; +pub const DATUM_TYPE_FLOAT64: i32 = 5; +pub const DATUM_TYPE_STRING: i32 = 6; +pub const DATUM_TYPE_BYTES: i32 = 7; + +fn ffi_data_type_to_core(dt: i32) -> Result<fcore::metadata::DataType> { + match dt { + DATA_TYPE_BOOLEAN => Ok(fcore::metadata::DataTypes::boolean()), + DATA_TYPE_TINYINT => Ok(fcore::metadata::DataTypes::tinyint()), + DATA_TYPE_SMALLINT => Ok(fcore::metadata::DataTypes::smallint()), + DATA_TYPE_INT => Ok(fcore::metadata::DataTypes::int()), + DATA_TYPE_BIGINT => Ok(fcore::metadata::DataTypes::bigint()), + DATA_TYPE_FLOAT => Ok(fcore::metadata::DataTypes::float()), + DATA_TYPE_DOUBLE => Ok(fcore::metadata::DataTypes::double()), + DATA_TYPE_STRING => Ok(fcore::metadata::DataTypes::string()), + DATA_TYPE_BYTES => Ok(fcore::metadata::DataTypes::bytes()), + DATA_TYPE_DATE => Ok(fcore::metadata::DataTypes::date()), + DATA_TYPE_TIME => Ok(fcore::metadata::DataTypes::time()), + DATA_TYPE_TIMESTAMP => Ok(fcore::metadata::DataTypes::timestamp()), + DATA_TYPE_TIMESTAMP_LTZ => Ok(fcore::metadata::DataTypes::timestamp_ltz()), + _ => Err(anyhow!("Unknown data type: {}", dt)), + } +} + +fn core_data_type_to_ffi(dt: &fcore::metadata::DataType) -> i32 { + match dt { + fcore::metadata::DataType::Boolean(_) => DATA_TYPE_BOOLEAN, + fcore::metadata::DataType::TinyInt(_) => DATA_TYPE_TINYINT, + fcore::metadata::DataType::SmallInt(_) => DATA_TYPE_SMALLINT, + fcore::metadata::DataType::Int(_) => DATA_TYPE_INT, + fcore::metadata::DataType::BigInt(_) => DATA_TYPE_BIGINT, + fcore::metadata::DataType::Float(_) => DATA_TYPE_FLOAT, + fcore::metadata::DataType::Double(_) => DATA_TYPE_DOUBLE, + fcore::metadata::DataType::String(_) => DATA_TYPE_STRING, + fcore::metadata::DataType::Bytes(_) => DATA_TYPE_BYTES, + fcore::metadata::DataType::Date(_) => DATA_TYPE_DATE, + fcore::metadata::DataType::Time(_) => DATA_TYPE_TIME, + fcore::metadata::DataType::Timestamp(_) => DATA_TYPE_TIMESTAMP, + fcore::metadata::DataType::TimestampLTz(_) => DATA_TYPE_TIMESTAMP_LTZ, + _ => 0, + } +} + +pub fn ffi_descriptor_to_core( + descriptor: &ffi::FfiTableDescriptor, +) -> Result<fcore::metadata::TableDescriptor> { + let mut schema_builder = fcore::metadata::Schema::builder(); + + for col in &descriptor.schema.columns { + let dt = ffi_data_type_to_core(col.data_type)?; + schema_builder = schema_builder.column(&col.name, dt); + if !col.comment.is_empty() { + schema_builder = schema_builder.with_comment(&col.comment); + } + } + + if !descriptor.schema.primary_keys.is_empty() { + schema_builder = schema_builder.primary_key(descriptor.schema.primary_keys.clone()); + } + + let schema = schema_builder.build()?; + + let mut builder = fcore::metadata::TableDescriptor::builder() + .schema(schema) + .partitioned_by(descriptor.partition_keys.clone()); + + if descriptor.bucket_count > 0 { + builder = builder.distributed_by(Some(descriptor.bucket_count), descriptor.bucket_keys.clone()); + } else { + builder = builder.distributed_by(None, descriptor.bucket_keys.clone()); + } + + for prop in &descriptor.properties { + builder = builder.property(&prop.key, &prop.value); + } + + if !descriptor.comment.is_empty() { + builder = builder.comment(&descriptor.comment); + } + + Ok(builder.build()?) +} + +pub fn core_table_info_to_ffi(info: &fcore::metadata::TableInfo) -> ffi::FfiTableInfo { + let schema = info.get_schema(); + let columns: Vec<ffi::FfiColumn> = schema + .columns() + .iter() + .map(|col| ffi::FfiColumn { + name: col.name().to_string(), + data_type: core_data_type_to_ffi(col.data_type()), + comment: col.comment().unwrap_or("").to_string(), + }) + .collect(); + + let primary_keys: Vec<String> = schema + .primary_key() + .map(|pk| pk.column_names().to_vec()) + .unwrap_or_default(); + + let properties: Vec<ffi::HashMapValue> = info + .get_properties() + .iter() + .map(|(k, v)| ffi::HashMapValue { + key: k.clone(), + value: v.clone(), + }) + .collect(); + + ffi::FfiTableInfo { + table_id: info.get_table_id(), + schema_id: info.get_schema_id(), + table_path: ffi::FfiTablePath { + database_name: info.get_table_path().database().to_string(), + table_name: info.get_table_path().table().to_string(), + }, + created_time: info.get_created_time(), + modified_time: info.get_modified_time(), + primary_keys: info.get_primary_keys().clone(), + bucket_keys: info.get_bucket_keys().to_vec(), + partition_keys: info.get_partition_keys().to_vec(), + num_buckets: info.get_num_buckets(), + has_primary_key: info.has_primary_key(), + is_partitioned: info.is_partitioned(), + properties, + comment: info.get_comment().unwrap_or("").to_string(), + schema: ffi::FfiSchema { + columns, + primary_keys, + }, + } +} + +pub fn empty_table_info() -> ffi::FfiTableInfo { + ffi::FfiTableInfo { + table_id: 0, + schema_id: 0, + table_path: ffi::FfiTablePath { + database_name: String::new(), + table_name: String::new(), + }, + created_time: 0, + modified_time: 0, + primary_keys: vec![], + bucket_keys: vec![], + partition_keys: vec![], + num_buckets: 0, + has_primary_key: false, + is_partitioned: false, + properties: vec![], + comment: String::new(), + schema: ffi::FfiSchema { + columns: vec![], + primary_keys: vec![], + }, + } +} + +pub struct OwnedRowData { + strings: Vec<String>, +} + +impl OwnedRowData { + pub fn new() -> Self { + Self { strings: Vec::new() } + } + + pub fn collect_strings(&mut self, row: &ffi::FfiGenericRow) { + for field in &row.fields { + if field.datum_type == DATUM_TYPE_STRING { + self.strings.push(field.string_val.to_string()); + } + } + } + + pub fn get_strings(&self) -> &[String] { + &self.strings + } +} + +pub fn ffi_row_to_core<'a>( + row: &ffi::FfiGenericRow, + owner: &'a OwnedRowData, +) -> fcore::row::GenericRow<'a> { + use fcore::row::{Blob, Datum, F32, F64}; + + let mut generic_row = fcore::row::GenericRow::new(); + let mut string_idx = 0; + + for (idx, field) in row.fields.iter().enumerate() { + let datum = match field.datum_type { + DATUM_TYPE_NULL => Datum::Null, + DATUM_TYPE_BOOL => Datum::Bool(field.bool_val), + DATUM_TYPE_INT32 => Datum::Int32(field.i32_val), + DATUM_TYPE_INT64 => Datum::Int64(field.i64_val), + DATUM_TYPE_FLOAT32 => Datum::Float32(F32::from(field.f32_val)), + DATUM_TYPE_FLOAT64 => Datum::Float64(F64::from(field.f64_val)), + DATUM_TYPE_STRING => { + let str_ref = owner.get_strings()[string_idx].as_str(); + string_idx += 1; + Datum::String(str_ref) + } + DATUM_TYPE_BYTES => Datum::Blob(Blob::from(field.bytes_val.clone())), + _ => Datum::Null, + }; + generic_row.set_field(idx, datum); + } + + generic_row +} + +pub fn core_scan_records_to_ffi(records: &fcore::record::ScanRecords) -> ffi::FfiScanRecords { + let mut ffi_records = Vec::new(); + + // Iterate over all buckets and their records + for bucket_records in records.records_by_buckets().values() { + for record in bucket_records { + let row = record.row(); + let fields = core_row_to_ffi_fields(row); + + ffi_records.push(ffi::FfiScanRecord { + offset: record.offset(), + timestamp: record.timestamp(), + row: ffi::FfiGenericRow { fields }, + }); + } + } + + ffi::FfiScanRecords { records: ffi_records } +} + +fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) -> Vec<ffi::FfiDatum> { + fn new_datum(datum_type: i32) -> ffi::FfiDatum { + ffi::FfiDatum { + datum_type, + bool_val: false, + i32_val: 0, + i64_val: 0, + f32_val: 0.0, + f64_val: 0.0, + string_val: String::new(), + bytes_val: vec![], + } + } + + let record_batch = row.get_record_batch(); + let schema = record_batch.schema(); + let row_id = row.get_row_id(); + + let mut fields = Vec::with_capacity(schema.fields().len()); + + for (i, field) in schema.fields().iter().enumerate() { + if row.is_null_at(i) { + fields.push(new_datum(DATUM_TYPE_NULL)); + continue; + } + + let datum = match field.data_type() { + ArrowDataType::Boolean => { + let mut datum = new_datum(DATUM_TYPE_BOOL); + datum.bool_val = row.get_boolean(i); + datum + } + ArrowDataType::Int8 => { + let mut datum = new_datum(DATUM_TYPE_INT32); + datum.i32_val = row.get_byte(i) as i32; + datum + } + ArrowDataType::Int16 => { + let mut datum = new_datum(DATUM_TYPE_INT32); + datum.i32_val = row.get_short(i) as i32; + datum + } + ArrowDataType::Int32 => { + let mut datum = new_datum(DATUM_TYPE_INT32); + datum.i32_val = row.get_int(i); + datum + } + ArrowDataType::Int64 => { + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = row.get_long(i); + datum + } + ArrowDataType::Float32 => { + let mut datum = new_datum(DATUM_TYPE_FLOAT32); + datum.f32_val = row.get_float(i); + datum + } + ArrowDataType::Float64 => { + let mut datum = new_datum(DATUM_TYPE_FLOAT64); + datum.f64_val = row.get_double(i); + datum + } + ArrowDataType::Utf8 => { + let mut datum = new_datum(DATUM_TYPE_STRING); + datum.string_val = row.get_string(i).to_string(); + datum + } + ArrowDataType::LargeUtf8 => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<LargeStringArray>() + .expect("LargeUtf8 column expected"); + let mut datum = new_datum(DATUM_TYPE_STRING); + datum.string_val = array.value(row_id).to_string(); + datum + } + ArrowDataType::Binary => { + let mut datum = new_datum(DATUM_TYPE_BYTES); + datum.bytes_val = row.get_bytes(i); + datum + } + ArrowDataType::FixedSizeBinary(len) => { + let mut datum = new_datum(DATUM_TYPE_BYTES); + datum.bytes_val = row.get_binary(i, *len as usize); + datum + } + ArrowDataType::LargeBinary => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<LargeBinaryArray>() + .expect("LargeBinary column expected"); + let mut datum = new_datum(DATUM_TYPE_BYTES); + datum.bytes_val = array.value(row_id).to_vec(); + datum + } + ArrowDataType::Date32 => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<Date32Array>() + .expect("Date32 column expected"); + let mut datum = new_datum(DATUM_TYPE_INT32); + datum.i32_val = array.value(row_id); + datum + } + ArrowDataType::Timestamp(unit, _) => match unit { + TimeUnit::Second => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<TimestampSecondArray>() + .expect("Timestamp(second) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = array.value(row_id); + datum + } + TimeUnit::Millisecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<TimestampMillisecondArray>() + .expect("Timestamp(millisecond) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = array.value(row_id); + datum + } + TimeUnit::Microsecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<TimestampMicrosecondArray>() + .expect("Timestamp(microsecond) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = array.value(row_id); + datum + } + TimeUnit::Nanosecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<TimestampNanosecondArray>() + .expect("Timestamp(nanosecond) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = array.value(row_id); + datum + } + }, + ArrowDataType::Time32(unit) => match unit { + TimeUnit::Second => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<Time32SecondArray>() + .expect("Time32(second) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT32); + datum.i32_val = array.value(row_id); + datum + } + TimeUnit::Millisecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<Time32MillisecondArray>() + .expect("Time32(millisecond) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT32); + datum.i32_val = array.value(row_id); + datum + } + _ => panic!("Unsupported Time32 unit for column {}", i), Review Comment: Panics can cause undefined behavior when crossing FFI boundaries. Instead of `panic!`, consider returning an error Result or using a default/fallback value. The panic will not be caught by C++ exception handlers and will abort the process. ########## bindings/cpp/src/types.rs: ########## @@ -0,0 +1,498 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::ffi; +use anyhow::{anyhow, Result}; +use arrow::array::{ + Date32Array, LargeBinaryArray, LargeStringArray, Time32MillisecondArray, Time32SecondArray, + Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, + TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, +}; +use arrow::datatypes::{DataType as ArrowDataType, TimeUnit}; +use fluss as fcore; +use fcore::row::InternalRow; + +pub const DATA_TYPE_BOOLEAN: i32 = 1; +pub const DATA_TYPE_TINYINT: i32 = 2; +pub const DATA_TYPE_SMALLINT: i32 = 3; +pub const DATA_TYPE_INT: i32 = 4; +pub const DATA_TYPE_BIGINT: i32 = 5; +pub const DATA_TYPE_FLOAT: i32 = 6; +pub const DATA_TYPE_DOUBLE: i32 = 7; +pub const DATA_TYPE_STRING: i32 = 8; +pub const DATA_TYPE_BYTES: i32 = 9; +pub const DATA_TYPE_DATE: i32 = 10; +pub const DATA_TYPE_TIME: i32 = 11; +pub const DATA_TYPE_TIMESTAMP: i32 = 12; +pub const DATA_TYPE_TIMESTAMP_LTZ: i32 = 13; + +pub const DATUM_TYPE_NULL: i32 = 0; +pub const DATUM_TYPE_BOOL: i32 = 1; +pub const DATUM_TYPE_INT32: i32 = 2; +pub const DATUM_TYPE_INT64: i32 = 3; +pub const DATUM_TYPE_FLOAT32: i32 = 4; +pub const DATUM_TYPE_FLOAT64: i32 = 5; +pub const DATUM_TYPE_STRING: i32 = 6; +pub const DATUM_TYPE_BYTES: i32 = 7; + +fn ffi_data_type_to_core(dt: i32) -> Result<fcore::metadata::DataType> { + match dt { + DATA_TYPE_BOOLEAN => Ok(fcore::metadata::DataTypes::boolean()), + DATA_TYPE_TINYINT => Ok(fcore::metadata::DataTypes::tinyint()), + DATA_TYPE_SMALLINT => Ok(fcore::metadata::DataTypes::smallint()), + DATA_TYPE_INT => Ok(fcore::metadata::DataTypes::int()), + DATA_TYPE_BIGINT => Ok(fcore::metadata::DataTypes::bigint()), + DATA_TYPE_FLOAT => Ok(fcore::metadata::DataTypes::float()), + DATA_TYPE_DOUBLE => Ok(fcore::metadata::DataTypes::double()), + DATA_TYPE_STRING => Ok(fcore::metadata::DataTypes::string()), + DATA_TYPE_BYTES => Ok(fcore::metadata::DataTypes::bytes()), + DATA_TYPE_DATE => Ok(fcore::metadata::DataTypes::date()), + DATA_TYPE_TIME => Ok(fcore::metadata::DataTypes::time()), + DATA_TYPE_TIMESTAMP => Ok(fcore::metadata::DataTypes::timestamp()), + DATA_TYPE_TIMESTAMP_LTZ => Ok(fcore::metadata::DataTypes::timestamp_ltz()), + _ => Err(anyhow!("Unknown data type: {}", dt)), + } +} + +fn core_data_type_to_ffi(dt: &fcore::metadata::DataType) -> i32 { + match dt { + fcore::metadata::DataType::Boolean(_) => DATA_TYPE_BOOLEAN, + fcore::metadata::DataType::TinyInt(_) => DATA_TYPE_TINYINT, + fcore::metadata::DataType::SmallInt(_) => DATA_TYPE_SMALLINT, + fcore::metadata::DataType::Int(_) => DATA_TYPE_INT, + fcore::metadata::DataType::BigInt(_) => DATA_TYPE_BIGINT, + fcore::metadata::DataType::Float(_) => DATA_TYPE_FLOAT, + fcore::metadata::DataType::Double(_) => DATA_TYPE_DOUBLE, + fcore::metadata::DataType::String(_) => DATA_TYPE_STRING, + fcore::metadata::DataType::Bytes(_) => DATA_TYPE_BYTES, + fcore::metadata::DataType::Date(_) => DATA_TYPE_DATE, + fcore::metadata::DataType::Time(_) => DATA_TYPE_TIME, + fcore::metadata::DataType::Timestamp(_) => DATA_TYPE_TIMESTAMP, + fcore::metadata::DataType::TimestampLTz(_) => DATA_TYPE_TIMESTAMP_LTZ, + _ => 0, + } +} + +pub fn ffi_descriptor_to_core( + descriptor: &ffi::FfiTableDescriptor, +) -> Result<fcore::metadata::TableDescriptor> { + let mut schema_builder = fcore::metadata::Schema::builder(); + + for col in &descriptor.schema.columns { + let dt = ffi_data_type_to_core(col.data_type)?; + schema_builder = schema_builder.column(&col.name, dt); + if !col.comment.is_empty() { + schema_builder = schema_builder.with_comment(&col.comment); + } + } + + if !descriptor.schema.primary_keys.is_empty() { + schema_builder = schema_builder.primary_key(descriptor.schema.primary_keys.clone()); + } + + let schema = schema_builder.build()?; + + let mut builder = fcore::metadata::TableDescriptor::builder() + .schema(schema) + .partitioned_by(descriptor.partition_keys.clone()); + + if descriptor.bucket_count > 0 { + builder = builder.distributed_by(Some(descriptor.bucket_count), descriptor.bucket_keys.clone()); + } else { + builder = builder.distributed_by(None, descriptor.bucket_keys.clone()); + } + + for prop in &descriptor.properties { + builder = builder.property(&prop.key, &prop.value); + } + + if !descriptor.comment.is_empty() { + builder = builder.comment(&descriptor.comment); + } + + Ok(builder.build()?) +} + +pub fn core_table_info_to_ffi(info: &fcore::metadata::TableInfo) -> ffi::FfiTableInfo { + let schema = info.get_schema(); + let columns: Vec<ffi::FfiColumn> = schema + .columns() + .iter() + .map(|col| ffi::FfiColumn { + name: col.name().to_string(), + data_type: core_data_type_to_ffi(col.data_type()), + comment: col.comment().unwrap_or("").to_string(), + }) + .collect(); + + let primary_keys: Vec<String> = schema + .primary_key() + .map(|pk| pk.column_names().to_vec()) + .unwrap_or_default(); + + let properties: Vec<ffi::HashMapValue> = info + .get_properties() + .iter() + .map(|(k, v)| ffi::HashMapValue { + key: k.clone(), + value: v.clone(), + }) + .collect(); + + ffi::FfiTableInfo { + table_id: info.get_table_id(), + schema_id: info.get_schema_id(), + table_path: ffi::FfiTablePath { + database_name: info.get_table_path().database().to_string(), + table_name: info.get_table_path().table().to_string(), + }, + created_time: info.get_created_time(), + modified_time: info.get_modified_time(), + primary_keys: info.get_primary_keys().clone(), + bucket_keys: info.get_bucket_keys().to_vec(), + partition_keys: info.get_partition_keys().to_vec(), + num_buckets: info.get_num_buckets(), + has_primary_key: info.has_primary_key(), + is_partitioned: info.is_partitioned(), + properties, + comment: info.get_comment().unwrap_or("").to_string(), + schema: ffi::FfiSchema { + columns, + primary_keys, + }, + } +} + +pub fn empty_table_info() -> ffi::FfiTableInfo { + ffi::FfiTableInfo { + table_id: 0, + schema_id: 0, + table_path: ffi::FfiTablePath { + database_name: String::new(), + table_name: String::new(), + }, + created_time: 0, + modified_time: 0, + primary_keys: vec![], + bucket_keys: vec![], + partition_keys: vec![], + num_buckets: 0, + has_primary_key: false, + is_partitioned: false, + properties: vec![], + comment: String::new(), + schema: ffi::FfiSchema { + columns: vec![], + primary_keys: vec![], + }, + } +} + +pub struct OwnedRowData { + strings: Vec<String>, +} + +impl OwnedRowData { + pub fn new() -> Self { + Self { strings: Vec::new() } + } + + pub fn collect_strings(&mut self, row: &ffi::FfiGenericRow) { + for field in &row.fields { + if field.datum_type == DATUM_TYPE_STRING { + self.strings.push(field.string_val.to_string()); + } + } + } + + pub fn get_strings(&self) -> &[String] { + &self.strings + } +} + +pub fn ffi_row_to_core<'a>( + row: &ffi::FfiGenericRow, + owner: &'a OwnedRowData, +) -> fcore::row::GenericRow<'a> { + use fcore::row::{Blob, Datum, F32, F64}; + + let mut generic_row = fcore::row::GenericRow::new(); + let mut string_idx = 0; + + for (idx, field) in row.fields.iter().enumerate() { + let datum = match field.datum_type { + DATUM_TYPE_NULL => Datum::Null, + DATUM_TYPE_BOOL => Datum::Bool(field.bool_val), + DATUM_TYPE_INT32 => Datum::Int32(field.i32_val), + DATUM_TYPE_INT64 => Datum::Int64(field.i64_val), + DATUM_TYPE_FLOAT32 => Datum::Float32(F32::from(field.f32_val)), + DATUM_TYPE_FLOAT64 => Datum::Float64(F64::from(field.f64_val)), + DATUM_TYPE_STRING => { + let str_ref = owner.get_strings()[string_idx].as_str(); + string_idx += 1; + Datum::String(str_ref) + } + DATUM_TYPE_BYTES => Datum::Blob(Blob::from(field.bytes_val.clone())), + _ => Datum::Null, + }; + generic_row.set_field(idx, datum); + } + + generic_row +} + +pub fn core_scan_records_to_ffi(records: &fcore::record::ScanRecords) -> ffi::FfiScanRecords { + let mut ffi_records = Vec::new(); + + // Iterate over all buckets and their records + for bucket_records in records.records_by_buckets().values() { + for record in bucket_records { + let row = record.row(); + let fields = core_row_to_ffi_fields(row); + + ffi_records.push(ffi::FfiScanRecord { + offset: record.offset(), + timestamp: record.timestamp(), + row: ffi::FfiGenericRow { fields }, + }); + } + } + + ffi::FfiScanRecords { records: ffi_records } +} + +fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) -> Vec<ffi::FfiDatum> { + fn new_datum(datum_type: i32) -> ffi::FfiDatum { + ffi::FfiDatum { + datum_type, + bool_val: false, + i32_val: 0, + i64_val: 0, + f32_val: 0.0, + f64_val: 0.0, + string_val: String::new(), + bytes_val: vec![], + } + } + + let record_batch = row.get_record_batch(); + let schema = record_batch.schema(); + let row_id = row.get_row_id(); + + let mut fields = Vec::with_capacity(schema.fields().len()); + + for (i, field) in schema.fields().iter().enumerate() { + if row.is_null_at(i) { + fields.push(new_datum(DATUM_TYPE_NULL)); + continue; + } + + let datum = match field.data_type() { + ArrowDataType::Boolean => { + let mut datum = new_datum(DATUM_TYPE_BOOL); + datum.bool_val = row.get_boolean(i); + datum + } + ArrowDataType::Int8 => { + let mut datum = new_datum(DATUM_TYPE_INT32); + datum.i32_val = row.get_byte(i) as i32; + datum + } + ArrowDataType::Int16 => { + let mut datum = new_datum(DATUM_TYPE_INT32); + datum.i32_val = row.get_short(i) as i32; + datum + } + ArrowDataType::Int32 => { + let mut datum = new_datum(DATUM_TYPE_INT32); + datum.i32_val = row.get_int(i); + datum + } + ArrowDataType::Int64 => { + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = row.get_long(i); + datum + } + ArrowDataType::Float32 => { + let mut datum = new_datum(DATUM_TYPE_FLOAT32); + datum.f32_val = row.get_float(i); + datum + } + ArrowDataType::Float64 => { + let mut datum = new_datum(DATUM_TYPE_FLOAT64); + datum.f64_val = row.get_double(i); + datum + } + ArrowDataType::Utf8 => { + let mut datum = new_datum(DATUM_TYPE_STRING); + datum.string_val = row.get_string(i).to_string(); + datum + } + ArrowDataType::LargeUtf8 => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<LargeStringArray>() + .expect("LargeUtf8 column expected"); + let mut datum = new_datum(DATUM_TYPE_STRING); + datum.string_val = array.value(row_id).to_string(); + datum + } + ArrowDataType::Binary => { + let mut datum = new_datum(DATUM_TYPE_BYTES); + datum.bytes_val = row.get_bytes(i); + datum + } + ArrowDataType::FixedSizeBinary(len) => { + let mut datum = new_datum(DATUM_TYPE_BYTES); + datum.bytes_val = row.get_binary(i, *len as usize); + datum + } + ArrowDataType::LargeBinary => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<LargeBinaryArray>() + .expect("LargeBinary column expected"); + let mut datum = new_datum(DATUM_TYPE_BYTES); + datum.bytes_val = array.value(row_id).to_vec(); + datum + } + ArrowDataType::Date32 => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<Date32Array>() + .expect("Date32 column expected"); + let mut datum = new_datum(DATUM_TYPE_INT32); + datum.i32_val = array.value(row_id); + datum + } + ArrowDataType::Timestamp(unit, _) => match unit { + TimeUnit::Second => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<TimestampSecondArray>() + .expect("Timestamp(second) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = array.value(row_id); + datum + } + TimeUnit::Millisecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<TimestampMillisecondArray>() + .expect("Timestamp(millisecond) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = array.value(row_id); + datum + } + TimeUnit::Microsecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<TimestampMicrosecondArray>() + .expect("Timestamp(microsecond) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = array.value(row_id); + datum + } + TimeUnit::Nanosecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<TimestampNanosecondArray>() + .expect("Timestamp(nanosecond) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = array.value(row_id); + datum + } + }, + ArrowDataType::Time32(unit) => match unit { + TimeUnit::Second => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<Time32SecondArray>() + .expect("Time32(second) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT32); + datum.i32_val = array.value(row_id); + datum + } + TimeUnit::Millisecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<Time32MillisecondArray>() + .expect("Time32(millisecond) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT32); + datum.i32_val = array.value(row_id); + datum + } + _ => panic!("Unsupported Time32 unit for column {}", i), + }, + ArrowDataType::Time64(unit) => match unit { + TimeUnit::Microsecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<Time64MicrosecondArray>() + .expect("Time64(microsecond) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = array.value(row_id); + datum + } + TimeUnit::Nanosecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::<Time64NanosecondArray>() + .expect("Time64(nanosecond) column expected"); Review Comment: The `expect()` calls throughout this function can panic if the downcasts fail, which will cause undefined behavior when crossing FFI boundaries. Consider using pattern matching with proper error handling instead of `expect()`, or return a Result type that can be properly handled by the C++ caller. ########## bindings/cpp/src/lib.rs: ########## @@ -0,0 +1,509 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod types; + +use std::sync::{Arc, LazyLock}; +use std::time::Duration; + +use fluss as fcore; + +static RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() Review Comment: The tokio runtime initialization uses `unwrap()` which will panic if the runtime fails to build. This panic will occur during static initialization and cannot be caught from C++. Consider using `expect()` with a descriptive message or handling the error more gracefully. ```suggestion .expect("Failed to build Tokio runtime for FFI") ``` ########## bindings/cpp/src/lib.rs: ########## @@ -0,0 +1,509 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod types; + +use std::sync::{Arc, LazyLock}; +use std::time::Duration; + +use fluss as fcore; + +static RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() +}); + +#[cxx::bridge(namespace = "fluss::ffi")] +mod ffi { + struct HashMapValue { + key: String, + value: String, + } + + struct FfiResult { + error_code: i32, + error_message: String, + } + + struct FfiTablePath { + database_name: String, + table_name: String, + } + + struct FfiColumn { + name: String, + data_type: i32, + comment: String, + } + + struct FfiSchema { + columns: Vec<FfiColumn>, + primary_keys: Vec<String>, + } + + struct FfiTableDescriptor { + schema: FfiSchema, + partition_keys: Vec<String>, + bucket_count: i32, + bucket_keys: Vec<String>, + properties: Vec<HashMapValue>, + comment: String, + } + + struct FfiTableInfo { + table_id: i64, + schema_id: i32, + table_path: FfiTablePath, + created_time: i64, + modified_time: i64, + primary_keys: Vec<String>, + bucket_keys: Vec<String>, + partition_keys: Vec<String>, + num_buckets: i32, + has_primary_key: bool, + is_partitioned: bool, + properties: Vec<HashMapValue>, + comment: String, + schema: FfiSchema, + } + + struct FfiTableInfoResult { + result: FfiResult, + table_info: FfiTableInfo, + } + + struct FfiDatum { + datum_type: i32, + bool_val: bool, + i32_val: i32, + i64_val: i64, + f32_val: f32, + f64_val: f64, + string_val: String, + bytes_val: Vec<u8>, + } + + struct FfiGenericRow { + fields: Vec<FfiDatum>, + } + + struct FfiScanRecord { + offset: i64, + timestamp: i64, + row: FfiGenericRow, + } + + struct FfiScanRecords { + records: Vec<FfiScanRecord>, + } + + struct FfiScanRecordsResult { + result: FfiResult, + scan_records: FfiScanRecords, + } + + struct FfiLakeSnapshot { + snapshot_id: i64, + bucket_offsets: Vec<FfiBucketOffset>, + } + + struct FfiBucketOffset { + table_id: i64, + partition_id: i64, + bucket_id: i32, + offset: i64, + } + + struct FfiLakeSnapshotResult { + result: FfiResult, + lake_snapshot: FfiLakeSnapshot, + } + + extern "Rust" { + type Connection; + type Admin; + type Table; + type AppendWriter; + type LogScanner; + + // Connection + fn new_connection(bootstrap_server: &str) -> Result<*mut Connection>; + unsafe fn delete_connection(conn: *mut Connection); + fn get_admin(self: &Connection) -> Result<*mut Admin>; + fn get_table(self: &Connection, table_path: &FfiTablePath) -> Result<*mut Table>; + + // Admin + unsafe fn delete_admin(admin: *mut Admin); + fn create_table( + self: &Admin, + table_path: &FfiTablePath, + descriptor: &FfiTableDescriptor, + ignore_if_exists: bool, + ) -> FfiResult; + fn get_table_info(self: &Admin, table_path: &FfiTablePath) -> FfiTableInfoResult; + fn get_latest_lake_snapshot(self: &Admin, table_path: &FfiTablePath) + -> FfiLakeSnapshotResult; + + // Table + unsafe fn delete_table(table: *mut Table); + fn new_append_writer(self: &Table) -> Result<*mut AppendWriter>; + fn new_log_scanner(self: &Table) -> Result<*mut LogScanner>; + fn new_log_scanner_with_projection(self: &Table, column_indices: Vec<usize>) -> Result<*mut LogScanner>; + fn get_table_info_from_table(self: &Table) -> FfiTableInfo; + fn get_table_path(self: &Table) -> FfiTablePath; + fn has_primary_key(self: &Table) -> bool; + + // AppendWriter + unsafe fn delete_append_writer(writer: *mut AppendWriter); + fn append(self: &mut AppendWriter, row: &FfiGenericRow) -> FfiResult; + fn flush(self: &mut AppendWriter) -> FfiResult; + + // LogScanner + unsafe fn delete_log_scanner(scanner: *mut LogScanner); + fn subscribe(self: &LogScanner, bucket_id: i32, start_offset: i64) -> FfiResult; + fn poll(self: &LogScanner, timeout_ms: i64) -> FfiScanRecordsResult; + } +} + +pub struct Connection { + inner: Arc<fcore::client::FlussConnection>, + #[allow(dead_code)] + metadata: Option<Arc<fcore::client::Metadata>>, +} + +pub struct Admin { + inner: fcore::client::FlussAdmin, +} + +pub struct Table { + connection: Arc<fcore::client::FlussConnection>, + metadata: Arc<fcore::client::Metadata>, + table_info: fcore::metadata::TableInfo, + table_path: fcore::metadata::TablePath, + has_pk: bool, +} + +pub struct AppendWriter { + inner: fcore::client::AppendWriter, +} + +pub struct LogScanner { + inner: fcore::client::LogScanner, +} + +fn ok_result() -> ffi::FfiResult { + ffi::FfiResult { + error_code: 0, + error_message: String::new(), + } +} + +fn err_result(code: i32, msg: String) -> ffi::FfiResult { + ffi::FfiResult { + error_code: code, + error_message: msg, + } +} + +// Connection implementation +fn new_connection(bootstrap_server: &str) -> Result<*mut Connection, String> { + let mut config = fcore::config::Config::default(); + config.bootstrap_server = Some(bootstrap_server.to_string()); + + let conn = RUNTIME.block_on(async { fcore::client::FlussConnection::new(config).await }); + + match conn { + Ok(c) => { + let conn = Box::into_raw(Box::new(Connection { + inner: Arc::new(c), + metadata: None, + })); + Ok(conn) + } + Err(e) => Err(format!("Failed to connect: {}", e)), + } +} + +unsafe fn delete_connection(conn: *mut Connection) { + if !conn.is_null() { + unsafe { + drop(Box::from_raw(conn)); + } + } +} + +impl Connection { + fn get_admin(&self) -> Result<*mut Admin, String> { + let admin_result = + RUNTIME.block_on(async { self.inner.get_admin().await }); + + match admin_result { + Ok(admin) => { + let admin = Box::into_raw(Box::new(Admin { inner: admin })); + Ok(admin) + } + Err(e) => Err(format!("Failed to get admin: {}", e)), + } + } + + fn get_table(&self, table_path: &ffi::FfiTablePath) -> Result<*mut Table, String> { + let path = fcore::metadata::TablePath::new( + table_path.database_name.clone(), + table_path.table_name.clone(), + ); + + let table_result = RUNTIME.block_on(async { self.inner.get_table(&path).await }); + + match table_result { + Ok(t) => { + let table = Box::into_raw(Box::new(Table { + connection: self.inner.clone(), + metadata: t.metadata().clone(), + table_info: t.table_info().clone(), + table_path: t.table_path().clone(), + has_pk: t.has_primary_key(), + })); + Ok(table) + } + Err(e) => Err(format!("Failed to get table: {}", e)), + } + } +} + +// Admin implementation +unsafe fn delete_admin(admin: *mut Admin) { + if !admin.is_null() { + unsafe { + drop(Box::from_raw(admin)); + } + } +} + +impl Admin { + fn create_table( + &self, + table_path: &ffi::FfiTablePath, + descriptor: &ffi::FfiTableDescriptor, + ignore_if_exists: bool, + ) -> ffi::FfiResult { + let path = fcore::metadata::TablePath::new( + table_path.database_name.clone(), + table_path.table_name.clone(), + ); + + let core_descriptor = match types::ffi_descriptor_to_core(descriptor) { + Ok(d) => d, + Err(e) => return err_result(1, e.to_string()), + }; + + let result = RUNTIME.block_on(async { + self.inner + .create_table(&path, &core_descriptor, ignore_if_exists) + .await + }); + + match result { + Ok(_) => ok_result(), + Err(e) => err_result(2, e.to_string()), + } + } + + fn get_table_info(&self, table_path: &ffi::FfiTablePath) -> ffi::FfiTableInfoResult { + let path = fcore::metadata::TablePath::new( + table_path.database_name.clone(), + table_path.table_name.clone(), + ); + + let result = RUNTIME.block_on(async { self.inner.get_table(&path).await }); + + match result { + Ok(info) => ffi::FfiTableInfoResult { + result: ok_result(), + table_info: types::core_table_info_to_ffi(&info), + }, + Err(e) => ffi::FfiTableInfoResult { + result: err_result(1, e.to_string()), + table_info: types::empty_table_info(), + }, + } + } + + fn get_latest_lake_snapshot( + &self, + table_path: &ffi::FfiTablePath, + ) -> ffi::FfiLakeSnapshotResult { + let path = fcore::metadata::TablePath::new( + table_path.database_name.clone(), + table_path.table_name.clone(), + ); + + let result = RUNTIME.block_on(async { self.inner.get_latest_lake_snapshot(&path).await }); + + match result { + Ok(snapshot) => ffi::FfiLakeSnapshotResult { + result: ok_result(), + lake_snapshot: types::core_lake_snapshot_to_ffi(&snapshot), + }, + Err(e) => ffi::FfiLakeSnapshotResult { + result: err_result(1, e.to_string()), + lake_snapshot: ffi::FfiLakeSnapshot { + snapshot_id: -1, + bucket_offsets: vec![], + }, + }, + } + } +} + +// Table implementation +unsafe fn delete_table(table: *mut Table) { + if !table.is_null() { + unsafe { + drop(Box::from_raw(table)); + } + } +} + +impl Table { + fn new_append_writer(&self) -> Result<*mut AppendWriter, String> { + let _enter = RUNTIME.enter(); + + let fluss_table = + fcore::client::FlussTable::new(&self.connection, self.metadata.clone(), self.table_info.clone()); + + let table_append = match fluss_table.new_append() { + Ok(a) => a, + Err(e) => { + return Err(format!("Failed to create append: {}", e)) + } + }; + + let writer = table_append.create_writer(); + let writer = Box::into_raw(Box::new(AppendWriter { inner: writer })); + Ok(writer) + } + + fn new_log_scanner(&self) -> Result<*mut LogScanner, String> { + let fluss_table = + fcore::client::FlussTable::new(&self.connection, self.metadata.clone(), self.table_info.clone()); + + let scanner = fluss_table.new_scan().create_log_scanner(); + let scanner = Box::into_raw(Box::new(LogScanner { inner: scanner })); + Ok(scanner) + } + + fn new_log_scanner_with_projection(&self, column_indices: Vec<usize>) -> Result<*mut LogScanner, String> { + let fluss_table = + fcore::client::FlussTable::new(&self.connection, self.metadata.clone(), self.table_info.clone()); + + let scan = fluss_table.new_scan(); + let scan = match scan.project(&column_indices) { + Ok(s) => s, + Err(e) => return Err(format!("Failed to project columns: {}", e)), + }; + let scanner = scan.create_log_scanner(); + let scanner = Box::into_raw(Box::new(LogScanner { inner: scanner })); + Ok(scanner) + } Review Comment: Inconsistent runtime context handling: `new_append_writer` uses `RUNTIME.enter()` but `new_log_scanner` and `new_log_scanner_with_projection` do not. If the scanner creation requires async context (similar to the writer), this could lead to runtime panics. Verify whether scanner creation needs runtime context, and if so, add `let _enter = RUNTIME.enter();` to these methods as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
