This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new ec4eb74 chore: Scan results returned per bucket python/cpp (#351)
ec4eb74 is described below
commit ec4eb749216ff40fd0c45eb7a786e2602d19c418
Author: Anton Borisov <[email protected]>
AuthorDate: Fri Feb 20 16:02:40 2026 +0000
chore: Scan results returned per bucket python/cpp (#351)
---
bindings/cpp/examples/example.cpp | 300 +++++++++--------
bindings/cpp/include/fluss.hpp | 139 ++++++--
bindings/cpp/src/lib.rs | 254 +++++++++-----
bindings/cpp/src/table.cpp | 190 +++++++----
bindings/python/Cargo.toml | 1 +
bindings/python/example/example.py | 31 +-
bindings/python/fluss/__init__.pyi | 62 +++-
bindings/python/src/lib.rs | 1 +
bindings/python/src/table.rs | 374 ++++++++++++++++-----
bindings/python/test/test_log_table.py | 83 ++++-
website/docs/user-guide/cpp/api-reference.md | 65 +++-
website/docs/user-guide/cpp/example/log-tables.md | 12 +
website/docs/user-guide/python/api-reference.md | 56 ++-
.../docs/user-guide/python/example/log-tables.md | 11 +-
website/docs/user-guide/rust/example/log-tables.md | 15 +
15 files changed, 1167 insertions(+), 427 deletions(-)
diff --git a/bindings/cpp/examples/example.cpp
b/bindings/cpp/examples/example.cpp
index ea966d8..d86ee5c 100644
--- a/bindings/cpp/examples/example.cpp
+++ b/bindings/cpp/examples/example.cpp
@@ -168,58 +168,75 @@ int main() {
fluss::ScanRecords records;
check("poll", scanner.Poll(5000, records));
- std::cout << "Scanned records: " << records.Size() << std::endl;
+ // Flat iteration over all records (regardless of bucket)
+ std::cout << "Scanned records: " << records.Count() << " across " <<
records.BucketCount()
+ << " buckets" << std::endl;
+ for (const auto& rec : records) {
+ std::cout << " offset=" << rec.offset << " timestamp=" <<
rec.timestamp << std::endl;
+ }
+
+ // Per-bucket access (with type verification)
bool scan_ok = true;
bool found_null_row = false;
- for (const auto& rec : records) {
- // Check if this is the all-null row (matches Rust: is_null_at for
every column)
- if (rec.row.IsNull(0)) {
- found_null_row = true;
- for (size_t i = 0; i < rec.row.FieldCount(); ++i) {
- if (!rec.row.IsNull(i)) {
- std::cerr << "ERROR: column " << i << " should be null" <<
std::endl;
- scan_ok = false;
+ for (const auto& tb : records.Buckets()) {
+ auto view = records.Records(tb);
+ std::cout << " Bucket " << tb.bucket_id;
+ if (tb.partition_id.has_value()) {
+ std::cout << " (partition=" << *tb.partition_id << ")";
+ }
+ std::cout << ": " << view.Size() << " records" << std::endl;
+ for (const auto& rec : view) {
+ // Check if this is the all-null row
+ if (rec.row.IsNull(0)) {
+ found_null_row = true;
+ for (size_t i = 0; i < rec.row.FieldCount(); ++i) {
+ if (!rec.row.IsNull(i)) {
+ std::cerr << "ERROR: column " << i << " should be
null" << std::endl;
+ scan_ok = false;
+ }
}
+ std::cout << " [null row] all " << rec.row.FieldCount() <<
" fields are null"
+ << std::endl;
+ continue;
}
- std::cout << " [null row] all " << rec.row.FieldCount() << "
fields are null"
- << std::endl;
- continue;
- }
-
- // Non-null rows: verify types
- if (rec.row.GetType(4) != fluss::TypeId::Date) {
- std::cerr << "ERROR: field 4 expected Date, got "
- << static_cast<int>(rec.row.GetType(4)) << std::endl;
- scan_ok = false;
- }
- if (rec.row.GetType(5) != fluss::TypeId::Time) {
- std::cerr << "ERROR: field 5 expected Time, got "
- << static_cast<int>(rec.row.GetType(5)) << std::endl;
- scan_ok = false;
- }
- if (rec.row.GetType(6) != fluss::TypeId::Timestamp) {
- std::cerr << "ERROR: field 6 expected Timestamp, got "
- << static_cast<int>(rec.row.GetType(6)) << std::endl;
- scan_ok = false;
- }
- if (rec.row.GetType(7) != fluss::TypeId::TimestampLtz) {
- std::cerr << "ERROR: field 7 expected TimestampLtz, got "
- << static_cast<int>(rec.row.GetType(7)) << std::endl;
- scan_ok = false;
- }
- // Name-based getters (equivalent to index-based above)
- auto date = rec.row.GetDate("event_date");
- auto time = rec.row.GetTime("event_time");
- auto ts_ntz = rec.row.GetTimestamp("created_at");
- auto ts_ltz = rec.row.GetTimestamp("updated_at");
+ // Non-null rows: verify types
+ if (rec.row.GetType(4) != fluss::TypeId::Date) {
+ std::cerr << "ERROR: field 4 expected Date, got "
+ << static_cast<int>(rec.row.GetType(4)) << std::endl;
+ scan_ok = false;
+ }
+ if (rec.row.GetType(5) != fluss::TypeId::Time) {
+ std::cerr << "ERROR: field 5 expected Time, got "
+ << static_cast<int>(rec.row.GetType(5)) << std::endl;
+ scan_ok = false;
+ }
+ if (rec.row.GetType(6) != fluss::TypeId::Timestamp) {
+ std::cerr << "ERROR: field 6 expected Timestamp, got "
+ << static_cast<int>(rec.row.GetType(6)) << std::endl;
+ scan_ok = false;
+ }
+ if (rec.row.GetType(7) != fluss::TypeId::TimestampLtz) {
+ std::cerr << "ERROR: field 7 expected TimestampLtz, got "
+ << static_cast<int>(rec.row.GetType(7)) << std::endl;
+ scan_ok = false;
+ }
- std::cout << " id=" << rec.row.GetInt32("id") << " name=" <<
rec.row.GetString("name")
- << " score=" << rec.row.GetFloat32("score") << " age=" <<
rec.row.GetInt32("age")
- << " date=" << date.Year() << "-" << date.Month() << "-" <<
date.Day()
- << " time=" << time.Hour() << ":" << time.Minute() << ":" <<
time.Second()
- << " ts_ntz=" << ts_ntz.epoch_millis << " ts_ltz=" <<
ts_ltz.epoch_millis << "+"
- << ts_ltz.nano_of_millisecond << "ns" << std::endl;
+ // Name-based getters
+ auto date = rec.row.GetDate("event_date");
+ auto time = rec.row.GetTime("event_time");
+ auto ts_ntz = rec.row.GetTimestamp("created_at");
+ auto ts_ltz = rec.row.GetTimestamp("updated_at");
+
+ std::cout << " id=" << rec.row.GetInt32("id")
+ << " name=" << rec.row.GetString("name")
+ << " score=" << rec.row.GetFloat32("score")
+ << " age=" << rec.row.GetInt32("age") << " date=" <<
date.Year() << "-"
+ << date.Month() << "-" << date.Day() << " time=" <<
time.Hour() << ":"
+ << time.Minute() << ":" << time.Second() << " ts_ntz="
<< ts_ntz.epoch_millis
+ << " ts_ltz=" << ts_ltz.epoch_millis << "+" <<
ts_ltz.nano_of_millisecond
+ << "ns" << std::endl;
+ }
}
if (!found_null_row) {
@@ -246,32 +263,34 @@ int main() {
fluss::ScanRecords projected_records;
check("poll_projected", projected_scanner.Poll(5000, projected_records));
- std::cout << "Projected records: " << projected_records.Size() <<
std::endl;
- for (const auto& rec : projected_records) {
- if (rec.row.FieldCount() != 2) {
- std::cerr << "ERROR: expected 2 fields, got " <<
rec.row.FieldCount() << std::endl;
- scan_ok = false;
- continue;
- }
- // Skip the all-null row
- if (rec.row.IsNull(0)) {
- std::cout << " [null row] skipped" << std::endl;
- continue;
- }
- if (rec.row.GetType(0) != fluss::TypeId::Int) {
- std::cerr << "ERROR: projected field 0 expected Int, got "
- << static_cast<int>(rec.row.GetType(0)) << std::endl;
- scan_ok = false;
- }
- if (rec.row.GetType(1) != fluss::TypeId::TimestampLtz) {
- std::cerr << "ERROR: projected field 1 expected TimestampLtz, got "
- << static_cast<int>(rec.row.GetType(1)) << std::endl;
- scan_ok = false;
- }
+ std::cout << "Projected records: " << projected_records.Count() <<
std::endl;
+ for (const auto& tb : projected_records.Buckets()) {
+ for (const auto& rec : projected_records.Records(tb)) {
+ if (rec.row.FieldCount() != 2) {
+ std::cerr << "ERROR: expected 2 fields, got " <<
rec.row.FieldCount() << std::endl;
+ scan_ok = false;
+ continue;
+ }
+ // Skip the all-null row
+ if (rec.row.IsNull(0)) {
+ std::cout << " [null row] skipped" << std::endl;
+ continue;
+ }
+ if (rec.row.GetType(0) != fluss::TypeId::Int) {
+ std::cerr << "ERROR: projected field 0 expected Int, got "
+ << static_cast<int>(rec.row.GetType(0)) << std::endl;
+ scan_ok = false;
+ }
+ if (rec.row.GetType(1) != fluss::TypeId::TimestampLtz) {
+ std::cerr << "ERROR: projected field 1 expected TimestampLtz,
got "
+ << static_cast<int>(rec.row.GetType(1)) << std::endl;
+ scan_ok = false;
+ }
- auto ts = rec.row.GetTimestamp(1);
- std::cout << " id=" << rec.row.GetInt32(0) << " updated_at=" <<
ts.epoch_millis << "+"
- << ts.nano_of_millisecond << "ns" << std::endl;
+ auto ts = rec.row.GetTimestamp(1);
+ std::cout << " id=" << rec.row.GetInt32(0) << " updated_at=" <<
ts.epoch_millis << "+"
+ << ts.nano_of_millisecond << "ns" << std::endl;
+ }
}
// 7b) Projected scan by column names — same columns as above but using
names
@@ -287,32 +306,34 @@ int main() {
fluss::ScanRecords name_projected_records;
check("poll_name_projected", name_projected_scanner.Poll(5000,
name_projected_records));
- std::cout << "Name-projected records: " << name_projected_records.Size()
<< std::endl;
- for (const auto& rec : name_projected_records) {
- if (rec.row.FieldCount() != 2) {
- std::cerr << "ERROR: expected 2 fields, got " <<
rec.row.FieldCount() << std::endl;
- scan_ok = false;
- continue;
- }
- // Skip the all-null row
- if (rec.row.IsNull(0)) {
- std::cout << " [null row] skipped" << std::endl;
- continue;
- }
- if (rec.row.GetType(0) != fluss::TypeId::Int) {
- std::cerr << "ERROR: name-projected field 0 expected Int, got "
- << static_cast<int>(rec.row.GetType(0)) << std::endl;
- scan_ok = false;
- }
- if (rec.row.GetType(1) != fluss::TypeId::TimestampLtz) {
- std::cerr << "ERROR: name-projected field 1 expected TimestampLtz,
got "
- << static_cast<int>(rec.row.GetType(1)) << std::endl;
- scan_ok = false;
- }
+ std::cout << "Name-projected records: " << name_projected_records.Count()
<< std::endl;
+ for (const auto& tb : name_projected_records.Buckets()) {
+ for (const auto& rec : name_projected_records.Records(tb)) {
+ if (rec.row.FieldCount() != 2) {
+ std::cerr << "ERROR: expected 2 fields, got " <<
rec.row.FieldCount() << std::endl;
+ scan_ok = false;
+ continue;
+ }
+ // Skip the all-null row
+ if (rec.row.IsNull(0)) {
+ std::cout << " [null row] skipped" << std::endl;
+ continue;
+ }
+ if (rec.row.GetType(0) != fluss::TypeId::Int) {
+ std::cerr << "ERROR: name-projected field 0 expected Int, got "
+ << static_cast<int>(rec.row.GetType(0)) << std::endl;
+ scan_ok = false;
+ }
+ if (rec.row.GetType(1) != fluss::TypeId::TimestampLtz) {
+ std::cerr << "ERROR: name-projected field 1 expected
TimestampLtz, got "
+ << static_cast<int>(rec.row.GetType(1)) << std::endl;
+ scan_ok = false;
+ }
- auto ts = rec.row.GetTimestamp(1);
- std::cout << " id=" << rec.row.GetInt32(0) << " updated_at=" <<
ts.epoch_millis << "+"
- << ts.nano_of_millisecond << "ns" << std::endl;
+ auto ts = rec.row.GetTimestamp(1);
+ std::cout << " id=" << rec.row.GetInt32(0) << " updated_at=" <<
ts.epoch_millis << "+"
+ << ts.nano_of_millisecond << "ns" << std::endl;
+ }
}
if (scan_ok) {
@@ -356,8 +377,8 @@ int main() {
std::unordered_map<int32_t, int64_t> timestamp_offsets;
check("list_timestamp_offsets",
- admin.ListOffsets(table_path, all_bucket_ids,
- fluss::OffsetSpec::Timestamp(timestamp_ms),
timestamp_offsets));
+ admin.ListOffsets(table_path, all_bucket_ids,
fluss::OffsetSpec::Timestamp(timestamp_ms),
+ timestamp_offsets));
std::cout << "Offsets for timestamp " << timestamp_ms << " (1 hour ago):"
<< std::endl;
for (const auto& [bucket_id, offset] : timestamp_offsets) {
std::cout << " Bucket " << bucket_id << ": offset=" << offset <<
std::endl;
@@ -381,15 +402,21 @@ int main() {
fluss::ScanRecords batch_records;
check("poll_batch", batch_scanner.Poll(5000, batch_records));
- std::cout << "Scanned " << batch_records.Size() << " records from batch
subscription"
+ std::cout << "Scanned " << batch_records.Count() << " records from batch
subscription"
<< std::endl;
- for (size_t i = 0; i < batch_records.Size() && i < 5; ++i) {
- const auto& rec = batch_records[i];
- std::cout << " Record " << i << ": bucket_id=" << rec.bucket_id
- << ", offset=" << rec.offset << ", timestamp=" <<
rec.timestamp << std::endl;
- }
- if (batch_records.Size() > 5) {
- std::cout << " ... and " << (batch_records.Size() - 5) << " more
records" << std::endl;
+ for (const auto& tb : batch_records.Buckets()) {
+ size_t shown = 0;
+ for (const auto& rec : batch_records.Records(tb)) {
+ if (shown < 5) {
+ std::cout << " bucket_id=" << tb.bucket_id << ", offset=" <<
rec.offset
+ << ", timestamp=" << rec.timestamp << std::endl;
+ }
+ ++shown;
+ }
+ if (shown > 5) {
+ std::cout << " ... and " << (shown - 5) << " more records in
bucket " << tb.bucket_id
+ << std::endl;
+ }
}
// 9.1) Unsubscribe from a bucket
@@ -520,11 +547,13 @@ int main() {
fluss::ScanRecords arrow_write_records;
check("poll_arrow_write", arrow_write_scanner.Poll(5000,
arrow_write_records));
- std::cout << "Scanned " << arrow_write_records.Size()
+ std::cout << "Scanned " << arrow_write_records.Count()
<< " records written via AppendArrowBatch:" << std::endl;
- for (const auto& rec : arrow_write_records) {
- std::cout << " id=" << rec.row.GetInt32(0) << " name=" <<
rec.row.GetString(1)
- << " score=" << rec.row.GetFloat32(2) << std::endl;
+ for (const auto& tb : arrow_write_records.Buckets()) {
+ for (const auto& rec : arrow_write_records.Records(tb)) {
+ std::cout << " id=" << rec.row.GetInt32(0) << " name=" <<
rec.row.GetString(1)
+ << " score=" << rec.row.GetFloat32(2) << std::endl;
+ }
}
}
@@ -591,11 +620,13 @@ int main() {
fluss::ScanRecords decimal_records;
check("poll_decimal", decimal_scanner.Poll(5000, decimal_records));
- std::cout << "Scanned decimal records: " << decimal_records.Size() <<
std::endl;
- for (const auto& rec : decimal_records) {
- std::cout << " id=" << rec.row.GetInt32(0) << " price=" <<
rec.row.GetDecimalString(1)
- << " amount=" << rec.row.GetDecimalString(2)
- << " is_decimal=" << rec.row.IsDecimal(1) << std::endl;
+ std::cout << "Scanned decimal records: " << decimal_records.Count() <<
std::endl;
+ for (const auto& tb : decimal_records.Buckets()) {
+ for (const auto& rec : decimal_records.Records(tb)) {
+ std::cout << " id=" << rec.row.GetInt32(0) << " price=" <<
rec.row.GetDecimalString(1)
+ << " amount=" << rec.row.GetDecimalString(2)
+ << " is_decimal=" << rec.row.IsDecimal(1) << std::endl;
+ }
}
// 14) Partitioned table example
@@ -690,14 +721,15 @@ int main() {
fluss::ScanRecords partition_records;
check("poll_partitioned", partition_scanner.Poll(5000, partition_records));
- std::cout << "Scanned " << partition_records.Size() << " records from
partitioned table"
+ std::cout << "Scanned " << partition_records.Count() << " records from
partitioned table"
<< std::endl;
- for (size_t i = 0; i < partition_records.Size(); ++i) {
- const auto& rec = partition_records[i];
- std::cout << " Record " << i << ": partition_id="
- << (rec.partition_id.has_value() ?
std::to_string(*rec.partition_id) : "none")
- << ", id=" << rec.row.GetInt32(0) << ", region=" <<
rec.row.GetString(1)
- << ", value=" << rec.row.GetInt64(2) << std::endl;
+ for (const auto& tb : partition_records.Buckets()) {
+ for (const auto& rec : partition_records.Records(tb)) {
+ std::cout << " partition_id="
+ << (tb.partition_id.has_value() ?
std::to_string(*tb.partition_id) : "none")
+ << ", id=" << rec.row.GetInt32(0) << ", region=" <<
rec.row.GetString(1)
+ << ", value=" << rec.row.GetInt64(2) << std::endl;
+ }
}
// 14.2) subscribe_partition_buckets: batch subscribe to all partitions at
once
@@ -717,13 +749,13 @@ int main() {
fluss::ScanRecords partition_batch_records;
check("poll_partition_batch", partition_batch_scanner.Poll(5000,
partition_batch_records));
- std::cout << "Scanned " << partition_batch_records.Size()
+ std::cout << "Scanned " << partition_batch_records.Count()
<< " records from batch partition subscription" << std::endl;
- for (size_t i = 0; i < partition_batch_records.Size(); ++i) {
- const auto& rec = partition_batch_records[i];
- std::cout << " Record " << i << ": id=" << rec.row.GetInt32(0)
- << ", region=" << rec.row.GetString(1) << ", value=" <<
rec.row.GetInt64(2)
- << std::endl;
+ for (const auto& tb : partition_batch_records.Buckets()) {
+ for (const auto& rec : partition_batch_records.Records(tb)) {
+ std::cout << " id=" << rec.row.GetInt32(0) << ", region=" <<
rec.row.GetString(1)
+ << ", value=" << rec.row.GetInt64(2) << std::endl;
+ }
}
// 14.3) UnsubscribePartition: unsubscribe from one partition, verify
remaining
@@ -743,12 +775,12 @@ int main() {
fluss::ScanRecords unsub_records;
check("poll_after_unsub", unsub_partition_scanner.Poll(5000,
unsub_records));
- std::cout << "After unsubscribe, scanned " << unsub_records.Size() << "
records" << std::endl;
- for (size_t i = 0; i < unsub_records.Size(); ++i) {
- const auto& rec = unsub_records[i];
- std::cout << " Record " << i << ": id=" << rec.row.GetInt32(0)
- << ", region=" << rec.row.GetString(1) << ", value=" <<
rec.row.GetInt64(2)
- << std::endl;
+ std::cout << "After unsubscribe, scanned " << unsub_records.Count() << "
records" << std::endl;
+ for (const auto& tb : unsub_records.Buckets()) {
+ for (const auto& rec : unsub_records.Records(tb)) {
+ std::cout << " id=" << rec.row.GetInt32(0) << ", region=" <<
rec.row.GetString(1)
+ << ", value=" << rec.row.GetInt64(2) << std::endl;
+ }
}
// Cleanup
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index 27d1fcb..9ea7e41 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -507,6 +507,17 @@ struct NamedGetters {
private:
const Derived& Self() const { return static_cast<const Derived&>(*this); }
};
+
+struct ScanData {
+ ffi::ScanResultInner* raw;
+ ColumnMap columns;
+
+ ScanData(ffi::ScanResultInner* r, ColumnMap cols) : raw(r),
columns(std::move(cols)) {}
+ ~ScanData();
+
+ ScanData(const ScanData&) = delete;
+ ScanData& operator=(const ScanData&) = delete;
+};
} // namespace detail
class GenericRow {
@@ -623,9 +634,8 @@ class RowView : public detail::NamedGetters<RowView> {
friend struct detail::NamedGetters<RowView>;
public:
- RowView(std::shared_ptr<const ffi::ScanResultInner> inner, size_t
record_idx,
- std::shared_ptr<const detail::ColumnMap> column_map)
- : inner_(std::move(inner)), record_idx_(record_idx),
column_map_(std::move(column_map)) {}
+ RowView(std::shared_ptr<const detail::ScanData> data, size_t bucket_idx,
size_t rec_idx)
+ : data_(std::move(data)), bucket_idx_(bucket_idx), rec_idx_(rec_idx) {}
// ── Index-based getters ──────────────────────────────────────────
size_t FieldCount() const;
@@ -660,14 +670,28 @@ class RowView : public detail::NamedGetters<RowView> {
private:
size_t Resolve(const std::string& name) const {
- if (!column_map_) {
+ if (!data_) {
throw std::runtime_error("RowView: name-based access not
available");
}
- return detail::ResolveColumn(*column_map_, name);
+ return detail::ResolveColumn(data_->columns, name);
+ }
+ std::shared_ptr<const detail::ScanData> data_;
+ size_t bucket_idx_;
+ size_t rec_idx_;
+};
+
+/// Identifies a specific bucket, optionally within a partition.
+struct TableBucket {
+ int64_t table_id;
+ int32_t bucket_id;
+ std::optional<int64_t> partition_id;
+
+ bool operator==(const TableBucket& other) const {
+ return table_id == other.table_id && bucket_id == other.bucket_id &&
+ partition_id == other.partition_id;
}
- std::shared_ptr<const ffi::ScanResultInner> inner_;
- size_t record_idx_;
- std::shared_ptr<const detail::ColumnMap> column_map_;
+
+ bool operator!=(const TableBucket& other) const { return !(*this ==
other); }
};
/// A single scan record. Contains metadata and a RowView for field access.
@@ -675,14 +699,61 @@ class RowView : public detail::NamedGetters<RowView> {
/// ScanRecord is a value type that can be freely copied, stored, and
/// accumulated across multiple Poll() calls.
struct ScanRecord {
- int32_t bucket_id;
- std::optional<int64_t> partition_id;
int64_t offset;
int64_t timestamp;
ChangeType change_type;
RowView row;
};
+/// A view into a subset of scan results for a single bucket.
+///
+/// BucketView is a value type — it shares ownership of the underlying scan
data
+/// via reference counting, so it can safely outlive the ScanRecords that
produced it.
+class BucketView {
+ public:
+ BucketView(std::shared_ptr<const detail::ScanData> data, TableBucket
bucket, size_t bucket_idx,
+ size_t count)
+ : data_(std::move(data)),
+ bucket_(std::move(bucket)),
+ bucket_idx_(bucket_idx),
+ count_(count) {}
+
+ /// The bucket these records belong to.
+ const TableBucket& Bucket() const { return bucket_; }
+
+ /// Number of records in this bucket.
+ size_t Size() const { return count_; }
+ bool Empty() const { return count_ == 0; }
+
+ /// Access a record by its position within this bucket (0-based).
+ ScanRecord operator[](size_t idx) const;
+
+ class Iterator {
+ public:
+ ScanRecord operator*() const;
+ Iterator& operator++() {
+ ++idx_;
+ return *this;
+ }
+ bool operator!=(const Iterator& other) const { return idx_ !=
other.idx_; }
+
+ private:
+ friend class BucketView;
+ Iterator(const BucketView* owner, size_t idx) : owner_(owner),
idx_(idx) {}
+ const BucketView* owner_;
+ size_t idx_;
+ };
+
+ Iterator begin() const { return Iterator(this, 0); }
+ Iterator end() const { return Iterator(this, count_); }
+
+ private:
+ std::shared_ptr<const detail::ScanData> data_;
+ TableBucket bucket_;
+ size_t bucket_idx_;
+ size_t count_;
+};
+
class ScanRecords {
public:
ScanRecords() noexcept = default;
@@ -693,36 +764,52 @@ class ScanRecords {
ScanRecords(ScanRecords&&) noexcept = default;
ScanRecords& operator=(ScanRecords&&) noexcept = default;
- size_t Size() const;
- bool Empty() const;
- ScanRecord operator[](size_t idx) const;
+ /// Total number of records across all buckets.
+ size_t Count() const;
+ bool IsEmpty() const;
+
+ /// Number of distinct buckets with records.
+ size_t BucketCount() const;
+
+ /// List of distinct buckets that have records.
+ std::vector<TableBucket> Buckets() const;
+
+ /// Get a view of records for a specific bucket.
+ ///
+ /// Returns an empty BucketView if the bucket is not present (matches
Rust/Java).
+ /// Note: O(B) linear scan. For iteration over all buckets, prefer
BucketAt(idx).
+ BucketView Records(const TableBucket& bucket) const;
+ /// Get a view of records by bucket index (0-based). O(1).
+ ///
+ /// Throws std::out_of_range if idx >= BucketCount().
+ BucketView BucketAt(size_t idx) const;
+
+ /// Flat iterator over all records across all buckets (matches Java
Iterable<ScanRecord>).
class Iterator {
public:
ScanRecord operator*() const;
- Iterator& operator++() {
- ++idx_;
- return *this;
+ Iterator& operator++();
+ bool operator!=(const Iterator& other) const {
+ return bucket_idx_ != other.bucket_idx_ || rec_idx_ !=
other.rec_idx_;
}
- bool operator!=(const Iterator& other) const { return idx_ !=
other.idx_; }
private:
friend class ScanRecords;
- Iterator(const ScanRecords* owner, size_t idx) : owner_(owner),
idx_(idx) {}
+ Iterator(const ScanRecords* owner, size_t bucket_idx, size_t rec_idx)
+ : owner_(owner), bucket_idx_(bucket_idx), rec_idx_(rec_idx) {}
const ScanRecords* owner_;
- size_t idx_;
+ size_t bucket_idx_;
+ size_t rec_idx_;
};
- Iterator begin() const { return Iterator(this, 0); }
- Iterator end() const { return Iterator(this, Size()); }
+ Iterator begin() const;
+ Iterator end() const { return Iterator(this, BucketCount(), 0); }
private:
- /// Returns the column name-to-index map (lazy-built, cached).
- const std::shared_ptr<detail::ColumnMap>& GetColumnMap() const;
friend class LogScanner;
- void BuildColumnMap() const;
- std::shared_ptr<ffi::ScanResultInner> inner_;
- mutable std::shared_ptr<detail::ColumnMap> column_map_;
+ ScanRecord RecordAt(size_t bucket, size_t rec_idx) const;
+ std::shared_ptr<const detail::ScanData> data_;
};
class ArrowRecordBatch {
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index cb29882..9f987b9 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -141,6 +141,14 @@ mod ffi {
timestamp: i64,
}
+ struct FfiBucketInfo {
+ table_id: i64,
+ bucket_id: i32,
+ has_partition_id: bool,
+ partition_id: i64,
+ record_count: usize,
+ }
+
struct FfiBucketSubscription {
bucket_id: i32,
offset: i64,
@@ -420,27 +428,96 @@ mod ffi {
fn sv_column_count(self: &ScanResultInner) -> usize;
fn sv_column_name(self: &ScanResultInner, field: usize) ->
Result<&str>;
fn sv_column_type(self: &ScanResultInner, field: usize) -> Result<i32>;
- fn sv_bucket_id(self: &ScanResultInner, rec: usize) -> i32;
- fn sv_has_partition_id(self: &ScanResultInner, rec: usize) -> bool;
- fn sv_partition_id(self: &ScanResultInner, rec: usize) -> i64;
- fn sv_offset(self: &ScanResultInner, rec: usize) -> i64;
- fn sv_timestamp(self: &ScanResultInner, rec: usize) -> i64;
- fn sv_change_type(self: &ScanResultInner, rec: usize) -> i32;
+ fn sv_offset(self: &ScanResultInner, bucket: usize, rec: usize) -> i64;
+ fn sv_timestamp(self: &ScanResultInner, bucket: usize, rec: usize) ->
i64;
+ fn sv_change_type(self: &ScanResultInner, bucket: usize, rec: usize)
-> i32;
fn sv_field_count(self: &ScanResultInner) -> usize;
- fn sv_is_null(self: &ScanResultInner, rec: usize, field: usize) ->
Result<bool>;
- fn sv_get_bool(self: &ScanResultInner, rec: usize, field: usize) ->
Result<bool>;
- fn sv_get_i32(self: &ScanResultInner, rec: usize, field: usize) ->
Result<i32>;
- fn sv_get_i64(self: &ScanResultInner, rec: usize, field: usize) ->
Result<i64>;
- fn sv_get_f32(self: &ScanResultInner, rec: usize, field: usize) ->
Result<f32>;
- fn sv_get_f64(self: &ScanResultInner, rec: usize, field: usize) ->
Result<f64>;
- fn sv_get_str(self: &ScanResultInner, rec: usize, field: usize) ->
Result<&str>;
- fn sv_get_bytes(self: &ScanResultInner, rec: usize, field: usize) ->
Result<&[u8]>;
- fn sv_get_date_days(self: &ScanResultInner, rec: usize, field: usize)
-> Result<i32>;
- fn sv_get_time_millis(self: &ScanResultInner, rec: usize, field:
usize) -> Result<i32>;
- fn sv_get_ts_millis(self: &ScanResultInner, rec: usize, field: usize)
-> Result<i64>;
- fn sv_get_ts_nanos(self: &ScanResultInner, rec: usize, field: usize)
-> Result<i32>;
- fn sv_is_ts_ltz(self: &ScanResultInner, rec: usize, field: usize) ->
Result<bool>;
- fn sv_get_decimal_str(self: &ScanResultInner, rec: usize, field:
usize) -> Result<String>;
+ fn sv_is_null(
+ self: &ScanResultInner,
+ bucket: usize,
+ rec: usize,
+ field: usize,
+ ) -> Result<bool>;
+ fn sv_get_bool(
+ self: &ScanResultInner,
+ bucket: usize,
+ rec: usize,
+ field: usize,
+ ) -> Result<bool>;
+ fn sv_get_i32(
+ self: &ScanResultInner,
+ bucket: usize,
+ rec: usize,
+ field: usize,
+ ) -> Result<i32>;
+ fn sv_get_i64(
+ self: &ScanResultInner,
+ bucket: usize,
+ rec: usize,
+ field: usize,
+ ) -> Result<i64>;
+ fn sv_get_f32(
+ self: &ScanResultInner,
+ bucket: usize,
+ rec: usize,
+ field: usize,
+ ) -> Result<f32>;
+ fn sv_get_f64(
+ self: &ScanResultInner,
+ bucket: usize,
+ rec: usize,
+ field: usize,
+ ) -> Result<f64>;
+ fn sv_get_str(
+ self: &ScanResultInner,
+ bucket: usize,
+ rec: usize,
+ field: usize,
+ ) -> Result<&str>;
+ fn sv_get_bytes(
+ self: &ScanResultInner,
+ bucket: usize,
+ rec: usize,
+ field: usize,
+ ) -> Result<&[u8]>;
+ fn sv_get_date_days(
+ self: &ScanResultInner,
+ bucket: usize,
+ rec: usize,
+ field: usize,
+ ) -> Result<i32>;
+ fn sv_get_time_millis(
+ self: &ScanResultInner,
+ bucket: usize,
+ rec: usize,
+ field: usize,
+ ) -> Result<i32>;
+ fn sv_get_ts_millis(
+ self: &ScanResultInner,
+ bucket: usize,
+ rec: usize,
+ field: usize,
+ ) -> Result<i64>;
+ fn sv_get_ts_nanos(
+ self: &ScanResultInner,
+ bucket: usize,
+ rec: usize,
+ field: usize,
+ ) -> Result<i32>;
+ fn sv_is_ts_ltz(
+ self: &ScanResultInner,
+ bucket: usize,
+ rec: usize,
+ field: usize,
+ ) -> Result<bool>;
+ fn sv_get_decimal_str(
+ self: &ScanResultInner,
+ bucket: usize,
+ rec: usize,
+ field: usize,
+ ) -> Result<String>;
+
+ fn sv_bucket_infos(self: &ScanResultInner) -> &Vec<FfiBucketInfo>;
}
}
@@ -1487,24 +1564,27 @@ impl LogScanner {
match result {
Ok(records) => {
let columns = self.projected_columns.clone();
- // Flatten ScanRecords into a Vec<FlatScanRecord> — moves
Arc<RecordBatch>, zero copy
- let mut flat = Vec::with_capacity(records.count());
+ let mut total_count = 0usize;
+ let mut buckets = Vec::new();
+ let mut bucket_infos = Vec::new();
for (table_bucket, bucket_records) in
records.into_records_by_buckets() {
- let bucket_id = table_bucket.bucket_id();
- let partition = table_bucket.partition_id();
- for record in bucket_records {
- flat.push(FlatScanRecord {
- bucket_id,
- has_partition_id: partition.is_some(),
- partition_id: partition.unwrap_or(0),
- record,
- });
- }
+ let count = bucket_records.len();
+ total_count += count;
+ bucket_infos.push(ffi::FfiBucketInfo {
+ table_id: table_bucket.table_id(),
+ bucket_id: table_bucket.bucket_id(),
+ has_partition_id:
table_bucket.partition_id().is_some(),
+ partition_id: table_bucket.partition_id().unwrap_or(0),
+ record_count: count,
+ });
+ buckets.push((table_bucket, bucket_records));
}
Box::new(ScanResultInner {
error: None,
- records: flat,
+ buckets,
columns,
+ bucket_infos,
+ total_count,
})
}
Err(e) => {
@@ -1917,28 +1997,29 @@ mod row_reader {
// Opaque types: ScanResultInner (scan read path)
// ============================================================================
-struct FlatScanRecord {
- bucket_id: i32,
- has_partition_id: bool,
- partition_id: i64,
- record: fcore::record::ScanRecord,
-}
-
pub struct ScanResultInner {
error: Option<(i32, String)>,
- records: Vec<FlatScanRecord>,
+ buckets: Vec<(fcore::metadata::TableBucket,
Vec<fcore::record::ScanRecord>)>,
columns: Vec<fcore::metadata::Column>,
+ bucket_infos: Vec<ffi::FfiBucketInfo>,
+ total_count: usize,
}
impl ScanResultInner {
fn from_error(code: i32, msg: String) -> Self {
Self {
error: Some((code, msg)),
- records: Vec::new(),
+ buckets: Vec::new(),
columns: Vec::new(),
+ bucket_infos: Vec::new(),
+ total_count: 0,
}
}
+ fn resolve(&self, bucket: usize, rec: usize) -> &fcore::record::ScanRecord
{
+ &self.buckets[bucket].1[rec]
+ }
+
fn sv_has_error(&self) -> bool {
self.error.is_some()
}
@@ -1952,7 +2033,7 @@ impl ScanResultInner {
}
fn sv_record_count(&self) -> usize {
- self.records.len()
+ self.total_count
}
fn sv_column_count(&self) -> usize {
@@ -1965,71 +2046,70 @@ impl ScanResultInner {
row_reader::column_type(&self.columns, field)
}
- // Metadata accessors — C++ validates rec in operator[] before calling
these.
- fn sv_bucket_id(&self, rec: usize) -> i32 {
- self.records[rec].bucket_id
+ fn sv_offset(&self, bucket: usize, rec: usize) -> i64 {
+ self.resolve(bucket, rec).offset()
}
- fn sv_has_partition_id(&self, rec: usize) -> bool {
- self.records[rec].has_partition_id
+ fn sv_timestamp(&self, bucket: usize, rec: usize) -> i64 {
+ self.resolve(bucket, rec).timestamp()
}
- fn sv_partition_id(&self, rec: usize) -> i64 {
- self.records[rec].partition_id
- }
- fn sv_offset(&self, rec: usize) -> i64 {
- self.records[rec].record.offset()
- }
- fn sv_timestamp(&self, rec: usize) -> i64 {
- self.records[rec].record.timestamp()
- }
- fn sv_change_type(&self, rec: usize) -> i32 {
- self.records[rec].record.change_type().to_byte_value() as i32
+ fn sv_change_type(&self, bucket: usize, rec: usize) -> i32 {
+ self.resolve(bucket, rec).change_type().to_byte_value() as i32
}
fn sv_field_count(&self) -> usize {
self.columns.len()
}
- // Field accessors — C++ validates rec in operator[], validate() checks
field.
- fn sv_is_null(&self, rec: usize, field: usize) -> Result<bool, String> {
- row_reader::is_null(self.records[rec].record.row(), &self.columns,
field)
+ // Field accessors — C++ validates bounds in BucketView/RecordAt,
validate() checks field.
+ fn sv_is_null(&self, bucket: usize, rec: usize, field: usize) ->
Result<bool, String> {
+ row_reader::is_null(self.resolve(bucket, rec).row(), &self.columns,
field)
}
- fn sv_get_bool(&self, rec: usize, field: usize) -> Result<bool, String> {
- row_reader::get_bool(self.records[rec].record.row(), &self.columns,
field)
+ fn sv_get_bool(&self, bucket: usize, rec: usize, field: usize) ->
Result<bool, String> {
+ row_reader::get_bool(self.resolve(bucket, rec).row(), &self.columns,
field)
}
- fn sv_get_i32(&self, rec: usize, field: usize) -> Result<i32, String> {
- row_reader::get_i32(self.records[rec].record.row(), &self.columns,
field)
+ fn sv_get_i32(&self, bucket: usize, rec: usize, field: usize) ->
Result<i32, String> {
+ row_reader::get_i32(self.resolve(bucket, rec).row(), &self.columns,
field)
}
- fn sv_get_i64(&self, rec: usize, field: usize) -> Result<i64, String> {
- row_reader::get_i64(self.records[rec].record.row(), &self.columns,
field)
+ fn sv_get_i64(&self, bucket: usize, rec: usize, field: usize) ->
Result<i64, String> {
+ row_reader::get_i64(self.resolve(bucket, rec).row(), &self.columns,
field)
}
- fn sv_get_f32(&self, rec: usize, field: usize) -> Result<f32, String> {
- row_reader::get_f32(self.records[rec].record.row(), &self.columns,
field)
+ fn sv_get_f32(&self, bucket: usize, rec: usize, field: usize) ->
Result<f32, String> {
+ row_reader::get_f32(self.resolve(bucket, rec).row(), &self.columns,
field)
}
- fn sv_get_f64(&self, rec: usize, field: usize) -> Result<f64, String> {
- row_reader::get_f64(self.records[rec].record.row(), &self.columns,
field)
+ fn sv_get_f64(&self, bucket: usize, rec: usize, field: usize) ->
Result<f64, String> {
+ row_reader::get_f64(self.resolve(bucket, rec).row(), &self.columns,
field)
}
- fn sv_get_str(&self, rec: usize, field: usize) -> Result<&str, String> {
- row_reader::get_str(self.records[rec].record.row(), &self.columns,
field)
+ fn sv_get_str(&self, bucket: usize, rec: usize, field: usize) ->
Result<&str, String> {
+ row_reader::get_str(self.resolve(bucket, rec).row(), &self.columns,
field)
}
- fn sv_get_bytes(&self, rec: usize, field: usize) -> Result<&[u8], String> {
- row_reader::get_bytes(self.records[rec].record.row(), &self.columns,
field)
+ fn sv_get_bytes(&self, bucket: usize, rec: usize, field: usize) ->
Result<&[u8], String> {
+ row_reader::get_bytes(self.resolve(bucket, rec).row(), &self.columns,
field)
}
- fn sv_get_date_days(&self, rec: usize, field: usize) -> Result<i32,
String> {
- row_reader::get_date_days(self.records[rec].record.row(),
&self.columns, field)
+ fn sv_get_date_days(&self, bucket: usize, rec: usize, field: usize) ->
Result<i32, String> {
+ row_reader::get_date_days(self.resolve(bucket, rec).row(),
&self.columns, field)
}
- fn sv_get_time_millis(&self, rec: usize, field: usize) -> Result<i32,
String> {
- row_reader::get_time_millis(self.records[rec].record.row(),
&self.columns, field)
+ fn sv_get_time_millis(&self, bucket: usize, rec: usize, field: usize) ->
Result<i32, String> {
+ row_reader::get_time_millis(self.resolve(bucket, rec).row(),
&self.columns, field)
}
- fn sv_get_ts_millis(&self, rec: usize, field: usize) -> Result<i64,
String> {
- row_reader::get_ts_millis(self.records[rec].record.row(),
&self.columns, field)
+ fn sv_get_ts_millis(&self, bucket: usize, rec: usize, field: usize) ->
Result<i64, String> {
+ row_reader::get_ts_millis(self.resolve(bucket, rec).row(),
&self.columns, field)
}
- fn sv_get_ts_nanos(&self, rec: usize, field: usize) -> Result<i32, String>
{
- row_reader::get_ts_nanos(self.records[rec].record.row(),
&self.columns, field)
+ fn sv_get_ts_nanos(&self, bucket: usize, rec: usize, field: usize) ->
Result<i32, String> {
+ row_reader::get_ts_nanos(self.resolve(bucket, rec).row(),
&self.columns, field)
}
- fn sv_is_ts_ltz(&self, _rec: usize, field: usize) -> Result<bool, String> {
+ fn sv_is_ts_ltz(&self, _bucket: usize, _rec: usize, field: usize) ->
Result<bool, String> {
row_reader::is_ts_ltz(&self.columns, field)
}
- fn sv_get_decimal_str(&self, rec: usize, field: usize) -> Result<String,
String> {
- row_reader::get_decimal_str(self.records[rec].record.row(),
&self.columns, field)
+ fn sv_get_decimal_str(
+ &self,
+ bucket: usize,
+ rec: usize,
+ field: usize,
+ ) -> Result<String, String> {
+ row_reader::get_decimal_str(self.resolve(bucket, rec).row(),
&self.columns, field)
+ }
+
+ fn sv_bucket_infos(&self) -> &Vec<ffi::FfiBucketInfo> {
+ &self.bucket_infos
}
}
diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp
index a697cea..73035bb 100644
--- a/bindings/cpp/src/table.cpp
+++ b/bindings/cpp/src/table.cpp
@@ -191,75 +191,91 @@ void GenericRow::SetDecimal(size_t idx, const
std::string& value) {
inner_->gr_set_decimal_str(idx, value);
}
+// ============================================================================
+// ScanData — destructor must live in .cpp where rust::Box is visible
+// ============================================================================
+
+detail::ScanData::~ScanData() {
+ if (raw) {
+ rust::Box<ffi::ScanResultInner>::from_raw(raw);
+ }
+}
+
// ============================================================================
// RowView — zero-copy read-only row view for scan results
// ============================================================================
-size_t RowView::FieldCount() const { return inner_ ? inner_->sv_field_count()
: 0; }
+// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
+#define CHECK_DATA(name)
\
+ do {
\
+ if (!data_) throw std::logic_error(name ": not available (moved-from
or null)"); \
+ } while (0)
+
+size_t RowView::FieldCount() const { return data_ ?
data_->raw->sv_field_count() : 0; }
TypeId RowView::GetType(size_t idx) const {
- CHECK_INNER("RowView");
- return static_cast<TypeId>(inner_->sv_column_type(idx));
+ CHECK_DATA("RowView");
+ return static_cast<TypeId>(data_->raw->sv_column_type(idx));
}
bool RowView::IsNull(size_t idx) const {
- CHECK_INNER("RowView");
- return inner_->sv_is_null(record_idx_, idx);
+ CHECK_DATA("RowView");
+ return data_->raw->sv_is_null(bucket_idx_, rec_idx_, idx);
}
bool RowView::GetBool(size_t idx) const {
- CHECK_INNER("RowView");
- return inner_->sv_get_bool(record_idx_, idx);
+ CHECK_DATA("RowView");
+ return data_->raw->sv_get_bool(bucket_idx_, rec_idx_, idx);
}
int32_t RowView::GetInt32(size_t idx) const {
- CHECK_INNER("RowView");
- return inner_->sv_get_i32(record_idx_, idx);
+ CHECK_DATA("RowView");
+ return data_->raw->sv_get_i32(bucket_idx_, rec_idx_, idx);
}
int64_t RowView::GetInt64(size_t idx) const {
- CHECK_INNER("RowView");
- return inner_->sv_get_i64(record_idx_, idx);
+ CHECK_DATA("RowView");
+ return data_->raw->sv_get_i64(bucket_idx_, rec_idx_, idx);
}
float RowView::GetFloat32(size_t idx) const {
- CHECK_INNER("RowView");
- return inner_->sv_get_f32(record_idx_, idx);
+ CHECK_DATA("RowView");
+ return data_->raw->sv_get_f32(bucket_idx_, rec_idx_, idx);
}
double RowView::GetFloat64(size_t idx) const {
- CHECK_INNER("RowView");
- return inner_->sv_get_f64(record_idx_, idx);
+ CHECK_DATA("RowView");
+ return data_->raw->sv_get_f64(bucket_idx_, rec_idx_, idx);
}
std::string_view RowView::GetString(size_t idx) const {
- CHECK_INNER("RowView");
- auto s = inner_->sv_get_str(record_idx_, idx);
+ CHECK_DATA("RowView");
+ auto s = data_->raw->sv_get_str(bucket_idx_, rec_idx_, idx);
return std::string_view(s.data(), s.size());
}
std::pair<const uint8_t*, size_t> RowView::GetBytes(size_t idx) const {
- CHECK_INNER("RowView");
- auto bytes = inner_->sv_get_bytes(record_idx_, idx);
+ CHECK_DATA("RowView");
+ auto bytes = data_->raw->sv_get_bytes(bucket_idx_, rec_idx_, idx);
return {bytes.data(), bytes.size()};
}
Date RowView::GetDate(size_t idx) const {
- CHECK_INNER("RowView");
- return Date{inner_->sv_get_date_days(record_idx_, idx)};
+ CHECK_DATA("RowView");
+ return Date{data_->raw->sv_get_date_days(bucket_idx_, rec_idx_, idx)};
}
Time RowView::GetTime(size_t idx) const {
- CHECK_INNER("RowView");
- return Time{inner_->sv_get_time_millis(record_idx_, idx)};
+ CHECK_DATA("RowView");
+ return Time{data_->raw->sv_get_time_millis(bucket_idx_, rec_idx_, idx)};
}
Timestamp RowView::GetTimestamp(size_t idx) const {
- CHECK_INNER("RowView");
- return Timestamp{inner_->sv_get_ts_millis(record_idx_, idx),
- inner_->sv_get_ts_nanos(record_idx_, idx)};
+ CHECK_DATA("RowView");
+ return Timestamp{data_->raw->sv_get_ts_millis(bucket_idx_, rec_idx_, idx),
+ data_->raw->sv_get_ts_nanos(bucket_idx_, rec_idx_, idx)};
}
bool RowView::IsDecimal(size_t idx) const { return GetType(idx) ==
TypeId::Decimal; }
std::string RowView::GetDecimalString(size_t idx) const {
- CHECK_INNER("RowView");
- return std::string(inner_->sv_get_decimal_str(record_idx_, idx));
+ CHECK_DATA("RowView");
+ return std::string(data_->raw->sv_get_decimal_str(bucket_idx_, rec_idx_,
idx));
}
// ============================================================================
@@ -268,48 +284,94 @@ std::string RowView::GetDecimalString(size_t idx) const {
// ScanRecords constructor, destructor, move operations are all defaulted in
the header.
-size_t ScanRecords::Size() const { return inner_ ? inner_->sv_record_count() :
0; }
+size_t ScanRecords::Count() const { return data_ ?
data_->raw->sv_record_count() : 0; }
-bool ScanRecords::Empty() const { return Size() == 0; }
+bool ScanRecords::IsEmpty() const { return Count() == 0; }
-void ScanRecords::BuildColumnMap() const {
- if (!inner_) return;
- auto map = std::make_shared<detail::ColumnMap>();
- auto count = inner_->sv_column_count();
- for (size_t i = 0; i < count; ++i) {
- auto name = inner_->sv_column_name(i);
- (*map)[std::string(name.data(), name.size())] = {
- i, static_cast<TypeId>(inner_->sv_column_type(i))};
+ScanRecord ScanRecords::RecordAt(size_t bucket, size_t rec_idx) const {
+ if (!data_) {
+ throw std::logic_error("ScanRecords: not available (moved-from or
null)");
}
- column_map_ = std::move(map);
+ return ScanRecord{data_->raw->sv_offset(bucket, rec_idx),
+ data_->raw->sv_timestamp(bucket, rec_idx),
+
static_cast<ChangeType>(data_->raw->sv_change_type(bucket, rec_idx)),
+ RowView(data_, bucket, rec_idx)};
+}
+
+static TableBucket to_table_bucket(const ffi::FfiBucketInfo& g) {
+ return TableBucket{g.table_id, g.bucket_id,
+ g.has_partition_id ?
std::optional<int64_t>(g.partition_id) : std::nullopt};
+}
+
+size_t ScanRecords::BucketCount() const { return data_ ?
data_->raw->sv_bucket_infos().size() : 0; }
+
+ScanRecord ScanRecords::Iterator::operator*() const {
+ return owner_->RecordAt(bucket_idx_, rec_idx_);
}
-const std::shared_ptr<detail::ColumnMap>& ScanRecords::GetColumnMap() const {
- if (!column_map_) {
- BuildColumnMap();
+ScanRecords::Iterator ScanRecords::begin() const { return Iterator(this, 0,
0); }
+
+ScanRecords::Iterator& ScanRecords::Iterator::operator++() {
+ ++rec_idx_;
+ if (owner_->data_) {
+ const auto& infos = owner_->data_->raw->sv_bucket_infos();
+ while (bucket_idx_ < infos.size() && rec_idx_ >=
infos[bucket_idx_].record_count) {
+ rec_idx_ = 0;
+ ++bucket_idx_;
+ }
}
- return column_map_;
+ return *this;
+}
+
+std::vector<TableBucket> ScanRecords::Buckets() const {
+ std::vector<TableBucket> result;
+ if (!data_) return result;
+ const auto& infos = data_->raw->sv_bucket_infos();
+ result.reserve(infos.size());
+ for (const auto& g : infos) {
+ result.push_back(to_table_bucket(g));
+ }
+ return result;
}
-ScanRecord ScanRecords::operator[](size_t idx) const {
- if (!inner_) {
+BucketView ScanRecords::Records(const TableBucket& bucket) const {
+ if (!data_) {
+ return BucketView({}, bucket, 0, 0);
+ }
+ const auto& infos = data_->raw->sv_bucket_infos();
+ for (size_t i = 0; i < infos.size(); ++i) {
+ TableBucket tb = to_table_bucket(infos[i]);
+ if (tb == bucket) {
+ return BucketView(data_, std::move(tb), i, infos[i].record_count);
+ }
+ }
+ return BucketView({}, bucket, 0, 0);
+}
+
+BucketView ScanRecords::BucketAt(size_t idx) const {
+ if (!data_) {
throw std::logic_error("ScanRecords: not available (moved-from or
null)");
}
- if (idx >= inner_->sv_record_count()) {
- throw std::out_of_range("ScanRecords: index " + std::to_string(idx) +
" out of range (" +
- std::to_string(inner_->sv_record_count()) + "
records)");
+ const auto& infos = data_->raw->sv_bucket_infos();
+ if (idx >= infos.size()) {
+ throw std::out_of_range("ScanRecords::BucketAt: index " +
std::to_string(idx) +
+ " out of range (" +
std::to_string(infos.size()) + " buckets)");
}
- return ScanRecord{inner_->sv_bucket_id(idx),
- inner_->sv_has_partition_id(idx)
- ?
std::optional<int64_t>(inner_->sv_partition_id(idx))
- : std::nullopt,
- inner_->sv_offset(idx),
- inner_->sv_timestamp(idx),
- static_cast<ChangeType>(inner_->sv_change_type(idx)),
- RowView(inner_, idx, GetColumnMap())};
+ return BucketView(data_, to_table_bucket(infos[idx]), idx,
infos[idx].record_count);
}
-ScanRecord ScanRecords::Iterator::operator*() const { return
owner_->operator[](idx_); }
+ScanRecord BucketView::operator[](size_t idx) const {
+ if (idx >= count_) {
+ throw std::out_of_range("BucketView: index " + std::to_string(idx) + "
out of range (" +
+ std::to_string(count_) + " records)");
+ }
+ return ScanRecord{data_->raw->sv_offset(bucket_idx_, idx),
+ data_->raw->sv_timestamp(bucket_idx_, idx),
+
static_cast<ChangeType>(data_->raw->sv_change_type(bucket_idx_, idx)),
+ RowView(data_, bucket_idx_, idx)};
+}
+
+ScanRecord BucketView::Iterator::operator*() const { return
owner_->operator[](idx_); }
// ============================================================================
// LookupResult — backed by opaque Rust LookupResultInner
@@ -1082,10 +1144,16 @@ Result LogScanner::Poll(int64_t timeout_ms,
ScanRecords& out) {
std::string(result_box->sv_error_message()));
}
- out.column_map_.reset();
- out.inner_ = std::shared_ptr<ffi::ScanResultInner>(
- result_box.into_raw(),
- [](ffi::ScanResultInner* p) {
rust::Box<ffi::ScanResultInner>::from_raw(p); });
+ // Wrap raw pointer in ScanData immediately so it's never leaked on
exception.
+ auto data = std::make_shared<detail::ScanData>(result_box.into_raw(),
detail::ColumnMap{});
+ // Build column map eagerly — shared by all RowViews/BucketViews.
+ auto col_count = data->raw->sv_column_count();
+ for (size_t i = 0; i < col_count; ++i) {
+ auto name = data->raw->sv_column_name(i);
+ data->columns[std::string(name.data(), name.size())] = {
+ i, static_cast<TypeId>(data->raw->sv_column_type(i))};
+ }
+ out.data_ = std::move(data);
return utils::make_ok();
}
diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml
index 804e1bb..9cf20e3 100644
--- a/bindings/python/Cargo.toml
+++ b/bindings/python/Cargo.toml
@@ -37,3 +37,4 @@ arrow-array = "57.0.0"
pyo3-async-runtimes = { version = "0.26.0", features = ["tokio-runtime"] }
jiff = { workspace = true }
bigdecimal = "0.4"
+indexmap = "2"
diff --git a/bindings/python/example/example.py
b/bindings/python/example/example.py
index 9c2b7e3..3564d91 100644
--- a/bindings/python/example/example.py
+++ b/bindings/python/example/example.py
@@ -351,21 +351,26 @@ async def main():
record_scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in
range(num_buckets)})
- # Poll returns List[ScanRecord] with per-record metadata
+ # Poll returns ScanRecords — records grouped by bucket
print("\n--- Testing poll() method (record-by-record) ---")
try:
- records = record_scanner.poll(5000)
- print(f"Number of records: {len(records)}")
-
- # Show first few records with metadata
- for i, record in enumerate(records[:5]):
- print(f" Record {i}: offset={record.offset}, "
- f"timestamp={record.timestamp}, "
- f"change_type={record.change_type}, "
- f"row={record.row}")
-
- if len(records) > 5:
- print(f" ... and {len(records) - 5} more records")
+ scan_records = record_scanner.poll(5000)
+ print(f"Total records: {scan_records.count()}, buckets:
{len(scan_records.buckets())}")
+
+ # Flat iteration over all records (regardless of bucket)
+ print(f" Flat iteration: {scan_records.count()} records")
+ for record in scan_records:
+ print(f" offset={record.offset},
timestamp={record.timestamp}")
+
+ # Per-bucket access
+ for bucket in scan_records.buckets():
+ bucket_recs = scan_records.records(bucket)
+ print(f" Bucket {bucket}: {len(bucket_recs)} records")
+ for record in bucket_recs[:3]:
+ print(f" offset={record.offset}, "
+ f"timestamp={record.timestamp}, "
+ f"change_type={record.change_type}, "
+ f"row={record.row}")
except Exception as e:
print(f"Error during poll: {e}")
diff --git a/bindings/python/fluss/__init__.pyi
b/bindings/python/fluss/__init__.pyi
index 47eeb80..4b7fa4e 100644
--- a/bindings/python/fluss/__init__.pyi
+++ b/bindings/python/fluss/__init__.pyi
@@ -19,7 +19,7 @@
from enum import IntEnum
from types import TracebackType
-from typing import Dict, List, Optional, Tuple
+from typing import Dict, Iterator, List, Optional, Tuple, Union, overload
import pandas as pd
import pyarrow as pa
@@ -43,12 +43,12 @@ class ChangeType(IntEnum):
...
class ScanRecord:
- """Represents a single scan record with metadata."""
+ """Represents a single scan record with metadata.
+
+ The bucket is the key in ScanRecords, not on the individual record
+ (matches Rust/Java).
+ """
- @property
- def bucket(self) -> TableBucket:
- """The bucket this record belongs to."""
- ...
@property
def offset(self) -> int:
"""The position of this record in the log."""
@@ -90,6 +90,47 @@ class RecordBatch:
def __str__(self) -> str: ...
def __repr__(self) -> str: ...
+class ScanRecords:
+ """A collection of scan records grouped by bucket.
+
+ Returned by ``LogScanner.poll()``. Supports flat iteration
+ (``for rec in records``) and per-bucket access
(``records.records(bucket)``).
+ """
+
+ def buckets(self) -> List[TableBucket]:
+ """List of distinct buckets that have records."""
+ ...
+ def records(self, bucket: TableBucket) -> List[ScanRecord]:
+ """Get records for a specific bucket. Returns empty list if bucket not
present."""
+ ...
+ def count(self) -> int:
+ """Total number of records across all buckets."""
+ ...
+ def is_empty(self) -> bool:
+ """Whether the result set is empty."""
+ ...
+ def keys(self) -> List[TableBucket]:
+ """Mapping protocol: alias for ``buckets()``."""
+ ...
+ def values(self) -> Iterator[List[ScanRecord]]:
+ """Mapping protocol: lazy iterator over record lists, one per
bucket."""
+ ...
+ def items(self) -> Iterator[Tuple[TableBucket, List[ScanRecord]]]:
+ """Mapping protocol: lazy iterator over ``(bucket, records)`` pairs."""
+ ...
+ def __len__(self) -> int: ...
+ @overload
+ def __getitem__(self, index: int) -> ScanRecord: ...
+ @overload
+ def __getitem__(self, index: slice) -> List[ScanRecord]: ...
+ @overload
+ def __getitem__(self, bucket: TableBucket) -> List[ScanRecord]: ...
+ def __getitem__(self, key: Union[int, slice, TableBucket]) ->
Union[ScanRecord, List[ScanRecord]]: ...
+ def __contains__(self, bucket: TableBucket) -> bool: ...
+ def __iter__(self) -> Iterator[ScanRecord]: ...
+ def __str__(self) -> str: ...
+ def __repr__(self) -> str: ...
+
class Config:
def __init__(self, properties: Optional[Dict[str, str]] = None) -> None:
...
@property
@@ -590,7 +631,7 @@ class LogScanner:
bucket_id: The bucket ID within the partition
"""
...
- def poll(self, timeout_ms: int) -> List[ScanRecord]:
+ def poll(self, timeout_ms: int) -> ScanRecords:
"""Poll for individual records with metadata.
Requires a record-based scanner (created with
new_scan().create_log_scanner()).
@@ -599,11 +640,12 @@ class LogScanner:
timeout_ms: Timeout in milliseconds to wait for records.
Returns:
- List of ScanRecord objects, each containing bucket, offset,
timestamp,
- change_type, and row data as a dictionary.
+ ScanRecords grouped by bucket. Supports flat iteration
+ (``for rec in records``) and per-bucket access
+ (``records.buckets()``, ``records.records(bucket)``).
Note:
- Returns an empty list if no records are available or timeout
expires.
+ Returns an empty ScanRecords if no records are available or
timeout expires.
"""
...
def poll_record_batch(self, timeout_ms: int) -> List[RecordBatch]:
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs
index 553c8a9..ebc0d54 100644
--- a/bindings/python/src/lib.rs
+++ b/bindings/python/src/lib.rs
@@ -122,6 +122,7 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<TableBucket>()?;
m.add_class::<ChangeType>()?;
m.add_class::<ScanRecord>()?;
+ m.add_class::<ScanRecords>()?;
m.add_class::<RecordBatch>()?;
m.add_class::<PartitionInfo>()?;
m.add_class::<OffsetSpec>()?;
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index c3ea248..bc2e956 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -22,7 +22,14 @@ use arrow_pyarrow::{FromPyArrow, ToPyArrow};
use arrow_schema::SchemaRef;
use fluss::record::to_arrow_schema;
use fluss::rpc::message::OffsetSpec;
-use pyo3::types::IntoPyDict;
+use indexmap::IndexMap;
+use pyo3::exceptions::{PyIndexError, PyRuntimeError, PyTypeError};
+use pyo3::sync::PyOnceLock;
+use pyo3::types::{
+ IntoPyDict, PyBool, PyByteArray, PyBytes, PyDate, PyDateAccess,
PyDateTime, PyDelta,
+ PyDeltaAccess, PyDict, PyList, PySequence, PySlice, PyTime, PyTimeAccess,
PyTuple, PyType,
+ PyTzInfo,
+};
use pyo3_async_runtimes::tokio::future_into_py;
use std::collections::HashMap;
use std::sync::Arc;
@@ -38,11 +45,12 @@ const MICROS_PER_DAY: i64 = 86_400_000_000;
const NANOS_PER_MILLI: i64 = 1_000_000;
const NANOS_PER_MICRO: i64 = 1_000;
-/// Represents a single scan record with metadata
+/// Represents a single scan record with metadata.
+///
+/// Matches Rust/Java: offset, timestamp, change_type, row.
+/// The bucket is the key in ScanRecords, not on the individual record.
#[pyclass]
pub struct ScanRecord {
- #[pyo3(get)]
- bucket: TableBucket,
#[pyo3(get)]
offset: i64,
#[pyo3(get)]
@@ -50,21 +58,20 @@ pub struct ScanRecord {
#[pyo3(get)]
change_type: ChangeType,
/// Store row as a Python dict directly
- row_dict: Py<pyo3::types::PyDict>,
+ row_dict: Py<PyDict>,
}
#[pymethods]
impl ScanRecord {
/// Get the row data as a dictionary
#[getter]
- pub fn row(&self, py: Python) -> Py<pyo3::types::PyDict> {
+ pub fn row(&self, py: Python) -> Py<PyDict> {
self.row_dict.clone_ref(py)
}
fn __str__(&self) -> String {
format!(
- "ScanRecord(bucket={}, offset={}, timestamp={}, change_type={})",
- self.bucket.__str__(),
+ "ScanRecord(offset={}, timestamp={}, change_type={})",
self.offset,
self.timestamp,
self.change_type.short_string()
@@ -80,13 +87,12 @@ impl ScanRecord {
/// Create a ScanRecord from core types
pub fn from_core(
py: Python,
- bucket: &fcore::metadata::TableBucket,
record: &fcore::record::ScanRecord,
row_type: &fcore::metadata::RowType,
) -> PyResult<Self> {
let fields = row_type.fields();
let row = record.row();
- let dict = pyo3::types::PyDict::new(py);
+ let dict = PyDict::new(py);
for (pos, field) in fields.iter().enumerate() {
let value = datum_to_python_value(py, row, pos,
field.data_type())?;
@@ -94,7 +100,6 @@ impl ScanRecord {
}
Ok(ScanRecord {
- bucket: TableBucket::from_core(bucket.clone()),
offset: record.offset(),
timestamp: record.timestamp(),
change_type: ChangeType::from_core(*record.change_type()),
@@ -155,6 +160,247 @@ impl RecordBatch {
}
}
+/// A collection of scan records grouped by bucket.
+///
+/// Returned by `LogScanner.poll()`. Records are grouped by `TableBucket`.
+#[pyclass]
+pub struct ScanRecords {
+ records_by_bucket: IndexMap<TableBucket, Vec<Py<ScanRecord>>>,
+ total_count: usize,
+}
+
+#[pymethods]
+impl ScanRecords {
+ /// List of distinct buckets that have records in this result.
+ pub fn buckets(&self) -> Vec<TableBucket> {
+ self.records_by_bucket.keys().cloned().collect()
+ }
+
+ /// Get records for a specific bucket.
+ ///
+ /// Returns an empty list if the bucket is not present (matches Rust/Java
behavior).
+ pub fn records(&self, py: Python, bucket: &TableBucket) ->
Vec<Py<ScanRecord>> {
+ self.records_by_bucket
+ .get(bucket)
+ .map(|recs| recs.iter().map(|r| r.clone_ref(py)).collect())
+ .unwrap_or_default()
+ }
+
+ /// Total number of records across all buckets.
+ pub fn count(&self) -> usize {
+ self.total_count
+ }
+
+ /// Whether the result set is empty.
+ pub fn is_empty(&self) -> bool {
+ self.total_count == 0
+ }
+
+ fn __len__(&self) -> usize {
+ self.total_count
+ }
+
+ /// Type-dispatched indexing:
+ /// records[0] → ScanRecord (flat index)
+ /// records[-1] → ScanRecord (negative index)
+ /// records[1:3] → list[ScanRecord] (slice)
+ /// records[bucket] → list[ScanRecord] (by bucket)
+ fn __getitem__(&self, py: Python, key: &Bound<'_, PyAny>) ->
PyResult<Py<PyAny>> {
+ // Try integer index first
+ if let Ok(mut idx) = key.extract::<isize>() {
+ let len = self.total_count as isize;
+ if idx < 0 {
+ idx += len;
+ }
+ if idx < 0 || idx >= len {
+ return Err(PyIndexError::new_err(format!(
+ "index {idx} out of range for ScanRecords of size {len}"
+ )));
+ }
+ let idx = idx as usize;
+ let mut offset = 0;
+ for recs in self.records_by_bucket.values() {
+ if idx < offset + recs.len() {
+ return Ok(recs[idx - offset].clone_ref(py).into_any());
+ }
+ offset += recs.len();
+ }
+ return Err(PyRuntimeError::new_err(
+ "internal error: total_count out of sync with records",
+ ));
+ }
+ // Try slice
+ if let Ok(slice) = key.downcast::<PySlice>() {
+ let indices = slice.indices(self.total_count as isize)?;
+ let mut result: Vec<Py<ScanRecord>> = Vec::new();
+ let mut i = indices.start;
+ while (indices.step > 0 && i < indices.stop) || (indices.step < 0
&& i > indices.stop) {
+ let idx = i as usize;
+ let mut offset = 0;
+ for recs in self.records_by_bucket.values() {
+ if idx < offset + recs.len() {
+ result.push(recs[idx - offset].clone_ref(py));
+ break;
+ }
+ offset += recs.len();
+ }
+ i += indices.step;
+ }
+ return Ok(result.into_pyobject(py).unwrap().into_any().unbind());
+ }
+ // Try TableBucket
+ if let Ok(bucket) = key.extract::<TableBucket>() {
+ let recs = self.records(py, &bucket);
+ return Ok(recs.into_pyobject(py).unwrap().into_any().unbind());
+ }
+ Err(PyTypeError::new_err(
+ "index must be int, slice, or TableBucket",
+ ))
+ }
+
+ /// Support `bucket in records`.
+ fn __contains__(&self, bucket: &TableBucket) -> bool {
+ self.records_by_bucket.contains_key(bucket)
+ }
+
+ /// Mapping protocol: alias for `buckets()`.
+ pub fn keys(&self) -> Vec<TableBucket> {
+ self.buckets()
+ }
+
+ /// Mapping protocol: lazy iterator over record lists, one per bucket.
+ pub fn values(slf: Bound<'_, Self>) -> ScanRecordsBucketIter {
+ let this = slf.borrow();
+ let bucket_keys: Vec<TableBucket> =
this.records_by_bucket.keys().cloned().collect();
+ drop(this);
+ ScanRecordsBucketIter {
+ owner: slf.unbind(),
+ bucket_keys,
+ bucket_idx: 0,
+ with_keys: false,
+ }
+ }
+
+ /// Mapping protocol: lazy iterator over `(TableBucket, list[ScanRecord])`
pairs.
+ pub fn items(slf: Bound<'_, Self>) -> ScanRecordsBucketIter {
+ let this = slf.borrow();
+ let bucket_keys: Vec<TableBucket> =
this.records_by_bucket.keys().cloned().collect();
+ drop(this);
+ ScanRecordsBucketIter {
+ owner: slf.unbind(),
+ bucket_keys,
+ bucket_idx: 0,
+ with_keys: true,
+ }
+ }
+
+ fn __str__(&self) -> String {
+ format!(
+ "ScanRecords(records={}, buckets={})",
+ self.total_count,
+ self.records_by_bucket.len()
+ )
+ }
+
+ fn __repr__(&self) -> String {
+ self.__str__()
+ }
+
+ /// Flat iterator over all records across all buckets (matches Java/Rust).
+ fn __iter__(slf: Bound<'_, Self>) -> ScanRecordsIter {
+ let this = slf.borrow();
+ let bucket_keys: Vec<TableBucket> =
this.records_by_bucket.keys().cloned().collect();
+ drop(this);
+ ScanRecordsIter {
+ owner: slf.unbind(),
+ bucket_keys,
+ bucket_idx: 0,
+ rec_idx: 0,
+ }
+ }
+}
+
+#[pyclass]
+struct ScanRecordsIter {
+ owner: Py<ScanRecords>,
+ bucket_keys: Vec<TableBucket>,
+ bucket_idx: usize,
+ rec_idx: usize,
+}
+
+#[pymethods]
+impl ScanRecordsIter {
+ fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
+ slf
+ }
+
+ fn __next__(&mut self, py: Python) -> Option<Py<ScanRecord>> {
+ let owner = self.owner.borrow(py);
+ loop {
+ if self.bucket_idx >= self.bucket_keys.len() {
+ return None;
+ }
+ let bucket = &self.bucket_keys[self.bucket_idx];
+ if let Some(recs) = owner.records_by_bucket.get(bucket) {
+ if self.rec_idx < recs.len() {
+ let rec = recs[self.rec_idx].clone_ref(py);
+ self.rec_idx += 1;
+ return Some(rec);
+ }
+ }
+ self.bucket_idx += 1;
+ self.rec_idx = 0;
+ }
+ }
+}
+
+/// Lazy iterator for `ScanRecords.items()` and `ScanRecords.values()`.
+///
+/// Yields one bucket at a time: `(TableBucket, list[ScanRecord])` for items,
+/// or `list[ScanRecord]` for values. Only materializes records for the
+/// current bucket on each `__next__` call.
+#[pyclass]
+pub struct ScanRecordsBucketIter {
+ owner: Py<ScanRecords>,
+ bucket_keys: Vec<TableBucket>,
+ bucket_idx: usize,
+ with_keys: bool,
+}
+
+#[pymethods]
+impl ScanRecordsBucketIter {
+ fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
+ slf
+ }
+
+ fn __next__(&mut self, py: Python) -> Option<Py<PyAny>> {
+ if self.bucket_idx >= self.bucket_keys.len() {
+ return None;
+ }
+ let bucket = &self.bucket_keys[self.bucket_idx];
+ let owner = self.owner.borrow(py);
+ let recs = owner
+ .records_by_bucket
+ .get(bucket)
+ .map(|recs| recs.iter().map(|r|
r.clone_ref(py)).collect::<Vec<_>>())
+ .unwrap_or_default();
+ let bucket = bucket.clone();
+ self.bucket_idx += 1;
+
+ if self.with_keys {
+ Some(
+ (bucket, recs)
+ .into_pyobject(py)
+ .unwrap()
+ .into_any()
+ .unbind(),
+ )
+ } else {
+ Some(recs.into_pyobject(py).unwrap().into_any().unbind())
+ }
+ }
+}
+
/// Represents a Fluss table for data operations
#[pyclass]
pub struct FlussTable {
@@ -763,9 +1009,9 @@ impl AppendWriter {
/// Represents different input shapes for a row
#[derive(FromPyObject)]
enum RowInput<'py> {
- Dict(Bound<'py, pyo3::types::PyDict>),
- Tuple(Bound<'py, pyo3::types::PyTuple>),
- List(Bound<'py, pyo3::types::PyList>),
+ Dict(Bound<'py, PyDict>),
+ Tuple(Bound<'py, PyTuple>),
+ List(Bound<'py, PyList>),
}
/// Convert Python row (dict/list/tuple) to GenericRow requiring all schema
columns.
@@ -779,7 +1025,7 @@ pub fn python_to_generic_row(
/// Process a Python sequence (list or tuple) into datums at the target column
positions.
fn process_sequence(
- seq: &Bound<pyo3::types::PySequence>,
+ seq: &Bound<PySequence>,
target_indices: &[usize],
fields: &[fcore::metadata::DataField],
datums: &mut [fcore::row::Datum<'static>],
@@ -924,7 +1170,7 @@ fn python_value_to_datum(
}
fcore::metadata::DataType::TinyInt(_) => {
// Strict type checking: reject bool for int columns
- if value.is_instance_of::<pyo3::types::PyBool>() {
+ if value.is_instance_of::<PyBool>() {
return Err(FlussError::new_err(
"Expected int for TinyInt column, got bool. Use 0 or 1
explicitly.".to_string(),
));
@@ -933,7 +1179,7 @@ fn python_value_to_datum(
Ok(Datum::Int8(v))
}
fcore::metadata::DataType::SmallInt(_) => {
- if value.is_instance_of::<pyo3::types::PyBool>() {
+ if value.is_instance_of::<PyBool>() {
return Err(FlussError::new_err(
"Expected int for SmallInt column, got bool. Use 0 or 1
explicitly."
.to_string(),
@@ -943,7 +1189,7 @@ fn python_value_to_datum(
Ok(Datum::Int16(v))
}
fcore::metadata::DataType::Int(_) => {
- if value.is_instance_of::<pyo3::types::PyBool>() {
+ if value.is_instance_of::<PyBool>() {
return Err(FlussError::new_err(
"Expected int for Int column, got bool. Use 0 or 1
explicitly.".to_string(),
));
@@ -952,7 +1198,7 @@ fn python_value_to_datum(
Ok(Datum::Int32(v))
}
fcore::metadata::DataType::BigInt(_) => {
- if value.is_instance_of::<pyo3::types::PyBool>() {
+ if value.is_instance_of::<PyBool>() {
return Err(FlussError::new_err(
"Expected int for BigInt column, got bool. Use 0 or 1
explicitly.".to_string(),
));
@@ -975,9 +1221,9 @@ fn python_value_to_datum(
fcore::metadata::DataType::Bytes(_) |
fcore::metadata::DataType::Binary(_) => {
// Efficient extraction: downcast to specific type and use bulk
copy.
// PyBytes::as_bytes() and PyByteArray::to_vec() are O(n) bulk
copies of the underlying data.
- if let Ok(bytes) = value.downcast::<pyo3::types::PyBytes>() {
+ if let Ok(bytes) = value.downcast::<PyBytes>() {
Ok(bytes.as_bytes().to_vec().into())
- } else if let Ok(bytearray) =
value.downcast::<pyo3::types::PyByteArray>() {
+ } else if let Ok(bytearray) = value.downcast::<PyByteArray>() {
Ok(bytearray.to_vec().into())
} else {
Err(FlussError::new_err(format!(
@@ -1067,11 +1313,11 @@ pub fn datum_to_python_value(
}
DataType::Bytes(_) => {
let b = row.get_bytes(pos);
- Ok(pyo3::types::PyBytes::new(py, b).into_any().unbind())
+ Ok(PyBytes::new(py, b).into_any().unbind())
}
DataType::Binary(binary_type) => {
let b = row.get_binary(pos, binary_type.length());
- Ok(pyo3::types::PyBytes::new(py, b).into_any().unbind())
+ Ok(PyBytes::new(py, b).into_any().unbind())
}
DataType::Decimal(decimal_type) => {
let decimal = row.get_decimal(
@@ -1113,8 +1359,6 @@ fn rust_decimal_to_python(py: Python, decimal:
&fcore::row::Decimal) -> PyResult
/// Convert Rust Date (days since epoch) to Python datetime.date
fn rust_date_to_python(py: Python, date: fcore::row::Date) ->
PyResult<Py<PyAny>> {
- use pyo3::types::PyDate;
-
let days_since_epoch = date.get_inner();
let epoch = jiff::civil::date(1970, 1, 1);
let civil_date = epoch + jiff::Span::new().days(days_since_epoch as i64);
@@ -1130,8 +1374,6 @@ fn rust_date_to_python(py: Python, date:
fcore::row::Date) -> PyResult<Py<PyAny>
/// Convert Rust Time (millis since midnight) to Python datetime.time
fn rust_time_to_python(py: Python, time: fcore::row::Time) ->
PyResult<Py<PyAny>> {
- use pyo3::types::PyTime;
-
let millis = time.get_inner() as i64;
let hours = millis / MILLIS_PER_HOUR;
let minutes = (millis % MILLIS_PER_HOUR) / MILLIS_PER_MINUTE;
@@ -1151,8 +1393,6 @@ fn rust_time_to_python(py: Python, time:
fcore::row::Time) -> PyResult<Py<PyAny>
/// Convert Rust TimestampNtz to Python naive datetime
fn rust_timestamp_ntz_to_python(py: Python, ts: fcore::row::TimestampNtz) ->
PyResult<Py<PyAny>> {
- use pyo3::types::PyDateTime;
-
let millis = ts.get_millisecond();
let nanos = ts.get_nano_of_millisecond();
let total_micros = millis * MICROS_PER_MILLI + (nanos as i64 /
NANOS_PER_MICRO);
@@ -1178,8 +1418,6 @@ fn rust_timestamp_ntz_to_python(py: Python, ts:
fcore::row::TimestampNtz) -> PyR
/// Convert Rust TimestampLtz to Python timezone-aware datetime (UTC)
fn rust_timestamp_ltz_to_python(py: Python, ts: fcore::row::TimestampLtz) ->
PyResult<Py<PyAny>> {
- use pyo3::types::PyDateTime;
-
let millis = ts.get_epoch_millisecond();
let nanos = ts.get_nano_of_millisecond();
let total_micros = millis * MICROS_PER_MILLI + (nanos as i64 /
NANOS_PER_MICRO);
@@ -1212,7 +1450,7 @@ pub fn internal_row_to_dict(
) -> PyResult<Py<PyAny>> {
let row_type = table_info.row_type();
let fields = row_type.fields();
- let dict = pyo3::types::PyDict::new(py);
+ let dict = PyDict::new(py);
for (pos, field) in fields.iter().enumerate() {
let value = datum_to_python_value(py, row, pos, field.data_type())?;
@@ -1224,29 +1462,26 @@ pub fn internal_row_to_dict(
/// Cached decimal.Decimal type
/// Uses PyOnceLock for thread-safety and subinterpreter compatibility.
-static DECIMAL_TYPE: pyo3::sync::PyOnceLock<Py<pyo3::types::PyType>> =
- pyo3::sync::PyOnceLock::new();
+static DECIMAL_TYPE: PyOnceLock<Py<PyType>> = PyOnceLock::new();
/// Cached UTC timezone
-static UTC_TIMEZONE: pyo3::sync::PyOnceLock<Py<PyAny>> =
pyo3::sync::PyOnceLock::new();
+static UTC_TIMEZONE: PyOnceLock<Py<PyAny>> = PyOnceLock::new();
/// Cached UTC epoch type
-static UTC_EPOCH: pyo3::sync::PyOnceLock<Py<PyAny>> =
pyo3::sync::PyOnceLock::new();
+static UTC_EPOCH: PyOnceLock<Py<PyAny>> = PyOnceLock::new();
/// Get the cached decimal.Decimal type, importing it once per interpreter.
-fn get_decimal_type(py: Python) -> PyResult<Bound<pyo3::types::PyType>> {
+fn get_decimal_type(py: Python) -> PyResult<Bound<PyType>> {
let ty = DECIMAL_TYPE.get_or_try_init(py, || -> PyResult<_> {
let decimal_mod = py.import("decimal")?;
- let decimal_ty = decimal_mod
- .getattr("Decimal")?
- .downcast_into::<pyo3::types::PyType>()?;
+ let decimal_ty =
decimal_mod.getattr("Decimal")?.downcast_into::<PyType>()?;
Ok(decimal_ty.unbind())
})?;
Ok(ty.bind(py).clone())
}
/// Get the cached UTC timezone (datetime.timezone.utc), creating it once per
interpreter.
-fn get_utc_timezone(py: Python) -> PyResult<Bound<pyo3::types::PyTzInfo>> {
+fn get_utc_timezone(py: Python) -> PyResult<Bound<PyTzInfo>> {
let tz = UTC_TIMEZONE.get_or_try_init(py, || -> PyResult<_> {
let datetime_mod = py.import("datetime")?;
let timezone = datetime_mod.getattr("timezone")?;
@@ -1254,10 +1489,7 @@ fn get_utc_timezone(py: Python) ->
PyResult<Bound<pyo3::types::PyTzInfo>> {
Ok(utc.unbind())
})?;
// Downcast to PyTzInfo for use with PyDateTime::new()
- Ok(tz
- .bind(py)
- .clone()
- .downcast_into::<pyo3::types::PyTzInfo>()?)
+ Ok(tz.bind(py).clone().downcast_into::<PyTzInfo>()?)
}
/// Get the cached UTC epoch datetime, creating it once per interpreter.
@@ -1313,8 +1545,6 @@ fn python_decimal_to_datum(
/// Convert Python datetime.date to Datum::Date.
fn python_date_to_datum(value: &Bound<PyAny>) ->
PyResult<fcore::row::Datum<'static>> {
- use pyo3::types::{PyDate, PyDateAccess, PyDateTime};
-
// Reject datetime.datetime (subclass of date) - use timestamp columns for
those
if value.downcast::<PyDateTime>().is_ok() {
return Err(FlussError::new_err(
@@ -1351,8 +1581,6 @@ fn python_date_to_datum(value: &Bound<PyAny>) ->
PyResult<fcore::row::Datum<'sta
/// Sub-millisecond precision (microseconds not divisible by 1000) will raise
an error
/// to prevent silent data loss and ensure fail-fast behavior.
fn python_time_to_datum(value: &Bound<PyAny>) ->
PyResult<fcore::row::Datum<'static>> {
- use pyo3::types::{PyTime, PyTimeAccess};
-
let time = value.downcast::<PyTime>().map_err(|_| {
FlussError::new_err(format!(
"Expected datetime.time, got {}",
@@ -1411,8 +1639,6 @@ fn python_datetime_to_timestamp_ltz(value: &Bound<PyAny>)
-> PyResult<fcore::row
/// Uses integer arithmetic to avoid float precision issues.
/// For clarity, tz-aware datetimes are rejected - use TimestampLtz for those.
fn extract_datetime_components_ntz(value: &Bound<PyAny>) -> PyResult<(i64,
i32)> {
- use pyo3::types::PyDateTime;
-
// Try PyDateTime first
if let Ok(dt) = value.downcast::<PyDateTime>() {
// Reject tz-aware datetime for NTZ - it's ambiguous what the user
wants
@@ -1465,8 +1691,6 @@ fn extract_datetime_components_ntz(value: &Bound<PyAny>)
-> PyResult<(i64, i32)>
/// Extract epoch milliseconds for TimestampLtz (instant in time, UTC-based).
/// For naive datetimes, assumes UTC. For aware datetimes, converts to UTC.
fn extract_datetime_components_ltz(value: &Bound<PyAny>) -> PyResult<(i64,
i32)> {
- use pyo3::types::PyDateTime;
-
// Try PyDateTime first
if let Ok(dt) = value.downcast::<PyDateTime>() {
// Check if timezone-aware
@@ -1506,11 +1730,7 @@ fn extract_datetime_components_ltz(value: &Bound<PyAny>)
-> PyResult<(i64, i32)>
}
/// Convert datetime components to epoch milliseconds treating them as UTC
-fn datetime_to_epoch_millis_as_utc(
- dt: &pyo3::Bound<'_, pyo3::types::PyDateTime>,
-) -> PyResult<(i64, i32)> {
- use pyo3::types::{PyDateAccess, PyTimeAccess};
-
+fn datetime_to_epoch_millis_as_utc(dt: &Bound<'_, PyDateTime>) ->
PyResult<(i64, i32)> {
let year = dt.get_year();
let month = dt.get_month();
let day = dt.get_day();
@@ -1541,11 +1761,7 @@ fn datetime_to_epoch_millis_as_utc(
/// Convert timezone-aware datetime to epoch milliseconds using Python's
timedelta.
/// This correctly handles timezone conversions by computing (dt - UTC_EPOCH).
/// The UTC epoch is cached for performance.
-fn datetime_to_epoch_millis_utc_aware(
- dt: &pyo3::Bound<'_, pyo3::types::PyDateTime>,
-) -> PyResult<(i64, i32)> {
- use pyo3::types::{PyDelta, PyDeltaAccess};
-
+fn datetime_to_epoch_millis_utc_aware(dt: &Bound<'_, PyDateTime>) ->
PyResult<(i64, i32)> {
let py = dt.py();
let epoch = get_utc_epoch(py)?;
@@ -1777,14 +1993,15 @@ impl LogScanner {
/// timeout_ms: Timeout in milliseconds to wait for records
///
/// Returns:
- /// List of ScanRecord objects, each containing bucket, offset,
timestamp,
- /// change_type, and row data as a dictionary.
+ /// ScanRecords grouped by bucket. Supports flat iteration
+ /// (`for rec in records`) and per-bucket access (`records.buckets()`,
+ /// `records.records(bucket)`, `records[bucket]`).
///
/// Note:
/// - Requires a record-based scanner (created with
new_scan().create_log_scanner())
- /// - Returns an empty list if no records are available
- /// - When timeout expires, returns an empty list (NOT an error)
- fn poll(&self, py: Python, timeout_ms: i64) -> PyResult<Vec<ScanRecord>> {
+ /// - Returns an empty ScanRecords if no records are available
+ /// - When timeout expires, returns an empty ScanRecords (NOT an error)
+ fn poll(&self, py: Python, timeout_ms: i64) -> PyResult<ScanRecords> {
let scanner = self.scanner.as_record()?;
if timeout_ms < 0 {
@@ -1798,19 +2015,26 @@ impl LogScanner {
.detach(|| TOKIO_RUNTIME.block_on(async {
scanner.poll(timeout).await }))
.map_err(|e| FlussError::from_core_error(&e))?;
- // Convert ScanRecords to Python ScanRecord list
- // Use projected_row_type to handle column projection correctly
+ // Convert core ScanRecords to Python ScanRecords grouped by bucket
let row_type = &self.projected_row_type;
- let mut result = Vec::new();
+ let mut records_by_bucket = IndexMap::new();
+ let mut total_count = 0usize;
for (bucket, records) in scan_records.into_records_by_buckets() {
- for record in records {
- let scan_record = ScanRecord::from_core(py, &bucket, &record,
row_type)?;
- result.push(scan_record);
+ let py_bucket = TableBucket::from_core(bucket);
+ let mut py_records = Vec::with_capacity(records.len());
+ for record in &records {
+ let scan_record = ScanRecord::from_core(py, record, row_type)?;
+ py_records.push(Py::new(py, scan_record)?);
+ total_count += 1;
}
+ records_by_bucket.insert(py_bucket, py_records);
}
- Ok(result)
+ Ok(ScanRecords {
+ records_by_bucket,
+ total_count,
+ })
}
/// Poll for batches with metadata.
diff --git a/bindings/python/test/test_log_table.py
b/bindings/python/test/test_log_table.py
index 09586aa..bfa9789 100644
--- a/bindings/python/test/test_log_table.py
+++ b/bindings/python/test/test_log_table.py
@@ -492,11 +492,22 @@ async def test_partitioned_table_append_scan(connection,
admin):
(8, "EU", 800),
]
- records = _poll_records(scanner, expected_count=8)
- assert len(records) == 8
+ # Poll and verify per-bucket grouping
+ all_records = []
+ deadline = time.monotonic() + 10
+ while len(all_records) < 8 and time.monotonic() < deadline:
+ scan_records = scanner.poll(5000)
+ for bucket, bucket_records in scan_records.items():
+ assert bucket.partition_id is not None, "Partitioned table should
have partition_id"
+ # All records in a bucket should belong to the same partition
+ regions = {r.row["region"] for r in bucket_records}
+ assert len(regions) == 1, f"Bucket has mixed regions: {regions}"
+ all_records.extend(bucket_records)
+
+ assert len(all_records) == 8
collected = sorted(
- [(r.row["id"], r.row["region"], r.row["value"]) for r in records],
+ [(r.row["id"], r.row["region"], r.row["value"]) for r in all_records],
key=lambda x: x[0],
)
assert collected == expected
@@ -652,6 +663,70 @@ async def test_partitioned_table_to_arrow(connection,
admin):
await admin.drop_table(table_path, ignore_if_not_exists=False)
+async def test_scan_records_indexing_and_slicing(connection, admin):
+ """Test ScanRecords indexing, slicing (incl. negative steps), and
iteration consistency."""
+ table_path = fluss.TablePath("fluss", "py_test_scan_records_indexing")
+ await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+ schema = fluss.Schema(
+ pa.schema([pa.field("id", pa.int32()), pa.field("val", pa.string())])
+ )
+ await admin.create_table(table_path, fluss.TableDescriptor(schema))
+
+ table = await connection.get_table(table_path)
+ writer = table.new_append().create_writer()
+ writer.write_arrow_batch(
+ pa.RecordBatch.from_arrays(
+ [pa.array(list(range(1, 9)), type=pa.int32()),
+ pa.array([f"v{i}" for i in range(1, 9)])],
+ schema=pa.schema([pa.field("id", pa.int32()), pa.field("val",
pa.string())]),
+ )
+ )
+ await writer.flush()
+
+ scanner = await table.new_scan().create_log_scanner()
+ num_buckets = (await admin.get_table_info(table_path)).num_buckets
+ scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in
range(num_buckets)})
+
+ # Poll until we get a non-empty ScanRecords (need ≥2 records for slice
tests)
+ sr = None
+ deadline = time.monotonic() + 10
+ while time.monotonic() < deadline:
+ sr = scanner.poll(5000)
+ if len(sr) >= 2:
+ break
+ assert sr is not None and len(sr) >= 2, "Expected at least 2 records"
+ n = len(sr)
+ offsets = [sr[i].offset for i in range(n)]
+
+ # Iteration and indexing must produce the same order
+ assert [r.offset for r in sr] == offsets
+
+ # Negative indexing
+ assert sr[-1].offset == offsets[-1]
+ assert sr[-n].offset == offsets[0]
+
+ # Verify slices match the same operation on the offsets reference list
+ test_slices = [
+ slice(1, n - 1), # forward subrange
+ slice(None, None, -1), # [::-1] full reverse
+ slice(n - 2, 0, -1), # reverse with bounds
+ slice(n - 1, 0, -2), # reverse with step
+ slice(None, None, 2), # [::2]
+ slice(1, None, 3), # [1::3]
+ slice(2, 2), # empty
+ ]
+ for s in test_slices:
+ result = [r.offset for r in sr[s]]
+ assert result == offsets[s], f"slice {s}: got {result}, expected
{offsets[s]}"
+
+ # Bucket-based indexing
+ for bucket in sr.buckets():
+ assert len(sr[bucket]) > 0
+
+ await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
@@ -667,6 +742,8 @@ def _poll_records(scanner, expected_count, timeout_s=10):
return collected
+
+
def _poll_arrow_ids(scanner, expected_count, timeout_s=10):
"""Poll a batch scanner and extract 'id' column values."""
all_ids = []
diff --git a/website/docs/user-guide/cpp/api-reference.md
b/website/docs/user-guide/cpp/api-reference.md
index a07dd6c..433c5da 100644
--- a/website/docs/user-guide/cpp/api-reference.md
+++ b/website/docs/user-guide/cpp/api-reference.md
@@ -250,23 +250,60 @@ Read-only row view for scan results. Provides zero-copy
access to string and byt
`ScanRecord` is a value type that can be freely copied, stored, and
accumulated across multiple `Poll()` calls. It shares ownership of the
underlying scan data via reference counting.
-| Field | Type | Description |
-|----------------|-------------------------|----------------------------------|
-| `bucket_id` | `int32_t` | Bucket this record belongs to |
-| `partition_id` | `std::optional<int64_t>`| Partition ID (if partitioned) |
-| `offset` | `int64_t` | Record offset in the log |
-| `timestamp` | `int64_t` | Record timestamp |
-| `change_type` | `ChangeType` | Type of change (see `ChangeType`)|
-| `row` | `RowView` | Read-only row view for field
access |
+| Field | Type | Description
|
+|---------------|--------------|---------------------------------------------------------------------|
+| `offset` | `int64_t` | Record offset in the log
|
+| `timestamp` | `int64_t` | Record timestamp
|
+| `change_type` | `ChangeType` | Change type (AppendOnly, Insert,
UpdateBefore, UpdateAfter, Delete) |
+| `row` | `RowView` | Row data (value type, shares ownership via
reference counting) |
## `ScanRecords`
-| Method | Description
|
-|----------------------------------------|--------------------------------------------|
-| `Size() -> size_t` | Number of records
|
-| `Empty() -> bool` | Check if empty
|
-| `operator[](size_t idx) -> ScanRecord` | Access record by index
|
-| `begin() / end()` | Iterator support for range-based
for loops |
+### Flat Access
+
+| Method | Description
|
+|-----------------------------------------|--------------------------------------------|
+| `Count() -> size_t` | Total number of records across all
buckets |
+| `IsEmpty() -> bool` | Check if empty
|
+| `begin() / end()` | Iterator support for range-based
for loops |
+
+Flat iteration over all records (regardless of bucket):
+
+```cpp
+for (const auto& rec : records) {
+ std::cout << "offset=" << rec.offset << std::endl;
+}
+```
+
+### Per-Bucket Access
+
+| Method |
Description |
+|-----------------------------------------------------------------|-----------------------------------------------------------------------|
+| `BucketCount() -> size_t` | Number of
distinct buckets |
+| `Buckets() -> std::vector<TableBucket>` | List of
distinct buckets |
+| `Records(const TableBucket& bucket) -> BucketView` | Records
for a specific bucket (empty view if bucket not present) |
+| `BucketAt(size_t idx) -> BucketView` | Records by
bucket index (0-based, O(1)) |
+
+## `BucketView`
+
+A view of records within a single bucket. Obtained from
`ScanRecords::Records()` or `ScanRecords::BucketAt()`. `BucketView` is a value
type — it shares ownership of the underlying scan data via reference counting,
so it can safely outlive the `ScanRecords` that produced it.
+
+| Method | Description
|
+|------------------------------------------------|--------------------------------------------|
+| `Size() -> size_t` | Number of records in this
bucket |
+| `Empty() -> bool` | Check if empty
|
+| `Bucket() -> const TableBucket&` | Get the bucket
|
+| `operator[](size_t idx) -> ScanRecord` | Access record by index within
this bucket |
+| `begin() / end()` | Iterator support for
range-based for loops |
+
+## `TableBucket`
+
+| Field / Method | Description
|
+|---------------------------------------|-------------------------------------------------|
+| `table_id -> int64_t` | Table ID
|
+| `bucket_id -> int32_t` | Bucket ID
|
+| `partition_id -> std::optional<int64_t>` | Partition ID (empty if
non-partitioned) |
+| `operator==(const TableBucket&) -> bool` | Equality comparison
|
## `LookupResult`
diff --git a/website/docs/user-guide/cpp/example/log-tables.md
b/website/docs/user-guide/cpp/example/log-tables.md
index 3a862c1..0125a4c 100644
--- a/website/docs/user-guide/cpp/example/log-tables.md
+++ b/website/docs/user-guide/cpp/example/log-tables.md
@@ -60,6 +60,18 @@ for (const auto& rec : records) {
<< " timestamp=" << rec.row.GetInt64(2)
<< " @ offset=" << rec.offset << std::endl;
}
+
+// Or per-bucket access
+for (const auto& bucket : records.Buckets()) {
+ auto view = records.Records(bucket);
+ std::cout << "Bucket " << bucket.bucket_id << ": "
+ << view.Size() << " records" << std::endl;
+ for (const auto& rec : view) {
+ std::cout << " event_id=" << rec.row.GetInt32(0)
+ << " event_type=" << rec.row.GetString(1)
+ << " @ offset=" << rec.offset << std::endl;
+ }
+}
```
**Continuous polling:**
diff --git a/website/docs/user-guide/python/api-reference.md
b/website/docs/user-guide/python/api-reference.md
index af03058..27a57dc 100644
--- a/website/docs/user-guide/python/api-reference.md
+++ b/website/docs/user-guide/python/api-reference.md
@@ -137,17 +137,69 @@ Builder for creating a `Lookuper`. Obtain via
`FlussTable.new_lookup()`.
| `.subscribe_partition_buckets(partition_bucket_offsets)` | Subscribe to
multiple partition+bucket combos (`{(part_id, bucket_id): offset}`) |
| `.unsubscribe(bucket_id)` | Unsubscribe
from a bucket (non-partitioned tables) |
| `.unsubscribe_partition(partition_id, bucket_id)` | Unsubscribe
from a partition bucket |
-| `.poll(timeout_ms) -> list[ScanRecord]` | Poll
individual records (record scanner only) |
+| `.poll(timeout_ms) -> ScanRecords` | Poll
individual records (record scanner only) |
| `.poll_arrow(timeout_ms) -> pa.Table` | Poll as
Arrow Table (batch scanner only) |
| `.poll_record_batch(timeout_ms) -> list[RecordBatch]` | Poll batches
with metadata (batch scanner only) |
| `.to_arrow() -> pa.Table` | Read all
subscribed data as Arrow Table (batch scanner only) |
| `.to_pandas() -> pd.DataFrame` | Read all
subscribed data as DataFrame (batch scanner only) |
+## `ScanRecords`
+
+Returned by `LogScanner.poll()`. Records are grouped by bucket.
+
+> **Note:** Flat iteration and integer indexing traverse buckets in an
arbitrary order that is consistent within a single `ScanRecords` instance but
may differ between `poll()` calls. Use per-bucket access (`.items()`,
`.records(bucket)`) when bucket ordering matters.
+
+```python
+scan_records = scanner.poll(timeout_ms=5000)
+
+# Sequence access
+scan_records[0] # first record
+scan_records[-1] # last record
+scan_records[:5] # first 5 records
+
+# Per-bucket access
+for bucket, records in scan_records.items():
+ for record in records:
+ print(f"bucket={bucket.bucket_id}, offset={record.offset},
row={record.row}")
+
+# Flat iteration
+for record in scan_records:
+ print(record.row)
+```
+
+### Methods
+
+| Method | Description
|
+|----------------------------------------|------------------------------------------------------------------|
+| `.buckets() -> list[TableBucket]` | List of distinct buckets
|
+| `.records(bucket) -> list[ScanRecord]` | Records for a specific bucket
(empty list if bucket not present) |
+| `.count() -> int` | Total record count across all
buckets |
+| `.is_empty() -> bool` | Check if empty
|
+
+### Indexing
+
+| Expression | Returns | Description
|
+|------------------------------|----------------------|-----------------------------------|
+| `scan_records[0]` | `ScanRecord` | Record by flat index
|
+| `scan_records[-1]` | `ScanRecord` | Negative indexing
|
+| `scan_records[1:5]` | `list[ScanRecord]` | Slice
|
+| `scan_records[bucket]` | `list[ScanRecord]` | Records for a bucket
|
+
+### Mapping Protocol
+
+| Method / Protocol | Description
|
+|--------------------------------|-------------------------------------------------|
+| `.keys()` | Same as `.buckets()`
|
+| `.values()` | Lazy iterator over record lists, one per
bucket |
+| `.items()` | Lazy iterator over `(bucket, records)`
pairs |
+| `len(scan_records)` | Same as `.count()`
|
+| `bucket in scan_records` | Membership test
|
+| `for record in scan_records` | Flat iteration over all records
|
+
## `ScanRecord`
| Property | Description
|
|------------------------------|---------------------------------------------------------------------|
-| `.bucket -> TableBucket` | Bucket this record belongs to
|
| `.offset -> int` | Record offset in the log
|
| `.timestamp -> int` | Record timestamp
|
| `.change_type -> ChangeType` | Change type (AppendOnly, Insert,
UpdateBefore, UpdateAfter, Delete) |
diff --git a/website/docs/user-guide/python/example/log-tables.md
b/website/docs/user-guide/python/example/log-tables.md
index 6e44e06..adaa162 100644
--- a/website/docs/user-guide/python/example/log-tables.md
+++ b/website/docs/user-guide/python/example/log-tables.md
@@ -83,13 +83,20 @@ while True:
if result.num_rows > 0:
print(result.to_pandas())
-# Record scanner: poll individual records with metadata
+# Record scanner: poll individual records
scanner = await table.new_scan().create_log_scanner()
scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in
range(num_buckets)})
while True:
- for record in scanner.poll(timeout_ms=5000):
+ scan_records = scanner.poll(timeout_ms=5000)
+
+ for record in scan_records:
print(f"offset={record.offset},
change={record.change_type.short_string()}, row={record.row}")
+
+ # Or per-bucket access (dict-like)
+ for bucket, records in scan_records.items():
+ for record in records:
+ print(f"bucket={bucket.bucket_id}, offset={record.offset},
row={record.row}")
```
### Unsubscribing
diff --git a/website/docs/user-guide/rust/example/log-tables.md
b/website/docs/user-guide/rust/example/log-tables.md
index 3ba3354..f5a4d0e 100644
--- a/website/docs/user-guide/rust/example/log-tables.md
+++ b/website/docs/user-guide/rust/example/log-tables.md
@@ -63,6 +63,21 @@ log_scanner.subscribe(0, 0).await?;
// Poll for records
let records = log_scanner.poll(Duration::from_secs(10)).await?;
+// Per-bucket access
+for (bucket, bucket_records) in records.records_by_buckets() {
+ println!("Bucket {}: {} records", bucket.bucket_id(),
bucket_records.len());
+ for record in bucket_records {
+ let row = record.row();
+ println!(
+ " event_id={}, event_type={} @ offset={}",
+ row.get_int(0),
+ row.get_string(1),
+ record.offset()
+ );
+ }
+}
+
+// Or flat iteration (consumes ScanRecords)
for record in records {
let row = record.row();
println!(