luoyuxia commented on code in PR #83:
URL: https://github.com/apache/fluss-rust/pull/83#discussion_r2613983457


##########
bindings/cpp/examples/example.cpp:
##########
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "fluss.hpp"
+
+#include <iostream>
+#include <vector>
+
+static void check(const char* step, const fluss::Result& r) {
+    if (!r.Ok()) {
+        std::cerr << step << " failed: code=" << r.error_code
+                  << " msg=" << r.error_message << std::endl;
+        std::exit(1);
+    }
+}
+
+int main() {
+    // 1) Connect
+    fluss::Connection conn;
+    check("connect", fluss::Connection::Connect("127.0.0.1:9123", conn));
+
+    // 2) Admin
+    fluss::Admin admin;
+    check("get_admin", conn.GetAdmin(admin));
+
+    // 3) Schema & descriptor
+    auto schema = fluss::Schema::NewBuilder()
+                        .AddColumn("id", fluss::DataType::Int)
+                        .AddColumn("name", fluss::DataType::String)
+                        .AddColumn("score", fluss::DataType::Float)
+                        .AddColumn("age", fluss::DataType::Int)
+                        .Build();
+
+    auto descriptor = fluss::TableDescriptor::NewBuilder()
+                          .SetSchema(schema)
+                          .SetBucketCount(1)
+                          .SetProperty("table.log.arrow.compression.type", 
"NONE")

Review Comment:
   curious about why set compression type to NONE? Is there any bug when 
compression is not null?



##########
bindings/cpp/include/fluss.hpp:
##########
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <optional>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+namespace fluss {
+
+namespace ffi {
+    struct Connection;
+    struct Admin;
+    struct Table;
+    struct AppendWriter;
+    struct LogScanner;
+}  // namespace ffi
+
+enum class DataType {
+    Boolean = 1,
+    TinyInt = 2,
+    SmallInt = 3,
+    Int = 4,
+    BigInt = 5,
+    Float = 6,
+    Double = 7,
+    String = 8,
+    Bytes = 9,
+    Date = 10,
+    Time = 11,
+    Timestamp = 12,
+    TimestampLtz = 13,
+};
+
+enum class DatumType {
+    Null = 0,
+    Bool = 1,
+    Int32 = 2,
+    Int64 = 3,
+    Float32 = 4,
+    Float64 = 5,
+    String = 6,
+    Bytes = 7,
+};
+
+struct Result {
+    int32_t error_code{0};
+    std::string error_message;
+
+    bool Ok() const { return error_code == 0; }
+};
+
+struct TablePath {
+    std::string database_name;
+    std::string table_name;
+
+    TablePath() = default;
+    TablePath(std::string db, std::string tbl)
+        : database_name(std::move(db)), table_name(std::move(tbl)) {}
+
+    std::string ToString() const { return database_name + "." + table_name; }
+};
+
+struct Column {
+    std::string name;
+    DataType data_type;
+    std::string comment;
+};
+
+struct Schema {
+    std::vector<Column> columns;
+    std::vector<std::string> primary_keys;
+
+    class Builder {
+    public:
+        Builder& AddColumn(std::string name, DataType type,
+                           std::string comment = "") {
+            columns_.push_back({std::move(name), type, std::move(comment)});
+            return *this;
+        }
+
+        Builder& SetPrimaryKeys(std::vector<std::string> keys) {
+            primary_keys_ = std::move(keys);
+            return *this;
+        }
+
+        Schema Build() {
+            return Schema{std::move(columns_), std::move(primary_keys_)};
+        }
+
+    private:
+        std::vector<Column> columns_;
+        std::vector<std::string> primary_keys_;
+    };
+
+    static Builder NewBuilder() { return Builder(); }
+};
+
+struct TableDescriptor {
+    Schema schema;
+    std::vector<std::string> partition_keys;
+    int32_t bucket_count{0};
+    std::vector<std::string> bucket_keys;
+    std::unordered_map<std::string, std::string> properties;
+    std::string comment;
+
+    class Builder {
+    public:
+        Builder& SetSchema(Schema s) {
+            schema_ = std::move(s);
+            return *this;
+        }
+
+        Builder& SetPartitionKeys(std::vector<std::string> keys) {
+            partition_keys_ = std::move(keys);
+            return *this;
+        }
+
+        Builder& SetBucketCount(int32_t count) {
+            bucket_count_ = count;
+            return *this;
+        }
+
+        Builder& SetBucketKeys(std::vector<std::string> keys) {
+            bucket_keys_ = std::move(keys);
+            return *this;
+        }
+
+        Builder& SetProperty(std::string key, std::string value) {
+            properties_[std::move(key)] = std::move(value);
+            return *this;
+        }
+
+        Builder& SetComment(std::string comment) {
+            comment_ = std::move(comment);
+            return *this;
+        }
+
+        TableDescriptor Build() {
+            return TableDescriptor{std::move(schema_),
+                                   std::move(partition_keys_),
+                                   bucket_count_,
+                                   std::move(bucket_keys_),
+                                   std::move(properties_),
+                                   std::move(comment_)};
+        }
+
+    private:
+        Schema schema_;
+        std::vector<std::string> partition_keys_;
+        int32_t bucket_count_{0};
+        std::vector<std::string> bucket_keys_;
+        std::unordered_map<std::string, std::string> properties_;
+        std::string comment_;
+    };
+
+    static Builder NewBuilder() { return Builder(); }
+};
+
+struct TableInfo {
+    int64_t table_id;
+    int32_t schema_id;
+    TablePath table_path;
+    int64_t created_time;
+    int64_t modified_time;
+    std::vector<std::string> primary_keys;
+    std::vector<std::string> bucket_keys;
+    std::vector<std::string> partition_keys;
+    int32_t num_buckets;
+    bool has_primary_key;
+    bool is_partitioned;
+    std::unordered_map<std::string, std::string> properties;
+    std::string comment;
+    Schema schema;
+};
+
+struct Datum {

Review Comment:
   Seem for even a bool value, `Datum` will occupy more bytes, right?
   We can consider to optimze it in the future version. Two thought in here:
   - use cpp variant
   - rust side emit arrow record batch, and cpp side wrap the arrow record 
batch to provide row api



##########
bindings/cpp/src/types.rs:
##########
@@ -0,0 +1,498 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::ffi;
+use anyhow::{anyhow, Result};
+use arrow::array::{
+    Date32Array, LargeBinaryArray, LargeStringArray, Time32MillisecondArray, 
Time32SecondArray,
+    Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
+    TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
+};
+use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
+use fluss as fcore;
+use fcore::row::InternalRow;
+
+pub const DATA_TYPE_BOOLEAN: i32 = 1;
+pub const DATA_TYPE_TINYINT: i32 = 2;
+pub const DATA_TYPE_SMALLINT: i32 = 3;
+pub const DATA_TYPE_INT: i32 = 4;
+pub const DATA_TYPE_BIGINT: i32 = 5;
+pub const DATA_TYPE_FLOAT: i32 = 6;
+pub const DATA_TYPE_DOUBLE: i32 = 7;
+pub const DATA_TYPE_STRING: i32 = 8;
+pub const DATA_TYPE_BYTES: i32 = 9;
+pub const DATA_TYPE_DATE: i32 = 10;
+pub const DATA_TYPE_TIME: i32 = 11;
+pub const DATA_TYPE_TIMESTAMP: i32 = 12;
+pub const DATA_TYPE_TIMESTAMP_LTZ: i32 = 13;
+
+pub const DATUM_TYPE_NULL: i32 = 0;
+pub const DATUM_TYPE_BOOL: i32 = 1;
+pub const DATUM_TYPE_INT32: i32 = 2;
+pub const DATUM_TYPE_INT64: i32 = 3;
+pub const DATUM_TYPE_FLOAT32: i32 = 4;
+pub const DATUM_TYPE_FLOAT64: i32 = 5;
+pub const DATUM_TYPE_STRING: i32 = 6;
+pub const DATUM_TYPE_BYTES: i32 = 7;
+
+fn ffi_data_type_to_core(dt: i32) -> Result<fcore::metadata::DataType> {
+    match dt {
+        DATA_TYPE_BOOLEAN => Ok(fcore::metadata::DataTypes::boolean()),
+        DATA_TYPE_TINYINT => Ok(fcore::metadata::DataTypes::tinyint()),
+        DATA_TYPE_SMALLINT => Ok(fcore::metadata::DataTypes::smallint()),
+        DATA_TYPE_INT => Ok(fcore::metadata::DataTypes::int()),
+        DATA_TYPE_BIGINT => Ok(fcore::metadata::DataTypes::bigint()),
+        DATA_TYPE_FLOAT => Ok(fcore::metadata::DataTypes::float()),
+        DATA_TYPE_DOUBLE => Ok(fcore::metadata::DataTypes::double()),
+        DATA_TYPE_STRING => Ok(fcore::metadata::DataTypes::string()),
+        DATA_TYPE_BYTES => Ok(fcore::metadata::DataTypes::bytes()),
+        DATA_TYPE_DATE => Ok(fcore::metadata::DataTypes::date()),
+        DATA_TYPE_TIME => Ok(fcore::metadata::DataTypes::time()),
+        DATA_TYPE_TIMESTAMP => Ok(fcore::metadata::DataTypes::timestamp()),
+        DATA_TYPE_TIMESTAMP_LTZ => 
Ok(fcore::metadata::DataTypes::timestamp_ltz()),
+        _ => Err(anyhow!("Unknown data type: {}", dt)),
+    }
+}
+
+fn core_data_type_to_ffi(dt: &fcore::metadata::DataType) -> i32 {
+    match dt {
+        fcore::metadata::DataType::Boolean(_) => DATA_TYPE_BOOLEAN,
+        fcore::metadata::DataType::TinyInt(_) => DATA_TYPE_TINYINT,
+        fcore::metadata::DataType::SmallInt(_) => DATA_TYPE_SMALLINT,
+        fcore::metadata::DataType::Int(_) => DATA_TYPE_INT,
+        fcore::metadata::DataType::BigInt(_) => DATA_TYPE_BIGINT,
+        fcore::metadata::DataType::Float(_) => DATA_TYPE_FLOAT,
+        fcore::metadata::DataType::Double(_) => DATA_TYPE_DOUBLE,
+        fcore::metadata::DataType::String(_) => DATA_TYPE_STRING,
+        fcore::metadata::DataType::Bytes(_) => DATA_TYPE_BYTES,
+        fcore::metadata::DataType::Date(_) => DATA_TYPE_DATE,
+        fcore::metadata::DataType::Time(_) => DATA_TYPE_TIME,
+        fcore::metadata::DataType::Timestamp(_) => DATA_TYPE_TIMESTAMP,
+        fcore::metadata::DataType::TimestampLTz(_) => DATA_TYPE_TIMESTAMP_LTZ,
+        _ => 0,
+    }
+}
+
+pub fn ffi_descriptor_to_core(
+    descriptor: &ffi::FfiTableDescriptor,
+) -> Result<fcore::metadata::TableDescriptor> {
+    let mut schema_builder = fcore::metadata::Schema::builder();
+
+    for col in &descriptor.schema.columns {
+        let dt = ffi_data_type_to_core(col.data_type)?;
+        schema_builder = schema_builder.column(&col.name, dt);
+        if !col.comment.is_empty() {
+            schema_builder = schema_builder.with_comment(&col.comment);
+        }
+    }
+
+    if !descriptor.schema.primary_keys.is_empty() {
+        schema_builder = 
schema_builder.primary_key(descriptor.schema.primary_keys.clone());
+    }
+
+    let schema = schema_builder.build()?;
+
+    let mut builder = fcore::metadata::TableDescriptor::builder()
+        .schema(schema)
+        .partitioned_by(descriptor.partition_keys.clone());
+
+    if descriptor.bucket_count > 0 {
+        builder = builder.distributed_by(Some(descriptor.bucket_count), 
descriptor.bucket_keys.clone());
+    } else {
+        builder = builder.distributed_by(None, descriptor.bucket_keys.clone());
+    }
+
+    for prop in &descriptor.properties {
+        builder = builder.property(&prop.key, &prop.value);
+    }
+
+    if !descriptor.comment.is_empty() {
+        builder = builder.comment(&descriptor.comment);
+    }
+
+    Ok(builder.build()?)
+}
+
+pub fn core_table_info_to_ffi(info: &fcore::metadata::TableInfo) -> 
ffi::FfiTableInfo {
+    let schema = info.get_schema();
+    let columns: Vec<ffi::FfiColumn> = schema
+        .columns()
+        .iter()
+        .map(|col| ffi::FfiColumn {
+            name: col.name().to_string(),
+            data_type: core_data_type_to_ffi(col.data_type()),
+            comment: col.comment().unwrap_or("").to_string(),
+        })
+        .collect();
+
+    let primary_keys: Vec<String> = schema
+        .primary_key()
+        .map(|pk| pk.column_names().to_vec())
+        .unwrap_or_default();
+
+    let properties: Vec<ffi::HashMapValue> = info
+        .get_properties()
+        .iter()
+        .map(|(k, v)| ffi::HashMapValue {
+            key: k.clone(),
+            value: v.clone(),
+        })
+        .collect();
+
+    ffi::FfiTableInfo {
+        table_id: info.get_table_id(),
+        schema_id: info.get_schema_id(),
+        table_path: ffi::FfiTablePath {
+            database_name: info.get_table_path().database().to_string(),
+            table_name: info.get_table_path().table().to_string(),
+        },
+        created_time: info.get_created_time(),
+        modified_time: info.get_modified_time(),
+        primary_keys: info.get_primary_keys().clone(),
+        bucket_keys: info.get_bucket_keys().to_vec(),
+        partition_keys: info.get_partition_keys().to_vec(),
+        num_buckets: info.get_num_buckets(),
+        has_primary_key: info.has_primary_key(),
+        is_partitioned: info.is_partitioned(),
+        properties,
+        comment: info.get_comment().unwrap_or("").to_string(),
+        schema: ffi::FfiSchema {
+            columns,
+            primary_keys,
+        },
+    }
+}
+
+pub fn empty_table_info() -> ffi::FfiTableInfo {
+    ffi::FfiTableInfo {
+        table_id: 0,
+        schema_id: 0,
+        table_path: ffi::FfiTablePath {
+            database_name: String::new(),
+            table_name: String::new(),
+        },
+        created_time: 0,
+        modified_time: 0,
+        primary_keys: vec![],
+        bucket_keys: vec![],
+        partition_keys: vec![],
+        num_buckets: 0,
+        has_primary_key: false,
+        is_partitioned: false,
+        properties: vec![],
+        comment: String::new(),
+        schema: ffi::FfiSchema {
+            columns: vec![],
+            primary_keys: vec![],
+        },
+    }
+}
+
+pub struct OwnedRowData {
+    strings: Vec<String>,
+}
+
+impl OwnedRowData {
+    pub fn new() -> Self {
+        Self { strings: Vec::new() }
+    }
+
+    pub fn collect_strings(&mut self, row: &ffi::FfiGenericRow) {
+        for field in &row.fields {
+            if field.datum_type == DATUM_TYPE_STRING {
+                self.strings.push(field.string_val.to_string());
+            }
+        }
+    }
+
+    pub fn get_strings(&self) -> &[String] {
+        &self.strings
+    }
+}
+
+pub fn ffi_row_to_core<'a>(
+    row: &ffi::FfiGenericRow,
+    owner: &'a OwnedRowData,
+) -> fcore::row::GenericRow<'a> {
+    use fcore::row::{Blob, Datum, F32, F64};
+
+    let mut generic_row = fcore::row::GenericRow::new();
+    let mut string_idx = 0;
+
+    for (idx, field) in row.fields.iter().enumerate() {
+        let datum = match field.datum_type {
+            DATUM_TYPE_NULL => Datum::Null,
+            DATUM_TYPE_BOOL => Datum::Bool(field.bool_val),
+            DATUM_TYPE_INT32 => Datum::Int32(field.i32_val),
+            DATUM_TYPE_INT64 => Datum::Int64(field.i64_val),
+            DATUM_TYPE_FLOAT32 => Datum::Float32(F32::from(field.f32_val)),
+            DATUM_TYPE_FLOAT64 => Datum::Float64(F64::from(field.f64_val)),
+            DATUM_TYPE_STRING => {
+                let str_ref = owner.get_strings()[string_idx].as_str();
+                string_idx += 1;
+                Datum::String(str_ref)
+            }
+            DATUM_TYPE_BYTES => 
Datum::Blob(Blob::from(field.bytes_val.clone())),
+            _ => Datum::Null,
+        };
+        generic_row.set_field(idx, datum);
+    }
+
+    generic_row
+}
+
+pub fn core_scan_records_to_ffi(records: &fcore::record::ScanRecords) -> 
ffi::FfiScanRecords {
+    let mut ffi_records = Vec::new();
+    
+    // Iterate over all buckets and their records
+    for bucket_records in records.records_by_buckets().values() {
+        for record in bucket_records {
+            let row = record.row();
+            let fields = core_row_to_ffi_fields(row);
+
+            ffi_records.push(ffi::FfiScanRecord {
+                offset: record.offset(),
+                timestamp: record.timestamp(),
+                row: ffi::FfiGenericRow { fields },
+            });
+        }
+    }
+
+    ffi::FfiScanRecords { records: ffi_records }
+}
+
+fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) -> Vec<ffi::FfiDatum> 
{
+    fn new_datum(datum_type: i32) -> ffi::FfiDatum {
+        ffi::FfiDatum {
+            datum_type,
+            bool_val: false,
+            i32_val: 0,
+            i64_val: 0,
+            f32_val: 0.0,
+            f64_val: 0.0,
+            string_val: String::new(),
+            bytes_val: vec![],
+        }
+    }
+
+    let record_batch = row.get_record_batch();
+    let schema = record_batch.schema();
+    let row_id = row.get_row_id();
+
+    let mut fields = Vec::with_capacity(schema.fields().len());
+
+    for (i, field) in schema.fields().iter().enumerate() {
+        if row.is_null_at(i) {
+            fields.push(new_datum(DATUM_TYPE_NULL));
+            continue;
+        }
+
+        let datum = match field.data_type() {
+            ArrowDataType::Boolean => {
+                let mut datum = new_datum(DATUM_TYPE_BOOL);
+                datum.bool_val = row.get_boolean(i);
+                datum
+            }
+            ArrowDataType::Int8 => {
+                let mut datum = new_datum(DATUM_TYPE_INT32);
+                datum.i32_val = row.get_byte(i) as i32;
+                datum
+            }
+            ArrowDataType::Int16 => {
+                let mut datum = new_datum(DATUM_TYPE_INT32);
+                datum.i32_val = row.get_short(i) as i32;
+                datum
+            }
+            ArrowDataType::Int32 => {
+                let mut datum = new_datum(DATUM_TYPE_INT32);
+                datum.i32_val = row.get_int(i);
+                datum
+            }
+            ArrowDataType::Int64 => {
+                let mut datum = new_datum(DATUM_TYPE_INT64);
+                datum.i64_val = row.get_long(i);
+                datum
+            }
+            ArrowDataType::Float32 => {
+                let mut datum = new_datum(DATUM_TYPE_FLOAT32);
+                datum.f32_val = row.get_float(i);
+                datum
+            }
+            ArrowDataType::Float64 => {
+                let mut datum = new_datum(DATUM_TYPE_FLOAT64);
+                datum.f64_val = row.get_double(i);
+                datum
+            }
+            ArrowDataType::Utf8 => {
+                let mut datum = new_datum(DATUM_TYPE_STRING);
+                datum.string_val = row.get_string(i).to_string();
+                datum
+            }
+            ArrowDataType::LargeUtf8 => {
+                let array = record_batch
+                    .column(i)
+                    .as_any()
+                    .downcast_ref::<LargeStringArray>()
+                    .expect("LargeUtf8 column expected");
+                let mut datum = new_datum(DATUM_TYPE_STRING);
+                datum.string_val = array.value(row_id).to_string();
+                datum
+            }
+            ArrowDataType::Binary => {
+                let mut datum = new_datum(DATUM_TYPE_BYTES);
+                datum.bytes_val = row.get_bytes(i);
+                datum
+            }
+            ArrowDataType::FixedSizeBinary(len) => {
+                let mut datum = new_datum(DATUM_TYPE_BYTES);
+                datum.bytes_val = row.get_binary(i, *len as usize);
+                datum
+            }
+            ArrowDataType::LargeBinary => {
+                let array = record_batch
+                    .column(i)
+                    .as_any()
+                    .downcast_ref::<LargeBinaryArray>()
+                    .expect("LargeBinary column expected");
+                let mut datum = new_datum(DATUM_TYPE_BYTES);
+                datum.bytes_val = array.value(row_id).to_vec();
+                datum
+            }
+            ArrowDataType::Date32 => {
+                let array = record_batch
+                    .column(i)
+                    .as_any()
+                    .downcast_ref::<Date32Array>()
+                    .expect("Date32 column expected");
+                let mut datum = new_datum(DATUM_TYPE_INT32);
+                datum.i32_val = array.value(row_id);
+                datum
+            }
+            ArrowDataType::Timestamp(unit, _) => match unit {
+                TimeUnit::Second => {
+                    let array = record_batch
+                        .column(i)
+                        .as_any()
+                        .downcast_ref::<TimestampSecondArray>()
+                        .expect("Timestamp(second) column expected");
+                    let mut datum = new_datum(DATUM_TYPE_INT64);
+                    datum.i64_val = array.value(row_id);
+                    datum
+                }
+                TimeUnit::Millisecond => {
+                    let array = record_batch
+                        .column(i)
+                        .as_any()
+                        .downcast_ref::<TimestampMillisecondArray>()
+                        .expect("Timestamp(millisecond) column expected");
+                    let mut datum = new_datum(DATUM_TYPE_INT64);
+                    datum.i64_val = array.value(row_id);
+                    datum
+                }
+                TimeUnit::Microsecond => {
+                    let array = record_batch
+                        .column(i)
+                        .as_any()
+                        .downcast_ref::<TimestampMicrosecondArray>()
+                        .expect("Timestamp(microsecond) column expected");
+                    let mut datum = new_datum(DATUM_TYPE_INT64);
+                    datum.i64_val = array.value(row_id);
+                    datum
+                }
+                TimeUnit::Nanosecond => {
+                    let array = record_batch
+                        .column(i)
+                        .as_any()
+                        .downcast_ref::<TimestampNanosecondArray>()
+                        .expect("Timestamp(nanosecond) column expected");
+                    let mut datum = new_datum(DATUM_TYPE_INT64);
+                    datum.i64_val = array.value(row_id);
+                    datum
+                }
+            },
+            ArrowDataType::Time32(unit) => match unit {
+                TimeUnit::Second => {
+                    let array = record_batch
+                        .column(i)
+                        .as_any()
+                        .downcast_ref::<Time32SecondArray>()
+                        .expect("Time32(second) column expected");
+                    let mut datum = new_datum(DATUM_TYPE_INT32);
+                    datum.i32_val = array.value(row_id);
+                    datum
+                }
+                TimeUnit::Millisecond => {
+                    let array = record_batch
+                        .column(i)
+                        .as_any()
+                        .downcast_ref::<Time32MillisecondArray>()
+                        .expect("Time32(millisecond) column expected");
+                    let mut datum = new_datum(DATUM_TYPE_INT32);
+                    datum.i32_val = array.value(row_id);
+                    datum
+                }
+                _ => panic!("Unsupported Time32 unit for column {}", i),

Review Comment:
   add comment like:
   "will never come to here"



##########
bindings/cpp/src/types.rs:
##########
@@ -0,0 +1,498 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::ffi;
+use anyhow::{anyhow, Result};
+use arrow::array::{
+    Date32Array, LargeBinaryArray, LargeStringArray, Time32MillisecondArray, 
Time32SecondArray,
+    Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
+    TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
+};
+use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
+use fluss as fcore;
+use fcore::row::InternalRow;
+
+pub const DATA_TYPE_BOOLEAN: i32 = 1;
+pub const DATA_TYPE_TINYINT: i32 = 2;
+pub const DATA_TYPE_SMALLINT: i32 = 3;
+pub const DATA_TYPE_INT: i32 = 4;
+pub const DATA_TYPE_BIGINT: i32 = 5;
+pub const DATA_TYPE_FLOAT: i32 = 6;
+pub const DATA_TYPE_DOUBLE: i32 = 7;
+pub const DATA_TYPE_STRING: i32 = 8;
+pub const DATA_TYPE_BYTES: i32 = 9;
+pub const DATA_TYPE_DATE: i32 = 10;
+pub const DATA_TYPE_TIME: i32 = 11;
+pub const DATA_TYPE_TIMESTAMP: i32 = 12;
+pub const DATA_TYPE_TIMESTAMP_LTZ: i32 = 13;
+
+pub const DATUM_TYPE_NULL: i32 = 0;
+pub const DATUM_TYPE_BOOL: i32 = 1;
+pub const DATUM_TYPE_INT32: i32 = 2;
+pub const DATUM_TYPE_INT64: i32 = 3;
+pub const DATUM_TYPE_FLOAT32: i32 = 4;
+pub const DATUM_TYPE_FLOAT64: i32 = 5;
+pub const DATUM_TYPE_STRING: i32 = 6;
+pub const DATUM_TYPE_BYTES: i32 = 7;
+
+fn ffi_data_type_to_core(dt: i32) -> Result<fcore::metadata::DataType> {
+    match dt {
+        DATA_TYPE_BOOLEAN => Ok(fcore::metadata::DataTypes::boolean()),
+        DATA_TYPE_TINYINT => Ok(fcore::metadata::DataTypes::tinyint()),
+        DATA_TYPE_SMALLINT => Ok(fcore::metadata::DataTypes::smallint()),
+        DATA_TYPE_INT => Ok(fcore::metadata::DataTypes::int()),
+        DATA_TYPE_BIGINT => Ok(fcore::metadata::DataTypes::bigint()),
+        DATA_TYPE_FLOAT => Ok(fcore::metadata::DataTypes::float()),
+        DATA_TYPE_DOUBLE => Ok(fcore::metadata::DataTypes::double()),
+        DATA_TYPE_STRING => Ok(fcore::metadata::DataTypes::string()),
+        DATA_TYPE_BYTES => Ok(fcore::metadata::DataTypes::bytes()),
+        DATA_TYPE_DATE => Ok(fcore::metadata::DataTypes::date()),
+        DATA_TYPE_TIME => Ok(fcore::metadata::DataTypes::time()),
+        DATA_TYPE_TIMESTAMP => Ok(fcore::metadata::DataTypes::timestamp()),
+        DATA_TYPE_TIMESTAMP_LTZ => 
Ok(fcore::metadata::DataTypes::timestamp_ltz()),
+        _ => Err(anyhow!("Unknown data type: {}", dt)),
+    }
+}
+
+fn core_data_type_to_ffi(dt: &fcore::metadata::DataType) -> i32 {
+    match dt {
+        fcore::metadata::DataType::Boolean(_) => DATA_TYPE_BOOLEAN,
+        fcore::metadata::DataType::TinyInt(_) => DATA_TYPE_TINYINT,
+        fcore::metadata::DataType::SmallInt(_) => DATA_TYPE_SMALLINT,
+        fcore::metadata::DataType::Int(_) => DATA_TYPE_INT,
+        fcore::metadata::DataType::BigInt(_) => DATA_TYPE_BIGINT,
+        fcore::metadata::DataType::Float(_) => DATA_TYPE_FLOAT,
+        fcore::metadata::DataType::Double(_) => DATA_TYPE_DOUBLE,
+        fcore::metadata::DataType::String(_) => DATA_TYPE_STRING,
+        fcore::metadata::DataType::Bytes(_) => DATA_TYPE_BYTES,
+        fcore::metadata::DataType::Date(_) => DATA_TYPE_DATE,
+        fcore::metadata::DataType::Time(_) => DATA_TYPE_TIME,
+        fcore::metadata::DataType::Timestamp(_) => DATA_TYPE_TIMESTAMP,
+        fcore::metadata::DataType::TimestampLTz(_) => DATA_TYPE_TIMESTAMP_LTZ,
+        _ => 0,
+    }
+}
+
+pub fn ffi_descriptor_to_core(
+    descriptor: &ffi::FfiTableDescriptor,
+) -> Result<fcore::metadata::TableDescriptor> {
+    let mut schema_builder = fcore::metadata::Schema::builder();
+
+    for col in &descriptor.schema.columns {
+        let dt = ffi_data_type_to_core(col.data_type)?;
+        schema_builder = schema_builder.column(&col.name, dt);
+        if !col.comment.is_empty() {
+            schema_builder = schema_builder.with_comment(&col.comment);
+        }
+    }
+
+    if !descriptor.schema.primary_keys.is_empty() {
+        schema_builder = 
schema_builder.primary_key(descriptor.schema.primary_keys.clone());
+    }
+
+    let schema = schema_builder.build()?;
+
+    let mut builder = fcore::metadata::TableDescriptor::builder()
+        .schema(schema)
+        .partitioned_by(descriptor.partition_keys.clone());
+
+    if descriptor.bucket_count > 0 {
+        builder = builder.distributed_by(Some(descriptor.bucket_count), 
descriptor.bucket_keys.clone());
+    } else {
+        builder = builder.distributed_by(None, descriptor.bucket_keys.clone());
+    }
+
+    for prop in &descriptor.properties {
+        builder = builder.property(&prop.key, &prop.value);
+    }
+
+    if !descriptor.comment.is_empty() {
+        builder = builder.comment(&descriptor.comment);
+    }
+
+    Ok(builder.build()?)
+}
+
+pub fn core_table_info_to_ffi(info: &fcore::metadata::TableInfo) -> 
ffi::FfiTableInfo {
+    let schema = info.get_schema();
+    let columns: Vec<ffi::FfiColumn> = schema
+        .columns()
+        .iter()
+        .map(|col| ffi::FfiColumn {
+            name: col.name().to_string(),
+            data_type: core_data_type_to_ffi(col.data_type()),
+            comment: col.comment().unwrap_or("").to_string(),
+        })
+        .collect();
+
+    let primary_keys: Vec<String> = schema
+        .primary_key()
+        .map(|pk| pk.column_names().to_vec())
+        .unwrap_or_default();
+
+    let properties: Vec<ffi::HashMapValue> = info
+        .get_properties()
+        .iter()
+        .map(|(k, v)| ffi::HashMapValue {
+            key: k.clone(),
+            value: v.clone(),
+        })
+        .collect();
+
+    ffi::FfiTableInfo {
+        table_id: info.get_table_id(),
+        schema_id: info.get_schema_id(),
+        table_path: ffi::FfiTablePath {
+            database_name: info.get_table_path().database().to_string(),
+            table_name: info.get_table_path().table().to_string(),
+        },
+        created_time: info.get_created_time(),
+        modified_time: info.get_modified_time(),
+        primary_keys: info.get_primary_keys().clone(),
+        bucket_keys: info.get_bucket_keys().to_vec(),
+        partition_keys: info.get_partition_keys().to_vec(),
+        num_buckets: info.get_num_buckets(),
+        has_primary_key: info.has_primary_key(),
+        is_partitioned: info.is_partitioned(),
+        properties,
+        comment: info.get_comment().unwrap_or("").to_string(),
+        schema: ffi::FfiSchema {
+            columns,
+            primary_keys,
+        },
+    }
+}
+
+pub fn empty_table_info() -> ffi::FfiTableInfo {
+    ffi::FfiTableInfo {
+        table_id: 0,
+        schema_id: 0,
+        table_path: ffi::FfiTablePath {
+            database_name: String::new(),
+            table_name: String::new(),
+        },
+        created_time: 0,
+        modified_time: 0,
+        primary_keys: vec![],
+        bucket_keys: vec![],
+        partition_keys: vec![],
+        num_buckets: 0,
+        has_primary_key: false,
+        is_partitioned: false,
+        properties: vec![],
+        comment: String::new(),
+        schema: ffi::FfiSchema {
+            columns: vec![],
+            primary_keys: vec![],
+        },
+    }
+}
+
+pub struct OwnedRowData {
+    strings: Vec<String>,
+}
+
+impl OwnedRowData {
+    pub fn new() -> Self {
+        Self { strings: Vec::new() }
+    }
+
+    pub fn collect_strings(&mut self, row: &ffi::FfiGenericRow) {
+        for field in &row.fields {
+            if field.datum_type == DATUM_TYPE_STRING {
+                self.strings.push(field.string_val.to_string());
+            }
+        }
+    }
+
+    pub fn get_strings(&self) -> &[String] {
+        &self.strings
+    }
+}
+
+pub fn ffi_row_to_core<'a>(
+    row: &ffi::FfiGenericRow,
+    owner: &'a OwnedRowData,
+) -> fcore::row::GenericRow<'a> {
+    use fcore::row::{Blob, Datum, F32, F64};
+
+    let mut generic_row = fcore::row::GenericRow::new();
+    let mut string_idx = 0;
+
+    for (idx, field) in row.fields.iter().enumerate() {
+        let datum = match field.datum_type {
+            DATUM_TYPE_NULL => Datum::Null,
+            DATUM_TYPE_BOOL => Datum::Bool(field.bool_val),
+            DATUM_TYPE_INT32 => Datum::Int32(field.i32_val),
+            DATUM_TYPE_INT64 => Datum::Int64(field.i64_val),
+            DATUM_TYPE_FLOAT32 => Datum::Float32(F32::from(field.f32_val)),
+            DATUM_TYPE_FLOAT64 => Datum::Float64(F64::from(field.f64_val)),
+            DATUM_TYPE_STRING => {
+                let str_ref = owner.get_strings()[string_idx].as_str();
+                string_idx += 1;
+                Datum::String(str_ref)
+            }
+            DATUM_TYPE_BYTES => 
Datum::Blob(Blob::from(field.bytes_val.clone())),
+            _ => Datum::Null,
+        };
+        generic_row.set_field(idx, datum);
+    }
+
+    generic_row
+}
+
+pub fn core_scan_records_to_ffi(records: &fcore::record::ScanRecords) -> 
ffi::FfiScanRecords {
+    let mut ffi_records = Vec::new();
+    
+    // Iterate over all buckets and their records
+    for bucket_records in records.records_by_buckets().values() {
+        for record in bucket_records {
+            let row = record.row();
+            let fields = core_row_to_ffi_fields(row);
+
+            ffi_records.push(ffi::FfiScanRecord {
+                offset: record.offset(),
+                timestamp: record.timestamp(),
+                row: ffi::FfiGenericRow { fields },
+            });
+        }
+    }
+
+    ffi::FfiScanRecords { records: ffi_records }
+}
+
+fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) -> Vec<ffi::FfiDatum> 
{
+    fn new_datum(datum_type: i32) -> ffi::FfiDatum {
+        ffi::FfiDatum {
+            datum_type,
+            bool_val: false,
+            i32_val: 0,
+            i64_val: 0,
+            f32_val: 0.0,
+            f64_val: 0.0,
+            string_val: String::new(),
+            bytes_val: vec![],
+        }
+    }
+
+    let record_batch = row.get_record_batch();
+    let schema = record_batch.schema();
+    let row_id = row.get_row_id();
+
+    let mut fields = Vec::with_capacity(schema.fields().len());
+
+    for (i, field) in schema.fields().iter().enumerate() {
+        if row.is_null_at(i) {
+            fields.push(new_datum(DATUM_TYPE_NULL));
+            continue;
+        }
+
+        let datum = match field.data_type() {
+            ArrowDataType::Boolean => {
+                let mut datum = new_datum(DATUM_TYPE_BOOL);
+                datum.bool_val = row.get_boolean(i);
+                datum
+            }
+            ArrowDataType::Int8 => {
+                let mut datum = new_datum(DATUM_TYPE_INT32);
+                datum.i32_val = row.get_byte(i) as i32;
+                datum
+            }
+            ArrowDataType::Int16 => {
+                let mut datum = new_datum(DATUM_TYPE_INT32);
+                datum.i32_val = row.get_short(i) as i32;
+                datum
+            }
+            ArrowDataType::Int32 => {
+                let mut datum = new_datum(DATUM_TYPE_INT32);
+                datum.i32_val = row.get_int(i);
+                datum
+            }
+            ArrowDataType::Int64 => {
+                let mut datum = new_datum(DATUM_TYPE_INT64);
+                datum.i64_val = row.get_long(i);
+                datum
+            }
+            ArrowDataType::Float32 => {
+                let mut datum = new_datum(DATUM_TYPE_FLOAT32);
+                datum.f32_val = row.get_float(i);
+                datum
+            }
+            ArrowDataType::Float64 => {
+                let mut datum = new_datum(DATUM_TYPE_FLOAT64);
+                datum.f64_val = row.get_double(i);
+                datum
+            }
+            ArrowDataType::Utf8 => {
+                let mut datum = new_datum(DATUM_TYPE_STRING);
+                datum.string_val = row.get_string(i).to_string();

Review Comment:
   It aslo need a string copy. Not sure whether it's easy or not to avoid this 
copy. We can left a todo to remind us the string copy wil happen in here.



##########
bindings/cpp/src/types.rs:
##########
@@ -0,0 +1,498 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::ffi;
+use anyhow::{anyhow, Result};
+use arrow::array::{
+    Date32Array, LargeBinaryArray, LargeStringArray, Time32MillisecondArray, 
Time32SecondArray,
+    Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
+    TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
+};
+use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
+use fluss as fcore;
+use fcore::row::InternalRow;
+
+pub const DATA_TYPE_BOOLEAN: i32 = 1;
+pub const DATA_TYPE_TINYINT: i32 = 2;
+pub const DATA_TYPE_SMALLINT: i32 = 3;
+pub const DATA_TYPE_INT: i32 = 4;
+pub const DATA_TYPE_BIGINT: i32 = 5;
+pub const DATA_TYPE_FLOAT: i32 = 6;
+pub const DATA_TYPE_DOUBLE: i32 = 7;
+pub const DATA_TYPE_STRING: i32 = 8;
+pub const DATA_TYPE_BYTES: i32 = 9;
+pub const DATA_TYPE_DATE: i32 = 10;
+pub const DATA_TYPE_TIME: i32 = 11;
+pub const DATA_TYPE_TIMESTAMP: i32 = 12;
+pub const DATA_TYPE_TIMESTAMP_LTZ: i32 = 13;
+
+pub const DATUM_TYPE_NULL: i32 = 0;
+pub const DATUM_TYPE_BOOL: i32 = 1;
+pub const DATUM_TYPE_INT32: i32 = 2;
+pub const DATUM_TYPE_INT64: i32 = 3;
+pub const DATUM_TYPE_FLOAT32: i32 = 4;
+pub const DATUM_TYPE_FLOAT64: i32 = 5;
+pub const DATUM_TYPE_STRING: i32 = 6;
+pub const DATUM_TYPE_BYTES: i32 = 7;
+
+fn ffi_data_type_to_core(dt: i32) -> Result<fcore::metadata::DataType> {
+    match dt {
+        DATA_TYPE_BOOLEAN => Ok(fcore::metadata::DataTypes::boolean()),
+        DATA_TYPE_TINYINT => Ok(fcore::metadata::DataTypes::tinyint()),
+        DATA_TYPE_SMALLINT => Ok(fcore::metadata::DataTypes::smallint()),
+        DATA_TYPE_INT => Ok(fcore::metadata::DataTypes::int()),
+        DATA_TYPE_BIGINT => Ok(fcore::metadata::DataTypes::bigint()),
+        DATA_TYPE_FLOAT => Ok(fcore::metadata::DataTypes::float()),
+        DATA_TYPE_DOUBLE => Ok(fcore::metadata::DataTypes::double()),
+        DATA_TYPE_STRING => Ok(fcore::metadata::DataTypes::string()),
+        DATA_TYPE_BYTES => Ok(fcore::metadata::DataTypes::bytes()),
+        DATA_TYPE_DATE => Ok(fcore::metadata::DataTypes::date()),
+        DATA_TYPE_TIME => Ok(fcore::metadata::DataTypes::time()),
+        DATA_TYPE_TIMESTAMP => Ok(fcore::metadata::DataTypes::timestamp()),
+        DATA_TYPE_TIMESTAMP_LTZ => 
Ok(fcore::metadata::DataTypes::timestamp_ltz()),
+        _ => Err(anyhow!("Unknown data type: {}", dt)),
+    }
+}
+
+fn core_data_type_to_ffi(dt: &fcore::metadata::DataType) -> i32 {
+    match dt {
+        fcore::metadata::DataType::Boolean(_) => DATA_TYPE_BOOLEAN,
+        fcore::metadata::DataType::TinyInt(_) => DATA_TYPE_TINYINT,
+        fcore::metadata::DataType::SmallInt(_) => DATA_TYPE_SMALLINT,
+        fcore::metadata::DataType::Int(_) => DATA_TYPE_INT,
+        fcore::metadata::DataType::BigInt(_) => DATA_TYPE_BIGINT,
+        fcore::metadata::DataType::Float(_) => DATA_TYPE_FLOAT,
+        fcore::metadata::DataType::Double(_) => DATA_TYPE_DOUBLE,
+        fcore::metadata::DataType::String(_) => DATA_TYPE_STRING,
+        fcore::metadata::DataType::Bytes(_) => DATA_TYPE_BYTES,
+        fcore::metadata::DataType::Date(_) => DATA_TYPE_DATE,
+        fcore::metadata::DataType::Time(_) => DATA_TYPE_TIME,
+        fcore::metadata::DataType::Timestamp(_) => DATA_TYPE_TIMESTAMP,
+        fcore::metadata::DataType::TimestampLTz(_) => DATA_TYPE_TIMESTAMP_LTZ,
+        _ => 0,
+    }
+}
+
+pub fn ffi_descriptor_to_core(
+    descriptor: &ffi::FfiTableDescriptor,
+) -> Result<fcore::metadata::TableDescriptor> {
+    let mut schema_builder = fcore::metadata::Schema::builder();
+
+    for col in &descriptor.schema.columns {
+        let dt = ffi_data_type_to_core(col.data_type)?;
+        schema_builder = schema_builder.column(&col.name, dt);
+        if !col.comment.is_empty() {
+            schema_builder = schema_builder.with_comment(&col.comment);
+        }
+    }
+
+    if !descriptor.schema.primary_keys.is_empty() {
+        schema_builder = 
schema_builder.primary_key(descriptor.schema.primary_keys.clone());
+    }
+
+    let schema = schema_builder.build()?;
+
+    let mut builder = fcore::metadata::TableDescriptor::builder()
+        .schema(schema)
+        .partitioned_by(descriptor.partition_keys.clone());
+
+    if descriptor.bucket_count > 0 {
+        builder = builder.distributed_by(Some(descriptor.bucket_count), 
descriptor.bucket_keys.clone());
+    } else {
+        builder = builder.distributed_by(None, descriptor.bucket_keys.clone());
+    }
+
+    for prop in &descriptor.properties {
+        builder = builder.property(&prop.key, &prop.value);
+    }
+
+    if !descriptor.comment.is_empty() {
+        builder = builder.comment(&descriptor.comment);
+    }
+
+    Ok(builder.build()?)
+}
+
+pub fn core_table_info_to_ffi(info: &fcore::metadata::TableInfo) -> 
ffi::FfiTableInfo {
+    let schema = info.get_schema();
+    let columns: Vec<ffi::FfiColumn> = schema
+        .columns()
+        .iter()
+        .map(|col| ffi::FfiColumn {
+            name: col.name().to_string(),
+            data_type: core_data_type_to_ffi(col.data_type()),
+            comment: col.comment().unwrap_or("").to_string(),
+        })
+        .collect();
+
+    let primary_keys: Vec<String> = schema
+        .primary_key()
+        .map(|pk| pk.column_names().to_vec())
+        .unwrap_or_default();
+
+    let properties: Vec<ffi::HashMapValue> = info
+        .get_properties()
+        .iter()
+        .map(|(k, v)| ffi::HashMapValue {
+            key: k.clone(),
+            value: v.clone(),
+        })
+        .collect();
+
+    ffi::FfiTableInfo {
+        table_id: info.get_table_id(),
+        schema_id: info.get_schema_id(),
+        table_path: ffi::FfiTablePath {
+            database_name: info.get_table_path().database().to_string(),
+            table_name: info.get_table_path().table().to_string(),
+        },
+        created_time: info.get_created_time(),
+        modified_time: info.get_modified_time(),
+        primary_keys: info.get_primary_keys().clone(),
+        bucket_keys: info.get_bucket_keys().to_vec(),
+        partition_keys: info.get_partition_keys().to_vec(),
+        num_buckets: info.get_num_buckets(),
+        has_primary_key: info.has_primary_key(),
+        is_partitioned: info.is_partitioned(),
+        properties,
+        comment: info.get_comment().unwrap_or("").to_string(),
+        schema: ffi::FfiSchema {
+            columns,
+            primary_keys,
+        },
+    }
+}
+
+pub fn empty_table_info() -> ffi::FfiTableInfo {
+    ffi::FfiTableInfo {
+        table_id: 0,
+        schema_id: 0,
+        table_path: ffi::FfiTablePath {
+            database_name: String::new(),
+            table_name: String::new(),
+        },
+        created_time: 0,
+        modified_time: 0,
+        primary_keys: vec![],
+        bucket_keys: vec![],
+        partition_keys: vec![],
+        num_buckets: 0,
+        has_primary_key: false,
+        is_partitioned: false,
+        properties: vec![],
+        comment: String::new(),
+        schema: ffi::FfiSchema {
+            columns: vec![],
+            primary_keys: vec![],
+        },
+    }
+}
+
+pub struct OwnedRowData {
+    strings: Vec<String>,
+}
+
+impl OwnedRowData {
+    pub fn new() -> Self {
+        Self { strings: Vec::new() }
+    }
+
+    pub fn collect_strings(&mut self, row: &ffi::FfiGenericRow) {
+        for field in &row.fields {
+            if field.datum_type == DATUM_TYPE_STRING {
+                self.strings.push(field.string_val.to_string());
+            }
+        }
+    }
+
+    pub fn get_strings(&self) -> &[String] {
+        &self.strings
+    }
+}
+
+pub fn ffi_row_to_core<'a>(
+    row: &ffi::FfiGenericRow,
+    owner: &'a OwnedRowData,
+) -> fcore::row::GenericRow<'a> {
+    use fcore::row::{Blob, Datum, F32, F64};
+
+    let mut generic_row = fcore::row::GenericRow::new();
+    let mut string_idx = 0;
+
+    for (idx, field) in row.fields.iter().enumerate() {
+        let datum = match field.datum_type {
+            DATUM_TYPE_NULL => Datum::Null,
+            DATUM_TYPE_BOOL => Datum::Bool(field.bool_val),
+            DATUM_TYPE_INT32 => Datum::Int32(field.i32_val),
+            DATUM_TYPE_INT64 => Datum::Int64(field.i64_val),
+            DATUM_TYPE_FLOAT32 => Datum::Float32(F32::from(field.f32_val)),
+            DATUM_TYPE_FLOAT64 => Datum::Float64(F64::from(field.f64_val)),
+            DATUM_TYPE_STRING => {
+                let str_ref = owner.get_strings()[string_idx].as_str();
+                string_idx += 1;
+                Datum::String(str_ref)
+            }
+            DATUM_TYPE_BYTES => 
Datum::Blob(Blob::from(field.bytes_val.clone())),
+            _ => Datum::Null,
+        };
+        generic_row.set_field(idx, datum);
+    }
+
+    generic_row
+}
+
+pub fn core_scan_records_to_ffi(records: &fcore::record::ScanRecords) -> 
ffi::FfiScanRecords {
+    let mut ffi_records = Vec::new();
+    
+    // Iterate over all buckets and their records
+    for bucket_records in records.records_by_buckets().values() {
+        for record in bucket_records {
+            let row = record.row();
+            let fields = core_row_to_ffi_fields(row);
+
+            ffi_records.push(ffi::FfiScanRecord {
+                offset: record.offset(),
+                timestamp: record.timestamp(),
+                row: ffi::FfiGenericRow { fields },
+            });
+        }
+    }
+
+    ffi::FfiScanRecords { records: ffi_records }
+}
+
+fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) -> Vec<ffi::FfiDatum> 
{
+    fn new_datum(datum_type: i32) -> ffi::FfiDatum {
+        ffi::FfiDatum {
+            datum_type,
+            bool_val: false,
+            i32_val: 0,
+            i64_val: 0,
+            f32_val: 0.0,
+            f64_val: 0.0,
+            string_val: String::new(),
+            bytes_val: vec![],
+        }
+    }
+
+    let record_batch = row.get_record_batch();
+    let schema = record_batch.schema();
+    let row_id = row.get_row_id();
+
+    let mut fields = Vec::with_capacity(schema.fields().len());
+
+    for (i, field) in schema.fields().iter().enumerate() {
+        if row.is_null_at(i) {
+            fields.push(new_datum(DATUM_TYPE_NULL));
+            continue;
+        }
+
+        let datum = match field.data_type() {
+            ArrowDataType::Boolean => {
+                let mut datum = new_datum(DATUM_TYPE_BOOL);
+                datum.bool_val = row.get_boolean(i);
+                datum
+            }
+            ArrowDataType::Int8 => {
+                let mut datum = new_datum(DATUM_TYPE_INT32);
+                datum.i32_val = row.get_byte(i) as i32;
+                datum
+            }
+            ArrowDataType::Int16 => {
+                let mut datum = new_datum(DATUM_TYPE_INT32);
+                datum.i32_val = row.get_short(i) as i32;
+                datum
+            }
+            ArrowDataType::Int32 => {
+                let mut datum = new_datum(DATUM_TYPE_INT32);
+                datum.i32_val = row.get_int(i);
+                datum
+            }
+            ArrowDataType::Int64 => {
+                let mut datum = new_datum(DATUM_TYPE_INT64);
+                datum.i64_val = row.get_long(i);
+                datum
+            }
+            ArrowDataType::Float32 => {
+                let mut datum = new_datum(DATUM_TYPE_FLOAT32);
+                datum.f32_val = row.get_float(i);
+                datum
+            }
+            ArrowDataType::Float64 => {
+                let mut datum = new_datum(DATUM_TYPE_FLOAT64);
+                datum.f64_val = row.get_double(i);
+                datum
+            }
+            ArrowDataType::Utf8 => {
+                let mut datum = new_datum(DATUM_TYPE_STRING);
+                datum.string_val = row.get_string(i).to_string();
+                datum
+            }
+            ArrowDataType::LargeUtf8 => {
+                let array = record_batch
+                    .column(i)
+                    .as_any()
+                    .downcast_ref::<LargeStringArray>()
+                    .expect("LargeUtf8 column expected");
+                let mut datum = new_datum(DATUM_TYPE_STRING);
+                datum.string_val = array.value(row_id).to_string();
+                datum
+            }
+            ArrowDataType::Binary => {
+                let mut datum = new_datum(DATUM_TYPE_BYTES);
+                datum.bytes_val = row.get_bytes(i);

Review Comment:
   dito



##########
bindings/cpp/include/fluss.hpp:
##########
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <optional>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+namespace fluss {
+
+namespace ffi {
+    struct Connection;
+    struct Admin;
+    struct Table;
+    struct AppendWriter;
+    struct LogScanner;
+}  // namespace ffi
+
+enum class DataType {
+    Boolean = 1,
+    TinyInt = 2,
+    SmallInt = 3,
+    Int = 4,
+    BigInt = 5,
+    Float = 6,
+    Double = 7,
+    String = 8,
+    Bytes = 9,
+    Date = 10,
+    Time = 11,
+    Timestamp = 12,
+    TimestampLtz = 13,
+};
+
+enum class DatumType {
+    Null = 0,
+    Bool = 1,
+    Int32 = 2,
+    Int64 = 3,
+    Float32 = 4,
+    Float64 = 5,
+    String = 6,
+    Bytes = 7,
+};
+
+struct Result {
+    int32_t error_code{0};
+    std::string error_message;
+
+    bool Ok() const { return error_code == 0; }
+};
+
+struct TablePath {
+    std::string database_name;
+    std::string table_name;
+
+    TablePath() = default;
+    TablePath(std::string db, std::string tbl)
+        : database_name(std::move(db)), table_name(std::move(tbl)) {}
+
+    std::string ToString() const { return database_name + "." + table_name; }
+};
+
+struct Column {
+    std::string name;
+    DataType data_type;
+    std::string comment;
+};
+
+struct Schema {
+    std::vector<Column> columns;
+    std::vector<std::string> primary_keys;
+
+    class Builder {
+    public:
+        Builder& AddColumn(std::string name, DataType type,
+                           std::string comment = "") {
+            columns_.push_back({std::move(name), type, std::move(comment)});
+            return *this;
+        }
+
+        Builder& SetPrimaryKeys(std::vector<std::string> keys) {
+            primary_keys_ = std::move(keys);
+            return *this;
+        }
+
+        Schema Build() {
+            return Schema{std::move(columns_), std::move(primary_keys_)};
+        }
+
+    private:
+        std::vector<Column> columns_;
+        std::vector<std::string> primary_keys_;
+    };
+
+    static Builder NewBuilder() { return Builder(); }
+};
+
+struct TableDescriptor {
+    Schema schema;
+    std::vector<std::string> partition_keys;
+    int32_t bucket_count{0};
+    std::vector<std::string> bucket_keys;
+    std::unordered_map<std::string, std::string> properties;
+    std::string comment;
+
+    class Builder {
+    public:
+        Builder& SetSchema(Schema s) {
+            schema_ = std::move(s);
+            return *this;
+        }
+
+        Builder& SetPartitionKeys(std::vector<std::string> keys) {
+            partition_keys_ = std::move(keys);
+            return *this;
+        }
+
+        Builder& SetBucketCount(int32_t count) {
+            bucket_count_ = count;
+            return *this;
+        }
+
+        Builder& SetBucketKeys(std::vector<std::string> keys) {
+            bucket_keys_ = std::move(keys);
+            return *this;
+        }
+
+        Builder& SetProperty(std::string key, std::string value) {
+            properties_[std::move(key)] = std::move(value);
+            return *this;
+        }
+
+        Builder& SetComment(std::string comment) {
+            comment_ = std::move(comment);
+            return *this;
+        }
+
+        TableDescriptor Build() {
+            return TableDescriptor{std::move(schema_),
+                                   std::move(partition_keys_),
+                                   bucket_count_,
+                                   std::move(bucket_keys_),
+                                   std::move(properties_),
+                                   std::move(comment_)};
+        }
+
+    private:
+        Schema schema_;
+        std::vector<std::string> partition_keys_;
+        int32_t bucket_count_{0};
+        std::vector<std::string> bucket_keys_;
+        std::unordered_map<std::string, std::string> properties_;
+        std::string comment_;
+    };
+
+    static Builder NewBuilder() { return Builder(); }
+};
+
+struct TableInfo {
+    int64_t table_id;
+    int32_t schema_id;
+    TablePath table_path;
+    int64_t created_time;
+    int64_t modified_time;
+    std::vector<std::string> primary_keys;
+    std::vector<std::string> bucket_keys;
+    std::vector<std::string> partition_keys;
+    int32_t num_buckets;
+    bool has_primary_key;
+    bool is_partitioned;
+    std::unordered_map<std::string, std::string> properties;
+    std::string comment;
+    Schema schema;
+};
+
+struct Datum {
+    DatumType type{DatumType::Null};
+    bool bool_val{false};
+    int32_t i32_val{0};
+    int64_t i64_val{0};
+    float f32_val{0.0F};
+    double f64_val{0.0};
+    std::string string_val;
+    std::vector<uint8_t> bytes_val;
+
+    static Datum Null() { return Datum(); }

Review Comment:
   nit:
   ```
   static Datum Null() { return {}; }
   ```
   ?



##########
bindings/cpp/src/types.rs:
##########
@@ -0,0 +1,498 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::ffi;
+use anyhow::{anyhow, Result};
+use arrow::array::{
+    Date32Array, LargeBinaryArray, LargeStringArray, Time32MillisecondArray, 
Time32SecondArray,
+    Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
+    TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
+};
+use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
+use fluss as fcore;
+use fcore::row::InternalRow;
+
+pub const DATA_TYPE_BOOLEAN: i32 = 1;
+pub const DATA_TYPE_TINYINT: i32 = 2;
+pub const DATA_TYPE_SMALLINT: i32 = 3;
+pub const DATA_TYPE_INT: i32 = 4;
+pub const DATA_TYPE_BIGINT: i32 = 5;
+pub const DATA_TYPE_FLOAT: i32 = 6;
+pub const DATA_TYPE_DOUBLE: i32 = 7;
+pub const DATA_TYPE_STRING: i32 = 8;
+pub const DATA_TYPE_BYTES: i32 = 9;
+pub const DATA_TYPE_DATE: i32 = 10;
+pub const DATA_TYPE_TIME: i32 = 11;
+pub const DATA_TYPE_TIMESTAMP: i32 = 12;
+pub const DATA_TYPE_TIMESTAMP_LTZ: i32 = 13;
+
+pub const DATUM_TYPE_NULL: i32 = 0;
+pub const DATUM_TYPE_BOOL: i32 = 1;
+pub const DATUM_TYPE_INT32: i32 = 2;
+pub const DATUM_TYPE_INT64: i32 = 3;
+pub const DATUM_TYPE_FLOAT32: i32 = 4;
+pub const DATUM_TYPE_FLOAT64: i32 = 5;
+pub const DATUM_TYPE_STRING: i32 = 6;
+pub const DATUM_TYPE_BYTES: i32 = 7;
+
+fn ffi_data_type_to_core(dt: i32) -> Result<fcore::metadata::DataType> {
+    match dt {
+        DATA_TYPE_BOOLEAN => Ok(fcore::metadata::DataTypes::boolean()),
+        DATA_TYPE_TINYINT => Ok(fcore::metadata::DataTypes::tinyint()),
+        DATA_TYPE_SMALLINT => Ok(fcore::metadata::DataTypes::smallint()),
+        DATA_TYPE_INT => Ok(fcore::metadata::DataTypes::int()),
+        DATA_TYPE_BIGINT => Ok(fcore::metadata::DataTypes::bigint()),
+        DATA_TYPE_FLOAT => Ok(fcore::metadata::DataTypes::float()),
+        DATA_TYPE_DOUBLE => Ok(fcore::metadata::DataTypes::double()),
+        DATA_TYPE_STRING => Ok(fcore::metadata::DataTypes::string()),
+        DATA_TYPE_BYTES => Ok(fcore::metadata::DataTypes::bytes()),
+        DATA_TYPE_DATE => Ok(fcore::metadata::DataTypes::date()),
+        DATA_TYPE_TIME => Ok(fcore::metadata::DataTypes::time()),
+        DATA_TYPE_TIMESTAMP => Ok(fcore::metadata::DataTypes::timestamp()),
+        DATA_TYPE_TIMESTAMP_LTZ => 
Ok(fcore::metadata::DataTypes::timestamp_ltz()),
+        _ => Err(anyhow!("Unknown data type: {}", dt)),
+    }
+}
+
+fn core_data_type_to_ffi(dt: &fcore::metadata::DataType) -> i32 {
+    match dt {
+        fcore::metadata::DataType::Boolean(_) => DATA_TYPE_BOOLEAN,
+        fcore::metadata::DataType::TinyInt(_) => DATA_TYPE_TINYINT,
+        fcore::metadata::DataType::SmallInt(_) => DATA_TYPE_SMALLINT,
+        fcore::metadata::DataType::Int(_) => DATA_TYPE_INT,
+        fcore::metadata::DataType::BigInt(_) => DATA_TYPE_BIGINT,
+        fcore::metadata::DataType::Float(_) => DATA_TYPE_FLOAT,
+        fcore::metadata::DataType::Double(_) => DATA_TYPE_DOUBLE,
+        fcore::metadata::DataType::String(_) => DATA_TYPE_STRING,
+        fcore::metadata::DataType::Bytes(_) => DATA_TYPE_BYTES,
+        fcore::metadata::DataType::Date(_) => DATA_TYPE_DATE,
+        fcore::metadata::DataType::Time(_) => DATA_TYPE_TIME,
+        fcore::metadata::DataType::Timestamp(_) => DATA_TYPE_TIMESTAMP,
+        fcore::metadata::DataType::TimestampLTz(_) => DATA_TYPE_TIMESTAMP_LTZ,
+        _ => 0,
+    }
+}
+
+pub fn ffi_descriptor_to_core(
+    descriptor: &ffi::FfiTableDescriptor,
+) -> Result<fcore::metadata::TableDescriptor> {
+    let mut schema_builder = fcore::metadata::Schema::builder();
+
+    for col in &descriptor.schema.columns {
+        let dt = ffi_data_type_to_core(col.data_type)?;
+        schema_builder = schema_builder.column(&col.name, dt);
+        if !col.comment.is_empty() {
+            schema_builder = schema_builder.with_comment(&col.comment);
+        }
+    }
+
+    if !descriptor.schema.primary_keys.is_empty() {
+        schema_builder = 
schema_builder.primary_key(descriptor.schema.primary_keys.clone());
+    }
+
+    let schema = schema_builder.build()?;
+
+    let mut builder = fcore::metadata::TableDescriptor::builder()
+        .schema(schema)
+        .partitioned_by(descriptor.partition_keys.clone());
+
+    if descriptor.bucket_count > 0 {
+        builder = builder.distributed_by(Some(descriptor.bucket_count), 
descriptor.bucket_keys.clone());
+    } else {
+        builder = builder.distributed_by(None, descriptor.bucket_keys.clone());
+    }
+
+    for prop in &descriptor.properties {
+        builder = builder.property(&prop.key, &prop.value);
+    }
+
+    if !descriptor.comment.is_empty() {
+        builder = builder.comment(&descriptor.comment);
+    }
+
+    Ok(builder.build()?)
+}
+
+pub fn core_table_info_to_ffi(info: &fcore::metadata::TableInfo) -> 
ffi::FfiTableInfo {
+    let schema = info.get_schema();
+    let columns: Vec<ffi::FfiColumn> = schema
+        .columns()
+        .iter()
+        .map(|col| ffi::FfiColumn {
+            name: col.name().to_string(),
+            data_type: core_data_type_to_ffi(col.data_type()),
+            comment: col.comment().unwrap_or("").to_string(),
+        })
+        .collect();
+
+    let primary_keys: Vec<String> = schema
+        .primary_key()
+        .map(|pk| pk.column_names().to_vec())
+        .unwrap_or_default();
+
+    let properties: Vec<ffi::HashMapValue> = info
+        .get_properties()
+        .iter()
+        .map(|(k, v)| ffi::HashMapValue {
+            key: k.clone(),
+            value: v.clone(),
+        })
+        .collect();
+
+    ffi::FfiTableInfo {
+        table_id: info.get_table_id(),
+        schema_id: info.get_schema_id(),
+        table_path: ffi::FfiTablePath {
+            database_name: info.get_table_path().database().to_string(),
+            table_name: info.get_table_path().table().to_string(),
+        },
+        created_time: info.get_created_time(),
+        modified_time: info.get_modified_time(),
+        primary_keys: info.get_primary_keys().clone(),
+        bucket_keys: info.get_bucket_keys().to_vec(),
+        partition_keys: info.get_partition_keys().to_vec(),
+        num_buckets: info.get_num_buckets(),
+        has_primary_key: info.has_primary_key(),
+        is_partitioned: info.is_partitioned(),
+        properties,
+        comment: info.get_comment().unwrap_or("").to_string(),
+        schema: ffi::FfiSchema {
+            columns,
+            primary_keys,
+        },
+    }
+}
+
+pub fn empty_table_info() -> ffi::FfiTableInfo {
+    ffi::FfiTableInfo {
+        table_id: 0,
+        schema_id: 0,
+        table_path: ffi::FfiTablePath {
+            database_name: String::new(),
+            table_name: String::new(),
+        },
+        created_time: 0,
+        modified_time: 0,
+        primary_keys: vec![],
+        bucket_keys: vec![],
+        partition_keys: vec![],
+        num_buckets: 0,
+        has_primary_key: false,
+        is_partitioned: false,
+        properties: vec![],
+        comment: String::new(),
+        schema: ffi::FfiSchema {
+            columns: vec![],
+            primary_keys: vec![],
+        },
+    }
+}
+
+pub struct OwnedRowData {
+    strings: Vec<String>,
+}
+
+impl OwnedRowData {
+    pub fn new() -> Self {
+        Self { strings: Vec::new() }
+    }
+
+    pub fn collect_strings(&mut self, row: &ffi::FfiGenericRow) {
+        for field in &row.fields {
+            if field.datum_type == DATUM_TYPE_STRING {
+                self.strings.push(field.string_val.to_string());
+            }
+        }
+    }
+
+    pub fn get_strings(&self) -> &[String] {
+        &self.strings
+    }
+}
+
+pub fn ffi_row_to_core<'a>(
+    row: &ffi::FfiGenericRow,
+    owner: &'a OwnedRowData,
+) -> fcore::row::GenericRow<'a> {
+    use fcore::row::{Blob, Datum, F32, F64};
+
+    let mut generic_row = fcore::row::GenericRow::new();
+    let mut string_idx = 0;
+
+    for (idx, field) in row.fields.iter().enumerate() {
+        let datum = match field.datum_type {
+            DATUM_TYPE_NULL => Datum::Null,
+            DATUM_TYPE_BOOL => Datum::Bool(field.bool_val),
+            DATUM_TYPE_INT32 => Datum::Int32(field.i32_val),
+            DATUM_TYPE_INT64 => Datum::Int64(field.i64_val),
+            DATUM_TYPE_FLOAT32 => Datum::Float32(F32::from(field.f32_val)),

Review Comment:
   nit:
   DATUM_TYPE_FLOAT32 => Datum::Float32(field.f32_val.into()),
               DATUM_TYPE_FLOAT64 => Datum::Float64(field.f64_val.into()),



##########
bindings/cpp/src/ffi_converter.hpp:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include "fluss.hpp"
+#include "lib.rs.h"
+
+namespace fluss {
+namespace utils {
+
+inline Result make_error(int32_t code, std::string msg) {
+    return Result{code, std::move(msg)};
+}
+
+inline Result make_ok() {
+    return Result{0, {}};
+}
+
+inline Result from_ffi_result(const ffi::FfiResult& ffi_result) {
+    return Result{ffi_result.error_code, 
std::string(ffi_result.error_message)};
+}
+
+inline ffi::FfiTablePath to_ffi_table_path(const TablePath& path) {
+    ffi::FfiTablePath ffi_path;
+    ffi_path.database_name = rust::String(path.database_name);
+    ffi_path.table_name = rust::String(path.table_name);
+    return ffi_path;
+}
+
+inline ffi::FfiColumn to_ffi_column(const Column& col) {
+    ffi::FfiColumn ffi_col;
+    ffi_col.name = rust::String(col.name);
+    ffi_col.data_type = static_cast<int32_t>(col.data_type);
+    ffi_col.comment = rust::String(col.comment);
+    return ffi_col;
+}
+
+inline ffi::FfiSchema to_ffi_schema(const Schema& schema) {
+    ffi::FfiSchema ffi_schema;
+
+    rust::Vec<ffi::FfiColumn> cols;
+    for (const auto& col : schema.columns) {
+        cols.push_back(to_ffi_column(col));
+    }
+    ffi_schema.columns = std::move(cols);
+
+    rust::Vec<rust::String> pks;
+    for (const auto& pk : schema.primary_keys) {
+        pks.push_back(rust::String(pk));
+    }
+    ffi_schema.primary_keys = std::move(pks);
+
+    return ffi_schema;
+}
+
+inline ffi::FfiTableDescriptor to_ffi_table_descriptor(const TableDescriptor& 
desc) {
+    ffi::FfiTableDescriptor ffi_desc;
+
+    ffi_desc.schema = to_ffi_schema(desc.schema);
+
+    rust::Vec<rust::String> partition_keys;
+    for (const auto& pk : desc.partition_keys) {
+        partition_keys.push_back(rust::String(pk));
+    }
+    ffi_desc.partition_keys = std::move(partition_keys);
+
+    ffi_desc.bucket_count = desc.bucket_count;
+
+    rust::Vec<rust::String> bucket_keys;
+    for (const auto& bk : desc.bucket_keys) {
+        bucket_keys.push_back(rust::String(bk));
+    }
+    ffi_desc.bucket_keys = std::move(bucket_keys);
+
+    rust::Vec<ffi::HashMapValue> props;
+    for (const auto& [k, v] : desc.properties) {
+        ffi::HashMapValue prop;
+        prop.key = rust::String(k);
+        prop.value = rust::String(v);
+        props.push_back(prop);
+    }
+    ffi_desc.properties = std::move(props);
+
+    ffi_desc.comment = rust::String(desc.comment);
+
+    return ffi_desc;
+}
+
+inline ffi::FfiDatum to_ffi_datum(const Datum& datum) {
+    ffi::FfiDatum ffi_datum;
+    ffi_datum.datum_type = static_cast<int32_t>(datum.type);
+    ffi_datum.bool_val = datum.bool_val;
+    ffi_datum.i32_val = datum.i32_val;
+    ffi_datum.i64_val = datum.i64_val;
+    ffi_datum.f32_val = datum.f32_val;
+    ffi_datum.f64_val = datum.f64_val;
+    ffi_datum.string_val = rust::String(datum.string_val);
+
+    rust::Vec<uint8_t> bytes;
+    for (auto b : datum.bytes_val) {
+        bytes.push_back(b);
+    }
+    ffi_datum.bytes_val = std::move(bytes);
+
+    return ffi_datum;
+}
+
+inline ffi::FfiGenericRow to_ffi_generic_row(const GenericRow& row) {
+    ffi::FfiGenericRow ffi_row;
+
+    rust::Vec<ffi::FfiDatum> fields;
+    for (const auto& field : row.fields) {
+        fields.push_back(to_ffi_datum(field));
+    }
+    ffi_row.fields = std::move(fields);
+
+    return ffi_row;
+}
+
+inline Column from_ffi_column(const ffi::FfiColumn& ffi_col) {
+    return Column{
+        std::string(ffi_col.name),
+        static_cast<DataType>(ffi_col.data_type),
+        std::string(ffi_col.comment)};
+}
+
+inline Schema from_ffi_schema(const ffi::FfiSchema& ffi_schema) {
+    Schema schema;
+
+    for (const auto& col : ffi_schema.columns) {
+        schema.columns.push_back(from_ffi_column(col));
+    }
+
+    for (const auto& pk : ffi_schema.primary_keys) {
+        schema.primary_keys.push_back(std::string(pk));
+    }
+
+    return schema;
+}
+
+inline TableInfo from_ffi_table_info(const ffi::FfiTableInfo& ffi_info) {
+    TableInfo info;
+
+    info.table_id = ffi_info.table_id;
+    info.schema_id = ffi_info.schema_id;
+    info.table_path = TablePath{
+        std::string(ffi_info.table_path.database_name),
+        std::string(ffi_info.table_path.table_name)};
+    info.created_time = ffi_info.created_time;
+    info.modified_time = ffi_info.modified_time;
+
+    for (const auto& pk : ffi_info.primary_keys) {
+        info.primary_keys.push_back(std::string(pk));
+    }
+
+    for (const auto& bk : ffi_info.bucket_keys) {
+        info.bucket_keys.push_back(std::string(bk));
+    }
+
+    for (const auto& pk : ffi_info.partition_keys) {
+        info.partition_keys.push_back(std::string(pk));
+    }
+
+    info.num_buckets = ffi_info.num_buckets;
+    info.has_primary_key = ffi_info.has_primary_key;
+    info.is_partitioned = ffi_info.is_partitioned;
+
+    for (const auto& prop : ffi_info.properties) {
+        info.properties[std::string(prop.key)] = std::string(prop.value);
+    }
+
+    info.comment = std::string(ffi_info.comment);
+    info.schema = from_ffi_schema(ffi_info.schema);
+
+    return info;
+}
+
+inline Datum from_ffi_datum(const ffi::FfiDatum& ffi_datum) {
+    Datum datum;
+    datum.type = static_cast<DatumType>(ffi_datum.datum_type);
+    datum.bool_val = ffi_datum.bool_val;
+    datum.i32_val = ffi_datum.i32_val;
+    datum.i64_val = ffi_datum.i64_val;
+    datum.f32_val = ffi_datum.f32_val;
+    datum.f64_val = ffi_datum.f64_val;
+    datum.string_val = std::string(ffi_datum.string_val);

Review Comment:
   seem here need to a string copy?



##########
bindings/cpp/src/ffi_converter.hpp:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include "fluss.hpp"
+#include "lib.rs.h"
+
+namespace fluss {
+namespace utils {
+
+inline Result make_error(int32_t code, std::string msg) {
+    return Result{code, std::move(msg)};
+}
+
+inline Result make_ok() {
+    return Result{0, {}};
+}
+
+inline Result from_ffi_result(const ffi::FfiResult& ffi_result) {
+    return Result{ffi_result.error_code, 
std::string(ffi_result.error_message)};
+}
+
+inline ffi::FfiTablePath to_ffi_table_path(const TablePath& path) {
+    ffi::FfiTablePath ffi_path;
+    ffi_path.database_name = rust::String(path.database_name);
+    ffi_path.table_name = rust::String(path.table_name);
+    return ffi_path;
+}
+
+inline ffi::FfiColumn to_ffi_column(const Column& col) {
+    ffi::FfiColumn ffi_col;
+    ffi_col.name = rust::String(col.name);
+    ffi_col.data_type = static_cast<int32_t>(col.data_type);
+    ffi_col.comment = rust::String(col.comment);
+    return ffi_col;
+}
+
+inline ffi::FfiSchema to_ffi_schema(const Schema& schema) {
+    ffi::FfiSchema ffi_schema;
+
+    rust::Vec<ffi::FfiColumn> cols;
+    for (const auto& col : schema.columns) {
+        cols.push_back(to_ffi_column(col));
+    }
+    ffi_schema.columns = std::move(cols);
+
+    rust::Vec<rust::String> pks;
+    for (const auto& pk : schema.primary_keys) {
+        pks.push_back(rust::String(pk));
+    }
+    ffi_schema.primary_keys = std::move(pks);
+
+    return ffi_schema;
+}
+
+inline ffi::FfiTableDescriptor to_ffi_table_descriptor(const TableDescriptor& 
desc) {
+    ffi::FfiTableDescriptor ffi_desc;
+
+    ffi_desc.schema = to_ffi_schema(desc.schema);
+
+    rust::Vec<rust::String> partition_keys;
+    for (const auto& pk : desc.partition_keys) {
+        partition_keys.push_back(rust::String(pk));
+    }
+    ffi_desc.partition_keys = std::move(partition_keys);
+
+    ffi_desc.bucket_count = desc.bucket_count;
+
+    rust::Vec<rust::String> bucket_keys;
+    for (const auto& bk : desc.bucket_keys) {
+        bucket_keys.push_back(rust::String(bk));
+    }
+    ffi_desc.bucket_keys = std::move(bucket_keys);
+
+    rust::Vec<ffi::HashMapValue> props;
+    for (const auto& [k, v] : desc.properties) {
+        ffi::HashMapValue prop;
+        prop.key = rust::String(k);
+        prop.value = rust::String(v);
+        props.push_back(prop);
+    }
+    ffi_desc.properties = std::move(props);
+
+    ffi_desc.comment = rust::String(desc.comment);
+
+    return ffi_desc;
+}
+
+inline ffi::FfiDatum to_ffi_datum(const Datum& datum) {
+    ffi::FfiDatum ffi_datum;
+    ffi_datum.datum_type = static_cast<int32_t>(datum.type);
+    ffi_datum.bool_val = datum.bool_val;
+    ffi_datum.i32_val = datum.i32_val;
+    ffi_datum.i64_val = datum.i64_val;
+    ffi_datum.f32_val = datum.f32_val;
+    ffi_datum.f64_val = datum.f64_val;
+    ffi_datum.string_val = rust::String(datum.string_val);
+
+    rust::Vec<uint8_t> bytes;
+    for (auto b : datum.bytes_val) {
+        bytes.push_back(b);
+    }
+    ffi_datum.bytes_val = std::move(bytes);
+
+    return ffi_datum;
+}
+
+inline ffi::FfiGenericRow to_ffi_generic_row(const GenericRow& row) {
+    ffi::FfiGenericRow ffi_row;
+
+    rust::Vec<ffi::FfiDatum> fields;
+    for (const auto& field : row.fields) {
+        fields.push_back(to_ffi_datum(field));
+    }
+    ffi_row.fields = std::move(fields);
+
+    return ffi_row;
+}
+
+inline Column from_ffi_column(const ffi::FfiColumn& ffi_col) {
+    return Column{
+        std::string(ffi_col.name),
+        static_cast<DataType>(ffi_col.data_type),
+        std::string(ffi_col.comment)};
+}
+
+inline Schema from_ffi_schema(const ffi::FfiSchema& ffi_schema) {
+    Schema schema;
+
+    for (const auto& col : ffi_schema.columns) {
+        schema.columns.push_back(from_ffi_column(col));
+    }
+
+    for (const auto& pk : ffi_schema.primary_keys) {
+        schema.primary_keys.push_back(std::string(pk));
+    }
+
+    return schema;
+}
+
+inline TableInfo from_ffi_table_info(const ffi::FfiTableInfo& ffi_info) {
+    TableInfo info;
+
+    info.table_id = ffi_info.table_id;
+    info.schema_id = ffi_info.schema_id;
+    info.table_path = TablePath{
+        std::string(ffi_info.table_path.database_name),
+        std::string(ffi_info.table_path.table_name)};
+    info.created_time = ffi_info.created_time;
+    info.modified_time = ffi_info.modified_time;
+
+    for (const auto& pk : ffi_info.primary_keys) {
+        info.primary_keys.push_back(std::string(pk));
+    }
+
+    for (const auto& bk : ffi_info.bucket_keys) {
+        info.bucket_keys.push_back(std::string(bk));
+    }
+
+    for (const auto& pk : ffi_info.partition_keys) {
+        info.partition_keys.push_back(std::string(pk));
+    }
+
+    info.num_buckets = ffi_info.num_buckets;
+    info.has_primary_key = ffi_info.has_primary_key;
+    info.is_partitioned = ffi_info.is_partitioned;
+
+    for (const auto& prop : ffi_info.properties) {
+        info.properties[std::string(prop.key)] = std::string(prop.value);
+    }
+
+    info.comment = std::string(ffi_info.comment);
+    info.schema = from_ffi_schema(ffi_info.schema);
+
+    return info;
+}
+
+inline Datum from_ffi_datum(const ffi::FfiDatum& ffi_datum) {
+    Datum datum;
+    datum.type = static_cast<DatumType>(ffi_datum.datum_type);
+    datum.bool_val = ffi_datum.bool_val;
+    datum.i32_val = ffi_datum.i32_val;
+    datum.i64_val = ffi_datum.i64_val;
+    datum.f32_val = ffi_datum.f32_val;
+    datum.f64_val = ffi_datum.f64_val;
+    datum.string_val = std::string(ffi_datum.string_val);

Review Comment:
   I'm not sure is there any effient way. If complex, maybe left a todo to mark 
it. It'll reminds us if we find any bottle neck



##########
bindings/cpp/src/types.rs:
##########
@@ -0,0 +1,498 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::ffi;
+use anyhow::{anyhow, Result};
+use arrow::array::{
+    Date32Array, LargeBinaryArray, LargeStringArray, Time32MillisecondArray, 
Time32SecondArray,
+    Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
+    TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
+};
+use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
+use fluss as fcore;
+use fcore::row::InternalRow;
+
+pub const DATA_TYPE_BOOLEAN: i32 = 1;
+pub const DATA_TYPE_TINYINT: i32 = 2;
+pub const DATA_TYPE_SMALLINT: i32 = 3;
+pub const DATA_TYPE_INT: i32 = 4;
+pub const DATA_TYPE_BIGINT: i32 = 5;
+pub const DATA_TYPE_FLOAT: i32 = 6;
+pub const DATA_TYPE_DOUBLE: i32 = 7;
+pub const DATA_TYPE_STRING: i32 = 8;
+pub const DATA_TYPE_BYTES: i32 = 9;
+pub const DATA_TYPE_DATE: i32 = 10;
+pub const DATA_TYPE_TIME: i32 = 11;
+pub const DATA_TYPE_TIMESTAMP: i32 = 12;
+pub const DATA_TYPE_TIMESTAMP_LTZ: i32 = 13;
+
+pub const DATUM_TYPE_NULL: i32 = 0;
+pub const DATUM_TYPE_BOOL: i32 = 1;
+pub const DATUM_TYPE_INT32: i32 = 2;
+pub const DATUM_TYPE_INT64: i32 = 3;
+pub const DATUM_TYPE_FLOAT32: i32 = 4;
+pub const DATUM_TYPE_FLOAT64: i32 = 5;
+pub const DATUM_TYPE_STRING: i32 = 6;
+pub const DATUM_TYPE_BYTES: i32 = 7;
+
+fn ffi_data_type_to_core(dt: i32) -> Result<fcore::metadata::DataType> {
+    match dt {
+        DATA_TYPE_BOOLEAN => Ok(fcore::metadata::DataTypes::boolean()),
+        DATA_TYPE_TINYINT => Ok(fcore::metadata::DataTypes::tinyint()),
+        DATA_TYPE_SMALLINT => Ok(fcore::metadata::DataTypes::smallint()),
+        DATA_TYPE_INT => Ok(fcore::metadata::DataTypes::int()),
+        DATA_TYPE_BIGINT => Ok(fcore::metadata::DataTypes::bigint()),
+        DATA_TYPE_FLOAT => Ok(fcore::metadata::DataTypes::float()),
+        DATA_TYPE_DOUBLE => Ok(fcore::metadata::DataTypes::double()),
+        DATA_TYPE_STRING => Ok(fcore::metadata::DataTypes::string()),
+        DATA_TYPE_BYTES => Ok(fcore::metadata::DataTypes::bytes()),
+        DATA_TYPE_DATE => Ok(fcore::metadata::DataTypes::date()),
+        DATA_TYPE_TIME => Ok(fcore::metadata::DataTypes::time()),
+        DATA_TYPE_TIMESTAMP => Ok(fcore::metadata::DataTypes::timestamp()),
+        DATA_TYPE_TIMESTAMP_LTZ => 
Ok(fcore::metadata::DataTypes::timestamp_ltz()),
+        _ => Err(anyhow!("Unknown data type: {}", dt)),
+    }
+}
+
+fn core_data_type_to_ffi(dt: &fcore::metadata::DataType) -> i32 {
+    match dt {
+        fcore::metadata::DataType::Boolean(_) => DATA_TYPE_BOOLEAN,
+        fcore::metadata::DataType::TinyInt(_) => DATA_TYPE_TINYINT,
+        fcore::metadata::DataType::SmallInt(_) => DATA_TYPE_SMALLINT,
+        fcore::metadata::DataType::Int(_) => DATA_TYPE_INT,
+        fcore::metadata::DataType::BigInt(_) => DATA_TYPE_BIGINT,
+        fcore::metadata::DataType::Float(_) => DATA_TYPE_FLOAT,
+        fcore::metadata::DataType::Double(_) => DATA_TYPE_DOUBLE,
+        fcore::metadata::DataType::String(_) => DATA_TYPE_STRING,
+        fcore::metadata::DataType::Bytes(_) => DATA_TYPE_BYTES,
+        fcore::metadata::DataType::Date(_) => DATA_TYPE_DATE,
+        fcore::metadata::DataType::Time(_) => DATA_TYPE_TIME,
+        fcore::metadata::DataType::Timestamp(_) => DATA_TYPE_TIMESTAMP,
+        fcore::metadata::DataType::TimestampLTz(_) => DATA_TYPE_TIMESTAMP_LTZ,
+        _ => 0,
+    }
+}
+
+pub fn ffi_descriptor_to_core(
+    descriptor: &ffi::FfiTableDescriptor,
+) -> Result<fcore::metadata::TableDescriptor> {
+    let mut schema_builder = fcore::metadata::Schema::builder();
+
+    for col in &descriptor.schema.columns {
+        let dt = ffi_data_type_to_core(col.data_type)?;
+        schema_builder = schema_builder.column(&col.name, dt);
+        if !col.comment.is_empty() {
+            schema_builder = schema_builder.with_comment(&col.comment);
+        }
+    }
+
+    if !descriptor.schema.primary_keys.is_empty() {
+        schema_builder = 
schema_builder.primary_key(descriptor.schema.primary_keys.clone());
+    }
+
+    let schema = schema_builder.build()?;
+
+    let mut builder = fcore::metadata::TableDescriptor::builder()
+        .schema(schema)
+        .partitioned_by(descriptor.partition_keys.clone());
+
+    if descriptor.bucket_count > 0 {
+        builder = builder.distributed_by(Some(descriptor.bucket_count), 
descriptor.bucket_keys.clone());
+    } else {
+        builder = builder.distributed_by(None, descriptor.bucket_keys.clone());
+    }
+
+    for prop in &descriptor.properties {
+        builder = builder.property(&prop.key, &prop.value);
+    }
+
+    if !descriptor.comment.is_empty() {
+        builder = builder.comment(&descriptor.comment);
+    }
+
+    Ok(builder.build()?)
+}
+
+pub fn core_table_info_to_ffi(info: &fcore::metadata::TableInfo) -> 
ffi::FfiTableInfo {
+    let schema = info.get_schema();
+    let columns: Vec<ffi::FfiColumn> = schema
+        .columns()
+        .iter()
+        .map(|col| ffi::FfiColumn {
+            name: col.name().to_string(),
+            data_type: core_data_type_to_ffi(col.data_type()),
+            comment: col.comment().unwrap_or("").to_string(),
+        })
+        .collect();
+
+    let primary_keys: Vec<String> = schema
+        .primary_key()
+        .map(|pk| pk.column_names().to_vec())
+        .unwrap_or_default();
+
+    let properties: Vec<ffi::HashMapValue> = info
+        .get_properties()
+        .iter()
+        .map(|(k, v)| ffi::HashMapValue {
+            key: k.clone(),
+            value: v.clone(),
+        })
+        .collect();
+
+    ffi::FfiTableInfo {
+        table_id: info.get_table_id(),
+        schema_id: info.get_schema_id(),
+        table_path: ffi::FfiTablePath {
+            database_name: info.get_table_path().database().to_string(),
+            table_name: info.get_table_path().table().to_string(),
+        },
+        created_time: info.get_created_time(),
+        modified_time: info.get_modified_time(),
+        primary_keys: info.get_primary_keys().clone(),
+        bucket_keys: info.get_bucket_keys().to_vec(),
+        partition_keys: info.get_partition_keys().to_vec(),
+        num_buckets: info.get_num_buckets(),
+        has_primary_key: info.has_primary_key(),
+        is_partitioned: info.is_partitioned(),
+        properties,
+        comment: info.get_comment().unwrap_or("").to_string(),
+        schema: ffi::FfiSchema {
+            columns,
+            primary_keys,
+        },
+    }
+}
+
+pub fn empty_table_info() -> ffi::FfiTableInfo {
+    ffi::FfiTableInfo {
+        table_id: 0,
+        schema_id: 0,
+        table_path: ffi::FfiTablePath {
+            database_name: String::new(),
+            table_name: String::new(),
+        },
+        created_time: 0,
+        modified_time: 0,
+        primary_keys: vec![],
+        bucket_keys: vec![],
+        partition_keys: vec![],
+        num_buckets: 0,
+        has_primary_key: false,
+        is_partitioned: false,
+        properties: vec![],
+        comment: String::new(),
+        schema: ffi::FfiSchema {
+            columns: vec![],
+            primary_keys: vec![],
+        },
+    }
+}
+
+pub struct OwnedRowData {
+    strings: Vec<String>,
+}
+
+impl OwnedRowData {
+    pub fn new() -> Self {
+        Self { strings: Vec::new() }
+    }
+
+    pub fn collect_strings(&mut self, row: &ffi::FfiGenericRow) {
+        for field in &row.fields {
+            if field.datum_type == DATUM_TYPE_STRING {
+                self.strings.push(field.string_val.to_string());
+            }
+        }
+    }
+
+    pub fn get_strings(&self) -> &[String] {
+        &self.strings
+    }
+}
+
+pub fn ffi_row_to_core<'a>(
+    row: &ffi::FfiGenericRow,
+    owner: &'a OwnedRowData,
+) -> fcore::row::GenericRow<'a> {
+    use fcore::row::{Blob, Datum, F32, F64};
+
+    let mut generic_row = fcore::row::GenericRow::new();
+    let mut string_idx = 0;
+
+    for (idx, field) in row.fields.iter().enumerate() {
+        let datum = match field.datum_type {
+            DATUM_TYPE_NULL => Datum::Null,
+            DATUM_TYPE_BOOL => Datum::Bool(field.bool_val),
+            DATUM_TYPE_INT32 => Datum::Int32(field.i32_val),
+            DATUM_TYPE_INT64 => Datum::Int64(field.i64_val),
+            DATUM_TYPE_FLOAT32 => Datum::Float32(F32::from(field.f32_val)),

Review Comment:
   Can the method be 
   ```
   pub fn ffi_row_to_core(
       row: &ffi::FfiGenericRow
   ) -> fcore::row::GenericRow {
       use fcore::row::Datum;
   
       let mut generic_row = fcore::row::GenericRow::new();
   
       for (idx, field) in row.fields.iter().enumerate() {
           let datum = match field.datum_type {
               DATUM_TYPE_NULL => Datum::Null,
               DATUM_TYPE_BOOL => Datum::Bool(field.bool_val),
               DATUM_TYPE_INT32 => Datum::Int32(field.i32_val),
               DATUM_TYPE_INT64 => Datum::Int64(field.i64_val),
               DATUM_TYPE_FLOAT32 => Datum::Float32(field.f32_val.into()),
               DATUM_TYPE_FLOAT64 => Datum::Float64(field.f64_val.into()),
               DATUM_TYPE_STRING => Datum::String(field.string_val.as_str()),
               // todo: avoid copy bytes for blob
               DATUM_TYPE_BYTES => Datum::Blob(field.bytes_val.clone().into()),
               _ => Datum::Null,
           };
           generic_row.set_field(idx, datum);
       }
   
       generic_row
   }
   ```
   ?



##########
bindings/cpp/src/types.rs:
##########
@@ -0,0 +1,498 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::ffi;
+use anyhow::{anyhow, Result};
+use arrow::array::{
+    Date32Array, LargeBinaryArray, LargeStringArray, Time32MillisecondArray, 
Time32SecondArray,
+    Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
+    TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
+};
+use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
+use fluss as fcore;
+use fcore::row::InternalRow;
+
+pub const DATA_TYPE_BOOLEAN: i32 = 1;
+pub const DATA_TYPE_TINYINT: i32 = 2;
+pub const DATA_TYPE_SMALLINT: i32 = 3;
+pub const DATA_TYPE_INT: i32 = 4;
+pub const DATA_TYPE_BIGINT: i32 = 5;
+pub const DATA_TYPE_FLOAT: i32 = 6;
+pub const DATA_TYPE_DOUBLE: i32 = 7;
+pub const DATA_TYPE_STRING: i32 = 8;
+pub const DATA_TYPE_BYTES: i32 = 9;
+pub const DATA_TYPE_DATE: i32 = 10;
+pub const DATA_TYPE_TIME: i32 = 11;
+pub const DATA_TYPE_TIMESTAMP: i32 = 12;
+pub const DATA_TYPE_TIMESTAMP_LTZ: i32 = 13;
+
+pub const DATUM_TYPE_NULL: i32 = 0;
+pub const DATUM_TYPE_BOOL: i32 = 1;
+pub const DATUM_TYPE_INT32: i32 = 2;
+pub const DATUM_TYPE_INT64: i32 = 3;
+pub const DATUM_TYPE_FLOAT32: i32 = 4;
+pub const DATUM_TYPE_FLOAT64: i32 = 5;
+pub const DATUM_TYPE_STRING: i32 = 6;
+pub const DATUM_TYPE_BYTES: i32 = 7;
+
+fn ffi_data_type_to_core(dt: i32) -> Result<fcore::metadata::DataType> {
+    match dt {
+        DATA_TYPE_BOOLEAN => Ok(fcore::metadata::DataTypes::boolean()),
+        DATA_TYPE_TINYINT => Ok(fcore::metadata::DataTypes::tinyint()),
+        DATA_TYPE_SMALLINT => Ok(fcore::metadata::DataTypes::smallint()),
+        DATA_TYPE_INT => Ok(fcore::metadata::DataTypes::int()),
+        DATA_TYPE_BIGINT => Ok(fcore::metadata::DataTypes::bigint()),
+        DATA_TYPE_FLOAT => Ok(fcore::metadata::DataTypes::float()),
+        DATA_TYPE_DOUBLE => Ok(fcore::metadata::DataTypes::double()),
+        DATA_TYPE_STRING => Ok(fcore::metadata::DataTypes::string()),
+        DATA_TYPE_BYTES => Ok(fcore::metadata::DataTypes::bytes()),
+        DATA_TYPE_DATE => Ok(fcore::metadata::DataTypes::date()),
+        DATA_TYPE_TIME => Ok(fcore::metadata::DataTypes::time()),
+        DATA_TYPE_TIMESTAMP => Ok(fcore::metadata::DataTypes::timestamp()),
+        DATA_TYPE_TIMESTAMP_LTZ => 
Ok(fcore::metadata::DataTypes::timestamp_ltz()),
+        _ => Err(anyhow!("Unknown data type: {}", dt)),
+    }
+}
+
+fn core_data_type_to_ffi(dt: &fcore::metadata::DataType) -> i32 {
+    match dt {
+        fcore::metadata::DataType::Boolean(_) => DATA_TYPE_BOOLEAN,
+        fcore::metadata::DataType::TinyInt(_) => DATA_TYPE_TINYINT,
+        fcore::metadata::DataType::SmallInt(_) => DATA_TYPE_SMALLINT,
+        fcore::metadata::DataType::Int(_) => DATA_TYPE_INT,
+        fcore::metadata::DataType::BigInt(_) => DATA_TYPE_BIGINT,
+        fcore::metadata::DataType::Float(_) => DATA_TYPE_FLOAT,
+        fcore::metadata::DataType::Double(_) => DATA_TYPE_DOUBLE,
+        fcore::metadata::DataType::String(_) => DATA_TYPE_STRING,
+        fcore::metadata::DataType::Bytes(_) => DATA_TYPE_BYTES,
+        fcore::metadata::DataType::Date(_) => DATA_TYPE_DATE,
+        fcore::metadata::DataType::Time(_) => DATA_TYPE_TIME,
+        fcore::metadata::DataType::Timestamp(_) => DATA_TYPE_TIMESTAMP,
+        fcore::metadata::DataType::TimestampLTz(_) => DATA_TYPE_TIMESTAMP_LTZ,
+        _ => 0,
+    }
+}
+
+pub fn ffi_descriptor_to_core(
+    descriptor: &ffi::FfiTableDescriptor,
+) -> Result<fcore::metadata::TableDescriptor> {
+    let mut schema_builder = fcore::metadata::Schema::builder();
+
+    for col in &descriptor.schema.columns {
+        let dt = ffi_data_type_to_core(col.data_type)?;
+        schema_builder = schema_builder.column(&col.name, dt);
+        if !col.comment.is_empty() {
+            schema_builder = schema_builder.with_comment(&col.comment);
+        }
+    }
+
+    if !descriptor.schema.primary_keys.is_empty() {
+        schema_builder = 
schema_builder.primary_key(descriptor.schema.primary_keys.clone());
+    }
+
+    let schema = schema_builder.build()?;
+
+    let mut builder = fcore::metadata::TableDescriptor::builder()
+        .schema(schema)
+        .partitioned_by(descriptor.partition_keys.clone());
+
+    if descriptor.bucket_count > 0 {
+        builder = builder.distributed_by(Some(descriptor.bucket_count), 
descriptor.bucket_keys.clone());
+    } else {
+        builder = builder.distributed_by(None, descriptor.bucket_keys.clone());
+    }
+
+    for prop in &descriptor.properties {
+        builder = builder.property(&prop.key, &prop.value);
+    }
+
+    if !descriptor.comment.is_empty() {
+        builder = builder.comment(&descriptor.comment);
+    }
+
+    Ok(builder.build()?)
+}
+
+pub fn core_table_info_to_ffi(info: &fcore::metadata::TableInfo) -> 
ffi::FfiTableInfo {
+    let schema = info.get_schema();
+    let columns: Vec<ffi::FfiColumn> = schema
+        .columns()
+        .iter()
+        .map(|col| ffi::FfiColumn {
+            name: col.name().to_string(),
+            data_type: core_data_type_to_ffi(col.data_type()),
+            comment: col.comment().unwrap_or("").to_string(),
+        })
+        .collect();
+
+    let primary_keys: Vec<String> = schema
+        .primary_key()
+        .map(|pk| pk.column_names().to_vec())
+        .unwrap_or_default();
+
+    let properties: Vec<ffi::HashMapValue> = info
+        .get_properties()
+        .iter()
+        .map(|(k, v)| ffi::HashMapValue {
+            key: k.clone(),
+            value: v.clone(),
+        })
+        .collect();
+
+    ffi::FfiTableInfo {
+        table_id: info.get_table_id(),
+        schema_id: info.get_schema_id(),
+        table_path: ffi::FfiTablePath {
+            database_name: info.get_table_path().database().to_string(),
+            table_name: info.get_table_path().table().to_string(),
+        },
+        created_time: info.get_created_time(),
+        modified_time: info.get_modified_time(),
+        primary_keys: info.get_primary_keys().clone(),
+        bucket_keys: info.get_bucket_keys().to_vec(),
+        partition_keys: info.get_partition_keys().to_vec(),
+        num_buckets: info.get_num_buckets(),
+        has_primary_key: info.has_primary_key(),
+        is_partitioned: info.is_partitioned(),
+        properties,
+        comment: info.get_comment().unwrap_or("").to_string(),
+        schema: ffi::FfiSchema {
+            columns,
+            primary_keys,
+        },
+    }
+}
+
+pub fn empty_table_info() -> ffi::FfiTableInfo {
+    ffi::FfiTableInfo {
+        table_id: 0,
+        schema_id: 0,
+        table_path: ffi::FfiTablePath {
+            database_name: String::new(),
+            table_name: String::new(),
+        },
+        created_time: 0,
+        modified_time: 0,
+        primary_keys: vec![],
+        bucket_keys: vec![],
+        partition_keys: vec![],
+        num_buckets: 0,
+        has_primary_key: false,
+        is_partitioned: false,
+        properties: vec![],
+        comment: String::new(),
+        schema: ffi::FfiSchema {
+            columns: vec![],
+            primary_keys: vec![],
+        },
+    }
+}
+
+pub struct OwnedRowData {
+    strings: Vec<String>,
+}
+
+impl OwnedRowData {
+    pub fn new() -> Self {
+        Self { strings: Vec::new() }
+    }
+
+    pub fn collect_strings(&mut self, row: &ffi::FfiGenericRow) {
+        for field in &row.fields {
+            if field.datum_type == DATUM_TYPE_STRING {
+                self.strings.push(field.string_val.to_string());
+            }
+        }
+    }
+
+    pub fn get_strings(&self) -> &[String] {
+        &self.strings
+    }
+}
+
+pub fn ffi_row_to_core<'a>(
+    row: &ffi::FfiGenericRow,
+    owner: &'a OwnedRowData,
+) -> fcore::row::GenericRow<'a> {
+    use fcore::row::{Blob, Datum, F32, F64};
+
+    let mut generic_row = fcore::row::GenericRow::new();
+    let mut string_idx = 0;
+
+    for (idx, field) in row.fields.iter().enumerate() {
+        let datum = match field.datum_type {
+            DATUM_TYPE_NULL => Datum::Null,
+            DATUM_TYPE_BOOL => Datum::Bool(field.bool_val),
+            DATUM_TYPE_INT32 => Datum::Int32(field.i32_val),
+            DATUM_TYPE_INT64 => Datum::Int64(field.i64_val),
+            DATUM_TYPE_FLOAT32 => Datum::Float32(F32::from(field.f32_val)),
+            DATUM_TYPE_FLOAT64 => Datum::Float64(F64::from(field.f64_val)),
+            DATUM_TYPE_STRING => {
+                let str_ref = owner.get_strings()[string_idx].as_str();
+                string_idx += 1;
+                Datum::String(str_ref)
+            }
+            DATUM_TYPE_BYTES => 
Datum::Blob(Blob::from(field.bytes_val.clone())),
+            _ => Datum::Null,
+        };
+        generic_row.set_field(idx, datum);
+    }
+
+    generic_row
+}
+
+pub fn core_scan_records_to_ffi(records: &fcore::record::ScanRecords) -> 
ffi::FfiScanRecords {
+    let mut ffi_records = Vec::new();
+    
+    // Iterate over all buckets and their records
+    for bucket_records in records.records_by_buckets().values() {
+        for record in bucket_records {
+            let row = record.row();
+            let fields = core_row_to_ffi_fields(row);
+
+            ffi_records.push(ffi::FfiScanRecord {
+                offset: record.offset(),
+                timestamp: record.timestamp(),
+                row: ffi::FfiGenericRow { fields },
+            });
+        }
+    }
+
+    ffi::FfiScanRecords { records: ffi_records }
+}
+
+fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) -> Vec<ffi::FfiDatum> 
{
+    fn new_datum(datum_type: i32) -> ffi::FfiDatum {
+        ffi::FfiDatum {
+            datum_type,
+            bool_val: false,
+            i32_val: 0,
+            i64_val: 0,
+            f32_val: 0.0,
+            f64_val: 0.0,
+            string_val: String::new(),
+            bytes_val: vec![],
+        }
+    }
+
+    let record_batch = row.get_record_batch();
+    let schema = record_batch.schema();
+    let row_id = row.get_row_id();
+
+    let mut fields = Vec::with_capacity(schema.fields().len());
+
+    for (i, field) in schema.fields().iter().enumerate() {
+        if row.is_null_at(i) {
+            fields.push(new_datum(DATUM_TYPE_NULL));
+            continue;
+        }
+
+        let datum = match field.data_type() {
+            ArrowDataType::Boolean => {
+                let mut datum = new_datum(DATUM_TYPE_BOOL);
+                datum.bool_val = row.get_boolean(i);
+                datum
+            }
+            ArrowDataType::Int8 => {
+                let mut datum = new_datum(DATUM_TYPE_INT32);
+                datum.i32_val = row.get_byte(i) as i32;
+                datum
+            }
+            ArrowDataType::Int16 => {
+                let mut datum = new_datum(DATUM_TYPE_INT32);
+                datum.i32_val = row.get_short(i) as i32;
+                datum
+            }
+            ArrowDataType::Int32 => {
+                let mut datum = new_datum(DATUM_TYPE_INT32);
+                datum.i32_val = row.get_int(i);
+                datum
+            }
+            ArrowDataType::Int64 => {
+                let mut datum = new_datum(DATUM_TYPE_INT64);
+                datum.i64_val = row.get_long(i);
+                datum
+            }
+            ArrowDataType::Float32 => {
+                let mut datum = new_datum(DATUM_TYPE_FLOAT32);
+                datum.f32_val = row.get_float(i);
+                datum
+            }
+            ArrowDataType::Float64 => {
+                let mut datum = new_datum(DATUM_TYPE_FLOAT64);
+                datum.f64_val = row.get_double(i);
+                datum
+            }
+            ArrowDataType::Utf8 => {
+                let mut datum = new_datum(DATUM_TYPE_STRING);
+                datum.string_val = row.get_string(i).to_string();
+                datum
+            }
+            ArrowDataType::LargeUtf8 => {
+                let array = record_batch
+                    .column(i)
+                    .as_any()
+                    .downcast_ref::<LargeStringArray>()
+                    .expect("LargeUtf8 column expected");
+                let mut datum = new_datum(DATUM_TYPE_STRING);
+                datum.string_val = array.value(row_id).to_string();
+                datum
+            }
+            ArrowDataType::Binary => {
+                let mut datum = new_datum(DATUM_TYPE_BYTES);
+                datum.bytes_val = row.get_bytes(i);
+                datum
+            }
+            ArrowDataType::FixedSizeBinary(len) => {
+                let mut datum = new_datum(DATUM_TYPE_BYTES);
+                datum.bytes_val = row.get_binary(i, *len as usize);
+                datum
+            }
+            ArrowDataType::LargeBinary => {
+                let array = record_batch
+                    .column(i)
+                    .as_any()
+                    .downcast_ref::<LargeBinaryArray>()
+                    .expect("LargeBinary column expected");
+                let mut datum = new_datum(DATUM_TYPE_BYTES);
+                datum.bytes_val = array.value(row_id).to_vec();
+                datum
+            }
+            ArrowDataType::Date32 => {
+                let array = record_batch
+                    .column(i)
+                    .as_any()
+                    .downcast_ref::<Date32Array>()
+                    .expect("Date32 column expected");
+                let mut datum = new_datum(DATUM_TYPE_INT32);
+                datum.i32_val = array.value(row_id);
+                datum
+            }
+            ArrowDataType::Timestamp(unit, _) => match unit {
+                TimeUnit::Second => {
+                    let array = record_batch
+                        .column(i)
+                        .as_any()
+                        .downcast_ref::<TimestampSecondArray>()
+                        .expect("Timestamp(second) column expected");
+                    let mut datum = new_datum(DATUM_TYPE_INT64);
+                    datum.i64_val = array.value(row_id);
+                    datum
+                }
+                TimeUnit::Millisecond => {
+                    let array = record_batch
+                        .column(i)
+                        .as_any()
+                        .downcast_ref::<TimestampMillisecondArray>()
+                        .expect("Timestamp(millisecond) column expected");
+                    let mut datum = new_datum(DATUM_TYPE_INT64);
+                    datum.i64_val = array.value(row_id);
+                    datum
+                }
+                TimeUnit::Microsecond => {
+                    let array = record_batch
+                        .column(i)
+                        .as_any()
+                        .downcast_ref::<TimestampMicrosecondArray>()
+                        .expect("Timestamp(microsecond) column expected");
+                    let mut datum = new_datum(DATUM_TYPE_INT64);
+                    datum.i64_val = array.value(row_id);
+                    datum
+                }
+                TimeUnit::Nanosecond => {
+                    let array = record_batch
+                        .column(i)
+                        .as_any()
+                        .downcast_ref::<TimestampNanosecondArray>()
+                        .expect("Timestamp(nanosecond) column expected");
+                    let mut datum = new_datum(DATUM_TYPE_INT64);
+                    datum.i64_val = array.value(row_id);
+                    datum
+                }
+            },
+            ArrowDataType::Time32(unit) => match unit {
+                TimeUnit::Second => {
+                    let array = record_batch
+                        .column(i)
+                        .as_any()
+                        .downcast_ref::<Time32SecondArray>()
+                        .expect("Time32(second) column expected");
+                    let mut datum = new_datum(DATUM_TYPE_INT32);
+                    datum.i32_val = array.value(row_id);
+                    datum
+                }
+                TimeUnit::Millisecond => {
+                    let array = record_batch
+                        .column(i)
+                        .as_any()
+                        .downcast_ref::<Time32MillisecondArray>()
+                        .expect("Time32(millisecond) column expected");
+                    let mut datum = new_datum(DATUM_TYPE_INT32);
+                    datum.i32_val = array.value(row_id);
+                    datum
+                }
+                _ => panic!("Unsupported Time32 unit for column {}", i),
+            },
+            ArrowDataType::Time64(unit) => match unit {
+                TimeUnit::Microsecond => {
+                    let array = record_batch
+                        .column(i)
+                        .as_any()
+                        .downcast_ref::<Time64MicrosecondArray>()
+                        .expect("Time64(microsecond) column expected");
+                    let mut datum = new_datum(DATUM_TYPE_INT64);
+                    datum.i64_val = array.value(row_id);
+                    datum
+                }
+                TimeUnit::Nanosecond => {
+                    let array = record_batch
+                        .column(i)
+                        .as_any()
+                        .downcast_ref::<Time64NanosecondArray>()
+                        .expect("Time64(nanosecond) column expected");
+                    let mut datum = new_datum(DATUM_TYPE_INT64);
+                    datum.i64_val = array.value(row_id);
+                    datum
+                }
+                _ => panic!("Unsupported Time64 unit for column {}", i),

Review Comment:
   dito



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to