This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 8ccd283  feat: support temporal types in CPP (#266)
8ccd283 is described below

commit 8ccd28384f5211211d20b2b302a462c104cc7b1e
Author: Anton Borisov <[email protected]>
AuthorDate: Sun Feb 8 01:06:03 2026 +0000

    feat: support temporal types in CPP (#266)
---
 bindings/cpp/examples/example.cpp | 229 +++++++++++++++++++++++---------------
 bindings/cpp/include/fluss.hpp    | 114 +++++++++++++++++++
 bindings/cpp/src/lib.rs           |  38 ++++++-
 bindings/cpp/src/table.cpp        |  48 ++++++++
 bindings/cpp/src/types.rs         | 136 +++++++++++++---------
 5 files changed, 422 insertions(+), 143 deletions(-)

diff --git a/bindings/cpp/examples/example.cpp 
b/bindings/cpp/examples/example.cpp
index 10266c7..92ebe9c 100644
--- a/bindings/cpp/examples/example.cpp
+++ b/bindings/cpp/examples/example.cpp
@@ -42,7 +42,7 @@ int main() {
     check("get_admin", conn.GetAdmin(admin));
 
     fluss::TablePath table_path("fluss", "sample_table_cpp_v1");
-    
+
     // 2.1) Drop table if exists
     std::cout << "Dropping table if exists..." << std::endl;
     auto drop_result = admin.DropTable(table_path, true);
@@ -52,12 +52,16 @@ int main() {
         std::cout << "Table drop result: " << drop_result.error_message << 
std::endl;
     }
 
-    // 3) Schema & descriptor
+    // 3) Schema with scalar and temporal columns
     auto schema = fluss::Schema::NewBuilder()
                         .AddColumn("id", fluss::DataType::Int)
                         .AddColumn("name", fluss::DataType::String)
                         .AddColumn("score", fluss::DataType::Float)
                         .AddColumn("age", fluss::DataType::Int)
+                        .AddColumn("event_date", fluss::DataType::Date)
+                        .AddColumn("event_time", fluss::DataType::Time)
+                        .AddColumn("created_at", fluss::DataType::Timestamp)
+                        .AddColumn("updated_at", fluss::DataType::TimestampLtz)
                         .Build();
 
     auto descriptor = fluss::TableDescriptor::NewBuilder()
@@ -66,7 +70,6 @@ int main() {
                           .SetComment("cpp example table with 3 buckets")
                           .Build();
 
-    // 3.1) Create table with 3 buckets
     std::cout << "Creating table with 3 buckets..." << std::endl;
     check("create_table", admin.CreateTable(table_path, descriptor, false));
 
@@ -74,7 +77,7 @@ int main() {
     fluss::Table table;
     check("get_table", conn.GetTable(table_path, table));
 
-    // 5) Writer
+    // 5) Write rows with scalar and temporal values
     fluss::AppendWriter writer;
     check("new_append_writer", table.NewAppendWriter(writer));
 
@@ -83,12 +86,26 @@ int main() {
         const char* name;
         float score;
         int age;
+        fluss::Date date;
+        fluss::Time time;
+        fluss::Timestamp ts_ntz;
+        fluss::Timestamp ts_ltz;
     };
 
+    auto tp_now = std::chrono::system_clock::now();
     std::vector<RowData> rows = {
-        {1, "Alice", 95.2f, 25},
-        {2, "Bob", 87.2f, 30},
-        {3, "Charlie", 92.1f, 35},
+        {1, "Alice", 95.2f, 25,
+         fluss::Date::FromYMD(2024, 6, 15), fluss::Time::FromHMS(14, 30, 45),
+         fluss::Timestamp::FromTimePoint(tp_now),
+         fluss::Timestamp::FromMillis(1718467200000)},
+        {2, "Bob", 87.2f, 30,
+         fluss::Date::FromYMD(2025, 1, 1), fluss::Time::FromHMS(0, 0, 0),
+         fluss::Timestamp::FromMillis(1735689600000),
+         fluss::Timestamp::FromMillisNanos(1735689600000, 500000)},
+        {3, "Charlie", 92.1f, 35,
+         fluss::Date::FromYMD(1999, 12, 31), fluss::Time::FromHMS(23, 59, 59),
+         fluss::Timestamp::FromMillis(946684799999),
+         fluss::Timestamp::FromMillis(946684799999)},
     };
 
     // Fire-and-forget: queue rows, flush at end
@@ -98,6 +115,10 @@ int main() {
         row.SetString(1, r.name);
         row.SetFloat32(2, r.score);
         row.SetInt32(3, r.age);
+        row.SetDate(4, r.date);
+        row.SetTime(5, r.time);
+        row.SetTimestampNtz(6, r.ts_ntz);
+        row.SetTimestampLtz(7, r.ts_ltz);
         check("append", writer.Append(row));
     }
     check("flush", writer.Flush());
@@ -116,7 +137,7 @@ int main() {
         std::cout << "Row acknowledged by server" << std::endl;
     }
 
-    // 6) Scan
+    // 6) Full scan — verify all column types including temporal
     fluss::LogScanner scanner;
     check("new_log_scanner", table.NewScan().CreateLogScanner(scanner));
 
@@ -129,188 +150,216 @@ int main() {
     fluss::ScanRecords records;
     check("poll", scanner.Poll(5000, records));
 
-    std::cout << "Scanned records: " << records.records.size() << std::endl;
+    std::cout << "Scanned records: " << records.Size() << std::endl;
+    bool scan_ok = true;
     for (const auto& rec : records.records) {
-        std::cout << " offset=" << rec.offset << " id=" << 
rec.row.fields[0].i32_val
-                  << " name=" << rec.row.fields[1].string_val
-                  << " score=" << rec.row.fields[2].f32_val << " age=" << 
rec.row.fields[3].i32_val
-                  << " ts=" << rec.timestamp << std::endl;
+        const auto& f = rec.row.fields;
+
+        if (f[4].type != fluss::DatumType::Date) {
+            std::cerr << "ERROR: field 4 expected Date, got "
+                      << static_cast<int>(f[4].type) << std::endl;
+            scan_ok = false;
+        }
+        if (f[5].type != fluss::DatumType::Time) {
+            std::cerr << "ERROR: field 5 expected Time, got "
+                      << static_cast<int>(f[5].type) << std::endl;
+            scan_ok = false;
+        }
+        if (f[6].type != fluss::DatumType::TimestampNtz) {
+            std::cerr << "ERROR: field 6 expected TimestampNtz, got "
+                      << static_cast<int>(f[6].type) << std::endl;
+            scan_ok = false;
+        }
+        if (f[7].type != fluss::DatumType::TimestampLtz) {
+            std::cerr << "ERROR: field 7 expected TimestampLtz, got "
+                      << static_cast<int>(f[7].type) << std::endl;
+            scan_ok = false;
+        }
+
+        auto date = f[4].GetDate();
+        auto time = f[5].GetTime();
+        auto ts_ntz = f[6].GetTimestamp();
+        auto ts_ltz = f[7].GetTimestamp();
+
+        std::cout << "  id=" << f[0].i32_val
+                  << " name=" << f[1].string_val
+                  << " score=" << f[2].f32_val
+                  << " age=" << f[3].i32_val
+                  << " date=" << date.Year() << "-" << date.Month() << "-" << 
date.Day()
+                  << " time=" << time.Hour() << ":" << time.Minute() << ":" << 
time.Second()
+                  << " ts_ntz=" << ts_ntz.epoch_millis
+                  << " ts_ltz=" << ts_ltz.epoch_millis
+                  << "+" << ts_ltz.nano_of_millisecond << "ns"
+                  << std::endl;
+    }
+
+    if (!scan_ok) {
+        std::cerr << "Full scan type verification FAILED!" << std::endl;
+        std::exit(1);
     }
-    
-    // 7) Project only id (0) and name (1) columns
-    std::vector<size_t> projected_columns = {0, 1};
+
+    // 7) Projected scan — project [id, updated_at(TimestampLtz)] to verify
+    //    NTZ/LTZ disambiguation works with column index remapping
+    std::vector<size_t> projected_columns = {0, 7};
     fluss::LogScanner projected_scanner;
     check("new_log_scanner_with_projection",
           
table.NewScan().Project(projected_columns).CreateLogScanner(projected_scanner));
-    
+
     for (int b = 0; b < buckets; ++b) {
         check("subscribe_projected", projected_scanner.Subscribe(b, 0));
     }
-    
+
     fluss::ScanRecords projected_records;
     check("poll_projected", projected_scanner.Poll(5000, projected_records));
-    
-    std::cout << "Projected records: " << projected_records.records.size() << 
std::endl;
-    
-    bool projection_verified = true;
-    for (size_t i = 0; i < projected_records.records.size(); ++i) {
-        const auto& rec = projected_records.records[i];
-        const auto& row = rec.row;
-        
-        if (row.fields.size() != projected_columns.size()) {
-            std::cerr << "ERROR: Record " << i << " has " << row.fields.size() 
-                      << " fields, expected " << projected_columns.size() << 
std::endl;
-            projection_verified = false;
+
+    std::cout << "Projected records: " << projected_records.Size() << 
std::endl;
+    for (const auto& rec : projected_records.records) {
+        const auto& f = rec.row.fields;
+
+        if (f.size() != 2) {
+            std::cerr << "ERROR: expected 2 fields, got " << f.size() << 
std::endl;
+            scan_ok = false;
             continue;
         }
-        
-        // Verify field types match expected columns
-        // Column 0 (id) should be Int32, Column 1 (name) should be String
-        if (row.fields[0].type != fluss::DatumType::Int32) {
-            std::cerr << "ERROR: Record " << i << " field 0 type mismatch, 
expected Int32" << std::endl;
-            projection_verified = false;
+        if (f[0].type != fluss::DatumType::Int32) {
+            std::cerr << "ERROR: projected field 0 expected Int32, got "
+                      << static_cast<int>(f[0].type) << std::endl;
+            scan_ok = false;
         }
-        if (row.fields[1].type != fluss::DatumType::String) {
-            std::cerr << "ERROR: Record " << i << " field 1 type mismatch, 
expected String" << std::endl;
-            projection_verified = false;
-        }
-        
-        // Print projected data
-        if (row.fields[0].type == fluss::DatumType::Int32 && 
-            row.fields[1].type == fluss::DatumType::String) {
-            std::cout << "  Record " << i << ": id=" << row.fields[0].i32_val 
-                      << ", name=" << row.fields[1].string_val << std::endl;
+        if (f[1].type != fluss::DatumType::TimestampLtz) {
+            std::cerr << "ERROR: projected field 1 expected TimestampLtz, got "
+                      << static_cast<int>(f[1].type) << std::endl;
+            scan_ok = false;
         }
+
+        auto ts = f[1].GetTimestamp();
+        std::cout << "  id=" << f[0].i32_val
+                  << " updated_at=" << ts.epoch_millis
+                  << "+" << ts.nano_of_millisecond << "ns" << std::endl;
     }
-    
-    if (projection_verified) {
-        std::cout << "Column pruning verification passed!" << std::endl;
+
+    if (scan_ok) {
+        std::cout << "Scan verification passed!" << std::endl;
     } else {
-        std::cerr << "Column pruning verification failed!" << std::endl;
+        std::cerr << "Scan verification FAILED!" << std::endl;
         std::exit(1);
     }
 
     // 8) List offsets examples
     std::cout << "\n=== List Offsets Examples ===" << std::endl;
-    
-    // 8.1) Query earliest offsets for all buckets
+
     std::vector<int32_t> all_bucket_ids;
     all_bucket_ids.reserve(buckets);
     for (int b = 0; b < buckets; ++b) {
         all_bucket_ids.push_back(b);
     }
-    
+
     std::unordered_map<int32_t, int64_t> earliest_offsets;
-    check("list_earliest_offsets", 
-          admin.ListOffsets(table_path, all_bucket_ids, 
-                           fluss::OffsetQuery::Earliest(), 
+    check("list_earliest_offsets",
+          admin.ListOffsets(table_path, all_bucket_ids,
+                           fluss::OffsetQuery::Earliest(),
                            earliest_offsets));
     std::cout << "Earliest offsets:" << std::endl;
     for (const auto& [bucket_id, offset] : earliest_offsets) {
         std::cout << "  Bucket " << bucket_id << ": offset=" << offset << 
std::endl;
     }
-    
-    // 8.2) Query latest offsets for all buckets
+
     std::unordered_map<int32_t, int64_t> latest_offsets;
-    check("list_latest_offsets", 
-          admin.ListOffsets(table_path, all_bucket_ids, 
-                           fluss::OffsetQuery::Latest(), 
+    check("list_latest_offsets",
+          admin.ListOffsets(table_path, all_bucket_ids,
+                           fluss::OffsetQuery::Latest(),
                            latest_offsets));
     std::cout << "Latest offsets:" << std::endl;
     for (const auto& [bucket_id, offset] : latest_offsets) {
         std::cout << "  Bucket " << bucket_id << ": offset=" << offset << 
std::endl;
     }
-    
-    // 8.3) Query offsets for a specific timestamp (current time - 1 hour)
+
     auto now = std::chrono::system_clock::now();
     auto one_hour_ago = now - std::chrono::hours(1);
     auto timestamp_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
         one_hour_ago.time_since_epoch()).count();
-    
+
     std::unordered_map<int32_t, int64_t> timestamp_offsets;
-    check("list_timestamp_offsets", 
-          admin.ListOffsets(table_path, all_bucket_ids, 
-                           fluss::OffsetQuery::FromTimestamp(timestamp_ms), 
+    check("list_timestamp_offsets",
+          admin.ListOffsets(table_path, all_bucket_ids,
+                           fluss::OffsetQuery::FromTimestamp(timestamp_ms),
                            timestamp_offsets));
     std::cout << "Offsets for timestamp " << timestamp_ms << " (1 hour ago):" 
<< std::endl;
     for (const auto& [bucket_id, offset] : timestamp_offsets) {
         std::cout << "  Bucket " << bucket_id << ": offset=" << offset << 
std::endl;
     }
-    
-    // 8.4) Use batch subscribe with offsets from list_offsets
+
+    // 9) Batch subscribe
     std::cout << "\n=== Batch Subscribe Example ===" << std::endl;
     fluss::LogScanner batch_scanner;
     check("new_log_scanner_for_batch", 
table.NewScan().CreateLogScanner(batch_scanner));
-    
+
     std::vector<fluss::BucketSubscription> subscriptions;
     for (const auto& [bucket_id, offset] : earliest_offsets) {
         subscriptions.push_back({bucket_id, offset});
-        std::cout << "Preparing subscription: bucket=" << bucket_id 
+        std::cout << "Preparing subscription: bucket=" << bucket_id
                   << ", offset=" << offset << std::endl;
     }
-    
+
     check("subscribe_buckets", batch_scanner.Subscribe(subscriptions));
     std::cout << "Batch subscribed to " << subscriptions.size() << " buckets" 
<< std::endl;
-    
-    // 8.5) Poll and verify bucket_id in records
+
     fluss::ScanRecords batch_records;
     check("poll_batch", batch_scanner.Poll(5000, batch_records));
-    
+
     std::cout << "Scanned " << batch_records.Size() << " records from batch 
subscription" << std::endl;
     for (size_t i = 0; i < batch_records.Size() && i < 5; ++i) {
         const auto& rec = batch_records[i];
-        std::cout << "  Record " << i << ": bucket_id=" << rec.bucket_id 
-                  << ", offset=" << rec.offset 
+        std::cout << "  Record " << i << ": bucket_id=" << rec.bucket_id
+                  << ", offset=" << rec.offset
                   << ", timestamp=" << rec.timestamp << std::endl;
     }
     if (batch_records.Size() > 5) {
         std::cout << "  ... and " << (batch_records.Size() - 5) << " more 
records" << std::endl;
     }
 
-    // 9) Test the new Arrow record batch polling functionality
+    // 10) Arrow record batch polling
     std::cout << "\n=== Testing Arrow Record Batch Polling ===" << std::endl;
 
     fluss::LogScanner arrow_scanner;
     check("new_record_batch_log_scanner", 
table.NewScan().CreateRecordBatchScanner(arrow_scanner));
-    
-    // Subscribe to all buckets starting from offset 0
+
     for (int b = 0; b < buckets; ++b) {
         check("subscribe_arrow", arrow_scanner.Subscribe(b, 0));
     }
-    
+
     fluss::ArrowRecordBatches arrow_batches;
     check("poll_record_batch", arrow_scanner.PollRecordBatch(5000, 
arrow_batches));
-    
+
     std::cout << "Polled " << arrow_batches.Size() << " Arrow record batches" 
<< std::endl;
     for (size_t i = 0; i < arrow_batches.Size(); ++i) {
         const auto& batch = arrow_batches[i];
         if (batch->Available()) {
-            std::cout << "  Batch " << i << ": " << 
batch->GetArrowRecordBatch()->num_rows() << " rows. " << std::endl;
+            std::cout << "  Batch " << i << ": " << 
batch->GetArrowRecordBatch()->num_rows() << " rows" << std::endl;
         } else {
             std::cout << "  Batch " << i << ": not available" << std::endl;
         }
     }
-    
-    // 10) Test the new Arrow record batch polling with projection
+
+    // 11) Arrow record batch polling with projection
     std::cout << "\n=== Testing Arrow Record Batch Polling with Projection 
===" << std::endl;
 
     fluss::LogScanner projected_arrow_scanner;
     check("new_record_batch_log_scanner_with_projection",
           
table.NewScan().Project(projected_columns).CreateRecordBatchScanner(projected_arrow_scanner));
-    
-    // Subscribe to all buckets starting from offset 0
+
     for (int b = 0; b < buckets; ++b) {
         check("subscribe_projected_arrow", 
projected_arrow_scanner.Subscribe(b, 0));
     }
-    
+
     fluss::ArrowRecordBatches projected_arrow_batches;
     check("poll_projected_record_batch", 
projected_arrow_scanner.PollRecordBatch(5000, projected_arrow_batches));
-    
+
     std::cout << "Polled " << projected_arrow_batches.Size() << " projected 
Arrow record batches" << std::endl;
     for (size_t i = 0; i < projected_arrow_batches.Size(); ++i) {
         const auto& batch = projected_arrow_batches[i];
         if (batch->Available()) {
-            std::cout << "  Batch " << i << ": " << 
batch->GetArrowRecordBatch()->num_rows() << " rows " << std::endl;
+            std::cout << "  Batch " << i << ": " << 
batch->GetArrowRecordBatch()->num_rows() << " rows" << std::endl;
         } else {
             std::cout << "  Batch " << i << ": not available" << std::endl;
         }
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index 6c20717..239d9a4 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -19,6 +19,7 @@
 
 #pragma once
 
+#include <chrono>
 #include <memory>
 #include <optional>
 #include <string>
@@ -41,6 +42,64 @@ namespace ffi {
     struct LogScanner;
 }  // namespace ffi
 
+struct Date {
+    int32_t days_since_epoch{0};
+
+    static Date FromDays(int32_t days) { return {days}; }
+    static Date FromYMD(int year, int month, int day);
+
+    int Year() const;
+    int Month() const;
+    int Day() const;
+};
+
+struct Time {
+    static constexpr int32_t kMillisPerSecond = 1000;
+    static constexpr int32_t kMillisPerMinute = 60 * kMillisPerSecond;
+    static constexpr int32_t kMillisPerHour = 60 * kMillisPerMinute;
+
+    int32_t millis_since_midnight{0};
+
+    static Time FromMillis(int32_t ms) { return {ms}; }
+    static Time FromHMS(int hour, int minute, int second, int millis = 0) {
+        return {hour * kMillisPerHour + minute * kMillisPerMinute +
+                second * kMillisPerSecond + millis};
+    }
+
+    int Hour() const { return millis_since_midnight / kMillisPerHour; }
+    int Minute() const { return (millis_since_midnight % kMillisPerHour) / 
kMillisPerMinute; }
+    int Second() const { return (millis_since_midnight % kMillisPerMinute) / 
kMillisPerSecond; }
+    int Millis() const { return millis_since_midnight % kMillisPerSecond; }
+};
+
+struct Timestamp {
+    static constexpr int32_t kMaxNanoOfMillisecond = 999999;
+    static constexpr int64_t kNanosPerMilli = 1000000;
+
+    int64_t epoch_millis{0};
+    int32_t nano_of_millisecond{0};
+
+    static Timestamp FromMillis(int64_t ms) { return {ms, 0}; }
+    static Timestamp FromMillisNanos(int64_t ms, int32_t nanos) {
+        if (nanos < 0) nanos = 0;
+        if (nanos > kMaxNanoOfMillisecond) nanos = kMaxNanoOfMillisecond;
+        return {ms, nanos};
+    }
+    static Timestamp FromTimePoint(std::chrono::system_clock::time_point tp) {
+        auto duration = tp.time_since_epoch();
+        auto ns =
+            std::chrono::duration_cast<std::chrono::nanoseconds>(duration)
+                .count();
+        auto ms = ns / kNanosPerMilli;
+        auto nano_of_ms = static_cast<int32_t>(ns % kNanosPerMilli);
+        if (nano_of_ms < 0) {
+            nano_of_ms += kNanosPerMilli;
+            ms -= 1;
+        }
+        return {ms, nano_of_ms};
+    }
+};
+
 enum class DataType {
     Boolean = 1,
     TinyInt = 2,
@@ -66,6 +125,11 @@ enum class DatumType {
     Float64 = 5,
     String = 6,
     Bytes = 7,
+    // 8-10 reserved for decimal types
+    Date = 11,
+    Time = 12,
+    TimestampNtz = 13,
+    TimestampLtz = 14,
 };
 
 constexpr int64_t EARLIEST_OFFSET = -2;
@@ -270,6 +334,36 @@ struct Datum {
         d.bytes_val = std::move(v);
         return d;
     }
+    static Datum Date(fluss::Date d) {
+        Datum dat;
+        dat.type = DatumType::Date;
+        dat.i32_val = d.days_since_epoch;
+        return dat;
+    }
+    static Datum Time(fluss::Time t) {
+        Datum dat;
+        dat.type = DatumType::Time;
+        dat.i32_val = t.millis_since_midnight;
+        return dat;
+    }
+    static Datum TimestampNtz(fluss::Timestamp ts) {
+        Datum dat;
+        dat.type = DatumType::TimestampNtz;
+        dat.i64_val = ts.epoch_millis;
+        dat.i32_val = ts.nano_of_millisecond;
+        return dat;
+    }
+    static Datum TimestampLtz(fluss::Timestamp ts) {
+        Datum dat;
+        dat.type = DatumType::TimestampLtz;
+        dat.i64_val = ts.epoch_millis;
+        dat.i32_val = ts.nano_of_millisecond;
+        return dat;
+    }
+
+    fluss::Date GetDate() const { return {i32_val}; }
+    fluss::Time GetTime() const { return {i32_val}; }
+    fluss::Timestamp GetTimestamp() const { return {i64_val, i32_val}; }
 };
 
 struct GenericRow {
@@ -315,6 +409,26 @@ struct GenericRow {
         fields[idx] = Datum::Bytes(std::move(v));
     }
 
+    void SetDate(size_t idx, fluss::Date d) {
+        EnsureSize(idx);
+        fields[idx] = Datum::Date(d);
+    }
+
+    void SetTime(size_t idx, fluss::Time t) {
+        EnsureSize(idx);
+        fields[idx] = Datum::Time(t);
+    }
+
+    void SetTimestampNtz(size_t idx, fluss::Timestamp ts) {
+        EnsureSize(idx);
+        fields[idx] = Datum::TimestampNtz(ts);
+    }
+
+    void SetTimestampLtz(size_t idx, fluss::Timestamp ts) {
+        EnsureSize(idx);
+        fields[idx] = Datum::TimestampLtz(ts);
+    }
+
 private:
     void EnsureSize(size_t idx) {
         if (fields.size() <= idx) {
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 4957c99..5a26613 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -310,6 +310,9 @@ pub struct WriteResult {
 pub struct LogScanner {
     inner: Option<fcore::client::LogScanner>,
     inner_batch: Option<fcore::client::RecordBatchLogScanner>,
+    /// Fluss columns matching the projected Arrow fields (1:1 by index).
+    /// For non-projected scanners this is the full table schema columns.
+    projected_columns: Vec<fcore::metadata::Column>,
 }
 
 fn ok_result() -> ffi::FfiResult {
@@ -653,6 +656,7 @@ impl Table {
             let scanner_ptr = Box::into_raw(Box::new(LogScanner {
                 inner: Some(scanner),
                 inner_batch: None,
+                projected_columns: 
self.table_info.get_schema().columns().to_vec(),
             }));
 
             Ok(scanner_ptr)
@@ -670,6 +674,19 @@ impl Table {
                 self.table_info.clone(),
             );
 
+            let all_columns = self.table_info.get_schema().columns();
+            let projected_columns: Vec<_> = column_indices
+                .iter()
+                .map(|&i| {
+                    all_columns.get(i).cloned().ok_or_else(|| {
+                        format!(
+                            "Invalid column index {i}: schema has {} columns",
+                            all_columns.len()
+                        )
+                    })
+                })
+                .collect::<Result<_, String>>()?;
+
             let log_scanner = fluss_table
                 .new_scan()
                 .project(&column_indices)
@@ -680,6 +697,7 @@ impl Table {
             let scanner = Box::into_raw(Box::new(LogScanner {
                 inner: Some(log_scanner),
                 inner_batch: None,
+                projected_columns,
             }));
             Ok(scanner)
         })
@@ -701,6 +719,7 @@ impl Table {
             let scanner = Box::into_raw(Box::new(LogScanner {
                 inner: None,
                 inner_batch: Some(batch_scanner),
+                projected_columns: 
self.table_info.get_schema().columns().to_vec(),
             }));
             Ok(scanner)
         })
@@ -717,6 +736,19 @@ impl Table {
                 self.table_info.clone(),
             );
 
+            let all_columns = self.table_info.get_schema().columns();
+            let projected_columns: Vec<_> = column_indices
+                .iter()
+                .map(|&i| {
+                    all_columns.get(i).cloned().ok_or_else(|| {
+                        format!(
+                            "Invalid column index {i}: schema has {} columns",
+                            all_columns.len()
+                        )
+                    })
+                })
+                .collect::<Result<_, String>>()?;
+
             let batch_scanner = fluss_table
                 .new_scan()
                 .project(&column_indices)
@@ -727,6 +759,7 @@ impl Table {
             let scanner = Box::into_raw(Box::new(LogScanner {
                 inner: None,
                 inner_batch: Some(batch_scanner),
+                projected_columns,
             }));
             Ok(scanner)
         })
@@ -925,7 +958,10 @@ impl LogScanner {
             match result {
                 Ok(records) => ffi::FfiScanRecordsResult {
                     result: ok_result(),
-                    scan_records: types::core_scan_records_to_ffi(&records),
+                    scan_records: types::core_scan_records_to_ffi(
+                        &records,
+                        &self.projected_columns,
+                    ),
                 },
                 Err(e) => ffi::FfiScanRecordsResult {
                     result: err_result(1, e.to_string()),
diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp
index 24be8d4..04d1846 100644
--- a/bindings/cpp/src/table.cpp
+++ b/bindings/cpp/src/table.cpp
@@ -22,12 +22,60 @@
 #include "ffi_converter.hpp"
 #include "rust/cxx.h"
 #include <arrow/c/bridge.h>
+#include <ctime>
 // todo:  bindings/cpp/BUILD.bazel still doesn’t declare Arrow include/link 
dependencies.
 // In environments where Bazel does not already have Arrow available, this 
will fail at compile/link time.
 #include <arrow/record_batch.h>
 
 namespace fluss {
 
+static constexpr int kSecondsPerDay = 24 * 60 * 60;
+
+static std::time_t timegm_utc(std::tm* tm) {
+#if defined(_WIN32)
+    return _mkgmtime(tm);
+#else
+    return ::timegm(tm);
+#endif
+}
+
+static std::tm gmtime_utc(std::time_t epoch_seconds) {
+    std::tm tm{};
+#if defined(_WIN32)
+    gmtime_s(&tm, &epoch_seconds);
+#else
+    ::gmtime_r(&epoch_seconds, &tm);
+#endif
+    return tm;
+}
+
+Date Date::FromYMD(int year, int month, int day) {
+    std::tm tm{};
+    tm.tm_year = year - 1900;
+    tm.tm_mon = month - 1;
+    tm.tm_mday = day;
+    std::time_t epoch_seconds = timegm_utc(&tm);
+    return {static_cast<int32_t>(epoch_seconds / kSecondsPerDay)};
+}
+
+int Date::Year() const {
+    std::time_t epoch_seconds = static_cast<std::time_t>(days_since_epoch) * 
kSecondsPerDay;
+    std::tm tm = gmtime_utc(epoch_seconds);
+    return tm.tm_year + 1900;
+}
+
+int Date::Month() const {
+    std::time_t epoch_seconds = static_cast<std::time_t>(days_since_epoch) * 
kSecondsPerDay;
+    std::tm tm = gmtime_utc(epoch_seconds);
+    return tm.tm_mon + 1;
+}
+
+int Date::Day() const {
+    std::time_t epoch_seconds = static_cast<std::time_t>(days_since_epoch) * 
kSecondsPerDay;
+    std::tm tm = gmtime_utc(epoch_seconds);
+    return tm.tm_mday;
+}
+
 Table::Table() noexcept = default;
 
 Table::Table(ffi::Table* table) noexcept : table_(table) {}
diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs
index 91d6e26..f546b68 100644
--- a/bindings/cpp/src/types.rs
+++ b/bindings/cpp/src/types.rs
@@ -52,6 +52,15 @@ pub const DATUM_TYPE_FLOAT32: i32 = 4;
 pub const DATUM_TYPE_FLOAT64: i32 = 5;
 pub const DATUM_TYPE_STRING: i32 = 6;
 pub const DATUM_TYPE_BYTES: i32 = 7;
+pub const DATUM_TYPE_DATE: i32 = 11;
+pub const DATUM_TYPE_TIME: i32 = 12;
+pub const DATUM_TYPE_TIMESTAMP_NTZ: i32 = 13;
+pub const DATUM_TYPE_TIMESTAMP_LTZ: i32 = 14;
+
+const MILLIS_PER_SECOND: i64 = 1_000;
+const MICROS_PER_MILLI: i64 = 1_000;
+const NANOS_PER_MICRO: i64 = 1_000;
+const NANOS_PER_MILLI: i64 = 1_000_000;
 
 fn ffi_data_type_to_core(dt: i32) -> Result<fcore::metadata::DataType> {
     match dt {
@@ -224,6 +233,16 @@ pub fn ffi_row_to_core(row: &ffi::FfiGenericRow) -> 
fcore::row::GenericRow<'_> {
             DATUM_TYPE_FLOAT64 => Datum::Float64(field.f64_val.into()),
             DATUM_TYPE_STRING => 
Datum::String(Cow::Borrowed(field.string_val.as_str())),
             DATUM_TYPE_BYTES => 
Datum::Blob(Cow::Borrowed(field.bytes_val.as_slice())),
+            DATUM_TYPE_DATE => 
Datum::Date(fcore::row::Date::new(field.i32_val)),
+            DATUM_TYPE_TIME => 
Datum::Time(fcore::row::Time::new(field.i32_val)),
+            DATUM_TYPE_TIMESTAMP_NTZ => Datum::TimestampNtz(
+                fcore::row::TimestampNtz::from_millis_nanos(field.i64_val, 
field.i32_val)
+                    .unwrap_or_else(|_| 
fcore::row::TimestampNtz::new(field.i64_val)),
+            ),
+            DATUM_TYPE_TIMESTAMP_LTZ => Datum::TimestampLtz(
+                fcore::row::TimestampLtz::from_millis_nanos(field.i64_val, 
field.i32_val)
+                    .unwrap_or_else(|_| 
fcore::row::TimestampLtz::new(field.i64_val)),
+            ),
             _ => Datum::Null,
         };
         generic_row.set_field(idx, datum);
@@ -232,7 +251,10 @@ pub fn ffi_row_to_core(row: &ffi::FfiGenericRow) -> 
fcore::row::GenericRow<'_> {
     generic_row
 }
 
-pub fn core_scan_records_to_ffi(records: &fcore::record::ScanRecords) -> 
ffi::FfiScanRecords {
+pub fn core_scan_records_to_ffi(
+    records: &fcore::record::ScanRecords,
+    columns: &[fcore::metadata::Column],
+) -> ffi::FfiScanRecords {
     let mut ffi_records = Vec::new();
 
     // Iterate over all buckets and their records
@@ -240,7 +262,7 @@ pub fn core_scan_records_to_ffi(records: 
&fcore::record::ScanRecords) -> ffi::Ff
         let bucket_id = table_bucket.bucket_id();
         for record in bucket_records {
             let row = record.row();
-            let fields = core_row_to_ffi_fields(row);
+            let fields = core_row_to_ffi_fields(row, columns);
 
             ffi_records.push(ffi::FfiScanRecord {
                 bucket_id,
@@ -256,7 +278,10 @@ pub fn core_scan_records_to_ffi(records: 
&fcore::record::ScanRecords) -> ffi::Ff
     }
 }
 
-fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) -> Vec<ffi::FfiDatum> 
{
+fn core_row_to_ffi_fields(
+    row: &fcore::row::ColumnarRow,
+    columns: &[fcore::metadata::Column],
+) -> Vec<ffi::FfiDatum> {
     fn new_datum(datum_type: i32) -> ffi::FfiDatum {
         ffi::FfiDatum {
             datum_type,
@@ -361,52 +386,59 @@ fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) 
-> Vec<ffi::FfiDatum> {
                     .as_any()
                     .downcast_ref::<Date32Array>()
                     .expect("Date32 column expected");
-                let mut datum = new_datum(DATUM_TYPE_INT32);
+                let mut datum = new_datum(DATUM_TYPE_DATE);
                 datum.i32_val = array.value(row_id);
                 datum
             }
-            ArrowDataType::Timestamp(unit, _) => match unit {
-                TimeUnit::Second => {
-                    let array = record_batch
-                        .column(i)
-                        .as_any()
-                        .downcast_ref::<TimestampSecondArray>()
-                        .expect("Timestamp(second) column expected");
-                    let mut datum = new_datum(DATUM_TYPE_INT64);
-                    datum.i64_val = array.value(row_id);
-                    datum
-                }
-                TimeUnit::Millisecond => {
-                    let array = record_batch
-                        .column(i)
-                        .as_any()
-                        .downcast_ref::<TimestampMillisecondArray>()
-                        .expect("Timestamp(millisecond) column expected");
-                    let mut datum = new_datum(DATUM_TYPE_INT64);
-                    datum.i64_val = array.value(row_id);
-                    datum
+            ArrowDataType::Timestamp(unit, _tz) => {
+                let datum_type = match columns.get(i).map(|c| c.data_type()) {
+                    Some(fcore::metadata::DataType::TimestampLTz(_)) => 
DATUM_TYPE_TIMESTAMP_LTZ,
+                    _ => DATUM_TYPE_TIMESTAMP_NTZ,
+                };
+                let mut datum = new_datum(datum_type);
+                match unit {
+                    TimeUnit::Second => {
+                        let array = record_batch
+                            .column(i)
+                            .as_any()
+                            .downcast_ref::<TimestampSecondArray>()
+                            .expect("Timestamp(second) column expected");
+                        datum.i64_val = array.value(row_id) * 
MILLIS_PER_SECOND;
+                        datum.i32_val = 0;
+                    }
+                    TimeUnit::Millisecond => {
+                        let array = record_batch
+                            .column(i)
+                            .as_any()
+                            .downcast_ref::<TimestampMillisecondArray>()
+                            .expect("Timestamp(millisecond) column expected");
+                        datum.i64_val = array.value(row_id);
+                        datum.i32_val = 0;
+                    }
+                    TimeUnit::Microsecond => {
+                        let array = record_batch
+                            .column(i)
+                            .as_any()
+                            .downcast_ref::<TimestampMicrosecondArray>()
+                            .expect("Timestamp(microsecond) column expected");
+                        let micros = array.value(row_id);
+                        datum.i64_val = micros.div_euclid(MICROS_PER_MILLI);
+                        datum.i32_val =
+                            (micros.rem_euclid(MICROS_PER_MILLI) * 
NANOS_PER_MICRO) as i32;
+                    }
+                    TimeUnit::Nanosecond => {
+                        let array = record_batch
+                            .column(i)
+                            .as_any()
+                            .downcast_ref::<TimestampNanosecondArray>()
+                            .expect("Timestamp(nanosecond) column expected");
+                        let nanos = array.value(row_id);
+                        datum.i64_val = nanos.div_euclid(NANOS_PER_MILLI);
+                        datum.i32_val = nanos.rem_euclid(NANOS_PER_MILLI) as 
i32;
+                    }
                 }
-                TimeUnit::Microsecond => {
-                    let array = record_batch
-                        .column(i)
-                        .as_any()
-                        .downcast_ref::<TimestampMicrosecondArray>()
-                        .expect("Timestamp(microsecond) column expected");
-                    let mut datum = new_datum(DATUM_TYPE_INT64);
-                    datum.i64_val = array.value(row_id);
-                    datum
-                }
-                TimeUnit::Nanosecond => {
-                    let array = record_batch
-                        .column(i)
-                        .as_any()
-                        .downcast_ref::<TimestampNanosecondArray>()
-                        .expect("Timestamp(nanosecond) column expected");
-                    let mut datum = new_datum(DATUM_TYPE_INT64);
-                    datum.i64_val = array.value(row_id);
-                    datum
-                }
-            },
+                datum
+            }
             ArrowDataType::Time32(unit) => match unit {
                 TimeUnit::Second => {
                     let array = record_batch
@@ -414,8 +446,8 @@ fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) -> 
Vec<ffi::FfiDatum> {
                         .as_any()
                         .downcast_ref::<Time32SecondArray>()
                         .expect("Time32(second) column expected");
-                    let mut datum = new_datum(DATUM_TYPE_INT32);
-                    datum.i32_val = array.value(row_id);
+                    let mut datum = new_datum(DATUM_TYPE_TIME);
+                    datum.i32_val = array.value(row_id) * MILLIS_PER_SECOND as 
i32;
                     datum
                 }
                 TimeUnit::Millisecond => {
@@ -424,7 +456,7 @@ fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) -> 
Vec<ffi::FfiDatum> {
                         .as_any()
                         .downcast_ref::<Time32MillisecondArray>()
                         .expect("Time32(millisecond) column expected");
-                    let mut datum = new_datum(DATUM_TYPE_INT32);
+                    let mut datum = new_datum(DATUM_TYPE_TIME);
                     datum.i32_val = array.value(row_id);
                     datum
                 }
@@ -437,8 +469,8 @@ fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) -> 
Vec<ffi::FfiDatum> {
                         .as_any()
                         .downcast_ref::<Time64MicrosecondArray>()
                         .expect("Time64(microsecond) column expected");
-                    let mut datum = new_datum(DATUM_TYPE_INT64);
-                    datum.i64_val = array.value(row_id);
+                    let mut datum = new_datum(DATUM_TYPE_TIME);
+                    datum.i32_val = (array.value(row_id) / MICROS_PER_MILLI) 
as i32;
                     datum
                 }
                 TimeUnit::Nanosecond => {
@@ -447,8 +479,8 @@ fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) -> 
Vec<ffi::FfiDatum> {
                         .as_any()
                         .downcast_ref::<Time64NanosecondArray>()
                         .expect("Time64(nanosecond) column expected");
-                    let mut datum = new_datum(DATUM_TYPE_INT64);
-                    datum.i64_val = array.value(row_id);
+                    let mut datum = new_datum(DATUM_TYPE_TIME);
+                    datum.i32_val = (array.value(row_id) / NANOS_PER_MILLI) as 
i32;
                     datum
                 }
                 _ => panic!("Will never come here. Unsupported Time64 unit for 
column {i}"),


Reply via email to