This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new e79251d chore: cleanup after docs review (#313)
e79251d is described below
commit e79251d79b4950fc7cf36f0d16bc3e78c12eaa8d
Author: Anton Borisov <[email protected]>
AuthorDate: Mon Feb 16 00:26:31 2026 +0000
chore: cleanup after docs review (#313)
---
bindings/cpp/examples/example.cpp | 15 +++--
bindings/cpp/include/fluss.hpp | 22 +++---
bindings/cpp/src/admin.cpp | 45 +++++++++++--
bindings/cpp/src/lib.rs | 78 +++++++++++++++-------
bindings/python/example/example.py | 18 +++--
bindings/python/fluss/__init__.pyi | 46 +++++++++----
bindings/python/src/admin.rs | 54 +++++----------
bindings/python/src/lib.rs | 52 ++++++++++++---
website/docs/user-guide/cpp/api-reference.md | 14 ++--
website/docs/user-guide/cpp/data-types.md | 2 +-
.../user-guide/cpp/example/admin-operations.md | 8 +--
.../user-guide/cpp/example/partitioned-tables.md | 2 +-
website/docs/user-guide/python/api-reference.md | 17 +++--
.../user-guide/python/example/admin-operations.md | 6 +-
.../user-guide/python/example/configuration.md | 4 +-
.../docs/user-guide/python/example/log-tables.md | 2 +-
16 files changed, 248 insertions(+), 137 deletions(-)
diff --git a/bindings/cpp/examples/example.cpp
b/bindings/cpp/examples/example.cpp
index 2c7f554..2b7f331 100644
--- a/bindings/cpp/examples/example.cpp
+++ b/bindings/cpp/examples/example.cpp
@@ -286,7 +286,7 @@ int main() {
std::unordered_map<int32_t, int64_t> earliest_offsets;
check("list_earliest_offsets",
- admin.ListOffsets(table_path, all_bucket_ids,
fluss::OffsetQuery::Earliest(),
+ admin.ListOffsets(table_path, all_bucket_ids,
fluss::OffsetSpec::Earliest(),
earliest_offsets));
std::cout << "Earliest offsets:" << std::endl;
for (const auto& [bucket_id, offset] : earliest_offsets) {
@@ -295,7 +295,7 @@ int main() {
std::unordered_map<int32_t, int64_t> latest_offsets;
check("list_latest_offsets", admin.ListOffsets(table_path, all_bucket_ids,
-
fluss::OffsetQuery::Latest(), latest_offsets));
+
fluss::OffsetSpec::Latest(), latest_offsets));
std::cout << "Latest offsets:" << std::endl;
for (const auto& [bucket_id, offset] : latest_offsets) {
std::cout << " Bucket " << bucket_id << ": offset=" << offset <<
std::endl;
@@ -310,7 +310,7 @@ int main() {
std::unordered_map<int32_t, int64_t> timestamp_offsets;
check("list_timestamp_offsets",
admin.ListOffsets(table_path, all_bucket_ids,
- fluss::OffsetQuery::FromTimestamp(timestamp_ms),
timestamp_offsets));
+ fluss::OffsetSpec::Timestamp(timestamp_ms),
timestamp_offsets));
std::cout << "Offsets for timestamp " << timestamp_ms << " (1 hour ago):"
<< std::endl;
for (const auto& [bucket_id, offset] : timestamp_offsets) {
std::cout << " Bucket " << bucket_id << ": offset=" << offset <<
std::endl;
@@ -507,7 +507,7 @@ int main() {
admin.CreatePartition(partitioned_table_path, {{"region", "EU"}},
true));
std::cout << "Created partitions: US, EU" << std::endl;
- // List partitions
+ // List all partitions
std::vector<fluss::PartitionInfo> partition_infos;
check("list_partition_infos",
admin.ListPartitionInfos(partitioned_table_path, partition_infos));
@@ -516,6 +516,13 @@ int main() {
<< std::endl;
}
+ // List partitions with partial spec filter
+ std::vector<fluss::PartitionInfo> us_partition_infos;
+ check("list_partition_infos_with_spec",
+ admin.ListPartitionInfos(partitioned_table_path, {{"region", "US"}},
us_partition_infos));
+ std::cout << " Filtered (region=US): " << us_partition_infos.size() << "
partition(s)"
+ << std::endl;
+
// Write data to partitioned table
fluss::Table partitioned_table;
check("get_partitioned_table", conn.GetTable(partitioned_table_path,
partitioned_table));
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index dd29882..30a8636 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -305,19 +305,19 @@ enum class DatumType {
constexpr int64_t EARLIEST_OFFSET = -2;
-enum class OffsetSpec {
+enum class OffsetType {
Earliest = 0,
Latest = 1,
Timestamp = 2,
};
-struct OffsetQuery {
- OffsetSpec spec;
+struct OffsetSpec {
+ OffsetType type;
int64_t timestamp{0};
- static OffsetQuery Earliest() { return {OffsetSpec::Earliest, 0}; }
- static OffsetQuery Latest() { return {OffsetSpec::Latest, 0}; }
- static OffsetQuery FromTimestamp(int64_t ts) { return
{OffsetSpec::Timestamp, ts}; }
+ static OffsetSpec Earliest() { return {OffsetType::Earliest, 0}; }
+ static OffsetSpec Latest() { return {OffsetType::Latest, 0}; }
+ static OffsetSpec Timestamp(int64_t ts) { return {OffsetType::Timestamp,
ts}; }
};
struct Result {
@@ -1000,15 +1000,19 @@ class Admin {
Result GetLatestLakeSnapshot(const TablePath& table_path, LakeSnapshot&
out);
Result ListOffsets(const TablePath& table_path, const
std::vector<int32_t>& bucket_ids,
- const OffsetQuery& offset_query,
std::unordered_map<int32_t, int64_t>& out);
+ const OffsetSpec& offset_spec,
std::unordered_map<int32_t, int64_t>& out);
Result ListPartitionOffsets(const TablePath& table_path, const
std::string& partition_name,
const std::vector<int32_t>& bucket_ids,
- const OffsetQuery& offset_query,
+ const OffsetSpec& offset_spec,
std::unordered_map<int32_t, int64_t>& out);
Result ListPartitionInfos(const TablePath& table_path,
std::vector<PartitionInfo>& out);
+ Result ListPartitionInfos(const TablePath& table_path,
+ const std::unordered_map<std::string,
std::string>& partition_spec,
+ std::vector<PartitionInfo>& out);
+
Result CreatePartition(const TablePath& table_path,
const std::unordered_map<std::string, std::string>&
partition_spec,
bool ignore_if_exists = false);
@@ -1035,7 +1039,7 @@ class Admin {
private:
Result DoListOffsets(const TablePath& table_path, const
std::vector<int32_t>& bucket_ids,
- const OffsetQuery& offset_query,
std::unordered_map<int32_t, int64_t>& out,
+ const OffsetSpec& offset_spec,
std::unordered_map<int32_t, int64_t>& out,
const std::string* partition_name = nullptr);
friend class Connection;
diff --git a/bindings/cpp/src/admin.cpp b/bindings/cpp/src/admin.cpp
index 0fb15b2..8deb182 100644
--- a/bindings/cpp/src/admin.cpp
+++ b/bindings/cpp/src/admin.cpp
@@ -107,7 +107,7 @@ Result Admin::GetLatestLakeSnapshot(const TablePath&
table_path, LakeSnapshot& o
// function for common list offsets functionality
Result Admin::DoListOffsets(const TablePath& table_path, const
std::vector<int32_t>& bucket_ids,
- const OffsetQuery& offset_query,
+ const OffsetSpec& offset_spec,
std::unordered_map<int32_t, int64_t>& out,
const std::string* partition_name) {
if (!Available()) {
@@ -122,8 +122,8 @@ Result Admin::DoListOffsets(const TablePath& table_path,
const std::vector<int32
}
ffi::FfiOffsetQuery ffi_query;
- ffi_query.offset_type = static_cast<int32_t>(offset_query.spec);
- ffi_query.timestamp = offset_query.timestamp;
+ ffi_query.offset_type = static_cast<int32_t>(offset_spec.type);
+ ffi_query.timestamp = offset_spec.timestamp;
ffi::FfiListOffsetsResult ffi_result;
if (partition_name != nullptr) {
@@ -145,16 +145,16 @@ Result Admin::DoListOffsets(const TablePath& table_path,
const std::vector<int32
}
Result Admin::ListOffsets(const TablePath& table_path, const
std::vector<int32_t>& bucket_ids,
- const OffsetQuery& offset_query,
+ const OffsetSpec& offset_spec,
std::unordered_map<int32_t, int64_t>& out) {
- return DoListOffsets(table_path, bucket_ids, offset_query, out);
+ return DoListOffsets(table_path, bucket_ids, offset_spec, out);
}
Result Admin::ListPartitionOffsets(const TablePath& table_path, const
std::string& partition_name,
const std::vector<int32_t>& bucket_ids,
- const OffsetQuery& offset_query,
+ const OffsetSpec& offset_spec,
std::unordered_map<int32_t, int64_t>& out) {
- return DoListOffsets(table_path, bucket_ids, offset_query, out,
&partition_name);
+ return DoListOffsets(table_path, bucket_ids, offset_spec, out,
&partition_name);
}
Result Admin::ListPartitionInfos(const TablePath& table_path,
std::vector<PartitionInfo>& out) {
@@ -177,6 +177,37 @@ Result Admin::ListPartitionInfos(const TablePath&
table_path, std::vector<Partit
return result;
}
+Result Admin::ListPartitionInfos(const TablePath& table_path,
+ const std::unordered_map<std::string,
std::string>& partition_spec,
+ std::vector<PartitionInfo>& out) {
+ if (!Available()) {
+ return utils::make_client_error("Admin not available");
+ }
+
+ auto ffi_path = utils::to_ffi_table_path(table_path);
+
+ rust::Vec<ffi::FfiPartitionKeyValue> rust_spec;
+ for (const auto& [key, value] : partition_spec) {
+ ffi::FfiPartitionKeyValue kv;
+ kv.key = rust::String(key);
+ kv.value = rust::String(value);
+ rust_spec.push_back(std::move(kv));
+ }
+
+ auto ffi_result = admin_->list_partition_infos_with_spec(ffi_path,
std::move(rust_spec));
+
+ auto result = utils::from_ffi_result(ffi_result.result);
+ if (result.Ok()) {
+ out.clear();
+ out.reserve(ffi_result.partition_infos.size());
+ for (const auto& pi : ffi_result.partition_infos) {
+ out.push_back({pi.partition_id, std::string(pi.partition_name)});
+ }
+ }
+
+ return result;
+}
+
Result Admin::CreatePartition(const TablePath& table_path,
const std::unordered_map<std::string,
std::string>& partition_spec,
bool ignore_if_exists) {
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 8a5bdfd..fab8edf 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -302,6 +302,11 @@ mod ffi {
self: &Admin,
table_path: &FfiTablePath,
) -> FfiListPartitionInfosResult;
+ fn list_partition_infos_with_spec(
+ self: &Admin,
+ table_path: &FfiTablePath,
+ partition_spec: Vec<FfiPartitionKeyValue>,
+ ) -> FfiListPartitionInfosResult;
fn create_partition(
self: &Admin,
table_path: &FfiTablePath,
@@ -735,30 +740,20 @@ impl Admin {
&self,
table_path: &ffi::FfiTablePath,
) -> ffi::FfiListPartitionInfosResult {
- let path = fcore::metadata::TablePath::new(
- table_path.database_name.clone(),
- table_path.table_name.clone(),
- );
- let result = RUNTIME.block_on(async {
self.inner.list_partition_infos(&path).await });
- match result {
- Ok(infos) => {
- let partition_infos: Vec<ffi::FfiPartitionInfo> = infos
- .into_iter()
- .map(|info| ffi::FfiPartitionInfo {
- partition_id: info.get_partition_id(),
- partition_name: info.get_partition_name(),
- })
- .collect();
- ffi::FfiListPartitionInfosResult {
- result: ok_result(),
- partition_infos,
- }
- }
- Err(e) => ffi::FfiListPartitionInfosResult {
- result: err_from_core_error(&e),
- partition_infos: vec![],
- },
- }
+ self.do_list_partition_infos(table_path, None)
+ }
+
+ fn list_partition_infos_with_spec(
+ &self,
+ table_path: &ffi::FfiTablePath,
+ partition_spec: Vec<ffi::FfiPartitionKeyValue>,
+ ) -> ffi::FfiListPartitionInfosResult {
+ let spec_map: std::collections::HashMap<String, String> =
partition_spec
+ .into_iter()
+ .map(|kv| (kv.key, kv.value))
+ .collect();
+ let spec = fcore::metadata::PartitionSpec::new(spec_map);
+ self.do_list_partition_infos(table_path, Some(&spec))
}
fn create_partition(
&self,
@@ -939,6 +934,41 @@ impl Admin {
},
}
}
+
+ fn do_list_partition_infos(
+ &self,
+ table_path: &ffi::FfiTablePath,
+ partial_partition_spec: Option<&fcore::metadata::PartitionSpec>,
+ ) -> ffi::FfiListPartitionInfosResult {
+ let path = fcore::metadata::TablePath::new(
+ table_path.database_name.clone(),
+ table_path.table_name.clone(),
+ );
+ let result = RUNTIME.block_on(async {
+ self.inner
+ .list_partition_infos_with_spec(&path, partial_partition_spec)
+ .await
+ });
+ match result {
+ Ok(infos) => {
+ let partition_infos: Vec<ffi::FfiPartitionInfo> = infos
+ .into_iter()
+ .map(|info| ffi::FfiPartitionInfo {
+ partition_id: info.get_partition_id(),
+ partition_name: info.get_partition_name(),
+ })
+ .collect();
+ ffi::FfiListPartitionInfosResult {
+ result: ok_result(),
+ partition_infos,
+ }
+ }
+ Err(e) => ffi::FfiListPartitionInfosResult {
+ result: err_from_core_error(&e),
+ partition_infos: vec![],
+ },
+ }
+ }
}
// Table implementation
diff --git a/bindings/python/example/example.py
b/bindings/python/example/example.py
index 4ea3bd6..9c2b7e3 100644
--- a/bindings/python/example/example.py
+++ b/bindings/python/example/example.py
@@ -90,11 +90,11 @@ async def main():
# Demo: List offsets
print("\n--- Testing list_offsets() ---")
try:
- # Query latest offsets using OffsetType constant (recommended for type
safety)
+ # Query latest offsets using OffsetSpec factory method
offsets = await admin.list_offsets(
table_path,
bucket_ids=[0],
- offset_type=fluss.OffsetType.LATEST
+ offset_spec=fluss.OffsetSpec.latest()
)
print(f"Latest offsets for table (before writes): {offsets}")
except Exception as e:
@@ -248,11 +248,10 @@ async def main():
# Demo: Check offsets after writes
print("\n--- Checking offsets after writes ---")
try:
- # Query with string constant (alternative API - both strings and
constants are supported)
offsets = await admin.list_offsets(
table_path,
bucket_ids=[0],
- offset_type="latest" # Can also use "earliest" or "timestamp"
+ offset_spec=fluss.OffsetSpec.latest()
)
print(f"Latest offsets after writing 7 records: {offsets}")
except Exception as e:
@@ -734,6 +733,13 @@ async def main():
await partitioned_writer.flush()
print("\nWrote 4 records (2 to US, 2 to EU)")
+ # Demo: list_partition_infos with partial spec filter
+ print("\n--- Testing list_partition_infos with spec ---")
+ us_partitions = await admin.list_partition_infos(
+ partitioned_table_path, partition_spec={"region": "US"}
+ )
+ print(f"Filtered partitions (region=US): {us_partitions}")
+
# Demo: list_partition_offsets
print("\n--- Testing list_partition_offsets ---")
@@ -743,7 +749,7 @@ async def main():
partitioned_table_path,
partition_name="US",
bucket_ids=[0],
- offset_type="latest"
+ offset_spec=fluss.OffsetSpec.latest()
)
print(f"US partition latest offsets: {us_offsets}")
@@ -752,7 +758,7 @@ async def main():
partitioned_table_path,
partition_name="EU",
bucket_ids=[0],
- offset_type="latest"
+ offset_spec=fluss.OffsetSpec.latest()
)
print(f"EU partition latest offsets: {eu_offsets}")
diff --git a/bindings/python/fluss/__init__.pyi
b/bindings/python/fluss/__init__.pyi
index daccca8..47eeb80 100644
--- a/bindings/python/fluss/__init__.pyi
+++ b/bindings/python/fluss/__init__.pyi
@@ -193,16 +193,15 @@ class FlussAdmin:
self,
table_path: TablePath,
bucket_ids: List[int],
- offset_type: str,
- timestamp: Optional[int] = None,
+ offset_spec: "OffsetSpec",
) -> Dict[int, int]:
"""List offsets for the specified buckets.
Args:
table_path: Path to the table
bucket_ids: List of bucket IDs to query
- offset_type: "earliest", "latest", or "timestamp"
- timestamp: Required when offset_type is "timestamp"
+ offset_spec: Offset specification (OffsetSpec.earliest(),
OffsetSpec.latest(),
+ or OffsetSpec.timestamp(ts))
Returns:
Dict mapping bucket_id -> offset
@@ -213,8 +212,7 @@ class FlussAdmin:
table_path: TablePath,
partition_name: str,
bucket_ids: List[int],
- offset_type: str,
- timestamp: Optional[int] = None,
+ offset_spec: "OffsetSpec",
) -> Dict[int, int]:
"""List offsets for buckets in a specific partition.
@@ -222,8 +220,8 @@ class FlussAdmin:
table_path: Path to the table
partition_name: Partition value (e.g., "US" not "region=US")
bucket_ids: List of bucket IDs to query
- offset_type: "earliest", "latest", or "timestamp"
- timestamp: Required when offset_type is "timestamp"
+ offset_spec: Offset specification (OffsetSpec.earliest(),
OffsetSpec.latest(),
+ or OffsetSpec.timestamp(ts))
Returns:
Dict mapping bucket_id -> offset
@@ -246,11 +244,15 @@ class FlussAdmin:
async def list_partition_infos(
self,
table_path: TablePath,
+ partition_spec: Optional[Dict[str, str]] = None,
) -> List["PartitionInfo"]:
- """List all partitions for a partitioned table.
+ """List partitions for a partitioned table.
Args:
table_path: Path to the table
+ partition_spec: Optional partial partition spec to filter results.
+ Dict mapping partition column name to value (e.g., {"region":
"US"}).
+ If None, returns all partitions.
Returns:
List of PartitionInfo objects
@@ -839,12 +841,28 @@ class ErrorCode:
INVALID_ALTER_TABLE_EXCEPTION: int
DELETION_DISABLED_EXCEPTION: int
-class OffsetType:
- """Offset type constants for list_offsets()."""
+class OffsetSpec:
+ """Offset specification for list_offsets(), matching Java's OffsetSpec.
- EARLIEST: str
- LATEST: str
- TIMESTAMP: str
+ Use factory methods to create instances:
+ OffsetSpec.earliest()
+ OffsetSpec.latest()
+ OffsetSpec.timestamp(ts)
+ """
+
+ @staticmethod
+ def earliest() -> "OffsetSpec":
+ """Create an OffsetSpec for the earliest available offset."""
+ ...
+ @staticmethod
+ def latest() -> "OffsetSpec":
+ """Create an OffsetSpec for the latest available offset."""
+ ...
+ @staticmethod
+ def timestamp(ts: int) -> "OffsetSpec":
+ """Create an OffsetSpec for the offset at or after the given
timestamp."""
+ ...
+ def __repr__(self) -> str: ...
# Constant for earliest offset (-2)
EARLIEST_OFFSET: int
diff --git a/bindings/python/src/admin.rs b/bindings/python/src/admin.rs
index d03ce7a..9a96bea 100644
--- a/bindings/python/src/admin.rs
+++ b/bindings/python/src/admin.rs
@@ -16,7 +16,6 @@
// under the License.
use crate::*;
-use fcore::rpc::message::OffsetSpec;
use pyo3::conversion::IntoPyObject;
use pyo3_async_runtimes::tokio::future_into_py;
use std::sync::Arc;
@@ -27,23 +26,6 @@ pub struct FlussAdmin {
__admin: Arc<fcore::client::FlussAdmin>,
}
-/// Parse offset_type string into OffsetSpec
-fn parse_offset_spec(offset_type: &str, timestamp: Option<i64>) ->
PyResult<OffsetSpec> {
- match offset_type {
- s if s.eq_ignore_ascii_case("earliest") => Ok(OffsetSpec::Earliest),
- s if s.eq_ignore_ascii_case("latest") => Ok(OffsetSpec::Latest),
- s if s.eq_ignore_ascii_case("timestamp") => {
- let ts = timestamp.ok_or_else(|| {
- FlussError::new_err("timestamp must be provided when
offset_type='timestamp'")
- })?;
- Ok(OffsetSpec::Timestamp(ts))
- }
- _ => Err(FlussError::new_err(format!(
- "Invalid offset_type: '{offset_type}'. Must be 'earliest',
'latest', or 'timestamp'"
- ))),
- }
-}
-
/// Validate bucket IDs are non-negative
fn validate_bucket_ids(bucket_ids: &[i32]) -> PyResult<()> {
for &bucket_id in bucket_ids {
@@ -374,25 +356,20 @@ impl FlussAdmin {
/// Args:
/// table_path: Path to the table
/// bucket_ids: List of bucket IDs to query
- /// offset_type: Type of offset to retrieve:
- /// - "earliest" or OffsetType.EARLIEST: Start of the log
- /// - "latest" or OffsetType.LATEST: End of the log
- /// - "timestamp" or OffsetType.TIMESTAMP: Offset at given
timestamp (requires timestamp arg)
- /// timestamp: Required when offset_type is "timestamp", ignored
otherwise
+ /// offset_spec: Offset specification (OffsetSpec.earliest(),
OffsetSpec.latest(),
+ /// or OffsetSpec.timestamp(ts))
///
/// Returns:
/// dict[int, int]: Mapping of bucket_id -> offset
- #[pyo3(signature = (table_path, bucket_ids, offset_type, timestamp=None))]
pub fn list_offsets<'py>(
&self,
py: Python<'py>,
table_path: &TablePath,
bucket_ids: Vec<i32>,
- offset_type: &str,
- timestamp: Option<i64>,
+ offset_spec: &OffsetSpec,
) -> PyResult<Bound<'py, PyAny>> {
validate_bucket_ids(&bucket_ids)?;
- let offset_spec = parse_offset_spec(offset_type, timestamp)?;
+ let offset_spec = offset_spec.inner.clone();
let core_table_path = table_path.to_core();
let admin = self.__admin.clone();
@@ -419,26 +396,21 @@ impl FlussAdmin {
/// table_path: Path to the table
/// partition_name: Partition value (e.g., "US" not "region=US")
/// bucket_ids: List of bucket IDs to query
- /// offset_type: Type of offset to retrieve:
- /// - "earliest" or OffsetType.EARLIEST: Start of the log
- /// - "latest" or OffsetType.LATEST: End of the log
- /// - "timestamp" or OffsetType.TIMESTAMP: Offset at given
timestamp (requires timestamp arg)
- /// timestamp: Required when offset_type is "timestamp", ignored
otherwise
+ /// offset_spec: Offset specification (OffsetSpec.earliest(),
OffsetSpec.latest(),
+ /// or OffsetSpec.timestamp(ts))
///
/// Returns:
/// dict[int, int]: Mapping of bucket_id -> offset
- #[pyo3(signature = (table_path, partition_name, bucket_ids, offset_type,
timestamp=None))]
pub fn list_partition_offsets<'py>(
&self,
py: Python<'py>,
table_path: &TablePath,
partition_name: &str,
bucket_ids: Vec<i32>,
- offset_type: &str,
- timestamp: Option<i64>,
+ offset_spec: &OffsetSpec,
) -> PyResult<Bound<'py, PyAny>> {
validate_bucket_ids(&bucket_ids)?;
- let offset_spec = parse_offset_spec(offset_type, timestamp)?;
+ let offset_spec = offset_spec.inner.clone();
let core_table_path = table_path.to_core();
let admin = self.__admin.clone();
@@ -493,24 +465,30 @@ impl FlussAdmin {
})
}
- /// List all partitions for a partitioned table.
+ /// List partitions for a partitioned table.
///
/// Args:
/// table_path: Path to the table
+ /// partition_spec: Optional partial partition spec to filter results.
+ /// Dict mapping partition column name to value (e.g., {"region":
"US"}).
+ /// If None, returns all partitions.
///
/// Returns:
/// List[PartitionInfo]: List of partition info objects
+ #[pyo3(signature = (table_path, partition_spec=None))]
pub fn list_partition_infos<'py>(
&self,
py: Python<'py>,
table_path: &TablePath,
+ partition_spec: Option<std::collections::HashMap<String, String>>,
) -> PyResult<Bound<'py, PyAny>> {
let core_table_path = table_path.to_core();
let admin = self.__admin.clone();
+ let core_partition_spec =
partition_spec.map(fcore::metadata::PartitionSpec::new);
future_into_py(py, async move {
let partition_infos = admin
- .list_partition_infos(&core_table_path)
+ .list_partition_infos_with_spec(&core_table_path,
core_partition_spec.as_ref())
.await
.map_err(|e| FlussError::new_err(format!("Failed to list
partitions: {e}")))?;
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs
index 801db2c..553c8a9 100644
--- a/bindings/python/src/lib.rs
+++ b/bindings/python/src/lib.rs
@@ -50,21 +50,53 @@ static TOKIO_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
.expect("Failed to create Tokio runtime")
});
-/// Offset type constants for list_offsets()
+/// Offset specification for list_offsets(), matching Java's OffsetSpec.
+///
+/// Use factory methods to create instances:
+/// OffsetSpec.earliest()
+/// OffsetSpec.latest()
+/// OffsetSpec.timestamp(ts)
#[pyclass]
#[derive(Clone)]
-pub struct OffsetType;
+pub struct OffsetSpec {
+ pub(crate) inner: fcore::rpc::message::OffsetSpec,
+}
#[pymethods]
-impl OffsetType {
- #[classattr]
- const EARLIEST: &'static str = "earliest";
+impl OffsetSpec {
+ /// Create an OffsetSpec for the earliest available offset.
+ #[staticmethod]
+ fn earliest() -> Self {
+ Self {
+ inner: fcore::rpc::message::OffsetSpec::Earliest,
+ }
+ }
+
+ /// Create an OffsetSpec for the latest available offset.
+ #[staticmethod]
+ fn latest() -> Self {
+ Self {
+ inner: fcore::rpc::message::OffsetSpec::Latest,
+ }
+ }
- #[classattr]
- const LATEST: &'static str = "latest";
+ /// Create an OffsetSpec for the offset at or after the given timestamp.
+ #[staticmethod]
+ fn timestamp(ts: i64) -> Self {
+ Self {
+ inner: fcore::rpc::message::OffsetSpec::Timestamp(ts),
+ }
+ }
- #[classattr]
- const TIMESTAMP: &'static str = "timestamp";
+ fn __repr__(&self) -> String {
+ match &self.inner {
+ fcore::rpc::message::OffsetSpec::Earliest =>
"OffsetSpec.earliest()".to_string(),
+ fcore::rpc::message::OffsetSpec::Latest =>
"OffsetSpec.latest()".to_string(),
+ fcore::rpc::message::OffsetSpec::Timestamp(ts) => {
+ format!("OffsetSpec.timestamp({ts})")
+ }
+ }
+ }
}
#[pymodule]
@@ -92,7 +124,7 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<ScanRecord>()?;
m.add_class::<RecordBatch>()?;
m.add_class::<PartitionInfo>()?;
- m.add_class::<OffsetType>()?;
+ m.add_class::<OffsetSpec>()?;
m.add_class::<WriteResultHandle>()?;
m.add_class::<DatabaseDescriptor>()?;
m.add_class::<DatabaseInfo>()?;
diff --git a/website/docs/user-guide/cpp/api-reference.md
b/website/docs/user-guide/cpp/api-reference.md
index 00ff808..47c9307 100644
--- a/website/docs/user-guide/cpp/api-reference.md
+++ b/website/docs/user-guide/cpp/api-reference.md
@@ -68,8 +68,8 @@ Complete API reference for the Fluss C++ client.
| Method
| Description
|
|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------|
-| `ListOffsets(const TablePath& path, const std::vector<int32_t>& bucket_ids,
const OffsetQuery& query, std::unordered_map<int32_t, int64_t>& out) -> Result`
| Get offsets for buckets
|
-| `ListPartitionOffsets(const TablePath& path, const std::string&
partition_name, const std::vector<int32_t>& bucket_ids, const OffsetQuery&
query, std::unordered_map<int32_t, int64_t>& out) -> Result` | Get offsets for
a partition's buckets |
+| `ListOffsets(const TablePath& path, const std::vector<int32_t>& bucket_ids,
const OffsetSpec& query, std::unordered_map<int32_t, int64_t>& out) -> Result`
| Get offsets for buckets
|
+| `ListPartitionOffsets(const TablePath& path, const std::string&
partition_name, const std::vector<int32_t>& bucket_ids, const OffsetSpec&
query, std::unordered_map<int32_t, int64_t>& out) -> Result` | Get offsets for
a partition's buckets |
### Lake Operations
@@ -423,13 +423,13 @@ When using `table.NewRow()`, the `Set()` method
auto-routes to the correct type
| `bucket_id` | `int32_t` | Bucket ID |
| `offset` | `int64_t` | Offset value |
-## `OffsetQuery`
+## `OffsetSpec`
| Method | Description
|
|----------------------------------------------------|-----------------------------------------|
-| `OffsetQuery::Earliest()` | Query for the earliest
available offset |
-| `OffsetQuery::Latest()` | Query for the latest
offset |
-| `OffsetQuery::FromTimestamp(int64_t timestamp_ms)` | Query offset at a
specific timestamp |
+| `OffsetSpec::Earliest()` | Query for the earliest
available offset |
+| `OffsetSpec::Latest()` | Query for the latest
offset |
+| `OffsetSpec::Timestamp(int64_t timestamp_ms)` | Query offset at a
specific timestamp |
## Constants
@@ -441,7 +441,7 @@ To start reading from the latest offset (only new records),
resolve the current
```cpp
std::unordered_map<int32_t, int64_t> offsets;
-admin.ListOffsets(table_path, {0}, fluss::OffsetQuery::Latest(), offsets);
+admin.ListOffsets(table_path, {0}, fluss::OffsetSpec::Latest(), offsets);
scanner.Subscribe(0, offsets[0]);
```
diff --git a/website/docs/user-guide/cpp/data-types.md
b/website/docs/user-guide/cpp/data-types.md
index 11712fa..fb01ac2 100644
--- a/website/docs/user-guide/cpp/data-types.md
+++ b/website/docs/user-guide/cpp/data-types.md
@@ -105,6 +105,6 @@ To start reading from the latest offset, resolve the
current offset via `ListOff
```cpp
std::unordered_map<int32_t, int64_t> offsets;
-admin.ListOffsets(table_path, {0}, fluss::OffsetQuery::Latest(), offsets);
+admin.ListOffsets(table_path, {0}, fluss::OffsetSpec::Latest(), offsets);
scanner.Subscribe(0, offsets[0]);
```
diff --git a/website/docs/user-guide/cpp/example/admin-operations.md
b/website/docs/user-guide/cpp/example/admin-operations.md
index 1a33012..850660e 100644
--- a/website/docs/user-guide/cpp/example/admin-operations.md
+++ b/website/docs/user-guide/cpp/example/admin-operations.md
@@ -120,23 +120,23 @@ std::vector<int32_t> bucket_ids = {0, 1, 2};
// Query earliest offsets
std::unordered_map<int32_t, int64_t> earliest_offsets;
admin.ListOffsets(table_path, bucket_ids,
- fluss::OffsetQuery::Earliest(), earliest_offsets);
+ fluss::OffsetSpec::Earliest(), earliest_offsets);
// Query latest offsets
std::unordered_map<int32_t, int64_t> latest_offsets;
admin.ListOffsets(table_path, bucket_ids,
- fluss::OffsetQuery::Latest(), latest_offsets);
+ fluss::OffsetSpec::Latest(), latest_offsets);
// Query offsets for a specific timestamp
std::unordered_map<int32_t, int64_t> timestamp_offsets;
admin.ListOffsets(table_path, bucket_ids,
- fluss::OffsetQuery::FromTimestamp(timestamp_ms),
+ fluss::OffsetSpec::Timestamp(timestamp_ms),
timestamp_offsets);
// Query partition offsets
std::unordered_map<int32_t, int64_t> partition_offsets;
admin.ListPartitionOffsets(table_path, "partition_name",
- bucket_ids, fluss::OffsetQuery::Latest(),
+ bucket_ids, fluss::OffsetSpec::Latest(),
partition_offsets);
```
diff --git a/website/docs/user-guide/cpp/example/partitioned-tables.md
b/website/docs/user-guide/cpp/example/partitioned-tables.md
index 6a6927f..371ee3e 100644
--- a/website/docs/user-guide/cpp/example/partitioned-tables.md
+++ b/website/docs/user-guide/cpp/example/partitioned-tables.md
@@ -103,7 +103,7 @@ admin.ListPartitionInfos(table_path, partition_infos);
std::vector<int32_t> bucket_ids = {0, 1, 2};
std::unordered_map<int32_t, int64_t> offsets;
admin.ListPartitionOffsets(table_path, "2024-01-15$US",
- bucket_ids, fluss::OffsetQuery::Latest(), offsets);
+ bucket_ids, fluss::OffsetSpec::Latest(), offsets);
```
## Partitioned Primary Key Tables
diff --git a/website/docs/user-guide/python/api-reference.md
b/website/docs/user-guide/python/api-reference.md
index 321e25e..af03058 100644
--- a/website/docs/user-guide/python/api-reference.md
+++ b/website/docs/user-guide/python/api-reference.md
@@ -43,8 +43,8 @@ Supports `with` statement (context manager).
| `await get_table_info(table_path) -> TableInfo`
| Get table metadata
|
| `await list_tables(database_name) -> list[str]`
| List tables in a database
|
| `await table_exists(table_path) -> bool`
| Check if a table exists
|
-| `await list_offsets(table_path, bucket_ids, offset_type, timestamp=None) ->
dict[int, int]` | Get offsets for buckets
|
-| `await list_partition_offsets(table_path, partition_name, bucket_ids,
offset_type, timestamp=None) -> dict[int, int]` | Get offsets for a partition's
buckets |
+| `await list_offsets(table_path, bucket_ids, offset_spec) -> dict[int, int]`
| Get offsets for buckets |
+| `await list_partition_offsets(table_path, partition_name, bucket_ids,
offset_spec) -> dict[int, int]` | Get offsets for a partition's buckets |
| `await create_partition(table_path, partition_spec, ignore_if_exists=False)`
| Create a partition
|
| `await drop_partition(table_path, partition_spec,
ignore_if_not_exists=False)` | Drop a
partition |
| `await list_partition_infos(table_path) -> list[PartitionInfo]`
| List partitions
|
@@ -264,14 +264,19 @@ Raised for all Fluss-specific errors (connection
failures, table not found, sche
| Constant | Value | Description
|
|------------------------------|---------------|-----------------------------------------------------|
| `fluss.EARLIEST_OFFSET` | `-2` | Start reading from earliest
available offset |
-| `fluss.OffsetType.EARLIEST` | `"earliest"` | For `list_offsets()`
|
-| `fluss.OffsetType.LATEST` | `"latest"` | For `list_offsets()`
|
-| `fluss.OffsetType.TIMESTAMP` | `"timestamp"` | For `list_offsets()` with
timestamp |
+
+## `OffsetSpec`
+
+| Method | Description
|
+|-----------------------------|--------------------------------------------------|
+| `OffsetSpec.earliest()` | Earliest available offset
|
+| `OffsetSpec.latest()` | Latest offset
|
+| `OffsetSpec.timestamp(ts)` | Offset at or after the given timestamp
(millis) |
To start reading from the latest offset (only new records), resolve the
current offset via `list_offsets` before subscribing:
```python
-offsets = await admin.list_offsets(table_path, [0], fluss.OffsetType.LATEST)
+offsets = await admin.list_offsets(table_path, [0], fluss.OffsetSpec.latest())
scanner.subscribe(bucket_id=0, start_offset=offsets[0])
```
diff --git a/website/docs/user-guide/python/example/admin-operations.md
b/website/docs/user-guide/python/example/admin-operations.md
index 8c62ee7..4561a3f 100644
--- a/website/docs/user-guide/python/example/admin-operations.md
+++ b/website/docs/user-guide/python/example/admin-operations.md
@@ -56,13 +56,13 @@ await admin.drop_table(table_path,
ignore_if_not_exists=True)
```python
# Latest offsets for buckets
-offsets = await admin.list_offsets(table_path, bucket_ids=[0, 1],
offset_type="latest")
+offsets = await admin.list_offsets(table_path, bucket_ids=[0, 1],
offset_spec=fluss.OffsetSpec.latest())
# By timestamp
-offsets = await admin.list_offsets(table_path, bucket_ids=[0],
offset_type="timestamp", timestamp=1704067200000)
+offsets = await admin.list_offsets(table_path, bucket_ids=[0],
offset_spec=fluss.OffsetSpec.timestamp(1704067200000))
# Per-partition offsets
-offsets = await admin.list_partition_offsets(table_path, partition_name="US",
bucket_ids=[0], offset_type="latest")
+offsets = await admin.list_partition_offsets(table_path, partition_name="US",
bucket_ids=[0], offset_spec=fluss.OffsetSpec.latest())
```
## Lake Snapshot
diff --git a/website/docs/user-guide/python/example/configuration.md
b/website/docs/user-guide/python/example/configuration.md
index c4ef4f3..9686fc6 100644
--- a/website/docs/user-guide/python/example/configuration.md
+++ b/website/docs/user-guide/python/example/configuration.md
@@ -24,10 +24,10 @@ with await fluss.FlussConnection.create(config) as conn:
| Key | Description
| Default |
|---------------------|-------------------------------------------------------|--------------------|
| `bootstrap.servers` | Coordinator server address
| `127.0.0.1:9123` |
-| `request.max.size` | Maximum request size in bytes
| `10485760` (10 MB) |
+| `writer.request-max-size` | Maximum request size in bytes
| `10485760` (10 MB) |
| `writer.acks` | Acknowledgment setting (`all` waits for all replicas)
| `all` |
| `writer.retries` | Number of retries on failure
| `2147483647` |
-| `writer.batch.size` | Batch size for writes in bytes
| `2097152` (2 MB) |
+| `writer.batch-size` | Batch size for writes in bytes
| `2097152` (2 MB) |
Remember to close the connection when done:
diff --git a/website/docs/user-guide/python/example/log-tables.md
b/website/docs/user-guide/python/example/log-tables.md
index 63903a4..6e44e06 100644
--- a/website/docs/user-guide/python/example/log-tables.md
+++ b/website/docs/user-guide/python/example/log-tables.md
@@ -106,7 +106,7 @@ To only consume new records (skip existing data), first
resolve the current late
```python
admin = await conn.get_admin()
-offsets = await admin.list_offsets(table_path, [0], fluss.OffsetType.LATEST)
+offsets = await admin.list_offsets(table_path, [0], fluss.OffsetSpec.latest())
latest = offsets[0]
scanner = await table.new_scan().create_record_batch_log_scanner()