fresh-borzoni commented on code in PR #229:
URL: https://github.com/apache/fluss-rust/pull/229#discussion_r2749163615
##########
crates/fluss/src/client/metadata.rs:
##########
@@ -142,6 +142,27 @@ impl Metadata {
.await
}
+ pub async fn update_physical_table_metadata(
+ &self,
+ physical_table_paths: &[Arc<PhysicalTablePath>],
+ ) -> Result<()> {
+ let mut update_table_paths = HashSet::new();
+ let mut update_partition_paths = HashSet::new();
+ for physical_table_path in physical_table_paths {
+ match physical_table_path.get_partition_name() {
+ None => {
+ update_partition_paths.insert(physical_table_path);
+ }
+ Some(_) => {
+
update_table_paths.insert(physical_table_path.get_table_path());
Review Comment:
Probably arms are inverted, as this would strip partition info.
##########
crates/fluss/src/client/admin.rs:
##########
@@ -294,23 +294,69 @@ impl FlussAdmin {
buckets_id: &[BucketId],
offset_spec: OffsetSpec,
) -> Result<HashMap<i32, i64>> {
- self.metadata
- .check_and_update_table_metadata(from_ref(table_path))
- .await?;
+ self.do_list_offsets(table_path, None, buckets_id, offset_spec)
+ .await
+ }
+
+ /// List offset for the specified buckets in a partition. This operation
enables to find
+ /// the beginning offset, end offset as well as the offset matching a
timestamp in buckets.
+ pub async fn list_partition_offsets(
+ &self,
+ table_path: &TablePath,
+ partition_name: &str,
+ buckets_id: &[BucketId],
+ offset_spec: OffsetSpec,
+ ) -> Result<HashMap<i32, i64>> {
+ self.do_list_offsets(table_path, Some(partition_name), buckets_id,
offset_spec)
+ .await
+ }
+ async fn do_list_offsets(
+ &self,
+ table_path: &TablePath,
+ partition_name: Option<&str>,
+ buckets_id: &[BucketId],
+ offset_spec: OffsetSpec,
+ ) -> Result<HashMap<i32, i64>> {
if buckets_id.is_empty() {
- return Err(Error::UnexpectedError {
+ return Err(Error::IllegalArgument {
message: "Buckets are empty.".to_string(),
- source: None,
});
}
+ // force to update table metadata like java side
+ self.metadata.update_table_metadata(table_path).await?;
+
let cluster = self.metadata.get_cluster();
let table_id = cluster.get_table(table_path)?.table_id;
+ // Resolve partition_id from partition_name if provided
+ let partition_id = if let Some(name) = partition_name {
+ let physical_table_path =
Arc::new(PhysicalTablePath::of_partitioned(
+ Arc::new(table_path.clone()),
+ Some(name.to_string()),
+ ));
+
+ // Update partition metadata like java side
+ self.metadata
+ .update_physical_table_metadata(from_ref(&physical_table_path))
Review Comment:
+1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]