This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new ec4eb74  chore: Scan results returned per bucket python/cpp (#351)
ec4eb74 is described below

commit ec4eb749216ff40fd0c45eb7a786e2602d19c418
Author: Anton Borisov <[email protected]>
AuthorDate: Fri Feb 20 16:02:40 2026 +0000

    chore: Scan results returned per bucket python/cpp (#351)
---
 bindings/cpp/examples/example.cpp                  | 300 +++++++++--------
 bindings/cpp/include/fluss.hpp                     | 139 ++++++--
 bindings/cpp/src/lib.rs                            | 254 +++++++++-----
 bindings/cpp/src/table.cpp                         | 190 +++++++----
 bindings/python/Cargo.toml                         |   1 +
 bindings/python/example/example.py                 |  31 +-
 bindings/python/fluss/__init__.pyi                 |  62 +++-
 bindings/python/src/lib.rs                         |   1 +
 bindings/python/src/table.rs                       | 374 ++++++++++++++++-----
 bindings/python/test/test_log_table.py             |  83 ++++-
 website/docs/user-guide/cpp/api-reference.md       |  65 +++-
 website/docs/user-guide/cpp/example/log-tables.md  |  12 +
 website/docs/user-guide/python/api-reference.md    |  56 ++-
 .../docs/user-guide/python/example/log-tables.md   |  11 +-
 website/docs/user-guide/rust/example/log-tables.md |  15 +
 15 files changed, 1167 insertions(+), 427 deletions(-)

diff --git a/bindings/cpp/examples/example.cpp 
b/bindings/cpp/examples/example.cpp
index ea966d8..d86ee5c 100644
--- a/bindings/cpp/examples/example.cpp
+++ b/bindings/cpp/examples/example.cpp
@@ -168,58 +168,75 @@ int main() {
     fluss::ScanRecords records;
     check("poll", scanner.Poll(5000, records));
 
-    std::cout << "Scanned records: " << records.Size() << std::endl;
+    // Flat iteration over all records (regardless of bucket)
+    std::cout << "Scanned records: " << records.Count() << " across " << 
records.BucketCount()
+              << " buckets" << std::endl;
+    for (const auto& rec : records) {
+        std::cout << "  offset=" << rec.offset << " timestamp=" << 
rec.timestamp << std::endl;
+    }
+
+    // Per-bucket access (with type verification)
     bool scan_ok = true;
     bool found_null_row = false;
-    for (const auto& rec : records) {
-        // Check if this is the all-null row (matches Rust: is_null_at for 
every column)
-        if (rec.row.IsNull(0)) {
-            found_null_row = true;
-            for (size_t i = 0; i < rec.row.FieldCount(); ++i) {
-                if (!rec.row.IsNull(i)) {
-                    std::cerr << "ERROR: column " << i << " should be null" << 
std::endl;
-                    scan_ok = false;
+    for (const auto& tb : records.Buckets()) {
+        auto view = records.Records(tb);
+        std::cout << "  Bucket " << tb.bucket_id;
+        if (tb.partition_id.has_value()) {
+            std::cout << " (partition=" << *tb.partition_id << ")";
+        }
+        std::cout << ": " << view.Size() << " records" << std::endl;
+        for (const auto& rec : view) {
+            // Check if this is the all-null row
+            if (rec.row.IsNull(0)) {
+                found_null_row = true;
+                for (size_t i = 0; i < rec.row.FieldCount(); ++i) {
+                    if (!rec.row.IsNull(i)) {
+                        std::cerr << "ERROR: column " << i << " should be 
null" << std::endl;
+                        scan_ok = false;
+                    }
                 }
+                std::cout << "    [null row] all " << rec.row.FieldCount() << 
" fields are null"
+                          << std::endl;
+                continue;
             }
-            std::cout << "  [null row] all " << rec.row.FieldCount() << " 
fields are null"
-                      << std::endl;
-            continue;
-        }
-
-        // Non-null rows: verify types
-        if (rec.row.GetType(4) != fluss::TypeId::Date) {
-            std::cerr << "ERROR: field 4 expected Date, got "
-                      << static_cast<int>(rec.row.GetType(4)) << std::endl;
-            scan_ok = false;
-        }
-        if (rec.row.GetType(5) != fluss::TypeId::Time) {
-            std::cerr << "ERROR: field 5 expected Time, got "
-                      << static_cast<int>(rec.row.GetType(5)) << std::endl;
-            scan_ok = false;
-        }
-        if (rec.row.GetType(6) != fluss::TypeId::Timestamp) {
-            std::cerr << "ERROR: field 6 expected Timestamp, got "
-                      << static_cast<int>(rec.row.GetType(6)) << std::endl;
-            scan_ok = false;
-        }
-        if (rec.row.GetType(7) != fluss::TypeId::TimestampLtz) {
-            std::cerr << "ERROR: field 7 expected TimestampLtz, got "
-                      << static_cast<int>(rec.row.GetType(7)) << std::endl;
-            scan_ok = false;
-        }
 
-        // Name-based getters (equivalent to index-based above)
-        auto date = rec.row.GetDate("event_date");
-        auto time = rec.row.GetTime("event_time");
-        auto ts_ntz = rec.row.GetTimestamp("created_at");
-        auto ts_ltz = rec.row.GetTimestamp("updated_at");
+            // Non-null rows: verify types
+            if (rec.row.GetType(4) != fluss::TypeId::Date) {
+                std::cerr << "ERROR: field 4 expected Date, got "
+                          << static_cast<int>(rec.row.GetType(4)) << std::endl;
+                scan_ok = false;
+            }
+            if (rec.row.GetType(5) != fluss::TypeId::Time) {
+                std::cerr << "ERROR: field 5 expected Time, got "
+                          << static_cast<int>(rec.row.GetType(5)) << std::endl;
+                scan_ok = false;
+            }
+            if (rec.row.GetType(6) != fluss::TypeId::Timestamp) {
+                std::cerr << "ERROR: field 6 expected Timestamp, got "
+                          << static_cast<int>(rec.row.GetType(6)) << std::endl;
+                scan_ok = false;
+            }
+            if (rec.row.GetType(7) != fluss::TypeId::TimestampLtz) {
+                std::cerr << "ERROR: field 7 expected TimestampLtz, got "
+                          << static_cast<int>(rec.row.GetType(7)) << std::endl;
+                scan_ok = false;
+            }
 
-        std::cout << "  id=" << rec.row.GetInt32("id") << " name=" << 
rec.row.GetString("name")
-                  << " score=" << rec.row.GetFloat32("score") << " age=" << 
rec.row.GetInt32("age")
-                  << " date=" << date.Year() << "-" << date.Month() << "-" << 
date.Day()
-                  << " time=" << time.Hour() << ":" << time.Minute() << ":" << 
time.Second()
-                  << " ts_ntz=" << ts_ntz.epoch_millis << " ts_ltz=" << 
ts_ltz.epoch_millis << "+"
-                  << ts_ltz.nano_of_millisecond << "ns" << std::endl;
+            // Name-based getters
+            auto date = rec.row.GetDate("event_date");
+            auto time = rec.row.GetTime("event_time");
+            auto ts_ntz = rec.row.GetTimestamp("created_at");
+            auto ts_ltz = rec.row.GetTimestamp("updated_at");
+
+            std::cout << "    id=" << rec.row.GetInt32("id")
+                      << " name=" << rec.row.GetString("name")
+                      << " score=" << rec.row.GetFloat32("score")
+                      << " age=" << rec.row.GetInt32("age") << " date=" << 
date.Year() << "-"
+                      << date.Month() << "-" << date.Day() << " time=" << 
time.Hour() << ":"
+                      << time.Minute() << ":" << time.Second() << " ts_ntz=" 
<< ts_ntz.epoch_millis
+                      << " ts_ltz=" << ts_ltz.epoch_millis << "+" << 
ts_ltz.nano_of_millisecond
+                      << "ns" << std::endl;
+        }
     }
 
     if (!found_null_row) {
@@ -246,32 +263,34 @@ int main() {
     fluss::ScanRecords projected_records;
     check("poll_projected", projected_scanner.Poll(5000, projected_records));
 
-    std::cout << "Projected records: " << projected_records.Size() << 
std::endl;
-    for (const auto& rec : projected_records) {
-        if (rec.row.FieldCount() != 2) {
-            std::cerr << "ERROR: expected 2 fields, got " << 
rec.row.FieldCount() << std::endl;
-            scan_ok = false;
-            continue;
-        }
-        // Skip the all-null row
-        if (rec.row.IsNull(0)) {
-            std::cout << "  [null row] skipped" << std::endl;
-            continue;
-        }
-        if (rec.row.GetType(0) != fluss::TypeId::Int) {
-            std::cerr << "ERROR: projected field 0 expected Int, got "
-                      << static_cast<int>(rec.row.GetType(0)) << std::endl;
-            scan_ok = false;
-        }
-        if (rec.row.GetType(1) != fluss::TypeId::TimestampLtz) {
-            std::cerr << "ERROR: projected field 1 expected TimestampLtz, got "
-                      << static_cast<int>(rec.row.GetType(1)) << std::endl;
-            scan_ok = false;
-        }
+    std::cout << "Projected records: " << projected_records.Count() << 
std::endl;
+    for (const auto& tb : projected_records.Buckets()) {
+        for (const auto& rec : projected_records.Records(tb)) {
+            if (rec.row.FieldCount() != 2) {
+                std::cerr << "ERROR: expected 2 fields, got " << 
rec.row.FieldCount() << std::endl;
+                scan_ok = false;
+                continue;
+            }
+            // Skip the all-null row
+            if (rec.row.IsNull(0)) {
+                std::cout << "  [null row] skipped" << std::endl;
+                continue;
+            }
+            if (rec.row.GetType(0) != fluss::TypeId::Int) {
+                std::cerr << "ERROR: projected field 0 expected Int, got "
+                          << static_cast<int>(rec.row.GetType(0)) << std::endl;
+                scan_ok = false;
+            }
+            if (rec.row.GetType(1) != fluss::TypeId::TimestampLtz) {
+                std::cerr << "ERROR: projected field 1 expected TimestampLtz, 
got "
+                          << static_cast<int>(rec.row.GetType(1)) << std::endl;
+                scan_ok = false;
+            }
 
-        auto ts = rec.row.GetTimestamp(1);
-        std::cout << "  id=" << rec.row.GetInt32(0) << " updated_at=" << 
ts.epoch_millis << "+"
-                  << ts.nano_of_millisecond << "ns" << std::endl;
+            auto ts = rec.row.GetTimestamp(1);
+            std::cout << "  id=" << rec.row.GetInt32(0) << " updated_at=" << 
ts.epoch_millis << "+"
+                      << ts.nano_of_millisecond << "ns" << std::endl;
+        }
     }
 
     // 7b) Projected scan by column names — same columns as above but using 
names
@@ -287,32 +306,34 @@ int main() {
     fluss::ScanRecords name_projected_records;
     check("poll_name_projected", name_projected_scanner.Poll(5000, 
name_projected_records));
 
-    std::cout << "Name-projected records: " << name_projected_records.Size() 
<< std::endl;
-    for (const auto& rec : name_projected_records) {
-        if (rec.row.FieldCount() != 2) {
-            std::cerr << "ERROR: expected 2 fields, got " << 
rec.row.FieldCount() << std::endl;
-            scan_ok = false;
-            continue;
-        }
-        // Skip the all-null row
-        if (rec.row.IsNull(0)) {
-            std::cout << "  [null row] skipped" << std::endl;
-            continue;
-        }
-        if (rec.row.GetType(0) != fluss::TypeId::Int) {
-            std::cerr << "ERROR: name-projected field 0 expected Int, got "
-                      << static_cast<int>(rec.row.GetType(0)) << std::endl;
-            scan_ok = false;
-        }
-        if (rec.row.GetType(1) != fluss::TypeId::TimestampLtz) {
-            std::cerr << "ERROR: name-projected field 1 expected TimestampLtz, 
got "
-                      << static_cast<int>(rec.row.GetType(1)) << std::endl;
-            scan_ok = false;
-        }
+    std::cout << "Name-projected records: " << name_projected_records.Count() 
<< std::endl;
+    for (const auto& tb : name_projected_records.Buckets()) {
+        for (const auto& rec : name_projected_records.Records(tb)) {
+            if (rec.row.FieldCount() != 2) {
+                std::cerr << "ERROR: expected 2 fields, got " << 
rec.row.FieldCount() << std::endl;
+                scan_ok = false;
+                continue;
+            }
+            // Skip the all-null row
+            if (rec.row.IsNull(0)) {
+                std::cout << "  [null row] skipped" << std::endl;
+                continue;
+            }
+            if (rec.row.GetType(0) != fluss::TypeId::Int) {
+                std::cerr << "ERROR: name-projected field 0 expected Int, got "
+                          << static_cast<int>(rec.row.GetType(0)) << std::endl;
+                scan_ok = false;
+            }
+            if (rec.row.GetType(1) != fluss::TypeId::TimestampLtz) {
+                std::cerr << "ERROR: name-projected field 1 expected 
TimestampLtz, got "
+                          << static_cast<int>(rec.row.GetType(1)) << std::endl;
+                scan_ok = false;
+            }
 
-        auto ts = rec.row.GetTimestamp(1);
-        std::cout << "  id=" << rec.row.GetInt32(0) << " updated_at=" << 
ts.epoch_millis << "+"
-                  << ts.nano_of_millisecond << "ns" << std::endl;
+            auto ts = rec.row.GetTimestamp(1);
+            std::cout << "  id=" << rec.row.GetInt32(0) << " updated_at=" << 
ts.epoch_millis << "+"
+                      << ts.nano_of_millisecond << "ns" << std::endl;
+        }
     }
 
     if (scan_ok) {
@@ -356,8 +377,8 @@ int main() {
 
     std::unordered_map<int32_t, int64_t> timestamp_offsets;
     check("list_timestamp_offsets",
-          admin.ListOffsets(table_path, all_bucket_ids,
-                            fluss::OffsetSpec::Timestamp(timestamp_ms), 
timestamp_offsets));
+          admin.ListOffsets(table_path, all_bucket_ids, 
fluss::OffsetSpec::Timestamp(timestamp_ms),
+                            timestamp_offsets));
     std::cout << "Offsets for timestamp " << timestamp_ms << " (1 hour ago):" 
<< std::endl;
     for (const auto& [bucket_id, offset] : timestamp_offsets) {
         std::cout << "  Bucket " << bucket_id << ": offset=" << offset << 
std::endl;
@@ -381,15 +402,21 @@ int main() {
     fluss::ScanRecords batch_records;
     check("poll_batch", batch_scanner.Poll(5000, batch_records));
 
-    std::cout << "Scanned " << batch_records.Size() << " records from batch 
subscription"
+    std::cout << "Scanned " << batch_records.Count() << " records from batch 
subscription"
               << std::endl;
-    for (size_t i = 0; i < batch_records.Size() && i < 5; ++i) {
-        const auto& rec = batch_records[i];
-        std::cout << "  Record " << i << ": bucket_id=" << rec.bucket_id
-                  << ", offset=" << rec.offset << ", timestamp=" << 
rec.timestamp << std::endl;
-    }
-    if (batch_records.Size() > 5) {
-        std::cout << "  ... and " << (batch_records.Size() - 5) << " more 
records" << std::endl;
+    for (const auto& tb : batch_records.Buckets()) {
+        size_t shown = 0;
+        for (const auto& rec : batch_records.Records(tb)) {
+            if (shown < 5) {
+                std::cout << "  bucket_id=" << tb.bucket_id << ", offset=" << 
rec.offset
+                          << ", timestamp=" << rec.timestamp << std::endl;
+            }
+            ++shown;
+        }
+        if (shown > 5) {
+            std::cout << "  ... and " << (shown - 5) << " more records in 
bucket " << tb.bucket_id
+                      << std::endl;
+        }
     }
 
     // 9.1) Unsubscribe from a bucket
@@ -520,11 +547,13 @@ int main() {
 
         fluss::ScanRecords arrow_write_records;
         check("poll_arrow_write", arrow_write_scanner.Poll(5000, 
arrow_write_records));
-        std::cout << "Scanned " << arrow_write_records.Size()
+        std::cout << "Scanned " << arrow_write_records.Count()
                   << " records written via AppendArrowBatch:" << std::endl;
-        for (const auto& rec : arrow_write_records) {
-            std::cout << "  id=" << rec.row.GetInt32(0) << " name=" << 
rec.row.GetString(1)
-                      << " score=" << rec.row.GetFloat32(2) << std::endl;
+        for (const auto& tb : arrow_write_records.Buckets()) {
+            for (const auto& rec : arrow_write_records.Records(tb)) {
+                std::cout << "  id=" << rec.row.GetInt32(0) << " name=" << 
rec.row.GetString(1)
+                          << " score=" << rec.row.GetFloat32(2) << std::endl;
+            }
         }
     }
 
@@ -591,11 +620,13 @@ int main() {
     fluss::ScanRecords decimal_records;
     check("poll_decimal", decimal_scanner.Poll(5000, decimal_records));
 
-    std::cout << "Scanned decimal records: " << decimal_records.Size() << 
std::endl;
-    for (const auto& rec : decimal_records) {
-        std::cout << "  id=" << rec.row.GetInt32(0) << " price=" << 
rec.row.GetDecimalString(1)
-                  << " amount=" << rec.row.GetDecimalString(2)
-                  << " is_decimal=" << rec.row.IsDecimal(1) << std::endl;
+    std::cout << "Scanned decimal records: " << decimal_records.Count() << 
std::endl;
+    for (const auto& tb : decimal_records.Buckets()) {
+        for (const auto& rec : decimal_records.Records(tb)) {
+            std::cout << "  id=" << rec.row.GetInt32(0) << " price=" << 
rec.row.GetDecimalString(1)
+                      << " amount=" << rec.row.GetDecimalString(2)
+                      << " is_decimal=" << rec.row.IsDecimal(1) << std::endl;
+        }
     }
 
     // 14) Partitioned table example
@@ -690,14 +721,15 @@ int main() {
 
     fluss::ScanRecords partition_records;
     check("poll_partitioned", partition_scanner.Poll(5000, partition_records));
-    std::cout << "Scanned " << partition_records.Size() << " records from 
partitioned table"
+    std::cout << "Scanned " << partition_records.Count() << " records from 
partitioned table"
               << std::endl;
-    for (size_t i = 0; i < partition_records.Size(); ++i) {
-        const auto& rec = partition_records[i];
-        std::cout << "  Record " << i << ": partition_id="
-                  << (rec.partition_id.has_value() ? 
std::to_string(*rec.partition_id) : "none")
-                  << ", id=" << rec.row.GetInt32(0) << ", region=" << 
rec.row.GetString(1)
-                  << ", value=" << rec.row.GetInt64(2) << std::endl;
+    for (const auto& tb : partition_records.Buckets()) {
+        for (const auto& rec : partition_records.Records(tb)) {
+            std::cout << "  partition_id="
+                      << (tb.partition_id.has_value() ? 
std::to_string(*tb.partition_id) : "none")
+                      << ", id=" << rec.row.GetInt32(0) << ", region=" << 
rec.row.GetString(1)
+                      << ", value=" << rec.row.GetInt64(2) << std::endl;
+        }
     }
 
     // 14.2) subscribe_partition_buckets: batch subscribe to all partitions at 
once
@@ -717,13 +749,13 @@ int main() {
 
     fluss::ScanRecords partition_batch_records;
     check("poll_partition_batch", partition_batch_scanner.Poll(5000, 
partition_batch_records));
-    std::cout << "Scanned " << partition_batch_records.Size()
+    std::cout << "Scanned " << partition_batch_records.Count()
               << " records from batch partition subscription" << std::endl;
-    for (size_t i = 0; i < partition_batch_records.Size(); ++i) {
-        const auto& rec = partition_batch_records[i];
-        std::cout << "  Record " << i << ": id=" << rec.row.GetInt32(0)
-                  << ", region=" << rec.row.GetString(1) << ", value=" << 
rec.row.GetInt64(2)
-                  << std::endl;
+    for (const auto& tb : partition_batch_records.Buckets()) {
+        for (const auto& rec : partition_batch_records.Records(tb)) {
+            std::cout << "  id=" << rec.row.GetInt32(0) << ", region=" << 
rec.row.GetString(1)
+                      << ", value=" << rec.row.GetInt64(2) << std::endl;
+        }
     }
 
     // 14.3) UnsubscribePartition: unsubscribe from one partition, verify 
remaining
@@ -743,12 +775,12 @@ int main() {
 
     fluss::ScanRecords unsub_records;
     check("poll_after_unsub", unsub_partition_scanner.Poll(5000, 
unsub_records));
-    std::cout << "After unsubscribe, scanned " << unsub_records.Size() << " 
records" << std::endl;
-    for (size_t i = 0; i < unsub_records.Size(); ++i) {
-        const auto& rec = unsub_records[i];
-        std::cout << "  Record " << i << ": id=" << rec.row.GetInt32(0)
-                  << ", region=" << rec.row.GetString(1) << ", value=" << 
rec.row.GetInt64(2)
-                  << std::endl;
+    std::cout << "After unsubscribe, scanned " << unsub_records.Count() << " 
records" << std::endl;
+    for (const auto& tb : unsub_records.Buckets()) {
+        for (const auto& rec : unsub_records.Records(tb)) {
+            std::cout << "  id=" << rec.row.GetInt32(0) << ", region=" << 
rec.row.GetString(1)
+                      << ", value=" << rec.row.GetInt64(2) << std::endl;
+        }
     }
 
     // Cleanup
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index 27d1fcb..9ea7e41 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -507,6 +507,17 @@ struct NamedGetters {
    private:
     const Derived& Self() const { return static_cast<const Derived&>(*this); }
 };
+
+struct ScanData {
+    ffi::ScanResultInner* raw;
+    ColumnMap columns;
+
+    ScanData(ffi::ScanResultInner* r, ColumnMap cols) : raw(r), 
columns(std::move(cols)) {}
+    ~ScanData();
+
+    ScanData(const ScanData&) = delete;
+    ScanData& operator=(const ScanData&) = delete;
+};
 }  // namespace detail
 
 class GenericRow {
@@ -623,9 +634,8 @@ class RowView : public detail::NamedGetters<RowView> {
     friend struct detail::NamedGetters<RowView>;
 
    public:
-    RowView(std::shared_ptr<const ffi::ScanResultInner> inner, size_t 
record_idx,
-            std::shared_ptr<const detail::ColumnMap> column_map)
-        : inner_(std::move(inner)), record_idx_(record_idx), 
column_map_(std::move(column_map)) {}
+    RowView(std::shared_ptr<const detail::ScanData> data, size_t bucket_idx, 
size_t rec_idx)
+        : data_(std::move(data)), bucket_idx_(bucket_idx), rec_idx_(rec_idx) {}
 
     // ── Index-based getters ──────────────────────────────────────────
     size_t FieldCount() const;
@@ -660,14 +670,28 @@ class RowView : public detail::NamedGetters<RowView> {
 
    private:
     size_t Resolve(const std::string& name) const {
-        if (!column_map_) {
+        if (!data_) {
             throw std::runtime_error("RowView: name-based access not 
available");
         }
-        return detail::ResolveColumn(*column_map_, name);
+        return detail::ResolveColumn(data_->columns, name);
+    }
+    std::shared_ptr<const detail::ScanData> data_;
+    size_t bucket_idx_;
+    size_t rec_idx_;
+};
+
+/// Identifies a specific bucket, optionally within a partition.
+struct TableBucket {
+    int64_t table_id;
+    int32_t bucket_id;
+    std::optional<int64_t> partition_id;
+
+    bool operator==(const TableBucket& other) const {
+        return table_id == other.table_id && bucket_id == other.bucket_id &&
+               partition_id == other.partition_id;
     }
-    std::shared_ptr<const ffi::ScanResultInner> inner_;
-    size_t record_idx_;
-    std::shared_ptr<const detail::ColumnMap> column_map_;
+
+    bool operator!=(const TableBucket& other) const { return !(*this == 
other); }
 };
 
 /// A single scan record. Contains metadata and a RowView for field access.
@@ -675,14 +699,61 @@ class RowView : public detail::NamedGetters<RowView> {
 /// ScanRecord is a value type that can be freely copied, stored, and
 /// accumulated across multiple Poll() calls.
 struct ScanRecord {
-    int32_t bucket_id;
-    std::optional<int64_t> partition_id;
     int64_t offset;
     int64_t timestamp;
     ChangeType change_type;
     RowView row;
 };
 
+/// A view into a subset of scan results for a single bucket.
+///
+/// BucketView is a value type — it shares ownership of the underlying scan 
data
+/// via reference counting, so it can safely outlive the ScanRecords that 
produced it.
+class BucketView {
+   public:
+    BucketView(std::shared_ptr<const detail::ScanData> data, TableBucket 
bucket, size_t bucket_idx,
+               size_t count)
+        : data_(std::move(data)),
+          bucket_(std::move(bucket)),
+          bucket_idx_(bucket_idx),
+          count_(count) {}
+
+    /// The bucket these records belong to.
+    const TableBucket& Bucket() const { return bucket_; }
+
+    /// Number of records in this bucket.
+    size_t Size() const { return count_; }
+    bool Empty() const { return count_ == 0; }
+
+    /// Access a record by its position within this bucket (0-based).
+    ScanRecord operator[](size_t idx) const;
+
+    class Iterator {
+       public:
+        ScanRecord operator*() const;
+        Iterator& operator++() {
+            ++idx_;
+            return *this;
+        }
+        bool operator!=(const Iterator& other) const { return idx_ != 
other.idx_; }
+
+       private:
+        friend class BucketView;
+        Iterator(const BucketView* owner, size_t idx) : owner_(owner), 
idx_(idx) {}
+        const BucketView* owner_;
+        size_t idx_;
+    };
+
+    Iterator begin() const { return Iterator(this, 0); }
+    Iterator end() const { return Iterator(this, count_); }
+
+   private:
+    std::shared_ptr<const detail::ScanData> data_;
+    TableBucket bucket_;
+    size_t bucket_idx_;
+    size_t count_;
+};
+
 class ScanRecords {
    public:
     ScanRecords() noexcept = default;
@@ -693,36 +764,52 @@ class ScanRecords {
     ScanRecords(ScanRecords&&) noexcept = default;
     ScanRecords& operator=(ScanRecords&&) noexcept = default;
 
-    size_t Size() const;
-    bool Empty() const;
-    ScanRecord operator[](size_t idx) const;
+    /// Total number of records across all buckets.
+    size_t Count() const;
+    bool IsEmpty() const;
+
+    /// Number of distinct buckets with records.
+    size_t BucketCount() const;
+
+    /// List of distinct buckets that have records.
+    std::vector<TableBucket> Buckets() const;
+
+    /// Get a view of records for a specific bucket.
+    ///
+    /// Returns an empty BucketView if the bucket is not present (matches 
Rust/Java).
+    /// Note: O(B) linear scan. For iteration over all buckets, prefer 
BucketAt(idx).
+    BucketView Records(const TableBucket& bucket) const;
 
+    /// Get a view of records by bucket index (0-based). O(1).
+    ///
+    /// Throws std::out_of_range if idx >= BucketCount().
+    BucketView BucketAt(size_t idx) const;
+
+    /// Flat iterator over all records across all buckets (matches Java 
Iterable<ScanRecord>).
     class Iterator {
        public:
         ScanRecord operator*() const;
-        Iterator& operator++() {
-            ++idx_;
-            return *this;
+        Iterator& operator++();
+        bool operator!=(const Iterator& other) const {
+            return bucket_idx_ != other.bucket_idx_ || rec_idx_ != 
other.rec_idx_;
         }
-        bool operator!=(const Iterator& other) const { return idx_ != 
other.idx_; }
 
        private:
         friend class ScanRecords;
-        Iterator(const ScanRecords* owner, size_t idx) : owner_(owner), 
idx_(idx) {}
+        Iterator(const ScanRecords* owner, size_t bucket_idx, size_t rec_idx)
+            : owner_(owner), bucket_idx_(bucket_idx), rec_idx_(rec_idx) {}
         const ScanRecords* owner_;
-        size_t idx_;
+        size_t bucket_idx_;
+        size_t rec_idx_;
     };
 
-    Iterator begin() const { return Iterator(this, 0); }
-    Iterator end() const { return Iterator(this, Size()); }
+    Iterator begin() const;
+    Iterator end() const { return Iterator(this, BucketCount(), 0); }
 
    private:
-    /// Returns the column name-to-index map (lazy-built, cached).
-    const std::shared_ptr<detail::ColumnMap>& GetColumnMap() const;
     friend class LogScanner;
-    void BuildColumnMap() const;
-    std::shared_ptr<ffi::ScanResultInner> inner_;
-    mutable std::shared_ptr<detail::ColumnMap> column_map_;
+    ScanRecord RecordAt(size_t bucket, size_t rec_idx) const;
+    std::shared_ptr<const detail::ScanData> data_;
 };
 
 class ArrowRecordBatch {
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index cb29882..9f987b9 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -141,6 +141,14 @@ mod ffi {
         timestamp: i64,
     }
 
+    struct FfiBucketInfo {
+        table_id: i64,
+        bucket_id: i32,
+        has_partition_id: bool,
+        partition_id: i64,
+        record_count: usize,
+    }
+
     struct FfiBucketSubscription {
         bucket_id: i32,
         offset: i64,
@@ -420,27 +428,96 @@ mod ffi {
         fn sv_column_count(self: &ScanResultInner) -> usize;
         fn sv_column_name(self: &ScanResultInner, field: usize) -> 
Result<&str>;
         fn sv_column_type(self: &ScanResultInner, field: usize) -> Result<i32>;
-        fn sv_bucket_id(self: &ScanResultInner, rec: usize) -> i32;
-        fn sv_has_partition_id(self: &ScanResultInner, rec: usize) -> bool;
-        fn sv_partition_id(self: &ScanResultInner, rec: usize) -> i64;
-        fn sv_offset(self: &ScanResultInner, rec: usize) -> i64;
-        fn sv_timestamp(self: &ScanResultInner, rec: usize) -> i64;
-        fn sv_change_type(self: &ScanResultInner, rec: usize) -> i32;
+        fn sv_offset(self: &ScanResultInner, bucket: usize, rec: usize) -> i64;
+        fn sv_timestamp(self: &ScanResultInner, bucket: usize, rec: usize) -> 
i64;
+        fn sv_change_type(self: &ScanResultInner, bucket: usize, rec: usize) 
-> i32;
         fn sv_field_count(self: &ScanResultInner) -> usize;
-        fn sv_is_null(self: &ScanResultInner, rec: usize, field: usize) -> 
Result<bool>;
-        fn sv_get_bool(self: &ScanResultInner, rec: usize, field: usize) -> 
Result<bool>;
-        fn sv_get_i32(self: &ScanResultInner, rec: usize, field: usize) -> 
Result<i32>;
-        fn sv_get_i64(self: &ScanResultInner, rec: usize, field: usize) -> 
Result<i64>;
-        fn sv_get_f32(self: &ScanResultInner, rec: usize, field: usize) -> 
Result<f32>;
-        fn sv_get_f64(self: &ScanResultInner, rec: usize, field: usize) -> 
Result<f64>;
-        fn sv_get_str(self: &ScanResultInner, rec: usize, field: usize) -> 
Result<&str>;
-        fn sv_get_bytes(self: &ScanResultInner, rec: usize, field: usize) -> 
Result<&[u8]>;
-        fn sv_get_date_days(self: &ScanResultInner, rec: usize, field: usize) 
-> Result<i32>;
-        fn sv_get_time_millis(self: &ScanResultInner, rec: usize, field: 
usize) -> Result<i32>;
-        fn sv_get_ts_millis(self: &ScanResultInner, rec: usize, field: usize) 
-> Result<i64>;
-        fn sv_get_ts_nanos(self: &ScanResultInner, rec: usize, field: usize) 
-> Result<i32>;
-        fn sv_is_ts_ltz(self: &ScanResultInner, rec: usize, field: usize) -> 
Result<bool>;
-        fn sv_get_decimal_str(self: &ScanResultInner, rec: usize, field: 
usize) -> Result<String>;
+        fn sv_is_null(
+            self: &ScanResultInner,
+            bucket: usize,
+            rec: usize,
+            field: usize,
+        ) -> Result<bool>;
+        fn sv_get_bool(
+            self: &ScanResultInner,
+            bucket: usize,
+            rec: usize,
+            field: usize,
+        ) -> Result<bool>;
+        fn sv_get_i32(
+            self: &ScanResultInner,
+            bucket: usize,
+            rec: usize,
+            field: usize,
+        ) -> Result<i32>;
+        fn sv_get_i64(
+            self: &ScanResultInner,
+            bucket: usize,
+            rec: usize,
+            field: usize,
+        ) -> Result<i64>;
+        fn sv_get_f32(
+            self: &ScanResultInner,
+            bucket: usize,
+            rec: usize,
+            field: usize,
+        ) -> Result<f32>;
+        fn sv_get_f64(
+            self: &ScanResultInner,
+            bucket: usize,
+            rec: usize,
+            field: usize,
+        ) -> Result<f64>;
+        fn sv_get_str(
+            self: &ScanResultInner,
+            bucket: usize,
+            rec: usize,
+            field: usize,
+        ) -> Result<&str>;
+        fn sv_get_bytes(
+            self: &ScanResultInner,
+            bucket: usize,
+            rec: usize,
+            field: usize,
+        ) -> Result<&[u8]>;
+        fn sv_get_date_days(
+            self: &ScanResultInner,
+            bucket: usize,
+            rec: usize,
+            field: usize,
+        ) -> Result<i32>;
+        fn sv_get_time_millis(
+            self: &ScanResultInner,
+            bucket: usize,
+            rec: usize,
+            field: usize,
+        ) -> Result<i32>;
+        fn sv_get_ts_millis(
+            self: &ScanResultInner,
+            bucket: usize,
+            rec: usize,
+            field: usize,
+        ) -> Result<i64>;
+        fn sv_get_ts_nanos(
+            self: &ScanResultInner,
+            bucket: usize,
+            rec: usize,
+            field: usize,
+        ) -> Result<i32>;
+        fn sv_is_ts_ltz(
+            self: &ScanResultInner,
+            bucket: usize,
+            rec: usize,
+            field: usize,
+        ) -> Result<bool>;
+        fn sv_get_decimal_str(
+            self: &ScanResultInner,
+            bucket: usize,
+            rec: usize,
+            field: usize,
+        ) -> Result<String>;
+
+        fn sv_bucket_infos(self: &ScanResultInner) -> &Vec<FfiBucketInfo>;
     }
 }
 
@@ -1487,24 +1564,27 @@ impl LogScanner {
         match result {
             Ok(records) => {
                 let columns = self.projected_columns.clone();
-                // Flatten ScanRecords into a Vec<FlatScanRecord> — moves 
Arc<RecordBatch>, zero copy
-                let mut flat = Vec::with_capacity(records.count());
+                let mut total_count = 0usize;
+                let mut buckets = Vec::new();
+                let mut bucket_infos = Vec::new();
                 for (table_bucket, bucket_records) in 
records.into_records_by_buckets() {
-                    let bucket_id = table_bucket.bucket_id();
-                    let partition = table_bucket.partition_id();
-                    for record in bucket_records {
-                        flat.push(FlatScanRecord {
-                            bucket_id,
-                            has_partition_id: partition.is_some(),
-                            partition_id: partition.unwrap_or(0),
-                            record,
-                        });
-                    }
+                    let count = bucket_records.len();
+                    total_count += count;
+                    bucket_infos.push(ffi::FfiBucketInfo {
+                        table_id: table_bucket.table_id(),
+                        bucket_id: table_bucket.bucket_id(),
+                        has_partition_id: 
table_bucket.partition_id().is_some(),
+                        partition_id: table_bucket.partition_id().unwrap_or(0),
+                        record_count: count,
+                    });
+                    buckets.push((table_bucket, bucket_records));
                 }
                 Box::new(ScanResultInner {
                     error: None,
-                    records: flat,
+                    buckets,
                     columns,
+                    bucket_infos,
+                    total_count,
                 })
             }
             Err(e) => {
@@ -1917,28 +1997,29 @@ mod row_reader {
 // Opaque types: ScanResultInner (scan read path)
 // ============================================================================
 
-struct FlatScanRecord {
-    bucket_id: i32,
-    has_partition_id: bool,
-    partition_id: i64,
-    record: fcore::record::ScanRecord,
-}
-
 pub struct ScanResultInner {
     error: Option<(i32, String)>,
-    records: Vec<FlatScanRecord>,
+    buckets: Vec<(fcore::metadata::TableBucket, 
Vec<fcore::record::ScanRecord>)>,
     columns: Vec<fcore::metadata::Column>,
+    bucket_infos: Vec<ffi::FfiBucketInfo>,
+    total_count: usize,
 }
 
 impl ScanResultInner {
     fn from_error(code: i32, msg: String) -> Self {
         Self {
             error: Some((code, msg)),
-            records: Vec::new(),
+            buckets: Vec::new(),
             columns: Vec::new(),
+            bucket_infos: Vec::new(),
+            total_count: 0,
         }
     }
 
+    fn resolve(&self, bucket: usize, rec: usize) -> &fcore::record::ScanRecord 
{
+        &self.buckets[bucket].1[rec]
+    }
+
     fn sv_has_error(&self) -> bool {
         self.error.is_some()
     }
@@ -1952,7 +2033,7 @@ impl ScanResultInner {
     }
 
     fn sv_record_count(&self) -> usize {
-        self.records.len()
+        self.total_count
     }
 
     fn sv_column_count(&self) -> usize {
@@ -1965,71 +2046,70 @@ impl ScanResultInner {
         row_reader::column_type(&self.columns, field)
     }
 
-    // Metadata accessors — C++ validates rec in operator[] before calling 
these.
-    fn sv_bucket_id(&self, rec: usize) -> i32 {
-        self.records[rec].bucket_id
+    fn sv_offset(&self, bucket: usize, rec: usize) -> i64 {
+        self.resolve(bucket, rec).offset()
     }
-    fn sv_has_partition_id(&self, rec: usize) -> bool {
-        self.records[rec].has_partition_id
+    fn sv_timestamp(&self, bucket: usize, rec: usize) -> i64 {
+        self.resolve(bucket, rec).timestamp()
     }
-    fn sv_partition_id(&self, rec: usize) -> i64 {
-        self.records[rec].partition_id
-    }
-    fn sv_offset(&self, rec: usize) -> i64 {
-        self.records[rec].record.offset()
-    }
-    fn sv_timestamp(&self, rec: usize) -> i64 {
-        self.records[rec].record.timestamp()
-    }
-    fn sv_change_type(&self, rec: usize) -> i32 {
-        self.records[rec].record.change_type().to_byte_value() as i32
+    fn sv_change_type(&self, bucket: usize, rec: usize) -> i32 {
+        self.resolve(bucket, rec).change_type().to_byte_value() as i32
     }
     fn sv_field_count(&self) -> usize {
         self.columns.len()
     }
 
-    // Field accessors — C++ validates rec in operator[], validate() checks 
field.
-    fn sv_is_null(&self, rec: usize, field: usize) -> Result<bool, String> {
-        row_reader::is_null(self.records[rec].record.row(), &self.columns, 
field)
+    // Field accessors — C++ validates bounds in BucketView/RecordAt, 
validate() checks field.
+    fn sv_is_null(&self, bucket: usize, rec: usize, field: usize) -> 
Result<bool, String> {
+        row_reader::is_null(self.resolve(bucket, rec).row(), &self.columns, 
field)
     }
-    fn sv_get_bool(&self, rec: usize, field: usize) -> Result<bool, String> {
-        row_reader::get_bool(self.records[rec].record.row(), &self.columns, 
field)
+    fn sv_get_bool(&self, bucket: usize, rec: usize, field: usize) -> 
Result<bool, String> {
+        row_reader::get_bool(self.resolve(bucket, rec).row(), &self.columns, 
field)
     }
-    fn sv_get_i32(&self, rec: usize, field: usize) -> Result<i32, String> {
-        row_reader::get_i32(self.records[rec].record.row(), &self.columns, 
field)
+    fn sv_get_i32(&self, bucket: usize, rec: usize, field: usize) -> 
Result<i32, String> {
+        row_reader::get_i32(self.resolve(bucket, rec).row(), &self.columns, 
field)
     }
-    fn sv_get_i64(&self, rec: usize, field: usize) -> Result<i64, String> {
-        row_reader::get_i64(self.records[rec].record.row(), &self.columns, 
field)
+    fn sv_get_i64(&self, bucket: usize, rec: usize, field: usize) -> 
Result<i64, String> {
+        row_reader::get_i64(self.resolve(bucket, rec).row(), &self.columns, 
field)
     }
-    fn sv_get_f32(&self, rec: usize, field: usize) -> Result<f32, String> {
-        row_reader::get_f32(self.records[rec].record.row(), &self.columns, 
field)
+    fn sv_get_f32(&self, bucket: usize, rec: usize, field: usize) -> 
Result<f32, String> {
+        row_reader::get_f32(self.resolve(bucket, rec).row(), &self.columns, 
field)
     }
-    fn sv_get_f64(&self, rec: usize, field: usize) -> Result<f64, String> {
-        row_reader::get_f64(self.records[rec].record.row(), &self.columns, 
field)
+    fn sv_get_f64(&self, bucket: usize, rec: usize, field: usize) -> 
Result<f64, String> {
+        row_reader::get_f64(self.resolve(bucket, rec).row(), &self.columns, 
field)
     }
-    fn sv_get_str(&self, rec: usize, field: usize) -> Result<&str, String> {
-        row_reader::get_str(self.records[rec].record.row(), &self.columns, 
field)
+    fn sv_get_str(&self, bucket: usize, rec: usize, field: usize) -> 
Result<&str, String> {
+        row_reader::get_str(self.resolve(bucket, rec).row(), &self.columns, 
field)
     }
-    fn sv_get_bytes(&self, rec: usize, field: usize) -> Result<&[u8], String> {
-        row_reader::get_bytes(self.records[rec].record.row(), &self.columns, 
field)
+    fn sv_get_bytes(&self, bucket: usize, rec: usize, field: usize) -> 
Result<&[u8], String> {
+        row_reader::get_bytes(self.resolve(bucket, rec).row(), &self.columns, 
field)
     }
-    fn sv_get_date_days(&self, rec: usize, field: usize) -> Result<i32, 
String> {
-        row_reader::get_date_days(self.records[rec].record.row(), 
&self.columns, field)
+    fn sv_get_date_days(&self, bucket: usize, rec: usize, field: usize) -> 
Result<i32, String> {
+        row_reader::get_date_days(self.resolve(bucket, rec).row(), 
&self.columns, field)
     }
-    fn sv_get_time_millis(&self, rec: usize, field: usize) -> Result<i32, 
String> {
-        row_reader::get_time_millis(self.records[rec].record.row(), 
&self.columns, field)
+    fn sv_get_time_millis(&self, bucket: usize, rec: usize, field: usize) -> 
Result<i32, String> {
+        row_reader::get_time_millis(self.resolve(bucket, rec).row(), 
&self.columns, field)
     }
-    fn sv_get_ts_millis(&self, rec: usize, field: usize) -> Result<i64, 
String> {
-        row_reader::get_ts_millis(self.records[rec].record.row(), 
&self.columns, field)
+    fn sv_get_ts_millis(&self, bucket: usize, rec: usize, field: usize) -> 
Result<i64, String> {
+        row_reader::get_ts_millis(self.resolve(bucket, rec).row(), 
&self.columns, field)
     }
-    fn sv_get_ts_nanos(&self, rec: usize, field: usize) -> Result<i32, String> 
{
-        row_reader::get_ts_nanos(self.records[rec].record.row(), 
&self.columns, field)
+    fn sv_get_ts_nanos(&self, bucket: usize, rec: usize, field: usize) -> 
Result<i32, String> {
+        row_reader::get_ts_nanos(self.resolve(bucket, rec).row(), 
&self.columns, field)
     }
-    fn sv_is_ts_ltz(&self, _rec: usize, field: usize) -> Result<bool, String> {
+    fn sv_is_ts_ltz(&self, _bucket: usize, _rec: usize, field: usize) -> 
Result<bool, String> {
         row_reader::is_ts_ltz(&self.columns, field)
     }
-    fn sv_get_decimal_str(&self, rec: usize, field: usize) -> Result<String, 
String> {
-        row_reader::get_decimal_str(self.records[rec].record.row(), 
&self.columns, field)
+    fn sv_get_decimal_str(
+        &self,
+        bucket: usize,
+        rec: usize,
+        field: usize,
+    ) -> Result<String, String> {
+        row_reader::get_decimal_str(self.resolve(bucket, rec).row(), 
&self.columns, field)
+    }
+
+    fn sv_bucket_infos(&self) -> &Vec<ffi::FfiBucketInfo> {
+        &self.bucket_infos
     }
 }
 
diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp
index a697cea..73035bb 100644
--- a/bindings/cpp/src/table.cpp
+++ b/bindings/cpp/src/table.cpp
@@ -191,75 +191,91 @@ void GenericRow::SetDecimal(size_t idx, const 
std::string& value) {
     inner_->gr_set_decimal_str(idx, value);
 }
 
+// ============================================================================
+// ScanData — destructor must live in .cpp where rust::Box is visible
+// ============================================================================
+
+detail::ScanData::~ScanData() {
+    if (raw) {
+        rust::Box<ffi::ScanResultInner>::from_raw(raw);
+    }
+}
+
 // ============================================================================
 // RowView — zero-copy read-only row view for scan results
 // ============================================================================
 
-size_t RowView::FieldCount() const { return inner_ ? inner_->sv_field_count() 
: 0; }
+// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
+#define CHECK_DATA(name)                                                       
          \
+    do {                                                                       
          \
+        if (!data_) throw std::logic_error(name ": not available (moved-from 
or null)"); \
+    } while (0)
+
+size_t RowView::FieldCount() const { return data_ ? 
data_->raw->sv_field_count() : 0; }
 
 TypeId RowView::GetType(size_t idx) const {
-    CHECK_INNER("RowView");
-    return static_cast<TypeId>(inner_->sv_column_type(idx));
+    CHECK_DATA("RowView");
+    return static_cast<TypeId>(data_->raw->sv_column_type(idx));
 }
 
 bool RowView::IsNull(size_t idx) const {
-    CHECK_INNER("RowView");
-    return inner_->sv_is_null(record_idx_, idx);
+    CHECK_DATA("RowView");
+    return data_->raw->sv_is_null(bucket_idx_, rec_idx_, idx);
 }
 bool RowView::GetBool(size_t idx) const {
-    CHECK_INNER("RowView");
-    return inner_->sv_get_bool(record_idx_, idx);
+    CHECK_DATA("RowView");
+    return data_->raw->sv_get_bool(bucket_idx_, rec_idx_, idx);
 }
 int32_t RowView::GetInt32(size_t idx) const {
-    CHECK_INNER("RowView");
-    return inner_->sv_get_i32(record_idx_, idx);
+    CHECK_DATA("RowView");
+    return data_->raw->sv_get_i32(bucket_idx_, rec_idx_, idx);
 }
 int64_t RowView::GetInt64(size_t idx) const {
-    CHECK_INNER("RowView");
-    return inner_->sv_get_i64(record_idx_, idx);
+    CHECK_DATA("RowView");
+    return data_->raw->sv_get_i64(bucket_idx_, rec_idx_, idx);
 }
 float RowView::GetFloat32(size_t idx) const {
-    CHECK_INNER("RowView");
-    return inner_->sv_get_f32(record_idx_, idx);
+    CHECK_DATA("RowView");
+    return data_->raw->sv_get_f32(bucket_idx_, rec_idx_, idx);
 }
 double RowView::GetFloat64(size_t idx) const {
-    CHECK_INNER("RowView");
-    return inner_->sv_get_f64(record_idx_, idx);
+    CHECK_DATA("RowView");
+    return data_->raw->sv_get_f64(bucket_idx_, rec_idx_, idx);
 }
 
 std::string_view RowView::GetString(size_t idx) const {
-    CHECK_INNER("RowView");
-    auto s = inner_->sv_get_str(record_idx_, idx);
+    CHECK_DATA("RowView");
+    auto s = data_->raw->sv_get_str(bucket_idx_, rec_idx_, idx);
     return std::string_view(s.data(), s.size());
 }
 
 std::pair<const uint8_t*, size_t> RowView::GetBytes(size_t idx) const {
-    CHECK_INNER("RowView");
-    auto bytes = inner_->sv_get_bytes(record_idx_, idx);
+    CHECK_DATA("RowView");
+    auto bytes = data_->raw->sv_get_bytes(bucket_idx_, rec_idx_, idx);
     return {bytes.data(), bytes.size()};
 }
 
 Date RowView::GetDate(size_t idx) const {
-    CHECK_INNER("RowView");
-    return Date{inner_->sv_get_date_days(record_idx_, idx)};
+    CHECK_DATA("RowView");
+    return Date{data_->raw->sv_get_date_days(bucket_idx_, rec_idx_, idx)};
 }
 
 Time RowView::GetTime(size_t idx) const {
-    CHECK_INNER("RowView");
-    return Time{inner_->sv_get_time_millis(record_idx_, idx)};
+    CHECK_DATA("RowView");
+    return Time{data_->raw->sv_get_time_millis(bucket_idx_, rec_idx_, idx)};
 }
 
 Timestamp RowView::GetTimestamp(size_t idx) const {
-    CHECK_INNER("RowView");
-    return Timestamp{inner_->sv_get_ts_millis(record_idx_, idx),
-                     inner_->sv_get_ts_nanos(record_idx_, idx)};
+    CHECK_DATA("RowView");
+    return Timestamp{data_->raw->sv_get_ts_millis(bucket_idx_, rec_idx_, idx),
+                     data_->raw->sv_get_ts_nanos(bucket_idx_, rec_idx_, idx)};
 }
 
 bool RowView::IsDecimal(size_t idx) const { return GetType(idx) == 
TypeId::Decimal; }
 
 std::string RowView::GetDecimalString(size_t idx) const {
-    CHECK_INNER("RowView");
-    return std::string(inner_->sv_get_decimal_str(record_idx_, idx));
+    CHECK_DATA("RowView");
+    return std::string(data_->raw->sv_get_decimal_str(bucket_idx_, rec_idx_, 
idx));
 }
 
 // ============================================================================
@@ -268,48 +284,94 @@ std::string RowView::GetDecimalString(size_t idx) const {
 
 // ScanRecords constructor, destructor, move operations are all defaulted in 
the header.
 
-size_t ScanRecords::Size() const { return inner_ ? inner_->sv_record_count() : 
0; }
+size_t ScanRecords::Count() const { return data_ ? 
data_->raw->sv_record_count() : 0; }
 
-bool ScanRecords::Empty() const { return Size() == 0; }
+bool ScanRecords::IsEmpty() const { return Count() == 0; }
 
-void ScanRecords::BuildColumnMap() const {
-    if (!inner_) return;
-    auto map = std::make_shared<detail::ColumnMap>();
-    auto count = inner_->sv_column_count();
-    for (size_t i = 0; i < count; ++i) {
-        auto name = inner_->sv_column_name(i);
-        (*map)[std::string(name.data(), name.size())] = {
-            i, static_cast<TypeId>(inner_->sv_column_type(i))};
+ScanRecord ScanRecords::RecordAt(size_t bucket, size_t rec_idx) const {
+    if (!data_) {
+        throw std::logic_error("ScanRecords: not available (moved-from or 
null)");
     }
-    column_map_ = std::move(map);
+    return ScanRecord{data_->raw->sv_offset(bucket, rec_idx),
+                      data_->raw->sv_timestamp(bucket, rec_idx),
+                      
static_cast<ChangeType>(data_->raw->sv_change_type(bucket, rec_idx)),
+                      RowView(data_, bucket, rec_idx)};
+}
+
+static TableBucket to_table_bucket(const ffi::FfiBucketInfo& g) {
+    return TableBucket{g.table_id, g.bucket_id,
+                       g.has_partition_id ? 
std::optional<int64_t>(g.partition_id) : std::nullopt};
+}
+
+size_t ScanRecords::BucketCount() const { return data_ ? 
data_->raw->sv_bucket_infos().size() : 0; }
+
+ScanRecord ScanRecords::Iterator::operator*() const {
+    return owner_->RecordAt(bucket_idx_, rec_idx_);
 }
 
-const std::shared_ptr<detail::ColumnMap>& ScanRecords::GetColumnMap() const {
-    if (!column_map_) {
-        BuildColumnMap();
+ScanRecords::Iterator ScanRecords::begin() const { return Iterator(this, 0, 
0); }
+
+ScanRecords::Iterator& ScanRecords::Iterator::operator++() {
+    ++rec_idx_;
+    if (owner_->data_) {
+        const auto& infos = owner_->data_->raw->sv_bucket_infos();
+        while (bucket_idx_ < infos.size() && rec_idx_ >= 
infos[bucket_idx_].record_count) {
+            rec_idx_ = 0;
+            ++bucket_idx_;
+        }
     }
-    return column_map_;
+    return *this;
+}
+
+std::vector<TableBucket> ScanRecords::Buckets() const {
+    std::vector<TableBucket> result;
+    if (!data_) return result;
+    const auto& infos = data_->raw->sv_bucket_infos();
+    result.reserve(infos.size());
+    for (const auto& g : infos) {
+        result.push_back(to_table_bucket(g));
+    }
+    return result;
 }
 
-ScanRecord ScanRecords::operator[](size_t idx) const {
-    if (!inner_) {
+BucketView ScanRecords::Records(const TableBucket& bucket) const {
+    if (!data_) {
+        return BucketView({}, bucket, 0, 0);
+    }
+    const auto& infos = data_->raw->sv_bucket_infos();
+    for (size_t i = 0; i < infos.size(); ++i) {
+        TableBucket tb = to_table_bucket(infos[i]);
+        if (tb == bucket) {
+            return BucketView(data_, std::move(tb), i, infos[i].record_count);
+        }
+    }
+    return BucketView({}, bucket, 0, 0);
+}
+
+BucketView ScanRecords::BucketAt(size_t idx) const {
+    if (!data_) {
         throw std::logic_error("ScanRecords: not available (moved-from or 
null)");
     }
-    if (idx >= inner_->sv_record_count()) {
-        throw std::out_of_range("ScanRecords: index " + std::to_string(idx) + 
" out of range (" +
-                                std::to_string(inner_->sv_record_count()) + " 
records)");
+    const auto& infos = data_->raw->sv_bucket_infos();
+    if (idx >= infos.size()) {
+        throw std::out_of_range("ScanRecords::BucketAt: index " + 
std::to_string(idx) +
+                                " out of range (" + 
std::to_string(infos.size()) + " buckets)");
     }
-    return ScanRecord{inner_->sv_bucket_id(idx),
-                      inner_->sv_has_partition_id(idx)
-                          ? 
std::optional<int64_t>(inner_->sv_partition_id(idx))
-                          : std::nullopt,
-                      inner_->sv_offset(idx),
-                      inner_->sv_timestamp(idx),
-                      static_cast<ChangeType>(inner_->sv_change_type(idx)),
-                      RowView(inner_, idx, GetColumnMap())};
+    return BucketView(data_, to_table_bucket(infos[idx]), idx, 
infos[idx].record_count);
 }
 
-ScanRecord ScanRecords::Iterator::operator*() const { return 
owner_->operator[](idx_); }
+ScanRecord BucketView::operator[](size_t idx) const {
+    if (idx >= count_) {
+        throw std::out_of_range("BucketView: index " + std::to_string(idx) + " 
out of range (" +
+                                std::to_string(count_) + " records)");
+    }
+    return ScanRecord{data_->raw->sv_offset(bucket_idx_, idx),
+                      data_->raw->sv_timestamp(bucket_idx_, idx),
+                      
static_cast<ChangeType>(data_->raw->sv_change_type(bucket_idx_, idx)),
+                      RowView(data_, bucket_idx_, idx)};
+}
+
+ScanRecord BucketView::Iterator::operator*() const { return 
owner_->operator[](idx_); }
 
 // ============================================================================
 // LookupResult — backed by opaque Rust LookupResultInner
@@ -1082,10 +1144,16 @@ Result LogScanner::Poll(int64_t timeout_ms, 
ScanRecords& out) {
                                  std::string(result_box->sv_error_message()));
     }
 
-    out.column_map_.reset();
-    out.inner_ = std::shared_ptr<ffi::ScanResultInner>(
-        result_box.into_raw(),
-        [](ffi::ScanResultInner* p) { 
rust::Box<ffi::ScanResultInner>::from_raw(p); });
+    // Wrap raw pointer in ScanData immediately so it's never leaked on 
exception.
+    auto data = std::make_shared<detail::ScanData>(result_box.into_raw(), 
detail::ColumnMap{});
+    // Build column map eagerly — shared by all RowViews/BucketViews.
+    auto col_count = data->raw->sv_column_count();
+    for (size_t i = 0; i < col_count; ++i) {
+        auto name = data->raw->sv_column_name(i);
+        data->columns[std::string(name.data(), name.size())] = {
+            i, static_cast<TypeId>(data->raw->sv_column_type(i))};
+    }
+    out.data_ = std::move(data);
     return utils::make_ok();
 }
 
diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml
index 804e1bb..9cf20e3 100644
--- a/bindings/python/Cargo.toml
+++ b/bindings/python/Cargo.toml
@@ -37,3 +37,4 @@ arrow-array = "57.0.0"
 pyo3-async-runtimes = { version = "0.26.0", features = ["tokio-runtime"] }
 jiff = { workspace = true }
 bigdecimal = "0.4"
+indexmap = "2"
diff --git a/bindings/python/example/example.py 
b/bindings/python/example/example.py
index 9c2b7e3..3564d91 100644
--- a/bindings/python/example/example.py
+++ b/bindings/python/example/example.py
@@ -351,21 +351,26 @@ async def main():
 
         record_scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in 
range(num_buckets)})
 
-        # Poll returns List[ScanRecord] with per-record metadata
+        # Poll returns ScanRecords — records grouped by bucket
         print("\n--- Testing poll() method (record-by-record) ---")
         try:
-            records = record_scanner.poll(5000)
-            print(f"Number of records: {len(records)}")
-
-            # Show first few records with metadata
-            for i, record in enumerate(records[:5]):
-                print(f"  Record {i}: offset={record.offset}, "
-                      f"timestamp={record.timestamp}, "
-                      f"change_type={record.change_type}, "
-                      f"row={record.row}")
-
-            if len(records) > 5:
-                print(f"  ... and {len(records) - 5} more records")
+            scan_records = record_scanner.poll(5000)
+            print(f"Total records: {scan_records.count()}, buckets: 
{len(scan_records.buckets())}")
+
+            # Flat iteration over all records (regardless of bucket)
+            print(f"  Flat iteration: {scan_records.count()} records")
+            for record in scan_records:
+                print(f"    offset={record.offset}, 
timestamp={record.timestamp}")
+
+            # Per-bucket access
+            for bucket in scan_records.buckets():
+                bucket_recs = scan_records.records(bucket)
+                print(f"  Bucket {bucket}: {len(bucket_recs)} records")
+                for record in bucket_recs[:3]:
+                    print(f"    offset={record.offset}, "
+                          f"timestamp={record.timestamp}, "
+                          f"change_type={record.change_type}, "
+                          f"row={record.row}")
 
         except Exception as e:
             print(f"Error during poll: {e}")
diff --git a/bindings/python/fluss/__init__.pyi 
b/bindings/python/fluss/__init__.pyi
index 47eeb80..4b7fa4e 100644
--- a/bindings/python/fluss/__init__.pyi
+++ b/bindings/python/fluss/__init__.pyi
@@ -19,7 +19,7 @@
 
 from enum import IntEnum
 from types import TracebackType
-from typing import Dict, List, Optional, Tuple
+from typing import Dict, Iterator, List, Optional, Tuple, Union, overload
 
 import pandas as pd
 import pyarrow as pa
@@ -43,12 +43,12 @@ class ChangeType(IntEnum):
         ...
 
 class ScanRecord:
-    """Represents a single scan record with metadata."""
+    """Represents a single scan record with metadata.
+
+    The bucket is the key in ScanRecords, not on the individual record
+    (matches Rust/Java).
+    """
 
-    @property
-    def bucket(self) -> TableBucket:
-        """The bucket this record belongs to."""
-        ...
     @property
     def offset(self) -> int:
         """The position of this record in the log."""
@@ -90,6 +90,47 @@ class RecordBatch:
     def __str__(self) -> str: ...
     def __repr__(self) -> str: ...
 
+class ScanRecords:
+    """A collection of scan records grouped by bucket.
+
+    Returned by ``LogScanner.poll()``. Supports flat iteration
+    (``for rec in records``) and per-bucket access 
(``records.records(bucket)``).
+    """
+
+    def buckets(self) -> List[TableBucket]:
+        """List of distinct buckets that have records."""
+        ...
+    def records(self, bucket: TableBucket) -> List[ScanRecord]:
+        """Get records for a specific bucket. Returns empty list if bucket not 
present."""
+        ...
+    def count(self) -> int:
+        """Total number of records across all buckets."""
+        ...
+    def is_empty(self) -> bool:
+        """Whether the result set is empty."""
+        ...
+    def keys(self) -> List[TableBucket]:
+        """Mapping protocol: alias for ``buckets()``."""
+        ...
+    def values(self) -> Iterator[List[ScanRecord]]:
+        """Mapping protocol: lazy iterator over record lists, one per 
bucket."""
+        ...
+    def items(self) -> Iterator[Tuple[TableBucket, List[ScanRecord]]]:
+        """Mapping protocol: lazy iterator over ``(bucket, records)`` pairs."""
+        ...
+    def __len__(self) -> int: ...
+    @overload
+    def __getitem__(self, index: int) -> ScanRecord: ...
+    @overload
+    def __getitem__(self, index: slice) -> List[ScanRecord]: ...
+    @overload
+    def __getitem__(self, bucket: TableBucket) -> List[ScanRecord]: ...
+    def __getitem__(self, key: Union[int, slice, TableBucket]) -> 
Union[ScanRecord, List[ScanRecord]]: ...
+    def __contains__(self, bucket: TableBucket) -> bool: ...
+    def __iter__(self) -> Iterator[ScanRecord]: ...
+    def __str__(self) -> str: ...
+    def __repr__(self) -> str: ...
+
 class Config:
     def __init__(self, properties: Optional[Dict[str, str]] = None) -> None: 
...
     @property
@@ -590,7 +631,7 @@ class LogScanner:
             bucket_id: The bucket ID within the partition
         """
         ...
-    def poll(self, timeout_ms: int) -> List[ScanRecord]:
+    def poll(self, timeout_ms: int) -> ScanRecords:
         """Poll for individual records with metadata.
 
         Requires a record-based scanner (created with 
new_scan().create_log_scanner()).
@@ -599,11 +640,12 @@ class LogScanner:
             timeout_ms: Timeout in milliseconds to wait for records.
 
         Returns:
-            List of ScanRecord objects, each containing bucket, offset, 
timestamp,
-            change_type, and row data as a dictionary.
+            ScanRecords grouped by bucket. Supports flat iteration
+            (``for rec in records``) and per-bucket access
+            (``records.buckets()``, ``records.records(bucket)``).
 
         Note:
-            Returns an empty list if no records are available or timeout 
expires.
+            Returns an empty ScanRecords if no records are available or 
timeout expires.
         """
         ...
     def poll_record_batch(self, timeout_ms: int) -> List[RecordBatch]:
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs
index 553c8a9..ebc0d54 100644
--- a/bindings/python/src/lib.rs
+++ b/bindings/python/src/lib.rs
@@ -122,6 +122,7 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
     m.add_class::<TableBucket>()?;
     m.add_class::<ChangeType>()?;
     m.add_class::<ScanRecord>()?;
+    m.add_class::<ScanRecords>()?;
     m.add_class::<RecordBatch>()?;
     m.add_class::<PartitionInfo>()?;
     m.add_class::<OffsetSpec>()?;
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index c3ea248..bc2e956 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -22,7 +22,14 @@ use arrow_pyarrow::{FromPyArrow, ToPyArrow};
 use arrow_schema::SchemaRef;
 use fluss::record::to_arrow_schema;
 use fluss::rpc::message::OffsetSpec;
-use pyo3::types::IntoPyDict;
+use indexmap::IndexMap;
+use pyo3::exceptions::{PyIndexError, PyRuntimeError, PyTypeError};
+use pyo3::sync::PyOnceLock;
+use pyo3::types::{
+    IntoPyDict, PyBool, PyByteArray, PyBytes, PyDate, PyDateAccess, 
PyDateTime, PyDelta,
+    PyDeltaAccess, PyDict, PyList, PySequence, PySlice, PyTime, PyTimeAccess, 
PyTuple, PyType,
+    PyTzInfo,
+};
 use pyo3_async_runtimes::tokio::future_into_py;
 use std::collections::HashMap;
 use std::sync::Arc;
@@ -38,11 +45,12 @@ const MICROS_PER_DAY: i64 = 86_400_000_000;
 const NANOS_PER_MILLI: i64 = 1_000_000;
 const NANOS_PER_MICRO: i64 = 1_000;
 
-/// Represents a single scan record with metadata
+/// Represents a single scan record with metadata.
+///
+/// Matches Rust/Java: offset, timestamp, change_type, row.
+/// The bucket is the key in ScanRecords, not on the individual record.
 #[pyclass]
 pub struct ScanRecord {
-    #[pyo3(get)]
-    bucket: TableBucket,
     #[pyo3(get)]
     offset: i64,
     #[pyo3(get)]
@@ -50,21 +58,20 @@ pub struct ScanRecord {
     #[pyo3(get)]
     change_type: ChangeType,
     /// Store row as a Python dict directly
-    row_dict: Py<pyo3::types::PyDict>,
+    row_dict: Py<PyDict>,
 }
 
 #[pymethods]
 impl ScanRecord {
     /// Get the row data as a dictionary
     #[getter]
-    pub fn row(&self, py: Python) -> Py<pyo3::types::PyDict> {
+    pub fn row(&self, py: Python) -> Py<PyDict> {
         self.row_dict.clone_ref(py)
     }
 
     fn __str__(&self) -> String {
         format!(
-            "ScanRecord(bucket={}, offset={}, timestamp={}, change_type={})",
-            self.bucket.__str__(),
+            "ScanRecord(offset={}, timestamp={}, change_type={})",
             self.offset,
             self.timestamp,
             self.change_type.short_string()
@@ -80,13 +87,12 @@ impl ScanRecord {
     /// Create a ScanRecord from core types
     pub fn from_core(
         py: Python,
-        bucket: &fcore::metadata::TableBucket,
         record: &fcore::record::ScanRecord,
         row_type: &fcore::metadata::RowType,
     ) -> PyResult<Self> {
         let fields = row_type.fields();
         let row = record.row();
-        let dict = pyo3::types::PyDict::new(py);
+        let dict = PyDict::new(py);
 
         for (pos, field) in fields.iter().enumerate() {
             let value = datum_to_python_value(py, row, pos, 
field.data_type())?;
@@ -94,7 +100,6 @@ impl ScanRecord {
         }
 
         Ok(ScanRecord {
-            bucket: TableBucket::from_core(bucket.clone()),
             offset: record.offset(),
             timestamp: record.timestamp(),
             change_type: ChangeType::from_core(*record.change_type()),
@@ -155,6 +160,247 @@ impl RecordBatch {
     }
 }
 
+/// A collection of scan records grouped by bucket.
+///
+/// Returned by `LogScanner.poll()`. Records are grouped by `TableBucket`.
+#[pyclass]
+pub struct ScanRecords {
+    records_by_bucket: IndexMap<TableBucket, Vec<Py<ScanRecord>>>,
+    total_count: usize,
+}
+
+#[pymethods]
+impl ScanRecords {
+    /// List of distinct buckets that have records in this result.
+    pub fn buckets(&self) -> Vec<TableBucket> {
+        self.records_by_bucket.keys().cloned().collect()
+    }
+
+    /// Get records for a specific bucket.
+    ///
+    /// Returns an empty list if the bucket is not present (matches Rust/Java 
behavior).
+    pub fn records(&self, py: Python, bucket: &TableBucket) -> 
Vec<Py<ScanRecord>> {
+        self.records_by_bucket
+            .get(bucket)
+            .map(|recs| recs.iter().map(|r| r.clone_ref(py)).collect())
+            .unwrap_or_default()
+    }
+
+    /// Total number of records across all buckets.
+    pub fn count(&self) -> usize {
+        self.total_count
+    }
+
+    /// Whether the result set is empty.
+    pub fn is_empty(&self) -> bool {
+        self.total_count == 0
+    }
+
+    fn __len__(&self) -> usize {
+        self.total_count
+    }
+
+    /// Type-dispatched indexing:
+    ///   records[0]       → ScanRecord (flat index)
+    ///   records[-1]      → ScanRecord (negative index)
+    ///   records[1:3]     → list[ScanRecord] (slice)
+    ///   records[bucket]  → list[ScanRecord] (by bucket)
+    fn __getitem__(&self, py: Python, key: &Bound<'_, PyAny>) -> 
PyResult<Py<PyAny>> {
+        // Try integer index first
+        if let Ok(mut idx) = key.extract::<isize>() {
+            let len = self.total_count as isize;
+            if idx < 0 {
+                idx += len;
+            }
+            if idx < 0 || idx >= len {
+                return Err(PyIndexError::new_err(format!(
+                    "index {idx} out of range for ScanRecords of size {len}"
+                )));
+            }
+            let idx = idx as usize;
+            let mut offset = 0;
+            for recs in self.records_by_bucket.values() {
+                if idx < offset + recs.len() {
+                    return Ok(recs[idx - offset].clone_ref(py).into_any());
+                }
+                offset += recs.len();
+            }
+            return Err(PyRuntimeError::new_err(
+                "internal error: total_count out of sync with records",
+            ));
+        }
+        // Try slice
+        if let Ok(slice) = key.downcast::<PySlice>() {
+            let indices = slice.indices(self.total_count as isize)?;
+            let mut result: Vec<Py<ScanRecord>> = Vec::new();
+            let mut i = indices.start;
+            while (indices.step > 0 && i < indices.stop) || (indices.step < 0 
&& i > indices.stop) {
+                let idx = i as usize;
+                let mut offset = 0;
+                for recs in self.records_by_bucket.values() {
+                    if idx < offset + recs.len() {
+                        result.push(recs[idx - offset].clone_ref(py));
+                        break;
+                    }
+                    offset += recs.len();
+                }
+                i += indices.step;
+            }
+            return Ok(result.into_pyobject(py).unwrap().into_any().unbind());
+        }
+        // Try TableBucket
+        if let Ok(bucket) = key.extract::<TableBucket>() {
+            let recs = self.records(py, &bucket);
+            return Ok(recs.into_pyobject(py).unwrap().into_any().unbind());
+        }
+        Err(PyTypeError::new_err(
+            "index must be int, slice, or TableBucket",
+        ))
+    }
+
+    /// Support `bucket in records`.
+    fn __contains__(&self, bucket: &TableBucket) -> bool {
+        self.records_by_bucket.contains_key(bucket)
+    }
+
+    /// Mapping protocol: alias for `buckets()`.
+    pub fn keys(&self) -> Vec<TableBucket> {
+        self.buckets()
+    }
+
+    /// Mapping protocol: lazy iterator over record lists, one per bucket.
+    pub fn values(slf: Bound<'_, Self>) -> ScanRecordsBucketIter {
+        let this = slf.borrow();
+        let bucket_keys: Vec<TableBucket> = 
this.records_by_bucket.keys().cloned().collect();
+        drop(this);
+        ScanRecordsBucketIter {
+            owner: slf.unbind(),
+            bucket_keys,
+            bucket_idx: 0,
+            with_keys: false,
+        }
+    }
+
+    /// Mapping protocol: lazy iterator over `(TableBucket, list[ScanRecord])` 
pairs.
+    pub fn items(slf: Bound<'_, Self>) -> ScanRecordsBucketIter {
+        let this = slf.borrow();
+        let bucket_keys: Vec<TableBucket> = 
this.records_by_bucket.keys().cloned().collect();
+        drop(this);
+        ScanRecordsBucketIter {
+            owner: slf.unbind(),
+            bucket_keys,
+            bucket_idx: 0,
+            with_keys: true,
+        }
+    }
+
+    fn __str__(&self) -> String {
+        format!(
+            "ScanRecords(records={}, buckets={})",
+            self.total_count,
+            self.records_by_bucket.len()
+        )
+    }
+
+    fn __repr__(&self) -> String {
+        self.__str__()
+    }
+
+    /// Flat iterator over all records across all buckets (matches Java/Rust).
+    fn __iter__(slf: Bound<'_, Self>) -> ScanRecordsIter {
+        let this = slf.borrow();
+        let bucket_keys: Vec<TableBucket> = 
this.records_by_bucket.keys().cloned().collect();
+        drop(this);
+        ScanRecordsIter {
+            owner: slf.unbind(),
+            bucket_keys,
+            bucket_idx: 0,
+            rec_idx: 0,
+        }
+    }
+}
+
+#[pyclass]
+struct ScanRecordsIter {
+    owner: Py<ScanRecords>,
+    bucket_keys: Vec<TableBucket>,
+    bucket_idx: usize,
+    rec_idx: usize,
+}
+
+#[pymethods]
+impl ScanRecordsIter {
+    fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
+        slf
+    }
+
+    fn __next__(&mut self, py: Python) -> Option<Py<ScanRecord>> {
+        let owner = self.owner.borrow(py);
+        loop {
+            if self.bucket_idx >= self.bucket_keys.len() {
+                return None;
+            }
+            let bucket = &self.bucket_keys[self.bucket_idx];
+            if let Some(recs) = owner.records_by_bucket.get(bucket) {
+                if self.rec_idx < recs.len() {
+                    let rec = recs[self.rec_idx].clone_ref(py);
+                    self.rec_idx += 1;
+                    return Some(rec);
+                }
+            }
+            self.bucket_idx += 1;
+            self.rec_idx = 0;
+        }
+    }
+}
+
+/// Lazy iterator for `ScanRecords.items()` and `ScanRecords.values()`.
+///
+/// Yields one bucket at a time: `(TableBucket, list[ScanRecord])` for items,
+/// or `list[ScanRecord]` for values. Only materializes records for the
+/// current bucket on each `__next__` call.
+#[pyclass]
+pub struct ScanRecordsBucketIter {
+    owner: Py<ScanRecords>,
+    bucket_keys: Vec<TableBucket>,
+    bucket_idx: usize,
+    with_keys: bool,
+}
+
+#[pymethods]
+impl ScanRecordsBucketIter {
+    fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
+        slf
+    }
+
+    fn __next__(&mut self, py: Python) -> Option<Py<PyAny>> {
+        if self.bucket_idx >= self.bucket_keys.len() {
+            return None;
+        }
+        let bucket = &self.bucket_keys[self.bucket_idx];
+        let owner = self.owner.borrow(py);
+        let recs = owner
+            .records_by_bucket
+            .get(bucket)
+            .map(|recs| recs.iter().map(|r| 
r.clone_ref(py)).collect::<Vec<_>>())
+            .unwrap_or_default();
+        let bucket = bucket.clone();
+        self.bucket_idx += 1;
+
+        if self.with_keys {
+            Some(
+                (bucket, recs)
+                    .into_pyobject(py)
+                    .unwrap()
+                    .into_any()
+                    .unbind(),
+            )
+        } else {
+            Some(recs.into_pyobject(py).unwrap().into_any().unbind())
+        }
+    }
+}
+
 /// Represents a Fluss table for data operations
 #[pyclass]
 pub struct FlussTable {
@@ -763,9 +1009,9 @@ impl AppendWriter {
 /// Represents different input shapes for a row
 #[derive(FromPyObject)]
 enum RowInput<'py> {
-    Dict(Bound<'py, pyo3::types::PyDict>),
-    Tuple(Bound<'py, pyo3::types::PyTuple>),
-    List(Bound<'py, pyo3::types::PyList>),
+    Dict(Bound<'py, PyDict>),
+    Tuple(Bound<'py, PyTuple>),
+    List(Bound<'py, PyList>),
 }
 
 /// Convert Python row (dict/list/tuple) to GenericRow requiring all schema 
columns.
@@ -779,7 +1025,7 @@ pub fn python_to_generic_row(
 
 /// Process a Python sequence (list or tuple) into datums at the target column 
positions.
 fn process_sequence(
-    seq: &Bound<pyo3::types::PySequence>,
+    seq: &Bound<PySequence>,
     target_indices: &[usize],
     fields: &[fcore::metadata::DataField],
     datums: &mut [fcore::row::Datum<'static>],
@@ -924,7 +1170,7 @@ fn python_value_to_datum(
         }
         fcore::metadata::DataType::TinyInt(_) => {
             // Strict type checking: reject bool for int columns
-            if value.is_instance_of::<pyo3::types::PyBool>() {
+            if value.is_instance_of::<PyBool>() {
                 return Err(FlussError::new_err(
                     "Expected int for TinyInt column, got bool. Use 0 or 1 
explicitly.".to_string(),
                 ));
@@ -933,7 +1179,7 @@ fn python_value_to_datum(
             Ok(Datum::Int8(v))
         }
         fcore::metadata::DataType::SmallInt(_) => {
-            if value.is_instance_of::<pyo3::types::PyBool>() {
+            if value.is_instance_of::<PyBool>() {
                 return Err(FlussError::new_err(
                     "Expected int for SmallInt column, got bool. Use 0 or 1 
explicitly."
                         .to_string(),
@@ -943,7 +1189,7 @@ fn python_value_to_datum(
             Ok(Datum::Int16(v))
         }
         fcore::metadata::DataType::Int(_) => {
-            if value.is_instance_of::<pyo3::types::PyBool>() {
+            if value.is_instance_of::<PyBool>() {
                 return Err(FlussError::new_err(
                     "Expected int for Int column, got bool. Use 0 or 1 
explicitly.".to_string(),
                 ));
@@ -952,7 +1198,7 @@ fn python_value_to_datum(
             Ok(Datum::Int32(v))
         }
         fcore::metadata::DataType::BigInt(_) => {
-            if value.is_instance_of::<pyo3::types::PyBool>() {
+            if value.is_instance_of::<PyBool>() {
                 return Err(FlussError::new_err(
                     "Expected int for BigInt column, got bool. Use 0 or 1 
explicitly.".to_string(),
                 ));
@@ -975,9 +1221,9 @@ fn python_value_to_datum(
         fcore::metadata::DataType::Bytes(_) | 
fcore::metadata::DataType::Binary(_) => {
             // Efficient extraction: downcast to specific type and use bulk 
copy.
             // PyBytes::as_bytes() and PyByteArray::to_vec() are O(n) bulk 
copies of the underlying data.
-            if let Ok(bytes) = value.downcast::<pyo3::types::PyBytes>() {
+            if let Ok(bytes) = value.downcast::<PyBytes>() {
                 Ok(bytes.as_bytes().to_vec().into())
-            } else if let Ok(bytearray) = 
value.downcast::<pyo3::types::PyByteArray>() {
+            } else if let Ok(bytearray) = value.downcast::<PyByteArray>() {
                 Ok(bytearray.to_vec().into())
             } else {
                 Err(FlussError::new_err(format!(
@@ -1067,11 +1313,11 @@ pub fn datum_to_python_value(
         }
         DataType::Bytes(_) => {
             let b = row.get_bytes(pos);
-            Ok(pyo3::types::PyBytes::new(py, b).into_any().unbind())
+            Ok(PyBytes::new(py, b).into_any().unbind())
         }
         DataType::Binary(binary_type) => {
             let b = row.get_binary(pos, binary_type.length());
-            Ok(pyo3::types::PyBytes::new(py, b).into_any().unbind())
+            Ok(PyBytes::new(py, b).into_any().unbind())
         }
         DataType::Decimal(decimal_type) => {
             let decimal = row.get_decimal(
@@ -1113,8 +1359,6 @@ fn rust_decimal_to_python(py: Python, decimal: 
&fcore::row::Decimal) -> PyResult
 
 /// Convert Rust Date (days since epoch) to Python datetime.date
 fn rust_date_to_python(py: Python, date: fcore::row::Date) -> 
PyResult<Py<PyAny>> {
-    use pyo3::types::PyDate;
-
     let days_since_epoch = date.get_inner();
     let epoch = jiff::civil::date(1970, 1, 1);
     let civil_date = epoch + jiff::Span::new().days(days_since_epoch as i64);
@@ -1130,8 +1374,6 @@ fn rust_date_to_python(py: Python, date: 
fcore::row::Date) -> PyResult<Py<PyAny>
 
 /// Convert Rust Time (millis since midnight) to Python datetime.time
 fn rust_time_to_python(py: Python, time: fcore::row::Time) -> 
PyResult<Py<PyAny>> {
-    use pyo3::types::PyTime;
-
     let millis = time.get_inner() as i64;
     let hours = millis / MILLIS_PER_HOUR;
     let minutes = (millis % MILLIS_PER_HOUR) / MILLIS_PER_MINUTE;
@@ -1151,8 +1393,6 @@ fn rust_time_to_python(py: Python, time: 
fcore::row::Time) -> PyResult<Py<PyAny>
 
 /// Convert Rust TimestampNtz to Python naive datetime
 fn rust_timestamp_ntz_to_python(py: Python, ts: fcore::row::TimestampNtz) -> 
PyResult<Py<PyAny>> {
-    use pyo3::types::PyDateTime;
-
     let millis = ts.get_millisecond();
     let nanos = ts.get_nano_of_millisecond();
     let total_micros = millis * MICROS_PER_MILLI + (nanos as i64 / 
NANOS_PER_MICRO);
@@ -1178,8 +1418,6 @@ fn rust_timestamp_ntz_to_python(py: Python, ts: 
fcore::row::TimestampNtz) -> PyR
 
 /// Convert Rust TimestampLtz to Python timezone-aware datetime (UTC)
 fn rust_timestamp_ltz_to_python(py: Python, ts: fcore::row::TimestampLtz) -> 
PyResult<Py<PyAny>> {
-    use pyo3::types::PyDateTime;
-
     let millis = ts.get_epoch_millisecond();
     let nanos = ts.get_nano_of_millisecond();
     let total_micros = millis * MICROS_PER_MILLI + (nanos as i64 / 
NANOS_PER_MICRO);
@@ -1212,7 +1450,7 @@ pub fn internal_row_to_dict(
 ) -> PyResult<Py<PyAny>> {
     let row_type = table_info.row_type();
     let fields = row_type.fields();
-    let dict = pyo3::types::PyDict::new(py);
+    let dict = PyDict::new(py);
 
     for (pos, field) in fields.iter().enumerate() {
         let value = datum_to_python_value(py, row, pos, field.data_type())?;
@@ -1224,29 +1462,26 @@ pub fn internal_row_to_dict(
 
 /// Cached decimal.Decimal type
 /// Uses PyOnceLock for thread-safety and subinterpreter compatibility.
-static DECIMAL_TYPE: pyo3::sync::PyOnceLock<Py<pyo3::types::PyType>> =
-    pyo3::sync::PyOnceLock::new();
+static DECIMAL_TYPE: PyOnceLock<Py<PyType>> = PyOnceLock::new();
 
 /// Cached UTC timezone
-static UTC_TIMEZONE: pyo3::sync::PyOnceLock<Py<PyAny>> = 
pyo3::sync::PyOnceLock::new();
+static UTC_TIMEZONE: PyOnceLock<Py<PyAny>> = PyOnceLock::new();
 
 /// Cached UTC epoch type
-static UTC_EPOCH: pyo3::sync::PyOnceLock<Py<PyAny>> = 
pyo3::sync::PyOnceLock::new();
+static UTC_EPOCH: PyOnceLock<Py<PyAny>> = PyOnceLock::new();
 
 /// Get the cached decimal.Decimal type, importing it once per interpreter.
-fn get_decimal_type(py: Python) -> PyResult<Bound<pyo3::types::PyType>> {
+fn get_decimal_type(py: Python) -> PyResult<Bound<PyType>> {
     let ty = DECIMAL_TYPE.get_or_try_init(py, || -> PyResult<_> {
         let decimal_mod = py.import("decimal")?;
-        let decimal_ty = decimal_mod
-            .getattr("Decimal")?
-            .downcast_into::<pyo3::types::PyType>()?;
+        let decimal_ty = 
decimal_mod.getattr("Decimal")?.downcast_into::<PyType>()?;
         Ok(decimal_ty.unbind())
     })?;
     Ok(ty.bind(py).clone())
 }
 
 /// Get the cached UTC timezone (datetime.timezone.utc), creating it once per 
interpreter.
-fn get_utc_timezone(py: Python) -> PyResult<Bound<pyo3::types::PyTzInfo>> {
+fn get_utc_timezone(py: Python) -> PyResult<Bound<PyTzInfo>> {
     let tz = UTC_TIMEZONE.get_or_try_init(py, || -> PyResult<_> {
         let datetime_mod = py.import("datetime")?;
         let timezone = datetime_mod.getattr("timezone")?;
@@ -1254,10 +1489,7 @@ fn get_utc_timezone(py: Python) -> 
PyResult<Bound<pyo3::types::PyTzInfo>> {
         Ok(utc.unbind())
     })?;
     // Downcast to PyTzInfo for use with PyDateTime::new()
-    Ok(tz
-        .bind(py)
-        .clone()
-        .downcast_into::<pyo3::types::PyTzInfo>()?)
+    Ok(tz.bind(py).clone().downcast_into::<PyTzInfo>()?)
 }
 
 /// Get the cached UTC epoch datetime, creating it once per interpreter.
@@ -1313,8 +1545,6 @@ fn python_decimal_to_datum(
 
 /// Convert Python datetime.date to Datum::Date.
 fn python_date_to_datum(value: &Bound<PyAny>) -> 
PyResult<fcore::row::Datum<'static>> {
-    use pyo3::types::{PyDate, PyDateAccess, PyDateTime};
-
     // Reject datetime.datetime (subclass of date) - use timestamp columns for 
those
     if value.downcast::<PyDateTime>().is_ok() {
         return Err(FlussError::new_err(
@@ -1351,8 +1581,6 @@ fn python_date_to_datum(value: &Bound<PyAny>) -> 
PyResult<fcore::row::Datum<'sta
 /// Sub-millisecond precision (microseconds not divisible by 1000) will raise 
an error
 /// to prevent silent data loss and ensure fail-fast behavior.
 fn python_time_to_datum(value: &Bound<PyAny>) -> 
PyResult<fcore::row::Datum<'static>> {
-    use pyo3::types::{PyTime, PyTimeAccess};
-
     let time = value.downcast::<PyTime>().map_err(|_| {
         FlussError::new_err(format!(
             "Expected datetime.time, got {}",
@@ -1411,8 +1639,6 @@ fn python_datetime_to_timestamp_ltz(value: &Bound<PyAny>) 
-> PyResult<fcore::row
 /// Uses integer arithmetic to avoid float precision issues.
 /// For clarity, tz-aware datetimes are rejected - use TimestampLtz for those.
 fn extract_datetime_components_ntz(value: &Bound<PyAny>) -> PyResult<(i64, 
i32)> {
-    use pyo3::types::PyDateTime;
-
     // Try PyDateTime first
     if let Ok(dt) = value.downcast::<PyDateTime>() {
         // Reject tz-aware datetime for NTZ - it's ambiguous what the user 
wants
@@ -1465,8 +1691,6 @@ fn extract_datetime_components_ntz(value: &Bound<PyAny>) 
-> PyResult<(i64, i32)>
 /// Extract epoch milliseconds for TimestampLtz (instant in time, UTC-based).
 /// For naive datetimes, assumes UTC. For aware datetimes, converts to UTC.
 fn extract_datetime_components_ltz(value: &Bound<PyAny>) -> PyResult<(i64, 
i32)> {
-    use pyo3::types::PyDateTime;
-
     // Try PyDateTime first
     if let Ok(dt) = value.downcast::<PyDateTime>() {
         // Check if timezone-aware
@@ -1506,11 +1730,7 @@ fn extract_datetime_components_ltz(value: &Bound<PyAny>) 
-> PyResult<(i64, i32)>
 }
 
 /// Convert datetime components to epoch milliseconds treating them as UTC
-fn datetime_to_epoch_millis_as_utc(
-    dt: &pyo3::Bound<'_, pyo3::types::PyDateTime>,
-) -> PyResult<(i64, i32)> {
-    use pyo3::types::{PyDateAccess, PyTimeAccess};
-
+fn datetime_to_epoch_millis_as_utc(dt: &Bound<'_, PyDateTime>) -> 
PyResult<(i64, i32)> {
     let year = dt.get_year();
     let month = dt.get_month();
     let day = dt.get_day();
@@ -1541,11 +1761,7 @@ fn datetime_to_epoch_millis_as_utc(
 /// Convert timezone-aware datetime to epoch milliseconds using Python's 
timedelta.
 /// This correctly handles timezone conversions by computing (dt - UTC_EPOCH).
 /// The UTC epoch is cached for performance.
-fn datetime_to_epoch_millis_utc_aware(
-    dt: &pyo3::Bound<'_, pyo3::types::PyDateTime>,
-) -> PyResult<(i64, i32)> {
-    use pyo3::types::{PyDelta, PyDeltaAccess};
-
+fn datetime_to_epoch_millis_utc_aware(dt: &Bound<'_, PyDateTime>) -> 
PyResult<(i64, i32)> {
     let py = dt.py();
     let epoch = get_utc_epoch(py)?;
 
@@ -1777,14 +1993,15 @@ impl LogScanner {
     ///     timeout_ms: Timeout in milliseconds to wait for records
     ///
     /// Returns:
-    ///     List of ScanRecord objects, each containing bucket, offset, 
timestamp,
-    ///     change_type, and row data as a dictionary.
+    ///     ScanRecords grouped by bucket. Supports flat iteration
+    ///     (`for rec in records`) and per-bucket access (`records.buckets()`,
+    ///     `records.records(bucket)`, `records[bucket]`).
     ///
     /// Note:
     ///     - Requires a record-based scanner (created with 
new_scan().create_log_scanner())
-    ///     - Returns an empty list if no records are available
-    ///     - When timeout expires, returns an empty list (NOT an error)
-    fn poll(&self, py: Python, timeout_ms: i64) -> PyResult<Vec<ScanRecord>> {
+    ///     - Returns an empty ScanRecords if no records are available
+    ///     - When timeout expires, returns an empty ScanRecords (NOT an error)
+    fn poll(&self, py: Python, timeout_ms: i64) -> PyResult<ScanRecords> {
         let scanner = self.scanner.as_record()?;
 
         if timeout_ms < 0 {
@@ -1798,19 +2015,26 @@ impl LogScanner {
             .detach(|| TOKIO_RUNTIME.block_on(async { 
scanner.poll(timeout).await }))
             .map_err(|e| FlussError::from_core_error(&e))?;
 
-        // Convert ScanRecords to Python ScanRecord list
-        // Use projected_row_type to handle column projection correctly
+        // Convert core ScanRecords to Python ScanRecords grouped by bucket
         let row_type = &self.projected_row_type;
-        let mut result = Vec::new();
+        let mut records_by_bucket = IndexMap::new();
+        let mut total_count = 0usize;
 
         for (bucket, records) in scan_records.into_records_by_buckets() {
-            for record in records {
-                let scan_record = ScanRecord::from_core(py, &bucket, &record, 
row_type)?;
-                result.push(scan_record);
+            let py_bucket = TableBucket::from_core(bucket);
+            let mut py_records = Vec::with_capacity(records.len());
+            for record in &records {
+                let scan_record = ScanRecord::from_core(py, record, row_type)?;
+                py_records.push(Py::new(py, scan_record)?);
+                total_count += 1;
             }
+            records_by_bucket.insert(py_bucket, py_records);
         }
 
-        Ok(result)
+        Ok(ScanRecords {
+            records_by_bucket,
+            total_count,
+        })
     }
 
     /// Poll for batches with metadata.
diff --git a/bindings/python/test/test_log_table.py 
b/bindings/python/test/test_log_table.py
index 09586aa..bfa9789 100644
--- a/bindings/python/test/test_log_table.py
+++ b/bindings/python/test/test_log_table.py
@@ -492,11 +492,22 @@ async def test_partitioned_table_append_scan(connection, 
admin):
         (8, "EU", 800),
     ]
 
-    records = _poll_records(scanner, expected_count=8)
-    assert len(records) == 8
+    # Poll and verify per-bucket grouping
+    all_records = []
+    deadline = time.monotonic() + 10
+    while len(all_records) < 8 and time.monotonic() < deadline:
+        scan_records = scanner.poll(5000)
+        for bucket, bucket_records in scan_records.items():
+            assert bucket.partition_id is not None, "Partitioned table should 
have partition_id"
+            # All records in a bucket should belong to the same partition
+            regions = {r.row["region"] for r in bucket_records}
+            assert len(regions) == 1, f"Bucket has mixed regions: {regions}"
+            all_records.extend(bucket_records)
+
+    assert len(all_records) == 8
 
     collected = sorted(
-        [(r.row["id"], r.row["region"], r.row["value"]) for r in records],
+        [(r.row["id"], r.row["region"], r.row["value"]) for r in all_records],
         key=lambda x: x[0],
     )
     assert collected == expected
@@ -652,6 +663,70 @@ async def test_partitioned_table_to_arrow(connection, 
admin):
     await admin.drop_table(table_path, ignore_if_not_exists=False)
 
 
+async def test_scan_records_indexing_and_slicing(connection, admin):
+    """Test ScanRecords indexing, slicing (incl. negative steps), and 
iteration consistency."""
+    table_path = fluss.TablePath("fluss", "py_test_scan_records_indexing")
+    await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+    schema = fluss.Schema(
+        pa.schema([pa.field("id", pa.int32()), pa.field("val", pa.string())])
+    )
+    await admin.create_table(table_path, fluss.TableDescriptor(schema))
+
+    table = await connection.get_table(table_path)
+    writer = table.new_append().create_writer()
+    writer.write_arrow_batch(
+        pa.RecordBatch.from_arrays(
+            [pa.array(list(range(1, 9)), type=pa.int32()),
+             pa.array([f"v{i}" for i in range(1, 9)])],
+            schema=pa.schema([pa.field("id", pa.int32()), pa.field("val", 
pa.string())]),
+        )
+    )
+    await writer.flush()
+
+    scanner = await table.new_scan().create_log_scanner()
+    num_buckets = (await admin.get_table_info(table_path)).num_buckets
+    scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in 
range(num_buckets)})
+
+    # Poll until we get a non-empty ScanRecords (need ≥2 records for slice 
tests)
+    sr = None
+    deadline = time.monotonic() + 10
+    while time.monotonic() < deadline:
+        sr = scanner.poll(5000)
+        if len(sr) >= 2:
+            break
+    assert sr is not None and len(sr) >= 2, "Expected at least 2 records"
+    n = len(sr)
+    offsets = [sr[i].offset for i in range(n)]
+
+    # Iteration and indexing must produce the same order
+    assert [r.offset for r in sr] == offsets
+
+    # Negative indexing
+    assert sr[-1].offset == offsets[-1]
+    assert sr[-n].offset == offsets[0]
+
+    # Verify slices match the same operation on the offsets reference list
+    test_slices = [
+        slice(1, n - 1),          # forward subrange
+        slice(None, None, -1),    # [::-1] full reverse
+        slice(n - 2, 0, -1),      # reverse with bounds
+        slice(n - 1, 0, -2),      # reverse with step
+        slice(None, None, 2),     # [::2]
+        slice(1, None, 3),        # [1::3]
+        slice(2, 2),              # empty
+    ]
+    for s in test_slices:
+        result = [r.offset for r in sr[s]]
+        assert result == offsets[s], f"slice {s}: got {result}, expected 
{offsets[s]}"
+
+    # Bucket-based indexing
+    for bucket in sr.buckets():
+        assert len(sr[bucket]) > 0
+
+    await admin.drop_table(table_path, ignore_if_not_exists=False)
+
+
 # ---------------------------------------------------------------------------
 # Helpers
 # ---------------------------------------------------------------------------
@@ -667,6 +742,8 @@ def _poll_records(scanner, expected_count, timeout_s=10):
     return collected
 
 
+
+
 def _poll_arrow_ids(scanner, expected_count, timeout_s=10):
     """Poll a batch scanner and extract 'id' column values."""
     all_ids = []
diff --git a/website/docs/user-guide/cpp/api-reference.md 
b/website/docs/user-guide/cpp/api-reference.md
index a07dd6c..433c5da 100644
--- a/website/docs/user-guide/cpp/api-reference.md
+++ b/website/docs/user-guide/cpp/api-reference.md
@@ -250,23 +250,60 @@ Read-only row view for scan results. Provides zero-copy 
access to string and byt
 
 `ScanRecord` is a value type that can be freely copied, stored, and 
accumulated across multiple `Poll()` calls. It shares ownership of the 
underlying scan data via reference counting.
 
-| Field          | Type                    |  Description                     |
-|----------------|-------------------------|----------------------------------|
-| `bucket_id`    | `int32_t`               | Bucket this record belongs to    |
-| `partition_id` | `std::optional<int64_t>`| Partition ID (if partitioned)    |
-| `offset`       | `int64_t`               | Record offset in the log         |
-| `timestamp`    | `int64_t`               | Record timestamp                 |
-| `change_type`  | `ChangeType`            | Type of change (see `ChangeType`)|
-| `row`          | `RowView`               | Read-only row view for field 
access |
+| Field         | Type         |  Description                                  
                      |
+|---------------|--------------|---------------------------------------------------------------------|
+| `offset`      | `int64_t`    | Record offset in the log                      
                      |
+| `timestamp`   | `int64_t`    | Record timestamp                              
                      |
+| `change_type` | `ChangeType` | Change type (AppendOnly, Insert, 
UpdateBefore, UpdateAfter, Delete) |
+| `row`         | `RowView`    | Row data (value type, shares ownership via 
reference counting)      |
 
 ## `ScanRecords`
 
-| Method                                 |  Description                        
       |
-|----------------------------------------|--------------------------------------------|
-| `Size() -> size_t`                     | Number of records                   
       |
-| `Empty() -> bool`                      | Check if empty                      
       |
-| `operator[](size_t idx) -> ScanRecord` | Access record by index              
       |
-| `begin() / end()`                      | Iterator support for range-based 
for loops |
+### Flat Access
+
+| Method                                  |  Description                       
        |
+|-----------------------------------------|--------------------------------------------|
+| `Count() -> size_t`                     | Total number of records across all 
buckets |
+| `IsEmpty() -> bool`                     | Check if empty                     
        |
+| `begin() / end()`                       | Iterator support for range-based 
for loops |
+
+Flat iteration over all records (regardless of bucket):
+
+```cpp
+for (const auto& rec : records) {
+    std::cout << "offset=" << rec.offset << std::endl;
+}
+```
+
+### Per-Bucket Access
+
+| Method                                                          |  
Description                                                          |
+|-----------------------------------------------------------------|-----------------------------------------------------------------------|
+| `BucketCount() -> size_t`                                       | Number of 
distinct buckets                                            |
+| `Buckets() -> std::vector<TableBucket>`                         | List of 
distinct buckets                                              |
+| `Records(const TableBucket& bucket) -> BucketView`              | Records 
for a specific bucket (empty view if bucket not present)      |
+| `BucketAt(size_t idx) -> BucketView`                            | Records by 
bucket index (0-based, O(1))                               |
+
+## `BucketView`
+
+A view of records within a single bucket. Obtained from 
`ScanRecords::Records()` or `ScanRecords::BucketAt()`. `BucketView` is a value 
type — it shares ownership of the underlying scan data via reference counting, 
so it can safely outlive the `ScanRecords` that produced it.
+
+| Method                                         |  Description                
               |
+|------------------------------------------------|--------------------------------------------|
+| `Size() -> size_t`                         | Number of records in this 
bucket           |
+| `Empty() -> bool`                          | Check if empty                  
           |
+| `Bucket() -> const TableBucket&`           | Get the bucket                  
           |
+| `operator[](size_t idx) -> ScanRecord`     | Access record by index within 
this bucket  |
+| `begin() / end()`                          | Iterator support for 
range-based for loops |
+
+## `TableBucket`
+
+| Field / Method                        |  Description                         
           |
+|---------------------------------------|-------------------------------------------------|
+| `table_id -> int64_t`                    | Table ID                          
              |
+| `bucket_id -> int32_t`                   | Bucket ID                         
              |
+| `partition_id -> std::optional<int64_t>` | Partition ID (empty if 
non-partitioned)         |
+| `operator==(const TableBucket&) -> bool` | Equality comparison               
              |
 
 ## `LookupResult`
 
diff --git a/website/docs/user-guide/cpp/example/log-tables.md 
b/website/docs/user-guide/cpp/example/log-tables.md
index 3a862c1..0125a4c 100644
--- a/website/docs/user-guide/cpp/example/log-tables.md
+++ b/website/docs/user-guide/cpp/example/log-tables.md
@@ -60,6 +60,18 @@ for (const auto& rec : records) {
               << " timestamp=" << rec.row.GetInt64(2)
               << " @ offset=" << rec.offset << std::endl;
 }
+
+// Or per-bucket access
+for (const auto& bucket : records.Buckets()) {
+    auto view = records.Records(bucket);
+    std::cout << "Bucket " << bucket.bucket_id << ": "
+              << view.Size() << " records" << std::endl;
+    for (const auto& rec : view) {
+        std::cout << "  event_id=" << rec.row.GetInt32(0)
+                  << " event_type=" << rec.row.GetString(1)
+                  << " @ offset=" << rec.offset << std::endl;
+    }
+}
 ```
 
 **Continuous polling:**
diff --git a/website/docs/user-guide/python/api-reference.md 
b/website/docs/user-guide/python/api-reference.md
index af03058..27a57dc 100644
--- a/website/docs/user-guide/python/api-reference.md
+++ b/website/docs/user-guide/python/api-reference.md
@@ -137,17 +137,69 @@ Builder for creating a `Lookuper`. Obtain via 
`FlussTable.new_lookup()`.
 | `.subscribe_partition_buckets(partition_bucket_offsets)`      | Subscribe to 
multiple partition+bucket combos (`{(part_id, bucket_id): offset}`) |
 | `.unsubscribe(bucket_id)`                                     | Unsubscribe 
from a bucket (non-partitioned tables)                               |
 | `.unsubscribe_partition(partition_id, bucket_id)`             | Unsubscribe 
from a partition bucket                                              |
-| `.poll(timeout_ms) -> list[ScanRecord]`                       | Poll 
individual records (record scanner only)                                    |
+| `.poll(timeout_ms) -> ScanRecords`                            | Poll 
individual records (record scanner only)                                    |
 | `.poll_arrow(timeout_ms) -> pa.Table`                         | Poll as 
Arrow Table (batch scanner only)                                         |
 | `.poll_record_batch(timeout_ms) -> list[RecordBatch]`         | Poll batches 
with metadata (batch scanner only)                                  |
 | `.to_arrow() -> pa.Table`                                     | Read all 
subscribed data as Arrow Table (batch scanner only)                     |
 | `.to_pandas() -> pd.DataFrame`                                | Read all 
subscribed data as DataFrame (batch scanner only)                       |
 
+## `ScanRecords`
+
+Returned by `LogScanner.poll()`. Records are grouped by bucket.
+
+> **Note:** Flat iteration and integer indexing traverse buckets in an 
arbitrary order that is consistent within a single `ScanRecords` instance but 
may differ between `poll()` calls. Use per-bucket access (`.items()`, 
`.records(bucket)`) when bucket ordering matters.
+
+```python
+scan_records = scanner.poll(timeout_ms=5000)
+
+# Sequence access
+scan_records[0]                              # first record
+scan_records[-1]                             # last record
+scan_records[:5]                             # first 5 records
+
+# Per-bucket access
+for bucket, records in scan_records.items():
+    for record in records:
+        print(f"bucket={bucket.bucket_id}, offset={record.offset}, 
row={record.row}")
+
+# Flat iteration
+for record in scan_records:
+    print(record.row)
+```
+
+### Methods
+
+| Method                                 |  Description                        
                             |
+|----------------------------------------|------------------------------------------------------------------|
+| `.buckets() -> list[TableBucket]`      | List of distinct buckets            
                             |
+| `.records(bucket) -> list[ScanRecord]` | Records for a specific bucket 
(empty list if bucket not present) |
+| `.count() -> int`                      | Total record count across all 
buckets                            |
+| `.is_empty() -> bool`                  | Check if empty                      
                             |
+
+### Indexing
+
+| Expression                   | Returns              | Description            
           |
+|------------------------------|----------------------|-----------------------------------|
+| `scan_records[0]`           | `ScanRecord`         | Record by flat index    
          |
+| `scan_records[-1]`          | `ScanRecord`         | Negative indexing       
           |
+| `scan_records[1:5]`         | `list[ScanRecord]`   | Slice                   
          |
+| `scan_records[bucket]`      | `list[ScanRecord]`   | Records for a bucket    
          |
+
+### Mapping Protocol
+
+| Method / Protocol              | Description                                 
    |
+|--------------------------------|-------------------------------------------------|
+| `.keys()`                      | Same as `.buckets()`                        
    |
+| `.values()`                    | Lazy iterator over record lists, one per 
bucket |
+| `.items()`                     | Lazy iterator over `(bucket, records)` 
pairs    |
+| `len(scan_records)`           | Same as `.count()`                           
   |
+| `bucket in scan_records`      | Membership test                              
   |
+| `for record in scan_records`  | Flat iteration over all records              
   |
+
 ## `ScanRecord`
 
 | Property                     |  Description                                  
                      |
 
|------------------------------|---------------------------------------------------------------------|
-| `.bucket -> TableBucket`     | Bucket this record belongs to                 
                      |
 | `.offset -> int`             | Record offset in the log                      
                      |
 | `.timestamp -> int`          | Record timestamp                              
                      |
 | `.change_type -> ChangeType` | Change type (AppendOnly, Insert, 
UpdateBefore, UpdateAfter, Delete) |
diff --git a/website/docs/user-guide/python/example/log-tables.md 
b/website/docs/user-guide/python/example/log-tables.md
index 6e44e06..adaa162 100644
--- a/website/docs/user-guide/python/example/log-tables.md
+++ b/website/docs/user-guide/python/example/log-tables.md
@@ -83,13 +83,20 @@ while True:
     if result.num_rows > 0:
         print(result.to_pandas())
 
-# Record scanner: poll individual records with metadata
+# Record scanner: poll individual records
 scanner = await table.new_scan().create_log_scanner()
 scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in 
range(num_buckets)})
 
 while True:
-    for record in scanner.poll(timeout_ms=5000):
+    scan_records = scanner.poll(timeout_ms=5000)
+
+    for record in scan_records:
         print(f"offset={record.offset}, 
change={record.change_type.short_string()}, row={record.row}")
+
+    # Or per-bucket access (dict-like)
+    for bucket, records in scan_records.items():
+        for record in records:
+            print(f"bucket={bucket.bucket_id}, offset={record.offset}, 
row={record.row}")
 ```
 
 ### Unsubscribing
diff --git a/website/docs/user-guide/rust/example/log-tables.md 
b/website/docs/user-guide/rust/example/log-tables.md
index 3ba3354..f5a4d0e 100644
--- a/website/docs/user-guide/rust/example/log-tables.md
+++ b/website/docs/user-guide/rust/example/log-tables.md
@@ -63,6 +63,21 @@ log_scanner.subscribe(0, 0).await?;
 // Poll for records
 let records = log_scanner.poll(Duration::from_secs(10)).await?;
 
+// Per-bucket access
+for (bucket, bucket_records) in records.records_by_buckets() {
+    println!("Bucket {}: {} records", bucket.bucket_id(), 
bucket_records.len());
+    for record in bucket_records {
+        let row = record.row();
+        println!(
+            "  event_id={}, event_type={} @ offset={}",
+            row.get_int(0),
+            row.get_string(1),
+            record.offset()
+        );
+    }
+}
+
+// Or flat iteration (consumes ScanRecords)
 for record in records {
     let row = record.row();
     println!(

Reply via email to