This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 8ccd283 feat: support temporal types in CPP (#266)
8ccd283 is described below
commit 8ccd28384f5211211d20b2b302a462c104cc7b1e
Author: Anton Borisov <[email protected]>
AuthorDate: Sun Feb 8 01:06:03 2026 +0000
feat: support temporal types in CPP (#266)
---
bindings/cpp/examples/example.cpp | 229 +++++++++++++++++++++++---------------
bindings/cpp/include/fluss.hpp | 114 +++++++++++++++++++
bindings/cpp/src/lib.rs | 38 ++++++-
bindings/cpp/src/table.cpp | 48 ++++++++
bindings/cpp/src/types.rs | 136 +++++++++++++---------
5 files changed, 422 insertions(+), 143 deletions(-)
diff --git a/bindings/cpp/examples/example.cpp
b/bindings/cpp/examples/example.cpp
index 10266c7..92ebe9c 100644
--- a/bindings/cpp/examples/example.cpp
+++ b/bindings/cpp/examples/example.cpp
@@ -42,7 +42,7 @@ int main() {
check("get_admin", conn.GetAdmin(admin));
fluss::TablePath table_path("fluss", "sample_table_cpp_v1");
-
+
// 2.1) Drop table if exists
std::cout << "Dropping table if exists..." << std::endl;
auto drop_result = admin.DropTable(table_path, true);
@@ -52,12 +52,16 @@ int main() {
std::cout << "Table drop result: " << drop_result.error_message <<
std::endl;
}
- // 3) Schema & descriptor
+ // 3) Schema with scalar and temporal columns
auto schema = fluss::Schema::NewBuilder()
.AddColumn("id", fluss::DataType::Int)
.AddColumn("name", fluss::DataType::String)
.AddColumn("score", fluss::DataType::Float)
.AddColumn("age", fluss::DataType::Int)
+ .AddColumn("event_date", fluss::DataType::Date)
+ .AddColumn("event_time", fluss::DataType::Time)
+ .AddColumn("created_at", fluss::DataType::Timestamp)
+ .AddColumn("updated_at", fluss::DataType::TimestampLtz)
.Build();
auto descriptor = fluss::TableDescriptor::NewBuilder()
@@ -66,7 +70,6 @@ int main() {
.SetComment("cpp example table with 3 buckets")
.Build();
- // 3.1) Create table with 3 buckets
std::cout << "Creating table with 3 buckets..." << std::endl;
check("create_table", admin.CreateTable(table_path, descriptor, false));
@@ -74,7 +77,7 @@ int main() {
fluss::Table table;
check("get_table", conn.GetTable(table_path, table));
- // 5) Writer
+ // 5) Write rows with scalar and temporal values
fluss::AppendWriter writer;
check("new_append_writer", table.NewAppendWriter(writer));
@@ -83,12 +86,26 @@ int main() {
const char* name;
float score;
int age;
+ fluss::Date date;
+ fluss::Time time;
+ fluss::Timestamp ts_ntz;
+ fluss::Timestamp ts_ltz;
};
+ auto tp_now = std::chrono::system_clock::now();
std::vector<RowData> rows = {
- {1, "Alice", 95.2f, 25},
- {2, "Bob", 87.2f, 30},
- {3, "Charlie", 92.1f, 35},
+ {1, "Alice", 95.2f, 25,
+ fluss::Date::FromYMD(2024, 6, 15), fluss::Time::FromHMS(14, 30, 45),
+ fluss::Timestamp::FromTimePoint(tp_now),
+ fluss::Timestamp::FromMillis(1718467200000)},
+ {2, "Bob", 87.2f, 30,
+ fluss::Date::FromYMD(2025, 1, 1), fluss::Time::FromHMS(0, 0, 0),
+ fluss::Timestamp::FromMillis(1735689600000),
+ fluss::Timestamp::FromMillisNanos(1735689600000, 500000)},
+ {3, "Charlie", 92.1f, 35,
+ fluss::Date::FromYMD(1999, 12, 31), fluss::Time::FromHMS(23, 59, 59),
+ fluss::Timestamp::FromMillis(946684799999),
+ fluss::Timestamp::FromMillis(946684799999)},
};
// Fire-and-forget: queue rows, flush at end
@@ -98,6 +115,10 @@ int main() {
row.SetString(1, r.name);
row.SetFloat32(2, r.score);
row.SetInt32(3, r.age);
+ row.SetDate(4, r.date);
+ row.SetTime(5, r.time);
+ row.SetTimestampNtz(6, r.ts_ntz);
+ row.SetTimestampLtz(7, r.ts_ltz);
check("append", writer.Append(row));
}
check("flush", writer.Flush());
@@ -116,7 +137,7 @@ int main() {
std::cout << "Row acknowledged by server" << std::endl;
}
- // 6) Scan
+ // 6) Full scan — verify all column types including temporal
fluss::LogScanner scanner;
check("new_log_scanner", table.NewScan().CreateLogScanner(scanner));
@@ -129,188 +150,216 @@ int main() {
fluss::ScanRecords records;
check("poll", scanner.Poll(5000, records));
- std::cout << "Scanned records: " << records.records.size() << std::endl;
+ std::cout << "Scanned records: " << records.Size() << std::endl;
+ bool scan_ok = true;
for (const auto& rec : records.records) {
- std::cout << " offset=" << rec.offset << " id=" <<
rec.row.fields[0].i32_val
- << " name=" << rec.row.fields[1].string_val
- << " score=" << rec.row.fields[2].f32_val << " age=" <<
rec.row.fields[3].i32_val
- << " ts=" << rec.timestamp << std::endl;
+ const auto& f = rec.row.fields;
+
+ if (f[4].type != fluss::DatumType::Date) {
+ std::cerr << "ERROR: field 4 expected Date, got "
+ << static_cast<int>(f[4].type) << std::endl;
+ scan_ok = false;
+ }
+ if (f[5].type != fluss::DatumType::Time) {
+ std::cerr << "ERROR: field 5 expected Time, got "
+ << static_cast<int>(f[5].type) << std::endl;
+ scan_ok = false;
+ }
+ if (f[6].type != fluss::DatumType::TimestampNtz) {
+ std::cerr << "ERROR: field 6 expected TimestampNtz, got "
+ << static_cast<int>(f[6].type) << std::endl;
+ scan_ok = false;
+ }
+ if (f[7].type != fluss::DatumType::TimestampLtz) {
+ std::cerr << "ERROR: field 7 expected TimestampLtz, got "
+ << static_cast<int>(f[7].type) << std::endl;
+ scan_ok = false;
+ }
+
+ auto date = f[4].GetDate();
+ auto time = f[5].GetTime();
+ auto ts_ntz = f[6].GetTimestamp();
+ auto ts_ltz = f[7].GetTimestamp();
+
+ std::cout << " id=" << f[0].i32_val
+ << " name=" << f[1].string_val
+ << " score=" << f[2].f32_val
+ << " age=" << f[3].i32_val
+ << " date=" << date.Year() << "-" << date.Month() << "-" <<
date.Day()
+ << " time=" << time.Hour() << ":" << time.Minute() << ":" <<
time.Second()
+ << " ts_ntz=" << ts_ntz.epoch_millis
+ << " ts_ltz=" << ts_ltz.epoch_millis
+ << "+" << ts_ltz.nano_of_millisecond << "ns"
+ << std::endl;
+ }
+
+ if (!scan_ok) {
+ std::cerr << "Full scan type verification FAILED!" << std::endl;
+ std::exit(1);
}
-
- // 7) Project only id (0) and name (1) columns
- std::vector<size_t> projected_columns = {0, 1};
+
+ // 7) Projected scan — project [id, updated_at(TimestampLtz)] to verify
+ // NTZ/LTZ disambiguation works with column index remapping
+ std::vector<size_t> projected_columns = {0, 7};
fluss::LogScanner projected_scanner;
check("new_log_scanner_with_projection",
table.NewScan().Project(projected_columns).CreateLogScanner(projected_scanner));
-
+
for (int b = 0; b < buckets; ++b) {
check("subscribe_projected", projected_scanner.Subscribe(b, 0));
}
-
+
fluss::ScanRecords projected_records;
check("poll_projected", projected_scanner.Poll(5000, projected_records));
-
- std::cout << "Projected records: " << projected_records.records.size() <<
std::endl;
-
- bool projection_verified = true;
- for (size_t i = 0; i < projected_records.records.size(); ++i) {
- const auto& rec = projected_records.records[i];
- const auto& row = rec.row;
-
- if (row.fields.size() != projected_columns.size()) {
- std::cerr << "ERROR: Record " << i << " has " << row.fields.size()
- << " fields, expected " << projected_columns.size() <<
std::endl;
- projection_verified = false;
+
+ std::cout << "Projected records: " << projected_records.Size() <<
std::endl;
+ for (const auto& rec : projected_records.records) {
+ const auto& f = rec.row.fields;
+
+ if (f.size() != 2) {
+ std::cerr << "ERROR: expected 2 fields, got " << f.size() <<
std::endl;
+ scan_ok = false;
continue;
}
-
- // Verify field types match expected columns
- // Column 0 (id) should be Int32, Column 1 (name) should be String
- if (row.fields[0].type != fluss::DatumType::Int32) {
- std::cerr << "ERROR: Record " << i << " field 0 type mismatch,
expected Int32" << std::endl;
- projection_verified = false;
+ if (f[0].type != fluss::DatumType::Int32) {
+ std::cerr << "ERROR: projected field 0 expected Int32, got "
+ << static_cast<int>(f[0].type) << std::endl;
+ scan_ok = false;
}
- if (row.fields[1].type != fluss::DatumType::String) {
- std::cerr << "ERROR: Record " << i << " field 1 type mismatch,
expected String" << std::endl;
- projection_verified = false;
- }
-
- // Print projected data
- if (row.fields[0].type == fluss::DatumType::Int32 &&
- row.fields[1].type == fluss::DatumType::String) {
- std::cout << " Record " << i << ": id=" << row.fields[0].i32_val
- << ", name=" << row.fields[1].string_val << std::endl;
+ if (f[1].type != fluss::DatumType::TimestampLtz) {
+ std::cerr << "ERROR: projected field 1 expected TimestampLtz, got "
+ << static_cast<int>(f[1].type) << std::endl;
+ scan_ok = false;
}
+
+ auto ts = f[1].GetTimestamp();
+ std::cout << " id=" << f[0].i32_val
+ << " updated_at=" << ts.epoch_millis
+ << "+" << ts.nano_of_millisecond << "ns" << std::endl;
}
-
- if (projection_verified) {
- std::cout << "Column pruning verification passed!" << std::endl;
+
+ if (scan_ok) {
+ std::cout << "Scan verification passed!" << std::endl;
} else {
- std::cerr << "Column pruning verification failed!" << std::endl;
+ std::cerr << "Scan verification FAILED!" << std::endl;
std::exit(1);
}
// 8) List offsets examples
std::cout << "\n=== List Offsets Examples ===" << std::endl;
-
- // 8.1) Query earliest offsets for all buckets
+
std::vector<int32_t> all_bucket_ids;
all_bucket_ids.reserve(buckets);
for (int b = 0; b < buckets; ++b) {
all_bucket_ids.push_back(b);
}
-
+
std::unordered_map<int32_t, int64_t> earliest_offsets;
- check("list_earliest_offsets",
- admin.ListOffsets(table_path, all_bucket_ids,
- fluss::OffsetQuery::Earliest(),
+ check("list_earliest_offsets",
+ admin.ListOffsets(table_path, all_bucket_ids,
+ fluss::OffsetQuery::Earliest(),
earliest_offsets));
std::cout << "Earliest offsets:" << std::endl;
for (const auto& [bucket_id, offset] : earliest_offsets) {
std::cout << " Bucket " << bucket_id << ": offset=" << offset <<
std::endl;
}
-
- // 8.2) Query latest offsets for all buckets
+
std::unordered_map<int32_t, int64_t> latest_offsets;
- check("list_latest_offsets",
- admin.ListOffsets(table_path, all_bucket_ids,
- fluss::OffsetQuery::Latest(),
+ check("list_latest_offsets",
+ admin.ListOffsets(table_path, all_bucket_ids,
+ fluss::OffsetQuery::Latest(),
latest_offsets));
std::cout << "Latest offsets:" << std::endl;
for (const auto& [bucket_id, offset] : latest_offsets) {
std::cout << " Bucket " << bucket_id << ": offset=" << offset <<
std::endl;
}
-
- // 8.3) Query offsets for a specific timestamp (current time - 1 hour)
+
auto now = std::chrono::system_clock::now();
auto one_hour_ago = now - std::chrono::hours(1);
auto timestamp_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
one_hour_ago.time_since_epoch()).count();
-
+
std::unordered_map<int32_t, int64_t> timestamp_offsets;
- check("list_timestamp_offsets",
- admin.ListOffsets(table_path, all_bucket_ids,
- fluss::OffsetQuery::FromTimestamp(timestamp_ms),
+ check("list_timestamp_offsets",
+ admin.ListOffsets(table_path, all_bucket_ids,
+ fluss::OffsetQuery::FromTimestamp(timestamp_ms),
timestamp_offsets));
std::cout << "Offsets for timestamp " << timestamp_ms << " (1 hour ago):"
<< std::endl;
for (const auto& [bucket_id, offset] : timestamp_offsets) {
std::cout << " Bucket " << bucket_id << ": offset=" << offset <<
std::endl;
}
-
- // 8.4) Use batch subscribe with offsets from list_offsets
+
+ // 9) Batch subscribe
std::cout << "\n=== Batch Subscribe Example ===" << std::endl;
fluss::LogScanner batch_scanner;
check("new_log_scanner_for_batch",
table.NewScan().CreateLogScanner(batch_scanner));
-
+
std::vector<fluss::BucketSubscription> subscriptions;
for (const auto& [bucket_id, offset] : earliest_offsets) {
subscriptions.push_back({bucket_id, offset});
- std::cout << "Preparing subscription: bucket=" << bucket_id
+ std::cout << "Preparing subscription: bucket=" << bucket_id
<< ", offset=" << offset << std::endl;
}
-
+
check("subscribe_buckets", batch_scanner.Subscribe(subscriptions));
std::cout << "Batch subscribed to " << subscriptions.size() << " buckets"
<< std::endl;
-
- // 8.5) Poll and verify bucket_id in records
+
fluss::ScanRecords batch_records;
check("poll_batch", batch_scanner.Poll(5000, batch_records));
-
+
std::cout << "Scanned " << batch_records.Size() << " records from batch
subscription" << std::endl;
for (size_t i = 0; i < batch_records.Size() && i < 5; ++i) {
const auto& rec = batch_records[i];
- std::cout << " Record " << i << ": bucket_id=" << rec.bucket_id
- << ", offset=" << rec.offset
+ std::cout << " Record " << i << ": bucket_id=" << rec.bucket_id
+ << ", offset=" << rec.offset
<< ", timestamp=" << rec.timestamp << std::endl;
}
if (batch_records.Size() > 5) {
std::cout << " ... and " << (batch_records.Size() - 5) << " more
records" << std::endl;
}
- // 9) Test the new Arrow record batch polling functionality
+ // 10) Arrow record batch polling
std::cout << "\n=== Testing Arrow Record Batch Polling ===" << std::endl;
fluss::LogScanner arrow_scanner;
check("new_record_batch_log_scanner",
table.NewScan().CreateRecordBatchScanner(arrow_scanner));
-
- // Subscribe to all buckets starting from offset 0
+
for (int b = 0; b < buckets; ++b) {
check("subscribe_arrow", arrow_scanner.Subscribe(b, 0));
}
-
+
fluss::ArrowRecordBatches arrow_batches;
check("poll_record_batch", arrow_scanner.PollRecordBatch(5000,
arrow_batches));
-
+
std::cout << "Polled " << arrow_batches.Size() << " Arrow record batches"
<< std::endl;
for (size_t i = 0; i < arrow_batches.Size(); ++i) {
const auto& batch = arrow_batches[i];
if (batch->Available()) {
- std::cout << " Batch " << i << ": " <<
batch->GetArrowRecordBatch()->num_rows() << " rows. " << std::endl;
+ std::cout << " Batch " << i << ": " <<
batch->GetArrowRecordBatch()->num_rows() << " rows" << std::endl;
} else {
std::cout << " Batch " << i << ": not available" << std::endl;
}
}
-
- // 10) Test the new Arrow record batch polling with projection
+
+ // 11) Arrow record batch polling with projection
std::cout << "\n=== Testing Arrow Record Batch Polling with Projection
===" << std::endl;
fluss::LogScanner projected_arrow_scanner;
check("new_record_batch_log_scanner_with_projection",
table.NewScan().Project(projected_columns).CreateRecordBatchScanner(projected_arrow_scanner));
-
- // Subscribe to all buckets starting from offset 0
+
for (int b = 0; b < buckets; ++b) {
check("subscribe_projected_arrow",
projected_arrow_scanner.Subscribe(b, 0));
}
-
+
fluss::ArrowRecordBatches projected_arrow_batches;
check("poll_projected_record_batch",
projected_arrow_scanner.PollRecordBatch(5000, projected_arrow_batches));
-
+
std::cout << "Polled " << projected_arrow_batches.Size() << " projected
Arrow record batches" << std::endl;
for (size_t i = 0; i < projected_arrow_batches.Size(); ++i) {
const auto& batch = projected_arrow_batches[i];
if (batch->Available()) {
- std::cout << " Batch " << i << ": " <<
batch->GetArrowRecordBatch()->num_rows() << " rows " << std::endl;
+ std::cout << " Batch " << i << ": " <<
batch->GetArrowRecordBatch()->num_rows() << " rows" << std::endl;
} else {
std::cout << " Batch " << i << ": not available" << std::endl;
}
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index 6c20717..239d9a4 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -19,6 +19,7 @@
#pragma once
+#include <chrono>
#include <memory>
#include <optional>
#include <string>
@@ -41,6 +42,64 @@ namespace ffi {
struct LogScanner;
} // namespace ffi
+struct Date {
+ int32_t days_since_epoch{0};
+
+ static Date FromDays(int32_t days) { return {days}; }
+ static Date FromYMD(int year, int month, int day);
+
+ int Year() const;
+ int Month() const;
+ int Day() const;
+};
+
+struct Time {
+ static constexpr int32_t kMillisPerSecond = 1000;
+ static constexpr int32_t kMillisPerMinute = 60 * kMillisPerSecond;
+ static constexpr int32_t kMillisPerHour = 60 * kMillisPerMinute;
+
+ int32_t millis_since_midnight{0};
+
+ static Time FromMillis(int32_t ms) { return {ms}; }
+ static Time FromHMS(int hour, int minute, int second, int millis = 0) {
+ return {hour * kMillisPerHour + minute * kMillisPerMinute +
+ second * kMillisPerSecond + millis};
+ }
+
+ int Hour() const { return millis_since_midnight / kMillisPerHour; }
+ int Minute() const { return (millis_since_midnight % kMillisPerHour) /
kMillisPerMinute; }
+ int Second() const { return (millis_since_midnight % kMillisPerMinute) /
kMillisPerSecond; }
+ int Millis() const { return millis_since_midnight % kMillisPerSecond; }
+};
+
+struct Timestamp {
+ static constexpr int32_t kMaxNanoOfMillisecond = 999999;
+ static constexpr int64_t kNanosPerMilli = 1000000;
+
+ int64_t epoch_millis{0};
+ int32_t nano_of_millisecond{0};
+
+ static Timestamp FromMillis(int64_t ms) { return {ms, 0}; }
+ static Timestamp FromMillisNanos(int64_t ms, int32_t nanos) {
+ if (nanos < 0) nanos = 0;
+ if (nanos > kMaxNanoOfMillisecond) nanos = kMaxNanoOfMillisecond;
+ return {ms, nanos};
+ }
+ static Timestamp FromTimePoint(std::chrono::system_clock::time_point tp) {
+ auto duration = tp.time_since_epoch();
+ auto ns =
+ std::chrono::duration_cast<std::chrono::nanoseconds>(duration)
+ .count();
+ auto ms = ns / kNanosPerMilli;
+ auto nano_of_ms = static_cast<int32_t>(ns % kNanosPerMilli);
+ if (nano_of_ms < 0) {
+ nano_of_ms += kNanosPerMilli;
+ ms -= 1;
+ }
+ return {ms, nano_of_ms};
+ }
+};
+
enum class DataType {
Boolean = 1,
TinyInt = 2,
@@ -66,6 +125,11 @@ enum class DatumType {
Float64 = 5,
String = 6,
Bytes = 7,
+ // 8-10 reserved for decimal types
+ Date = 11,
+ Time = 12,
+ TimestampNtz = 13,
+ TimestampLtz = 14,
};
constexpr int64_t EARLIEST_OFFSET = -2;
@@ -270,6 +334,36 @@ struct Datum {
d.bytes_val = std::move(v);
return d;
}
+ static Datum Date(fluss::Date d) {
+ Datum dat;
+ dat.type = DatumType::Date;
+ dat.i32_val = d.days_since_epoch;
+ return dat;
+ }
+ static Datum Time(fluss::Time t) {
+ Datum dat;
+ dat.type = DatumType::Time;
+ dat.i32_val = t.millis_since_midnight;
+ return dat;
+ }
+ static Datum TimestampNtz(fluss::Timestamp ts) {
+ Datum dat;
+ dat.type = DatumType::TimestampNtz;
+ dat.i64_val = ts.epoch_millis;
+ dat.i32_val = ts.nano_of_millisecond;
+ return dat;
+ }
+ static Datum TimestampLtz(fluss::Timestamp ts) {
+ Datum dat;
+ dat.type = DatumType::TimestampLtz;
+ dat.i64_val = ts.epoch_millis;
+ dat.i32_val = ts.nano_of_millisecond;
+ return dat;
+ }
+
+ fluss::Date GetDate() const { return {i32_val}; }
+ fluss::Time GetTime() const { return {i32_val}; }
+ fluss::Timestamp GetTimestamp() const { return {i64_val, i32_val}; }
};
struct GenericRow {
@@ -315,6 +409,26 @@ struct GenericRow {
fields[idx] = Datum::Bytes(std::move(v));
}
+ void SetDate(size_t idx, fluss::Date d) {
+ EnsureSize(idx);
+ fields[idx] = Datum::Date(d);
+ }
+
+ void SetTime(size_t idx, fluss::Time t) {
+ EnsureSize(idx);
+ fields[idx] = Datum::Time(t);
+ }
+
+ void SetTimestampNtz(size_t idx, fluss::Timestamp ts) {
+ EnsureSize(idx);
+ fields[idx] = Datum::TimestampNtz(ts);
+ }
+
+ void SetTimestampLtz(size_t idx, fluss::Timestamp ts) {
+ EnsureSize(idx);
+ fields[idx] = Datum::TimestampLtz(ts);
+ }
+
private:
void EnsureSize(size_t idx) {
if (fields.size() <= idx) {
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 4957c99..5a26613 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -310,6 +310,9 @@ pub struct WriteResult {
pub struct LogScanner {
inner: Option<fcore::client::LogScanner>,
inner_batch: Option<fcore::client::RecordBatchLogScanner>,
+ /// Fluss columns matching the projected Arrow fields (1:1 by index).
+ /// For non-projected scanners this is the full table schema columns.
+ projected_columns: Vec<fcore::metadata::Column>,
}
fn ok_result() -> ffi::FfiResult {
@@ -653,6 +656,7 @@ impl Table {
let scanner_ptr = Box::into_raw(Box::new(LogScanner {
inner: Some(scanner),
inner_batch: None,
+ projected_columns:
self.table_info.get_schema().columns().to_vec(),
}));
Ok(scanner_ptr)
@@ -670,6 +674,19 @@ impl Table {
self.table_info.clone(),
);
+ let all_columns = self.table_info.get_schema().columns();
+ let projected_columns: Vec<_> = column_indices
+ .iter()
+ .map(|&i| {
+ all_columns.get(i).cloned().ok_or_else(|| {
+ format!(
+ "Invalid column index {i}: schema has {} columns",
+ all_columns.len()
+ )
+ })
+ })
+ .collect::<Result<_, String>>()?;
+
let log_scanner = fluss_table
.new_scan()
.project(&column_indices)
@@ -680,6 +697,7 @@ impl Table {
let scanner = Box::into_raw(Box::new(LogScanner {
inner: Some(log_scanner),
inner_batch: None,
+ projected_columns,
}));
Ok(scanner)
})
@@ -701,6 +719,7 @@ impl Table {
let scanner = Box::into_raw(Box::new(LogScanner {
inner: None,
inner_batch: Some(batch_scanner),
+ projected_columns:
self.table_info.get_schema().columns().to_vec(),
}));
Ok(scanner)
})
@@ -717,6 +736,19 @@ impl Table {
self.table_info.clone(),
);
+ let all_columns = self.table_info.get_schema().columns();
+ let projected_columns: Vec<_> = column_indices
+ .iter()
+ .map(|&i| {
+ all_columns.get(i).cloned().ok_or_else(|| {
+ format!(
+ "Invalid column index {i}: schema has {} columns",
+ all_columns.len()
+ )
+ })
+ })
+ .collect::<Result<_, String>>()?;
+
let batch_scanner = fluss_table
.new_scan()
.project(&column_indices)
@@ -727,6 +759,7 @@ impl Table {
let scanner = Box::into_raw(Box::new(LogScanner {
inner: None,
inner_batch: Some(batch_scanner),
+ projected_columns,
}));
Ok(scanner)
})
@@ -925,7 +958,10 @@ impl LogScanner {
match result {
Ok(records) => ffi::FfiScanRecordsResult {
result: ok_result(),
- scan_records: types::core_scan_records_to_ffi(&records),
+ scan_records: types::core_scan_records_to_ffi(
+ &records,
+ &self.projected_columns,
+ ),
},
Err(e) => ffi::FfiScanRecordsResult {
result: err_result(1, e.to_string()),
diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp
index 24be8d4..04d1846 100644
--- a/bindings/cpp/src/table.cpp
+++ b/bindings/cpp/src/table.cpp
@@ -22,12 +22,60 @@
#include "ffi_converter.hpp"
#include "rust/cxx.h"
#include <arrow/c/bridge.h>
+#include <ctime>
// todo: bindings/cpp/BUILD.bazel still doesn’t declare Arrow include/link
dependencies.
// In environments where Bazel does not already have Arrow available, this
will fail at compile/link time.
#include <arrow/record_batch.h>
namespace fluss {
+static constexpr int kSecondsPerDay = 24 * 60 * 60;
+
+static std::time_t timegm_utc(std::tm* tm) {
+#if defined(_WIN32)
+ return _mkgmtime(tm);
+#else
+ return ::timegm(tm);
+#endif
+}
+
+static std::tm gmtime_utc(std::time_t epoch_seconds) {
+ std::tm tm{};
+#if defined(_WIN32)
+ gmtime_s(&tm, &epoch_seconds);
+#else
+ ::gmtime_r(&epoch_seconds, &tm);
+#endif
+ return tm;
+}
+
+Date Date::FromYMD(int year, int month, int day) {
+ std::tm tm{};
+ tm.tm_year = year - 1900;
+ tm.tm_mon = month - 1;
+ tm.tm_mday = day;
+ std::time_t epoch_seconds = timegm_utc(&tm);
+ return {static_cast<int32_t>(epoch_seconds / kSecondsPerDay)};
+}
+
+int Date::Year() const {
+ std::time_t epoch_seconds = static_cast<std::time_t>(days_since_epoch) *
kSecondsPerDay;
+ std::tm tm = gmtime_utc(epoch_seconds);
+ return tm.tm_year + 1900;
+}
+
+int Date::Month() const {
+ std::time_t epoch_seconds = static_cast<std::time_t>(days_since_epoch) *
kSecondsPerDay;
+ std::tm tm = gmtime_utc(epoch_seconds);
+ return tm.tm_mon + 1;
+}
+
+int Date::Day() const {
+ std::time_t epoch_seconds = static_cast<std::time_t>(days_since_epoch) *
kSecondsPerDay;
+ std::tm tm = gmtime_utc(epoch_seconds);
+ return tm.tm_mday;
+}
+
Table::Table() noexcept = default;
Table::Table(ffi::Table* table) noexcept : table_(table) {}
diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs
index 91d6e26..f546b68 100644
--- a/bindings/cpp/src/types.rs
+++ b/bindings/cpp/src/types.rs
@@ -52,6 +52,15 @@ pub const DATUM_TYPE_FLOAT32: i32 = 4;
pub const DATUM_TYPE_FLOAT64: i32 = 5;
pub const DATUM_TYPE_STRING: i32 = 6;
pub const DATUM_TYPE_BYTES: i32 = 7;
+pub const DATUM_TYPE_DATE: i32 = 11;
+pub const DATUM_TYPE_TIME: i32 = 12;
+pub const DATUM_TYPE_TIMESTAMP_NTZ: i32 = 13;
+pub const DATUM_TYPE_TIMESTAMP_LTZ: i32 = 14;
+
+const MILLIS_PER_SECOND: i64 = 1_000;
+const MICROS_PER_MILLI: i64 = 1_000;
+const NANOS_PER_MICRO: i64 = 1_000;
+const NANOS_PER_MILLI: i64 = 1_000_000;
fn ffi_data_type_to_core(dt: i32) -> Result<fcore::metadata::DataType> {
match dt {
@@ -224,6 +233,16 @@ pub fn ffi_row_to_core(row: &ffi::FfiGenericRow) ->
fcore::row::GenericRow<'_> {
DATUM_TYPE_FLOAT64 => Datum::Float64(field.f64_val.into()),
DATUM_TYPE_STRING =>
Datum::String(Cow::Borrowed(field.string_val.as_str())),
DATUM_TYPE_BYTES =>
Datum::Blob(Cow::Borrowed(field.bytes_val.as_slice())),
+ DATUM_TYPE_DATE =>
Datum::Date(fcore::row::Date::new(field.i32_val)),
+ DATUM_TYPE_TIME =>
Datum::Time(fcore::row::Time::new(field.i32_val)),
+ DATUM_TYPE_TIMESTAMP_NTZ => Datum::TimestampNtz(
+ fcore::row::TimestampNtz::from_millis_nanos(field.i64_val,
field.i32_val)
+ .unwrap_or_else(|_|
fcore::row::TimestampNtz::new(field.i64_val)),
+ ),
+ DATUM_TYPE_TIMESTAMP_LTZ => Datum::TimestampLtz(
+ fcore::row::TimestampLtz::from_millis_nanos(field.i64_val,
field.i32_val)
+ .unwrap_or_else(|_|
fcore::row::TimestampLtz::new(field.i64_val)),
+ ),
_ => Datum::Null,
};
generic_row.set_field(idx, datum);
@@ -232,7 +251,10 @@ pub fn ffi_row_to_core(row: &ffi::FfiGenericRow) ->
fcore::row::GenericRow<'_> {
generic_row
}
-pub fn core_scan_records_to_ffi(records: &fcore::record::ScanRecords) ->
ffi::FfiScanRecords {
+pub fn core_scan_records_to_ffi(
+ records: &fcore::record::ScanRecords,
+ columns: &[fcore::metadata::Column],
+) -> ffi::FfiScanRecords {
let mut ffi_records = Vec::new();
// Iterate over all buckets and their records
@@ -240,7 +262,7 @@ pub fn core_scan_records_to_ffi(records:
&fcore::record::ScanRecords) -> ffi::Ff
let bucket_id = table_bucket.bucket_id();
for record in bucket_records {
let row = record.row();
- let fields = core_row_to_ffi_fields(row);
+ let fields = core_row_to_ffi_fields(row, columns);
ffi_records.push(ffi::FfiScanRecord {
bucket_id,
@@ -256,7 +278,10 @@ pub fn core_scan_records_to_ffi(records:
&fcore::record::ScanRecords) -> ffi::Ff
}
}
-fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) -> Vec<ffi::FfiDatum>
{
+fn core_row_to_ffi_fields(
+ row: &fcore::row::ColumnarRow,
+ columns: &[fcore::metadata::Column],
+) -> Vec<ffi::FfiDatum> {
fn new_datum(datum_type: i32) -> ffi::FfiDatum {
ffi::FfiDatum {
datum_type,
@@ -361,52 +386,59 @@ fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow)
-> Vec<ffi::FfiDatum> {
.as_any()
.downcast_ref::<Date32Array>()
.expect("Date32 column expected");
- let mut datum = new_datum(DATUM_TYPE_INT32);
+ let mut datum = new_datum(DATUM_TYPE_DATE);
datum.i32_val = array.value(row_id);
datum
}
- ArrowDataType::Timestamp(unit, _) => match unit {
- TimeUnit::Second => {
- let array = record_batch
- .column(i)
- .as_any()
- .downcast_ref::<TimestampSecondArray>()
- .expect("Timestamp(second) column expected");
- let mut datum = new_datum(DATUM_TYPE_INT64);
- datum.i64_val = array.value(row_id);
- datum
- }
- TimeUnit::Millisecond => {
- let array = record_batch
- .column(i)
- .as_any()
- .downcast_ref::<TimestampMillisecondArray>()
- .expect("Timestamp(millisecond) column expected");
- let mut datum = new_datum(DATUM_TYPE_INT64);
- datum.i64_val = array.value(row_id);
- datum
+ ArrowDataType::Timestamp(unit, _tz) => {
+ let datum_type = match columns.get(i).map(|c| c.data_type()) {
+ Some(fcore::metadata::DataType::TimestampLTz(_)) =>
DATUM_TYPE_TIMESTAMP_LTZ,
+ _ => DATUM_TYPE_TIMESTAMP_NTZ,
+ };
+ let mut datum = new_datum(datum_type);
+ match unit {
+ TimeUnit::Second => {
+ let array = record_batch
+ .column(i)
+ .as_any()
+ .downcast_ref::<TimestampSecondArray>()
+ .expect("Timestamp(second) column expected");
+ datum.i64_val = array.value(row_id) *
MILLIS_PER_SECOND;
+ datum.i32_val = 0;
+ }
+ TimeUnit::Millisecond => {
+ let array = record_batch
+ .column(i)
+ .as_any()
+ .downcast_ref::<TimestampMillisecondArray>()
+ .expect("Timestamp(millisecond) column expected");
+ datum.i64_val = array.value(row_id);
+ datum.i32_val = 0;
+ }
+ TimeUnit::Microsecond => {
+ let array = record_batch
+ .column(i)
+ .as_any()
+ .downcast_ref::<TimestampMicrosecondArray>()
+ .expect("Timestamp(microsecond) column expected");
+ let micros = array.value(row_id);
+ datum.i64_val = micros.div_euclid(MICROS_PER_MILLI);
+ datum.i32_val =
+ (micros.rem_euclid(MICROS_PER_MILLI) *
NANOS_PER_MICRO) as i32;
+ }
+ TimeUnit::Nanosecond => {
+ let array = record_batch
+ .column(i)
+ .as_any()
+ .downcast_ref::<TimestampNanosecondArray>()
+ .expect("Timestamp(nanosecond) column expected");
+ let nanos = array.value(row_id);
+ datum.i64_val = nanos.div_euclid(NANOS_PER_MILLI);
+ datum.i32_val = nanos.rem_euclid(NANOS_PER_MILLI) as
i32;
+ }
}
- TimeUnit::Microsecond => {
- let array = record_batch
- .column(i)
- .as_any()
- .downcast_ref::<TimestampMicrosecondArray>()
- .expect("Timestamp(microsecond) column expected");
- let mut datum = new_datum(DATUM_TYPE_INT64);
- datum.i64_val = array.value(row_id);
- datum
- }
- TimeUnit::Nanosecond => {
- let array = record_batch
- .column(i)
- .as_any()
- .downcast_ref::<TimestampNanosecondArray>()
- .expect("Timestamp(nanosecond) column expected");
- let mut datum = new_datum(DATUM_TYPE_INT64);
- datum.i64_val = array.value(row_id);
- datum
- }
- },
+ datum
+ }
ArrowDataType::Time32(unit) => match unit {
TimeUnit::Second => {
let array = record_batch
@@ -414,8 +446,8 @@ fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) ->
Vec<ffi::FfiDatum> {
.as_any()
.downcast_ref::<Time32SecondArray>()
.expect("Time32(second) column expected");
- let mut datum = new_datum(DATUM_TYPE_INT32);
- datum.i32_val = array.value(row_id);
+ let mut datum = new_datum(DATUM_TYPE_TIME);
+ datum.i32_val = array.value(row_id) * MILLIS_PER_SECOND as
i32;
datum
}
TimeUnit::Millisecond => {
@@ -424,7 +456,7 @@ fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) ->
Vec<ffi::FfiDatum> {
.as_any()
.downcast_ref::<Time32MillisecondArray>()
.expect("Time32(millisecond) column expected");
- let mut datum = new_datum(DATUM_TYPE_INT32);
+ let mut datum = new_datum(DATUM_TYPE_TIME);
datum.i32_val = array.value(row_id);
datum
}
@@ -437,8 +469,8 @@ fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) ->
Vec<ffi::FfiDatum> {
.as_any()
.downcast_ref::<Time64MicrosecondArray>()
.expect("Time64(microsecond) column expected");
- let mut datum = new_datum(DATUM_TYPE_INT64);
- datum.i64_val = array.value(row_id);
+ let mut datum = new_datum(DATUM_TYPE_TIME);
+ datum.i32_val = (array.value(row_id) / MICROS_PER_MILLI)
as i32;
datum
}
TimeUnit::Nanosecond => {
@@ -447,8 +479,8 @@ fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) ->
Vec<ffi::FfiDatum> {
.as_any()
.downcast_ref::<Time64NanosecondArray>()
.expect("Time64(nanosecond) column expected");
- let mut datum = new_datum(DATUM_TYPE_INT64);
- datum.i64_val = array.value(row_id);
+ let mut datum = new_datum(DATUM_TYPE_TIME);
+ datum.i32_val = (array.value(row_id) / NANOS_PER_MILLI) as
i32;
datum
}
_ => panic!("Will never come here. Unsupported Time64 unit for
column {i}"),