This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new e79251d  chore: cleanup after docs review (#313)
e79251d is described below

commit e79251d79b4950fc7cf36f0d16bc3e78c12eaa8d
Author: Anton Borisov <[email protected]>
AuthorDate: Mon Feb 16 00:26:31 2026 +0000

    chore: cleanup after docs review (#313)
---
 bindings/cpp/examples/example.cpp                  | 15 +++--
 bindings/cpp/include/fluss.hpp                     | 22 +++---
 bindings/cpp/src/admin.cpp                         | 45 +++++++++++--
 bindings/cpp/src/lib.rs                            | 78 +++++++++++++++-------
 bindings/python/example/example.py                 | 18 +++--
 bindings/python/fluss/__init__.pyi                 | 46 +++++++++----
 bindings/python/src/admin.rs                       | 54 +++++----------
 bindings/python/src/lib.rs                         | 52 ++++++++++++---
 website/docs/user-guide/cpp/api-reference.md       | 14 ++--
 website/docs/user-guide/cpp/data-types.md          |  2 +-
 .../user-guide/cpp/example/admin-operations.md     |  8 +--
 .../user-guide/cpp/example/partitioned-tables.md   |  2 +-
 website/docs/user-guide/python/api-reference.md    | 17 +++--
 .../user-guide/python/example/admin-operations.md  |  6 +-
 .../user-guide/python/example/configuration.md     |  4 +-
 .../docs/user-guide/python/example/log-tables.md   |  2 +-
 16 files changed, 248 insertions(+), 137 deletions(-)

diff --git a/bindings/cpp/examples/example.cpp 
b/bindings/cpp/examples/example.cpp
index 2c7f554..2b7f331 100644
--- a/bindings/cpp/examples/example.cpp
+++ b/bindings/cpp/examples/example.cpp
@@ -286,7 +286,7 @@ int main() {
 
     std::unordered_map<int32_t, int64_t> earliest_offsets;
     check("list_earliest_offsets",
-          admin.ListOffsets(table_path, all_bucket_ids, 
fluss::OffsetQuery::Earliest(),
+          admin.ListOffsets(table_path, all_bucket_ids, 
fluss::OffsetSpec::Earliest(),
                             earliest_offsets));
     std::cout << "Earliest offsets:" << std::endl;
     for (const auto& [bucket_id, offset] : earliest_offsets) {
@@ -295,7 +295,7 @@ int main() {
 
     std::unordered_map<int32_t, int64_t> latest_offsets;
     check("list_latest_offsets", admin.ListOffsets(table_path, all_bucket_ids,
-                                                   
fluss::OffsetQuery::Latest(), latest_offsets));
+                                                   
fluss::OffsetSpec::Latest(), latest_offsets));
     std::cout << "Latest offsets:" << std::endl;
     for (const auto& [bucket_id, offset] : latest_offsets) {
         std::cout << "  Bucket " << bucket_id << ": offset=" << offset << 
std::endl;
@@ -310,7 +310,7 @@ int main() {
     std::unordered_map<int32_t, int64_t> timestamp_offsets;
     check("list_timestamp_offsets",
           admin.ListOffsets(table_path, all_bucket_ids,
-                            fluss::OffsetQuery::FromTimestamp(timestamp_ms), 
timestamp_offsets));
+                            fluss::OffsetSpec::Timestamp(timestamp_ms), 
timestamp_offsets));
     std::cout << "Offsets for timestamp " << timestamp_ms << " (1 hour ago):" 
<< std::endl;
     for (const auto& [bucket_id, offset] : timestamp_offsets) {
         std::cout << "  Bucket " << bucket_id << ": offset=" << offset << 
std::endl;
@@ -507,7 +507,7 @@ int main() {
           admin.CreatePartition(partitioned_table_path, {{"region", "EU"}}, 
true));
     std::cout << "Created partitions: US, EU" << std::endl;
 
-    // List partitions
+    // List all partitions
     std::vector<fluss::PartitionInfo> partition_infos;
     check("list_partition_infos",
           admin.ListPartitionInfos(partitioned_table_path, partition_infos));
@@ -516,6 +516,13 @@ int main() {
                   << std::endl;
     }
 
+    // List partitions with partial spec filter
+    std::vector<fluss::PartitionInfo> us_partition_infos;
+    check("list_partition_infos_with_spec",
+          admin.ListPartitionInfos(partitioned_table_path, {{"region", "US"}}, 
us_partition_infos));
+    std::cout << "  Filtered (region=US): " << us_partition_infos.size() << " 
partition(s)"
+              << std::endl;
+
     // Write data to partitioned table
     fluss::Table partitioned_table;
     check("get_partitioned_table", conn.GetTable(partitioned_table_path, 
partitioned_table));
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index dd29882..30a8636 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -305,19 +305,19 @@ enum class DatumType {
 
 constexpr int64_t EARLIEST_OFFSET = -2;
 
-enum class OffsetSpec {
+enum class OffsetType {
     Earliest = 0,
     Latest = 1,
     Timestamp = 2,
 };
 
-struct OffsetQuery {
-    OffsetSpec spec;
+struct OffsetSpec {
+    OffsetType type;
     int64_t timestamp{0};
 
-    static OffsetQuery Earliest() { return {OffsetSpec::Earliest, 0}; }
-    static OffsetQuery Latest() { return {OffsetSpec::Latest, 0}; }
-    static OffsetQuery FromTimestamp(int64_t ts) { return 
{OffsetSpec::Timestamp, ts}; }
+    static OffsetSpec Earliest() { return {OffsetType::Earliest, 0}; }
+    static OffsetSpec Latest() { return {OffsetType::Latest, 0}; }
+    static OffsetSpec Timestamp(int64_t ts) { return {OffsetType::Timestamp, 
ts}; }
 };
 
 struct Result {
@@ -1000,15 +1000,19 @@ class Admin {
     Result GetLatestLakeSnapshot(const TablePath& table_path, LakeSnapshot& 
out);
 
     Result ListOffsets(const TablePath& table_path, const 
std::vector<int32_t>& bucket_ids,
-                       const OffsetQuery& offset_query, 
std::unordered_map<int32_t, int64_t>& out);
+                       const OffsetSpec& offset_spec, 
std::unordered_map<int32_t, int64_t>& out);
 
     Result ListPartitionOffsets(const TablePath& table_path, const 
std::string& partition_name,
                                 const std::vector<int32_t>& bucket_ids,
-                                const OffsetQuery& offset_query,
+                                const OffsetSpec& offset_spec,
                                 std::unordered_map<int32_t, int64_t>& out);
 
     Result ListPartitionInfos(const TablePath& table_path, 
std::vector<PartitionInfo>& out);
 
+    Result ListPartitionInfos(const TablePath& table_path,
+                              const std::unordered_map<std::string, 
std::string>& partition_spec,
+                              std::vector<PartitionInfo>& out);
+
     Result CreatePartition(const TablePath& table_path,
                            const std::unordered_map<std::string, std::string>& 
partition_spec,
                            bool ignore_if_exists = false);
@@ -1035,7 +1039,7 @@ class Admin {
 
    private:
     Result DoListOffsets(const TablePath& table_path, const 
std::vector<int32_t>& bucket_ids,
-                         const OffsetQuery& offset_query, 
std::unordered_map<int32_t, int64_t>& out,
+                         const OffsetSpec& offset_spec, 
std::unordered_map<int32_t, int64_t>& out,
                          const std::string* partition_name = nullptr);
 
     friend class Connection;
diff --git a/bindings/cpp/src/admin.cpp b/bindings/cpp/src/admin.cpp
index 0fb15b2..8deb182 100644
--- a/bindings/cpp/src/admin.cpp
+++ b/bindings/cpp/src/admin.cpp
@@ -107,7 +107,7 @@ Result Admin::GetLatestLakeSnapshot(const TablePath& 
table_path, LakeSnapshot& o
 
 // function for common list offsets functionality
 Result Admin::DoListOffsets(const TablePath& table_path, const 
std::vector<int32_t>& bucket_ids,
-                            const OffsetQuery& offset_query,
+                            const OffsetSpec& offset_spec,
                             std::unordered_map<int32_t, int64_t>& out,
                             const std::string* partition_name) {
     if (!Available()) {
@@ -122,8 +122,8 @@ Result Admin::DoListOffsets(const TablePath& table_path, 
const std::vector<int32
     }
 
     ffi::FfiOffsetQuery ffi_query;
-    ffi_query.offset_type = static_cast<int32_t>(offset_query.spec);
-    ffi_query.timestamp = offset_query.timestamp;
+    ffi_query.offset_type = static_cast<int32_t>(offset_spec.type);
+    ffi_query.timestamp = offset_spec.timestamp;
 
     ffi::FfiListOffsetsResult ffi_result;
     if (partition_name != nullptr) {
@@ -145,16 +145,16 @@ Result Admin::DoListOffsets(const TablePath& table_path, 
const std::vector<int32
 }
 
 Result Admin::ListOffsets(const TablePath& table_path, const 
std::vector<int32_t>& bucket_ids,
-                          const OffsetQuery& offset_query,
+                          const OffsetSpec& offset_spec,
                           std::unordered_map<int32_t, int64_t>& out) {
-    return DoListOffsets(table_path, bucket_ids, offset_query, out);
+    return DoListOffsets(table_path, bucket_ids, offset_spec, out);
 }
 
 Result Admin::ListPartitionOffsets(const TablePath& table_path, const 
std::string& partition_name,
                                    const std::vector<int32_t>& bucket_ids,
-                                   const OffsetQuery& offset_query,
+                                   const OffsetSpec& offset_spec,
                                    std::unordered_map<int32_t, int64_t>& out) {
-    return DoListOffsets(table_path, bucket_ids, offset_query, out, 
&partition_name);
+    return DoListOffsets(table_path, bucket_ids, offset_spec, out, 
&partition_name);
 }
 
 Result Admin::ListPartitionInfos(const TablePath& table_path, 
std::vector<PartitionInfo>& out) {
@@ -177,6 +177,37 @@ Result Admin::ListPartitionInfos(const TablePath& 
table_path, std::vector<Partit
     return result;
 }
 
+Result Admin::ListPartitionInfos(const TablePath& table_path,
+                                 const std::unordered_map<std::string, 
std::string>& partition_spec,
+                                 std::vector<PartitionInfo>& out) {
+    if (!Available()) {
+        return utils::make_client_error("Admin not available");
+    }
+
+    auto ffi_path = utils::to_ffi_table_path(table_path);
+
+    rust::Vec<ffi::FfiPartitionKeyValue> rust_spec;
+    for (const auto& [key, value] : partition_spec) {
+        ffi::FfiPartitionKeyValue kv;
+        kv.key = rust::String(key);
+        kv.value = rust::String(value);
+        rust_spec.push_back(std::move(kv));
+    }
+
+    auto ffi_result = admin_->list_partition_infos_with_spec(ffi_path, 
std::move(rust_spec));
+
+    auto result = utils::from_ffi_result(ffi_result.result);
+    if (result.Ok()) {
+        out.clear();
+        out.reserve(ffi_result.partition_infos.size());
+        for (const auto& pi : ffi_result.partition_infos) {
+            out.push_back({pi.partition_id, std::string(pi.partition_name)});
+        }
+    }
+
+    return result;
+}
+
 Result Admin::CreatePartition(const TablePath& table_path,
                               const std::unordered_map<std::string, 
std::string>& partition_spec,
                               bool ignore_if_exists) {
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 8a5bdfd..fab8edf 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -302,6 +302,11 @@ mod ffi {
             self: &Admin,
             table_path: &FfiTablePath,
         ) -> FfiListPartitionInfosResult;
+        fn list_partition_infos_with_spec(
+            self: &Admin,
+            table_path: &FfiTablePath,
+            partition_spec: Vec<FfiPartitionKeyValue>,
+        ) -> FfiListPartitionInfosResult;
         fn create_partition(
             self: &Admin,
             table_path: &FfiTablePath,
@@ -735,30 +740,20 @@ impl Admin {
         &self,
         table_path: &ffi::FfiTablePath,
     ) -> ffi::FfiListPartitionInfosResult {
-        let path = fcore::metadata::TablePath::new(
-            table_path.database_name.clone(),
-            table_path.table_name.clone(),
-        );
-        let result = RUNTIME.block_on(async { 
self.inner.list_partition_infos(&path).await });
-        match result {
-            Ok(infos) => {
-                let partition_infos: Vec<ffi::FfiPartitionInfo> = infos
-                    .into_iter()
-                    .map(|info| ffi::FfiPartitionInfo {
-                        partition_id: info.get_partition_id(),
-                        partition_name: info.get_partition_name(),
-                    })
-                    .collect();
-                ffi::FfiListPartitionInfosResult {
-                    result: ok_result(),
-                    partition_infos,
-                }
-            }
-            Err(e) => ffi::FfiListPartitionInfosResult {
-                result: err_from_core_error(&e),
-                partition_infos: vec![],
-            },
-        }
+        self.do_list_partition_infos(table_path, None)
+    }
+
+    fn list_partition_infos_with_spec(
+        &self,
+        table_path: &ffi::FfiTablePath,
+        partition_spec: Vec<ffi::FfiPartitionKeyValue>,
+    ) -> ffi::FfiListPartitionInfosResult {
+        let spec_map: std::collections::HashMap<String, String> = 
partition_spec
+            .into_iter()
+            .map(|kv| (kv.key, kv.value))
+            .collect();
+        let spec = fcore::metadata::PartitionSpec::new(spec_map);
+        self.do_list_partition_infos(table_path, Some(&spec))
     }
     fn create_partition(
         &self,
@@ -939,6 +934,41 @@ impl Admin {
             },
         }
     }
+
+    fn do_list_partition_infos(
+        &self,
+        table_path: &ffi::FfiTablePath,
+        partial_partition_spec: Option<&fcore::metadata::PartitionSpec>,
+    ) -> ffi::FfiListPartitionInfosResult {
+        let path = fcore::metadata::TablePath::new(
+            table_path.database_name.clone(),
+            table_path.table_name.clone(),
+        );
+        let result = RUNTIME.block_on(async {
+            self.inner
+                .list_partition_infos_with_spec(&path, partial_partition_spec)
+                .await
+        });
+        match result {
+            Ok(infos) => {
+                let partition_infos: Vec<ffi::FfiPartitionInfo> = infos
+                    .into_iter()
+                    .map(|info| ffi::FfiPartitionInfo {
+                        partition_id: info.get_partition_id(),
+                        partition_name: info.get_partition_name(),
+                    })
+                    .collect();
+                ffi::FfiListPartitionInfosResult {
+                    result: ok_result(),
+                    partition_infos,
+                }
+            }
+            Err(e) => ffi::FfiListPartitionInfosResult {
+                result: err_from_core_error(&e),
+                partition_infos: vec![],
+            },
+        }
+    }
 }
 
 // Table implementation
diff --git a/bindings/python/example/example.py 
b/bindings/python/example/example.py
index 4ea3bd6..9c2b7e3 100644
--- a/bindings/python/example/example.py
+++ b/bindings/python/example/example.py
@@ -90,11 +90,11 @@ async def main():
     # Demo: List offsets
     print("\n--- Testing list_offsets() ---")
     try:
-        # Query latest offsets using OffsetType constant (recommended for type 
safety)
+        # Query latest offsets using OffsetSpec factory method
         offsets = await admin.list_offsets(
             table_path,
             bucket_ids=[0],
-            offset_type=fluss.OffsetType.LATEST
+            offset_spec=fluss.OffsetSpec.latest()
         )
         print(f"Latest offsets for table (before writes): {offsets}")
     except Exception as e:
@@ -248,11 +248,10 @@ async def main():
         # Demo: Check offsets after writes
         print("\n--- Checking offsets after writes ---")
         try:
-            # Query with string constant (alternative API - both strings and 
constants are supported)
             offsets = await admin.list_offsets(
                 table_path,
                 bucket_ids=[0],
-                offset_type="latest"  # Can also use "earliest" or "timestamp"
+                offset_spec=fluss.OffsetSpec.latest()
             )
             print(f"Latest offsets after writing 7 records: {offsets}")
         except Exception as e:
@@ -734,6 +733,13 @@ async def main():
         await partitioned_writer.flush()
         print("\nWrote 4 records (2 to US, 2 to EU)")
 
+        # Demo: list_partition_infos with partial spec filter
+        print("\n--- Testing list_partition_infos with spec ---")
+        us_partitions = await admin.list_partition_infos(
+            partitioned_table_path, partition_spec={"region": "US"}
+        )
+        print(f"Filtered partitions (region=US): {us_partitions}")
+
         # Demo: list_partition_offsets
         print("\n--- Testing list_partition_offsets ---")
 
@@ -743,7 +749,7 @@ async def main():
             partitioned_table_path,
             partition_name="US",
             bucket_ids=[0],
-            offset_type="latest"
+            offset_spec=fluss.OffsetSpec.latest()
         )
         print(f"US partition latest offsets: {us_offsets}")
 
@@ -752,7 +758,7 @@ async def main():
             partitioned_table_path,
             partition_name="EU",
             bucket_ids=[0],
-            offset_type="latest"
+            offset_spec=fluss.OffsetSpec.latest()
         )
         print(f"EU partition latest offsets: {eu_offsets}")
 
diff --git a/bindings/python/fluss/__init__.pyi 
b/bindings/python/fluss/__init__.pyi
index daccca8..47eeb80 100644
--- a/bindings/python/fluss/__init__.pyi
+++ b/bindings/python/fluss/__init__.pyi
@@ -193,16 +193,15 @@ class FlussAdmin:
         self,
         table_path: TablePath,
         bucket_ids: List[int],
-        offset_type: str,
-        timestamp: Optional[int] = None,
+        offset_spec: "OffsetSpec",
     ) -> Dict[int, int]:
         """List offsets for the specified buckets.
 
         Args:
             table_path: Path to the table
             bucket_ids: List of bucket IDs to query
-            offset_type: "earliest", "latest", or "timestamp"
-            timestamp: Required when offset_type is "timestamp"
+            offset_spec: Offset specification (OffsetSpec.earliest(), 
OffsetSpec.latest(),
+                or OffsetSpec.timestamp(ts))
 
         Returns:
             Dict mapping bucket_id -> offset
@@ -213,8 +212,7 @@ class FlussAdmin:
         table_path: TablePath,
         partition_name: str,
         bucket_ids: List[int],
-        offset_type: str,
-        timestamp: Optional[int] = None,
+        offset_spec: "OffsetSpec",
     ) -> Dict[int, int]:
         """List offsets for buckets in a specific partition.
 
@@ -222,8 +220,8 @@ class FlussAdmin:
             table_path: Path to the table
             partition_name: Partition value (e.g., "US" not "region=US")
             bucket_ids: List of bucket IDs to query
-            offset_type: "earliest", "latest", or "timestamp"
-            timestamp: Required when offset_type is "timestamp"
+            offset_spec: Offset specification (OffsetSpec.earliest(), 
OffsetSpec.latest(),
+                or OffsetSpec.timestamp(ts))
 
         Returns:
             Dict mapping bucket_id -> offset
@@ -246,11 +244,15 @@ class FlussAdmin:
     async def list_partition_infos(
         self,
         table_path: TablePath,
+        partition_spec: Optional[Dict[str, str]] = None,
     ) -> List["PartitionInfo"]:
-        """List all partitions for a partitioned table.
+        """List partitions for a partitioned table.
 
         Args:
             table_path: Path to the table
+            partition_spec: Optional partial partition spec to filter results.
+                Dict mapping partition column name to value (e.g., {"region": 
"US"}).
+                If None, returns all partitions.
 
         Returns:
             List of PartitionInfo objects
@@ -839,12 +841,28 @@ class ErrorCode:
     INVALID_ALTER_TABLE_EXCEPTION: int
     DELETION_DISABLED_EXCEPTION: int
 
-class OffsetType:
-    """Offset type constants for list_offsets()."""
+class OffsetSpec:
+    """Offset specification for list_offsets(), matching Java's OffsetSpec.
 
-    EARLIEST: str
-    LATEST: str
-    TIMESTAMP: str
+    Use factory methods to create instances:
+        OffsetSpec.earliest()
+        OffsetSpec.latest()
+        OffsetSpec.timestamp(ts)
+    """
+
+    @staticmethod
+    def earliest() -> "OffsetSpec":
+        """Create an OffsetSpec for the earliest available offset."""
+        ...
+    @staticmethod
+    def latest() -> "OffsetSpec":
+        """Create an OffsetSpec for the latest available offset."""
+        ...
+    @staticmethod
+    def timestamp(ts: int) -> "OffsetSpec":
+        """Create an OffsetSpec for the offset at or after the given 
timestamp."""
+        ...
+    def __repr__(self) -> str: ...
 
 # Constant for earliest offset (-2)
 EARLIEST_OFFSET: int
diff --git a/bindings/python/src/admin.rs b/bindings/python/src/admin.rs
index d03ce7a..9a96bea 100644
--- a/bindings/python/src/admin.rs
+++ b/bindings/python/src/admin.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use crate::*;
-use fcore::rpc::message::OffsetSpec;
 use pyo3::conversion::IntoPyObject;
 use pyo3_async_runtimes::tokio::future_into_py;
 use std::sync::Arc;
@@ -27,23 +26,6 @@ pub struct FlussAdmin {
     __admin: Arc<fcore::client::FlussAdmin>,
 }
 
-/// Parse offset_type string into OffsetSpec
-fn parse_offset_spec(offset_type: &str, timestamp: Option<i64>) -> 
PyResult<OffsetSpec> {
-    match offset_type {
-        s if s.eq_ignore_ascii_case("earliest") => Ok(OffsetSpec::Earliest),
-        s if s.eq_ignore_ascii_case("latest") => Ok(OffsetSpec::Latest),
-        s if s.eq_ignore_ascii_case("timestamp") => {
-            let ts = timestamp.ok_or_else(|| {
-                FlussError::new_err("timestamp must be provided when 
offset_type='timestamp'")
-            })?;
-            Ok(OffsetSpec::Timestamp(ts))
-        }
-        _ => Err(FlussError::new_err(format!(
-            "Invalid offset_type: '{offset_type}'. Must be 'earliest', 
'latest', or 'timestamp'"
-        ))),
-    }
-}
-
 /// Validate bucket IDs are non-negative
 fn validate_bucket_ids(bucket_ids: &[i32]) -> PyResult<()> {
     for &bucket_id in bucket_ids {
@@ -374,25 +356,20 @@ impl FlussAdmin {
     /// Args:
     ///     table_path: Path to the table
     ///     bucket_ids: List of bucket IDs to query
-    ///     offset_type: Type of offset to retrieve:
-    ///         - "earliest" or OffsetType.EARLIEST: Start of the log
-    ///         - "latest" or OffsetType.LATEST: End of the log
-    ///         - "timestamp" or OffsetType.TIMESTAMP: Offset at given 
timestamp (requires timestamp arg)
-    ///     timestamp: Required when offset_type is "timestamp", ignored 
otherwise
+    ///     offset_spec: Offset specification (OffsetSpec.earliest(), 
OffsetSpec.latest(),
+    ///         or OffsetSpec.timestamp(ts))
     ///
     /// Returns:
     ///     dict[int, int]: Mapping of bucket_id -> offset
-    #[pyo3(signature = (table_path, bucket_ids, offset_type, timestamp=None))]
     pub fn list_offsets<'py>(
         &self,
         py: Python<'py>,
         table_path: &TablePath,
         bucket_ids: Vec<i32>,
-        offset_type: &str,
-        timestamp: Option<i64>,
+        offset_spec: &OffsetSpec,
     ) -> PyResult<Bound<'py, PyAny>> {
         validate_bucket_ids(&bucket_ids)?;
-        let offset_spec = parse_offset_spec(offset_type, timestamp)?;
+        let offset_spec = offset_spec.inner.clone();
 
         let core_table_path = table_path.to_core();
         let admin = self.__admin.clone();
@@ -419,26 +396,21 @@ impl FlussAdmin {
     ///     table_path: Path to the table
     ///     partition_name: Partition value (e.g., "US" not "region=US")
     ///     bucket_ids: List of bucket IDs to query
-    ///     offset_type: Type of offset to retrieve:
-    ///         - "earliest" or OffsetType.EARLIEST: Start of the log
-    ///         - "latest" or OffsetType.LATEST: End of the log
-    ///         - "timestamp" or OffsetType.TIMESTAMP: Offset at given 
timestamp (requires timestamp arg)
-    ///     timestamp: Required when offset_type is "timestamp", ignored 
otherwise
+    ///     offset_spec: Offset specification (OffsetSpec.earliest(), 
OffsetSpec.latest(),
+    ///         or OffsetSpec.timestamp(ts))
     ///
     /// Returns:
     ///     dict[int, int]: Mapping of bucket_id -> offset
-    #[pyo3(signature = (table_path, partition_name, bucket_ids, offset_type, 
timestamp=None))]
     pub fn list_partition_offsets<'py>(
         &self,
         py: Python<'py>,
         table_path: &TablePath,
         partition_name: &str,
         bucket_ids: Vec<i32>,
-        offset_type: &str,
-        timestamp: Option<i64>,
+        offset_spec: &OffsetSpec,
     ) -> PyResult<Bound<'py, PyAny>> {
         validate_bucket_ids(&bucket_ids)?;
-        let offset_spec = parse_offset_spec(offset_type, timestamp)?;
+        let offset_spec = offset_spec.inner.clone();
 
         let core_table_path = table_path.to_core();
         let admin = self.__admin.clone();
@@ -493,24 +465,30 @@ impl FlussAdmin {
         })
     }
 
-    /// List all partitions for a partitioned table.
+    /// List partitions for a partitioned table.
     ///
     /// Args:
     ///     table_path: Path to the table
+    ///     partition_spec: Optional partial partition spec to filter results.
+    ///         Dict mapping partition column name to value (e.g., {"region": 
"US"}).
+    ///         If None, returns all partitions.
     ///
     /// Returns:
     ///     List[PartitionInfo]: List of partition info objects
+    #[pyo3(signature = (table_path, partition_spec=None))]
     pub fn list_partition_infos<'py>(
         &self,
         py: Python<'py>,
         table_path: &TablePath,
+        partition_spec: Option<std::collections::HashMap<String, String>>,
     ) -> PyResult<Bound<'py, PyAny>> {
         let core_table_path = table_path.to_core();
         let admin = self.__admin.clone();
+        let core_partition_spec = 
partition_spec.map(fcore::metadata::PartitionSpec::new);
 
         future_into_py(py, async move {
             let partition_infos = admin
-                .list_partition_infos(&core_table_path)
+                .list_partition_infos_with_spec(&core_table_path, 
core_partition_spec.as_ref())
                 .await
                 .map_err(|e| FlussError::new_err(format!("Failed to list 
partitions: {e}")))?;
 
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs
index 801db2c..553c8a9 100644
--- a/bindings/python/src/lib.rs
+++ b/bindings/python/src/lib.rs
@@ -50,21 +50,53 @@ static TOKIO_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
         .expect("Failed to create Tokio runtime")
 });
 
-/// Offset type constants for list_offsets()
+/// Offset specification for list_offsets(), matching Java's OffsetSpec.
+///
+/// Use factory methods to create instances:
+///   OffsetSpec.earliest()
+///   OffsetSpec.latest()
+///   OffsetSpec.timestamp(ts)
 #[pyclass]
 #[derive(Clone)]
-pub struct OffsetType;
+pub struct OffsetSpec {
+    pub(crate) inner: fcore::rpc::message::OffsetSpec,
+}
 
 #[pymethods]
-impl OffsetType {
-    #[classattr]
-    const EARLIEST: &'static str = "earliest";
+impl OffsetSpec {
+    /// Create an OffsetSpec for the earliest available offset.
+    #[staticmethod]
+    fn earliest() -> Self {
+        Self {
+            inner: fcore::rpc::message::OffsetSpec::Earliest,
+        }
+    }
+
+    /// Create an OffsetSpec for the latest available offset.
+    #[staticmethod]
+    fn latest() -> Self {
+        Self {
+            inner: fcore::rpc::message::OffsetSpec::Latest,
+        }
+    }
 
-    #[classattr]
-    const LATEST: &'static str = "latest";
+    /// Create an OffsetSpec for the offset at or after the given timestamp.
+    #[staticmethod]
+    fn timestamp(ts: i64) -> Self {
+        Self {
+            inner: fcore::rpc::message::OffsetSpec::Timestamp(ts),
+        }
+    }
 
-    #[classattr]
-    const TIMESTAMP: &'static str = "timestamp";
+    fn __repr__(&self) -> String {
+        match &self.inner {
+            fcore::rpc::message::OffsetSpec::Earliest => 
"OffsetSpec.earliest()".to_string(),
+            fcore::rpc::message::OffsetSpec::Latest => 
"OffsetSpec.latest()".to_string(),
+            fcore::rpc::message::OffsetSpec::Timestamp(ts) => {
+                format!("OffsetSpec.timestamp({ts})")
+            }
+        }
+    }
 }
 
 #[pymodule]
@@ -92,7 +124,7 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
     m.add_class::<ScanRecord>()?;
     m.add_class::<RecordBatch>()?;
     m.add_class::<PartitionInfo>()?;
-    m.add_class::<OffsetType>()?;
+    m.add_class::<OffsetSpec>()?;
     m.add_class::<WriteResultHandle>()?;
     m.add_class::<DatabaseDescriptor>()?;
     m.add_class::<DatabaseInfo>()?;
diff --git a/website/docs/user-guide/cpp/api-reference.md 
b/website/docs/user-guide/cpp/api-reference.md
index 00ff808..47c9307 100644
--- a/website/docs/user-guide/cpp/api-reference.md
+++ b/website/docs/user-guide/cpp/api-reference.md
@@ -68,8 +68,8 @@ Complete API reference for the Fluss C++ client.
 
 | Method                                                                       
                                                                                
                                           | Description                        
     |
 
|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------|
-| `ListOffsets(const TablePath& path, const std::vector<int32_t>& bucket_ids, 
const OffsetQuery& query, std::unordered_map<int32_t, int64_t>& out) -> Result` 
                                            | Get offsets for buckets           
      |
-| `ListPartitionOffsets(const TablePath& path, const std::string& 
partition_name, const std::vector<int32_t>& bucket_ids, const OffsetQuery& 
query, std::unordered_map<int32_t, int64_t>& out) -> Result` | Get offsets for 
a partition's buckets   |
+| `ListOffsets(const TablePath& path, const std::vector<int32_t>& bucket_ids, 
const OffsetSpec& query, std::unordered_map<int32_t, int64_t>& out) -> Result`  
                                           | Get offsets for buckets            
     |
+| `ListPartitionOffsets(const TablePath& path, const std::string& 
partition_name, const std::vector<int32_t>& bucket_ids, const OffsetSpec& 
query, std::unordered_map<int32_t, int64_t>& out) -> Result` | Get offsets for 
a partition's buckets   |
 
 ### Lake Operations
 
@@ -423,13 +423,13 @@ When using `table.NewRow()`, the `Set()` method 
auto-routes to the correct type
 | `bucket_id`    | `int32_t` | Bucket ID    |
 | `offset`       | `int64_t` | Offset value |
 
-## `OffsetQuery`
+## `OffsetSpec`
 
 | Method                                             | Description             
                |
 
|----------------------------------------------------|-----------------------------------------|
-| `OffsetQuery::Earliest()`                          | Query for the earliest 
available offset |
-| `OffsetQuery::Latest()`                            | Query for the latest 
offset             |
-| `OffsetQuery::FromTimestamp(int64_t timestamp_ms)` | Query offset at a 
specific timestamp    |
+| `OffsetSpec::Earliest()`                          | Query for the earliest 
available offset |
+| `OffsetSpec::Latest()`                            | Query for the latest 
offset             |
+| `OffsetSpec::Timestamp(int64_t timestamp_ms)`     | Query offset at a 
specific timestamp    |
 
 ## Constants
 
@@ -441,7 +441,7 @@ To start reading from the latest offset (only new records), 
resolve the current
 
 ```cpp
 std::unordered_map<int32_t, int64_t> offsets;
-admin.ListOffsets(table_path, {0}, fluss::OffsetQuery::Latest(), offsets);
+admin.ListOffsets(table_path, {0}, fluss::OffsetSpec::Latest(), offsets);
 scanner.Subscribe(0, offsets[0]);
 ```
 
diff --git a/website/docs/user-guide/cpp/data-types.md 
b/website/docs/user-guide/cpp/data-types.md
index 11712fa..fb01ac2 100644
--- a/website/docs/user-guide/cpp/data-types.md
+++ b/website/docs/user-guide/cpp/data-types.md
@@ -105,6 +105,6 @@ To start reading from the latest offset, resolve the 
current offset via `ListOff
 
 ```cpp
 std::unordered_map<int32_t, int64_t> offsets;
-admin.ListOffsets(table_path, {0}, fluss::OffsetQuery::Latest(), offsets);
+admin.ListOffsets(table_path, {0}, fluss::OffsetSpec::Latest(), offsets);
 scanner.Subscribe(0, offsets[0]);
 ```
diff --git a/website/docs/user-guide/cpp/example/admin-operations.md 
b/website/docs/user-guide/cpp/example/admin-operations.md
index 1a33012..850660e 100644
--- a/website/docs/user-guide/cpp/example/admin-operations.md
+++ b/website/docs/user-guide/cpp/example/admin-operations.md
@@ -120,23 +120,23 @@ std::vector<int32_t> bucket_ids = {0, 1, 2};
 // Query earliest offsets
 std::unordered_map<int32_t, int64_t> earliest_offsets;
 admin.ListOffsets(table_path, bucket_ids,
-                  fluss::OffsetQuery::Earliest(), earliest_offsets);
+                  fluss::OffsetSpec::Earliest(), earliest_offsets);
 
 // Query latest offsets
 std::unordered_map<int32_t, int64_t> latest_offsets;
 admin.ListOffsets(table_path, bucket_ids,
-                  fluss::OffsetQuery::Latest(), latest_offsets);
+                  fluss::OffsetSpec::Latest(), latest_offsets);
 
 // Query offsets for a specific timestamp
 std::unordered_map<int32_t, int64_t> timestamp_offsets;
 admin.ListOffsets(table_path, bucket_ids,
-                  fluss::OffsetQuery::FromTimestamp(timestamp_ms),
+                  fluss::OffsetSpec::Timestamp(timestamp_ms),
                   timestamp_offsets);
 
 // Query partition offsets
 std::unordered_map<int32_t, int64_t> partition_offsets;
 admin.ListPartitionOffsets(table_path, "partition_name",
-                           bucket_ids, fluss::OffsetQuery::Latest(),
+                           bucket_ids, fluss::OffsetSpec::Latest(),
                            partition_offsets);
 ```
 
diff --git a/website/docs/user-guide/cpp/example/partitioned-tables.md 
b/website/docs/user-guide/cpp/example/partitioned-tables.md
index 6a6927f..371ee3e 100644
--- a/website/docs/user-guide/cpp/example/partitioned-tables.md
+++ b/website/docs/user-guide/cpp/example/partitioned-tables.md
@@ -103,7 +103,7 @@ admin.ListPartitionInfos(table_path, partition_infos);
 std::vector<int32_t> bucket_ids = {0, 1, 2};
 std::unordered_map<int32_t, int64_t> offsets;
 admin.ListPartitionOffsets(table_path, "2024-01-15$US",
-                           bucket_ids, fluss::OffsetQuery::Latest(), offsets);
+                           bucket_ids, fluss::OffsetSpec::Latest(), offsets);
 ```
 
 ## Partitioned Primary Key Tables
diff --git a/website/docs/user-guide/python/api-reference.md 
b/website/docs/user-guide/python/api-reference.md
index 321e25e..af03058 100644
--- a/website/docs/user-guide/python/api-reference.md
+++ b/website/docs/user-guide/python/api-reference.md
@@ -43,8 +43,8 @@ Supports `with` statement (context manager).
 | `await get_table_info(table_path) -> TableInfo`                              
                                         | Get table metadata                   
 |
 | `await list_tables(database_name) -> list[str]`                              
                                         | List tables in a database            
 |
 | `await table_exists(table_path) -> bool`                                     
                                         | Check if a table exists              
 |
-| `await list_offsets(table_path, bucket_ids, offset_type, timestamp=None) -> 
dict[int, int]`                           | Get offsets for buckets             
  |
-| `await list_partition_offsets(table_path, partition_name, bucket_ids, 
offset_type, timestamp=None) -> dict[int, int]` | Get offsets for a partition's 
buckets |
+| `await list_offsets(table_path, bucket_ids, offset_spec) -> dict[int, int]`  
                         | Get offsets for buckets               |
+| `await list_partition_offsets(table_path, partition_name, bucket_ids, 
offset_spec) -> dict[int, int]` | Get offsets for a partition's buckets |
 | `await create_partition(table_path, partition_spec, ignore_if_exists=False)` 
                                         | Create a partition                   
 |
 | `await drop_partition(table_path, partition_spec, 
ignore_if_not_exists=False)`                                        | Drop a 
partition                      |
 | `await list_partition_infos(table_path) -> list[PartitionInfo]`              
                                         | List partitions                      
 |
@@ -264,14 +264,19 @@ Raised for all Fluss-specific errors (connection 
failures, table not found, sche
 | Constant                     | Value         | Description                   
                      |
 
|------------------------------|---------------|-----------------------------------------------------|
 | `fluss.EARLIEST_OFFSET`      | `-2`          | Start reading from earliest 
available offset        |
-| `fluss.OffsetType.EARLIEST`  | `"earliest"`  | For `list_offsets()`          
                      |
-| `fluss.OffsetType.LATEST`    | `"latest"`    | For `list_offsets()`          
                      |
-| `fluss.OffsetType.TIMESTAMP` | `"timestamp"` | For `list_offsets()` with 
timestamp                 |
+
+## `OffsetSpec`
+
+| Method                      | Description                                    
  |
+|-----------------------------|--------------------------------------------------|
+| `OffsetSpec.earliest()`     | Earliest available offset                      
  |
+| `OffsetSpec.latest()`       | Latest offset                                  
  |
+| `OffsetSpec.timestamp(ts)`  | Offset at or after the given timestamp 
(millis)  |
 
 To start reading from the latest offset (only new records), resolve the 
current offset via `list_offsets` before subscribing:
 
 ```python
-offsets = await admin.list_offsets(table_path, [0], fluss.OffsetType.LATEST)
+offsets = await admin.list_offsets(table_path, [0], fluss.OffsetSpec.latest())
 scanner.subscribe(bucket_id=0, start_offset=offsets[0])
 ```
 
diff --git a/website/docs/user-guide/python/example/admin-operations.md 
b/website/docs/user-guide/python/example/admin-operations.md
index 8c62ee7..4561a3f 100644
--- a/website/docs/user-guide/python/example/admin-operations.md
+++ b/website/docs/user-guide/python/example/admin-operations.md
@@ -56,13 +56,13 @@ await admin.drop_table(table_path, 
ignore_if_not_exists=True)
 
 ```python
 # Latest offsets for buckets
-offsets = await admin.list_offsets(table_path, bucket_ids=[0, 1], 
offset_type="latest")
+offsets = await admin.list_offsets(table_path, bucket_ids=[0, 1], 
offset_spec=fluss.OffsetSpec.latest())
 
 # By timestamp
-offsets = await admin.list_offsets(table_path, bucket_ids=[0], 
offset_type="timestamp", timestamp=1704067200000)
+offsets = await admin.list_offsets(table_path, bucket_ids=[0], 
offset_spec=fluss.OffsetSpec.timestamp(1704067200000))
 
 # Per-partition offsets
-offsets = await admin.list_partition_offsets(table_path, partition_name="US", 
bucket_ids=[0], offset_type="latest")
+offsets = await admin.list_partition_offsets(table_path, partition_name="US", 
bucket_ids=[0], offset_spec=fluss.OffsetSpec.latest())
 ```
 
 ## Lake Snapshot
diff --git a/website/docs/user-guide/python/example/configuration.md 
b/website/docs/user-guide/python/example/configuration.md
index c4ef4f3..9686fc6 100644
--- a/website/docs/user-guide/python/example/configuration.md
+++ b/website/docs/user-guide/python/example/configuration.md
@@ -24,10 +24,10 @@ with await fluss.FlussConnection.create(config) as conn:
 | Key                 | Description                                           
| Default            |
 
|---------------------|-------------------------------------------------------|--------------------|
 | `bootstrap.servers` | Coordinator server address                            
| `127.0.0.1:9123`   |
-| `request.max.size`  | Maximum request size in bytes                         
| `10485760` (10 MB) |
+| `writer.request-max-size`  | Maximum request size in bytes                  
| `10485760` (10 MB) |
 | `writer.acks`       | Acknowledgment setting (`all` waits for all replicas) 
| `all`              |
 | `writer.retries`    | Number of retries on failure                          
| `2147483647`       |
-| `writer.batch.size` | Batch size for writes in bytes                        
| `2097152` (2 MB)   |
+| `writer.batch-size` | Batch size for writes in bytes                        
| `2097152` (2 MB)   |
 
 Remember to close the connection when done:
 
diff --git a/website/docs/user-guide/python/example/log-tables.md 
b/website/docs/user-guide/python/example/log-tables.md
index 63903a4..6e44e06 100644
--- a/website/docs/user-guide/python/example/log-tables.md
+++ b/website/docs/user-guide/python/example/log-tables.md
@@ -106,7 +106,7 @@ To only consume new records (skip existing data), first 
resolve the current late
 
 ```python
 admin = await conn.get_admin()
-offsets = await admin.list_offsets(table_path, [0], fluss.OffsetType.LATEST)
+offsets = await admin.list_offsets(table_path, [0], fluss.OffsetSpec.latest())
 latest = offsets[0]
 
 scanner = await table.new_scan().create_record_batch_log_scanner()


Reply via email to