This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new fdd5296 feat: Implement Admin APIs for partitioning (#208)
fdd5296 is described below
commit fdd52968d031ed5b312005686c414596c1adf2d7
Author: Keith Lee <[email protected]>
AuthorDate: Sun Jan 25 01:18:59 2026 +0000
feat: Implement Admin APIs for partitioning (#208)
---
crates/fluss/src/client/admin.rs | 76 +++-
crates/fluss/src/client/table/scanner.rs | 3 +-
crates/fluss/src/client/write/accumulator.rs | 2 +-
crates/fluss/src/client/write/sender.rs | 3 +-
crates/fluss/src/lib.rs | 4 +-
crates/fluss/src/metadata/mod.rs | 2 +
crates/fluss/src/metadata/partition.rs | 469 +++++++++++++++++++++
crates/fluss/src/metadata/table.rs | 66 ++-
crates/fluss/src/proto/fluss_api.proto | 36 +-
crates/fluss/src/rpc/api_key.rs | 12 +
crates/fluss/src/rpc/message/create_partition.rs | 59 +++
crates/fluss/src/rpc/message/drop_partition.rs | 59 +++
crates/fluss/src/rpc/message/list_offsets.rs | 10 +-
.../fluss/src/rpc/message/list_partition_infos.rs | 63 +++
crates/fluss/src/rpc/message/mod.rs | 6 +
crates/fluss/src/util/mod.rs | 3 +-
crates/fluss/tests/integration/admin.rs | 145 ++++++-
17 files changed, 983 insertions(+), 35 deletions(-)
diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs
index 6646f97..bffe0f5 100644
--- a/crates/fluss/src/client/admin.rs
+++ b/crates/fluss/src/client/admin.rs
@@ -17,20 +17,21 @@
use crate::client::metadata::Metadata;
use crate::metadata::{
- DatabaseDescriptor, DatabaseInfo, JsonSerde, LakeSnapshot, TableBucket,
TableDescriptor,
- TableInfo, TablePath,
+ DatabaseDescriptor, DatabaseInfo, JsonSerde, LakeSnapshot, PartitionInfo,
PartitionSpec,
+ TableBucket, TableDescriptor, TableInfo, TablePath,
};
use crate::rpc::message::{
- CreateDatabaseRequest, CreateTableRequest, DatabaseExistsRequest,
DropDatabaseRequest,
- DropTableRequest, GetDatabaseInfoRequest, GetLatestLakeSnapshotRequest,
GetTableRequest,
- ListDatabasesRequest, ListTablesRequest, TableExistsRequest,
+ CreateDatabaseRequest, CreatePartitionRequest, CreateTableRequest,
DatabaseExistsRequest,
+ DropDatabaseRequest, DropPartitionRequest, DropTableRequest,
GetDatabaseInfoRequest,
+ GetLatestLakeSnapshotRequest, GetTableRequest, ListDatabasesRequest,
ListPartitionInfosRequest,
+ ListTablesRequest, TableExistsRequest,
};
use crate::rpc::message::{ListOffsetsRequest, OffsetSpec};
use crate::rpc::{RpcClient, ServerConnection};
-use crate::BucketId;
use crate::error::{Error, Result};
use crate::proto::GetTableInfoResponse;
+use crate::{BucketId, PartitionId, TableId};
use std::collections::HashMap;
use std::slice::from_ref;
use std::sync::Arc;
@@ -138,6 +139,63 @@ impl FlussAdmin {
Ok(response.table_name)
}
+ /// List all partitions in the given table.
+ pub async fn list_partition_infos(&self, table_path: &TablePath) ->
Result<Vec<PartitionInfo>> {
+ self.list_partition_infos_with_spec(table_path, None).await
+ }
+
+ /// List partitions in the given table that match the partial partition
spec.
+ pub async fn list_partition_infos_with_spec(
+ &self,
+ table_path: &TablePath,
+ partial_partition_spec: Option<&PartitionSpec>,
+ ) -> Result<Vec<PartitionInfo>> {
+ let response = self
+ .admin_gateway
+ .request(ListPartitionInfosRequest::new(
+ table_path,
+ partial_partition_spec,
+ ))
+ .await?;
+ Ok(response.get_partitions_info())
+ }
+
+ /// Create a new partition for a partitioned table.
+ pub async fn create_partition(
+ &self,
+ table_path: &TablePath,
+ partition_spec: &PartitionSpec,
+ ignore_if_exists: bool,
+ ) -> Result<()> {
+ let _response = self
+ .admin_gateway
+ .request(CreatePartitionRequest::new(
+ table_path,
+ partition_spec,
+ ignore_if_exists,
+ ))
+ .await?;
+ Ok(())
+ }
+
+ /// Drop a partition from a partitioned table.
+ pub async fn drop_partition(
+ &self,
+ table_path: &TablePath,
+ partition_spec: &PartitionSpec,
+ ignore_if_not_exists: bool,
+ ) -> Result<()> {
+ let _response = self
+ .admin_gateway
+ .request(DropPartitionRequest::new(
+ table_path,
+ partition_spec,
+ ignore_if_not_exists,
+ ))
+ .await?;
+ Ok(())
+ }
+
/// Check if a table exists
pub async fn table_exists(&self, table_path: &TablePath) -> Result<bool> {
let response = self
@@ -263,13 +321,13 @@ impl FlussAdmin {
fn prepare_list_offsets_requests(
&self,
- table_id: i64,
- partition_id: Option<i64>,
+ table_id: TableId,
+ partition_id: Option<PartitionId>,
buckets: &[BucketId],
offset_spec: OffsetSpec,
) -> Result<HashMap<i32, ListOffsetsRequest>> {
let cluster = self.metadata.get_cluster();
- let mut node_for_bucket_list: HashMap<i32, Vec<i32>> = HashMap::new();
+ let mut node_for_bucket_list: HashMap<i32, Vec<BucketId>> =
HashMap::new();
for bucket_id in buckets {
let table_bucket = TableBucket::new(table_id, *bucket_id);
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index 8712650..61ed56e 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -27,6 +27,7 @@ use std::{
};
use tempfile::TempDir;
+use crate::TableId;
use crate::client::connection::FlussConnection;
use crate::client::credentials::CredentialsCache;
use crate::client::metadata::Metadata;
@@ -264,7 +265,7 @@ pub struct RecordBatchLogScanner {
/// Private shared implementation for both scanner types
struct LogScannerInner {
table_path: TablePath,
- table_id: i64,
+ table_id: TableId,
metadata: Arc<Metadata>,
log_scanner_status: Arc<LogScannerStatus>,
log_fetcher: LogFetcher,
diff --git a/crates/fluss/src/client/write/accumulator.rs
b/crates/fluss/src/client/write/accumulator.rs
index 46c822c..96114fb 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -401,7 +401,7 @@ impl RecordAccumulator {
ready_write_batch.write_batch.re_enqueued();
let table_path = ready_write_batch.write_batch.table_path().clone();
let bucket_id = ready_write_batch.table_bucket.bucket_id();
- let table_id =
u64::try_from(ready_write_batch.table_bucket.table_id()).unwrap_or(0);
+ let table_id = ready_write_batch.table_bucket.table_id();
let dq = {
let mut binding =
diff --git a/crates/fluss/src/client/write/sender.rs
b/crates/fluss/src/client/write/sender.rs
index ceed245..1ffda58 100644
--- a/crates/fluss/src/client/write/sender.rs
+++ b/crates/fluss/src/client/write/sender.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use crate::TableId;
use crate::client::broadcast;
use crate::client::metadata::Metadata;
use crate::client::write::batch::WriteBatch;
@@ -144,7 +145,7 @@ impl Sender {
return Ok(());
}
let mut records_by_bucket = HashMap::new();
- let mut write_batch_by_table: HashMap<i64, Vec<TableBucket>> =
HashMap::new();
+ let mut write_batch_by_table: HashMap<TableId, Vec<TableBucket>> =
HashMap::new();
for batch in batches {
let table_bucket = batch.table_bucket.clone();
diff --git a/crates/fluss/src/lib.rs b/crates/fluss/src/lib.rs
index e8d822f..f079db2 100644
--- a/crates/fluss/src/lib.rs
+++ b/crates/fluss/src/lib.rs
@@ -34,8 +34,8 @@ mod util;
#[cfg(test)]
mod test_utils;
-pub type TableId = u64;
-pub type PartitionId = u64;
+pub type TableId = i64;
+pub type PartitionId = i64;
pub type BucketId = i32;
pub mod proto {
diff --git a/crates/fluss/src/metadata/mod.rs b/crates/fluss/src/metadata/mod.rs
index 9c0b1b4..0ca654a 100644
--- a/crates/fluss/src/metadata/mod.rs
+++ b/crates/fluss/src/metadata/mod.rs
@@ -19,10 +19,12 @@ mod data_lake_format;
mod database;
mod datatype;
mod json_serde;
+mod partition;
mod table;
pub use data_lake_format::*;
pub use database::*;
pub use datatype::*;
pub use json_serde::*;
+pub use partition::*;
pub use table::*;
diff --git a/crates/fluss/src/metadata/partition.rs
b/crates/fluss/src/metadata/partition.rs
new file mode 100644
index 0000000..1ecc0dc
--- /dev/null
+++ b/crates/fluss/src/metadata/partition.rs
@@ -0,0 +1,469 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::PartitionId;
+use crate::error::{Error, Result};
+use crate::proto::{PbKeyValue, PbPartitionInfo, PbPartitionSpec};
+use std::collections::HashMap;
+use std::fmt::{Display, Formatter};
+
+/// Represents a partition spec in fluss. Partition columns and values are NOT
of strict order, and
+/// they need to be re-arranged to the correct order by comparing with a list
of strictly ordered
+/// partition keys.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct PartitionSpec {
+ partition_spec: HashMap<String, String>,
+}
+
+impl PartitionSpec {
+ pub fn new(partition_spec: HashMap<String, String>) -> Self {
+ Self { partition_spec }
+ }
+
+ pub fn get_spec_map(&self) -> &HashMap<String, String> {
+ &self.partition_spec
+ }
+
+ pub fn to_pb(&self) -> PbPartitionSpec {
+ PbPartitionSpec {
+ partition_key_values: self
+ .partition_spec
+ .iter()
+ .map(|(k, v)| PbKeyValue {
+ key: k.clone(),
+ value: v.clone(),
+ })
+ .collect(),
+ }
+ }
+
+ pub fn from_pb(pb: &PbPartitionSpec) -> Self {
+ let partition_spec = pb
+ .partition_key_values
+ .iter()
+ .map(|kv| (kv.key.clone(), kv.value.clone()))
+ .collect();
+ Self { partition_spec }
+ }
+}
+
+impl Display for PartitionSpec {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(f, "PartitionSpec{{{:?}}}", self.partition_spec)
+ }
+}
+
+/// Represents a partition, which is the resolved version of PartitionSpec.
The partition
+/// spec is re-arranged into the correct order by comparing it with a list of
strictly ordered
+/// partition keys.
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct ResolvedPartitionSpec {
+ partition_keys: Vec<String>,
+ partition_values: Vec<String>,
+}
+
+pub const PARTITION_SPEC_SEPARATOR: &str = "$";
+
+impl ResolvedPartitionSpec {
+ pub fn new(partition_keys: Vec<String>, partition_values: Vec<String>) ->
Result<Self> {
+ if partition_keys.len() != partition_values.len() {
+ return Err(Error::IllegalArgument {
+ message: "The number of partition keys and partition values
should be the same."
+ .to_string(),
+ });
+ }
+ Ok(Self {
+ partition_keys,
+ partition_values,
+ })
+ }
+
+ pub fn from_partition_spec(
+ partition_keys: Vec<String>,
+ partition_spec: &PartitionSpec,
+ ) -> Self {
+ let partition_values =
+ Self::get_reordered_partition_values(&partition_keys,
partition_spec);
+ Self {
+ partition_keys,
+ partition_values,
+ }
+ }
+
+ pub fn from_partition_value(partition_key: String, partition_value:
String) -> Self {
+ Self {
+ partition_keys: vec![partition_key],
+ partition_values: vec![partition_value],
+ }
+ }
+
+ pub fn from_partition_name(partition_keys: Vec<String>, partition_name:
&str) -> Self {
+ let partition_values: Vec<String> = partition_name
+ .split(PARTITION_SPEC_SEPARATOR)
+ .map(|s| s.to_string())
+ .collect();
+ Self {
+ partition_keys,
+ partition_values,
+ }
+ }
+
+ pub fn from_partition_qualified_name(qualified_partition_name: &str) ->
Result<Self> {
+ let mut keys = Vec::new();
+ let mut values = Vec::new();
+
+ for pair in qualified_partition_name.split('/') {
+ let parts: Vec<&str> = pair.splitn(2, '=').collect();
+ if parts.len() != 2 {
+ return Err(Error::IllegalArgument {
+ message: format!(
+ "Invalid partition name format. Expected key=value,
got: {}",
+ pair
+ ),
+ });
+ }
+ keys.push(parts[0].to_string());
+ values.push(parts[1].to_string());
+ }
+
+ Ok(Self {
+ partition_keys: keys,
+ partition_values: values,
+ })
+ }
+
+ pub fn get_partition_keys(&self) -> &[String] {
+ &self.partition_keys
+ }
+
+ pub fn get_partition_values(&self) -> &[String] {
+ &self.partition_values
+ }
+
+ pub fn to_partition_spec(&self) -> PartitionSpec {
+ let mut spec_map = HashMap::new();
+ for (i, key) in self.partition_keys.iter().enumerate() {
+ spec_map.insert(key.clone(), self.partition_values[i].clone());
+ }
+ PartitionSpec::new(spec_map)
+ }
+
+ /// Generate the partition name for a partition table with specified
partition values.
+ ///
+ /// The partition name is in the following format: value1$value2$...$valueN
+ pub fn get_partition_name(&self) -> String {
+ self.partition_values.join(PARTITION_SPEC_SEPARATOR)
+ }
+
+ /// Returns the qualified partition name for a partition spec.
+ /// The format is: key1=value1/key2=value2/.../keyN=valueN
+ pub fn get_partition_qualified_name(&self) -> String {
+ let mut sb = String::new();
+ for (i, key) in self.partition_keys.iter().enumerate() {
+ sb.push_str(key);
+ sb.push('=');
+ sb.push_str(&self.partition_values[i]);
+ if i != self.partition_keys.len() - 1 {
+ sb.push('/');
+ }
+ }
+ sb
+ }
+
+ pub fn contains(&self, other: &ResolvedPartitionSpec) -> Result<bool> {
+ let other_partition_keys = other.get_partition_keys();
+ let other_partition_values = other.get_partition_values();
+
+ let mut expected_partition_values = Vec::new();
+ for other_partition_key in other_partition_keys {
+ let key_index = self
+ .partition_keys
+ .iter()
+ .position(|k| k == other_partition_key);
+ match key_index {
+ Some(idx) =>
expected_partition_values.push(self.partition_values[idx].clone()),
+ None => {
+ return Err(Error::IllegalArgument {
+ message: format!(
+ "table does not contain partitionKey: {}",
+ other_partition_key
+ ),
+ });
+ }
+ }
+ }
+
+ let expected_partition_name =
expected_partition_values.join(PARTITION_SPEC_SEPARATOR);
+ let other_partition_name =
other_partition_values.join(PARTITION_SPEC_SEPARATOR);
+
+ Ok(expected_partition_name == other_partition_name)
+ }
+
+ pub fn to_pb(&self) -> PbPartitionSpec {
+ PbPartitionSpec {
+ partition_key_values: self
+ .partition_keys
+ .iter()
+ .zip(self.partition_values.iter())
+ .map(|(k, v)| PbKeyValue {
+ key: k.clone(),
+ value: v.clone(),
+ })
+ .collect(),
+ }
+ }
+
+ pub fn from_pb(pb: &PbPartitionSpec) -> Self {
+ let partition_keys = pb
+ .partition_key_values
+ .iter()
+ .map(|kv| kv.key.clone())
+ .collect();
+ let partition_values = pb
+ .partition_key_values
+ .iter()
+ .map(|kv| kv.value.clone())
+ .collect();
+ Self {
+ partition_keys,
+ partition_values,
+ }
+ }
+
+ fn get_reordered_partition_values(
+ partition_keys: &[String],
+ partition_spec: &PartitionSpec,
+ ) -> Vec<String> {
+ let partition_spec_map = partition_spec.get_spec_map();
+ partition_keys
+ .iter()
+ .map(|key|
partition_spec_map.get(key).cloned().unwrap_or_default())
+ .collect()
+ }
+}
+
+impl Display for ResolvedPartitionSpec {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{}", self.get_partition_qualified_name())
+ }
+}
+
+/// Information of a partition metadata, includes the partition's name and the
partition id that
+/// represents the unique identifier of the partition.
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct PartitionInfo {
+ partition_id: PartitionId,
+ partition_spec: ResolvedPartitionSpec,
+}
+
+impl PartitionInfo {
+ pub fn new(partition_id: PartitionId, partition_spec:
ResolvedPartitionSpec) -> Self {
+ Self {
+ partition_id,
+ partition_spec,
+ }
+ }
+
+ /// Get the partition id. The id is globally unique in the Fluss cluster.
+ pub fn get_partition_id(&self) -> PartitionId {
+ self.partition_id
+ }
+
+ /// Get the partition name.
+ pub fn get_partition_name(&self) -> String {
+ self.partition_spec.get_partition_name()
+ }
+
+ pub fn get_resolved_partition_spec(&self) -> &ResolvedPartitionSpec {
+ &self.partition_spec
+ }
+
+ pub fn get_partition_spec(&self) -> PartitionSpec {
+ self.partition_spec.to_partition_spec()
+ }
+
+ pub fn to_pb(&self) -> PbPartitionInfo {
+ PbPartitionInfo {
+ partition_id: self.partition_id,
+ partition_spec: self.partition_spec.to_pb(),
+ }
+ }
+
+ pub fn from_pb(pb: &PbPartitionInfo) -> Self {
+ Self {
+ partition_id: pb.partition_id,
+ partition_spec: ResolvedPartitionSpec::from_pb(&pb.partition_spec),
+ }
+ }
+}
+
+impl Display for PartitionInfo {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "Partition{{name='{}', id={}}}",
+ self.get_partition_name(),
+ self.partition_id
+ )
+ }
+}
+
+/// A class to identify a table partition, containing the table id and the
partition id.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub struct TablePartition {
+ table_id: i64,
+ partition_id: PartitionId,
+}
+
+impl TablePartition {
+ pub fn new(table_id: i64, partition_id: PartitionId) -> Self {
+ Self {
+ table_id,
+ partition_id,
+ }
+ }
+
+ pub fn get_table_id(&self) -> i64 {
+ self.table_id
+ }
+
+ pub fn get_partition_id(&self) -> PartitionId {
+ self.partition_id
+ }
+}
+
+impl Display for TablePartition {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "TablePartition{{tableId={}, partitionId={}}}",
+ self.table_id, self.partition_id
+ )
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_resolved_partition_spec_name() {
+ let spec = ResolvedPartitionSpec::new(
+ vec!["date".to_string(), "region".to_string()],
+ vec!["2024-01-15".to_string(), "US".to_string()],
+ )
+ .unwrap();
+
+ assert_eq!(spec.get_partition_name(), "2024-01-15$US");
+ assert_eq!(
+ spec.get_partition_qualified_name(),
+ "date=2024-01-15/region=US"
+ );
+ }
+
+ #[test]
+ fn test_resolved_partition_spec_from_partition_name() {
+ let spec = ResolvedPartitionSpec::from_partition_name(
+ vec!["date".to_string(), "region".to_string()],
+ "2024-01-15$US",
+ );
+
+ assert_eq!(spec.get_partition_values(), &["2024-01-15", "US"]);
+ }
+
+ #[test]
+ fn test_resolved_partition_spec_from_qualified_name() {
+ let spec =
+
ResolvedPartitionSpec::from_partition_qualified_name("date=2024-01-15/region=US")
+ .unwrap();
+
+ assert_eq!(spec.get_partition_keys(), &["date", "region"]);
+ assert_eq!(spec.get_partition_values(), &["2024-01-15", "US"]);
+ }
+
+ #[test]
+ fn test_resolved_partition_spec_mismatched_lengths() {
+ let result = ResolvedPartitionSpec::new(
+ vec!["date".to_string(), "region".to_string()],
+ vec!["2024-01-15".to_string()],
+ );
+
+ assert!(result.is_err());
+ }
+
+ #[test]
+ fn test_partition_info() {
+ let spec =
+ ResolvedPartitionSpec::new(vec!["date".to_string()],
vec!["2024-01-15".to_string()])
+ .unwrap();
+
+ let info = PartitionInfo::new(42, spec);
+ assert_eq!(info.get_partition_id(), 42);
+ assert_eq!(info.get_partition_name(), "2024-01-15");
+ }
+
+ #[test]
+ fn test_table_partition() {
+ let tp = TablePartition::new(100, 42);
+ assert_eq!(tp.get_table_id(), 100);
+ assert_eq!(tp.get_partition_id(), 42);
+ }
+
+ #[test]
+ fn test_partition_spec_pb_roundtrip() {
+ let mut map = HashMap::new();
+ map.insert("date".to_string(), "2024-01-15".to_string());
+ let spec = PartitionSpec::new(map);
+
+ let pb = spec.to_pb();
+ let restored = PartitionSpec::from_pb(&pb);
+
+ assert_eq!(
+ spec.get_spec_map().get("date"),
+ restored.get_spec_map().get("date")
+ );
+ }
+
+ #[test]
+ fn test_partition_info_pb_roundtrip() {
+ let spec =
+ ResolvedPartitionSpec::new(vec!["date".to_string()],
vec!["2024-01-15".to_string()])
+ .unwrap();
+ let info = PartitionInfo::new(42, spec);
+
+ let pb = info.to_pb();
+ let restored = PartitionInfo::from_pb(&pb);
+
+ assert_eq!(info.get_partition_id(), restored.get_partition_id());
+ assert_eq!(info.get_partition_name(), restored.get_partition_name());
+ }
+
+ #[test]
+ fn test_contains() {
+ let full_spec = ResolvedPartitionSpec::new(
+ vec!["date".to_string(), "region".to_string()],
+ vec!["2024-01-15".to_string(), "US".to_string()],
+ )
+ .unwrap();
+
+ let partial_spec =
+ ResolvedPartitionSpec::new(vec!["date".to_string()],
vec!["2024-01-15".to_string()])
+ .unwrap();
+
+ assert!(full_spec.contains(&partial_spec).unwrap());
+ }
+}
diff --git a/crates/fluss/src/metadata/table.rs
b/crates/fluss/src/metadata/table.rs
index f4cf972..c4a9195 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -20,6 +20,7 @@ use crate::error::Error::{IllegalArgument, InvalidTableError};
use crate::error::{Error, Result};
use crate::metadata::DataLakeFormat;
use crate::metadata::datatype::{DataField, DataType, RowType};
+use crate::{BucketId, PartitionId, TableId};
use core::fmt;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
@@ -697,32 +698,71 @@ impl TablePath {
}
}
+/// A database name, table name and partition name combo. It's used to
represent the physical path of
+/// a bucket. If the bucket belongs to a partition (i.e., the table is a
partitioned table),
+/// `partition_name` will be `Some(...)`; otherwise, it will be `None`.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PhysicalTablePath {
table_path: TablePath,
- #[allow(dead_code)]
- partition: Option<String>,
+ partition_name: Option<String>,
}
impl PhysicalTablePath {
pub fn of(table_path: TablePath) -> Self {
Self {
table_path,
- partition: None,
+ partition_name: None,
}
}
- // TODO: support partition
+ pub fn of_partitioned(table_path: TablePath, partition_name:
Option<String>) -> Self {
+ Self {
+ table_path,
+ partition_name,
+ }
+ }
+
+ pub fn of_with_names(
+ database_name: String,
+ table_name: String,
+ partition_name: Option<String>,
+ ) -> Self {
+ Self {
+ table_path: TablePath::new(database_name, table_name),
+ partition_name,
+ }
+ }
pub fn get_table_path(&self) -> &TablePath {
&self.table_path
}
+
+ pub fn get_database_name(&self) -> &str {
+ self.table_path.database()
+ }
+
+ pub fn get_table_name(&self) -> &str {
+ self.table_path.table()
+ }
+
+ pub fn get_partition_name(&self) -> Option<&String> {
+ self.partition_name.as_ref()
+ }
+}
+
+impl Display for PhysicalTablePath {
+ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+ match &self.partition_name {
+ Some(partition) => write!(f, "{}(p={})", self.table_path,
partition),
+ None => write!(f, "{}", self.table_path),
+ }
+ }
}
#[derive(Debug, Clone)]
pub struct TableInfo {
pub table_path: TablePath,
- pub table_id: i64,
+ pub table_id: TableId,
pub schema_id: i32,
pub schema: Schema,
pub row_type: RowType,
@@ -819,7 +859,7 @@ impl TableInfo {
#[allow(clippy::too_many_arguments)]
pub fn new(
table_path: TablePath,
- table_id: i64,
+ table_id: TableId,
schema_id: i32,
schema: Schema,
bucket_keys: Vec<String>,
@@ -1000,13 +1040,13 @@ impl Display for TableInfo {
#[derive(Debug, Clone, Serialize, Deserialize, Hash, PartialEq, Eq)]
pub struct TableBucket {
- table_id: i64,
- partition_id: Option<i64>,
- bucket: i32,
+ table_id: TableId,
+ partition_id: Option<PartitionId>,
+ bucket: BucketId,
}
impl TableBucket {
- pub fn new(table_id: i64, bucket: i32) -> Self {
+ pub fn new(table_id: TableId, bucket: BucketId) -> Self {
TableBucket {
table_id,
partition_id: None,
@@ -1014,15 +1054,15 @@ impl TableBucket {
}
}
- pub fn table_id(&self) -> i64 {
+ pub fn table_id(&self) -> TableId {
self.table_id
}
- pub fn bucket_id(&self) -> i32 {
+ pub fn bucket_id(&self) -> BucketId {
self.bucket
}
- pub fn partition_id(&self) -> Option<i64> {
+ pub fn partition_id(&self) -> Option<PartitionId> {
self.partition_id
}
}
diff --git a/crates/fluss/src/proto/fluss_api.proto
b/crates/fluss/src/proto/fluss_api.proto
index 65eddce..eca4cf3 100644
--- a/crates/fluss/src/proto/fluss_api.proto
+++ b/crates/fluss/src/proto/fluss_api.proto
@@ -374,4 +374,38 @@ message PbLookupRespForBucket {
message PbValue {
optional bytes values = 1;
-}
\ No newline at end of file
+}
+
+message PbPartitionSpec {
+ repeated PbKeyValue partition_key_values = 1;
+}
+
+message PbPartitionInfo {
+ required int64 partition_id = 1;
+ required PbPartitionSpec partition_spec = 2;
+}
+
+message ListPartitionInfosRequest {
+ required PbTablePath table_path = 1;
+ optional PbPartitionSpec partial_partition_spec = 2;
+}
+
+message ListPartitionInfosResponse {
+ repeated PbPartitionInfo partitions_info = 1;
+}
+
+message CreatePartitionRequest {
+ required PbTablePath table_path = 1;
+ required PbPartitionSpec partition_spec = 2;
+ required bool ignore_if_exists = 3;
+}
+
+message CreatePartitionResponse {}
+
+message DropPartitionRequest {
+ required PbTablePath table_path = 1;
+ required PbPartitionSpec partition_spec = 2;
+ required bool ignore_if_not_exists = 3;
+}
+
+message DropPartitionResponse {}
\ No newline at end of file
diff --git a/crates/fluss/src/rpc/api_key.rs b/crates/fluss/src/rpc/api_key.rs
index 66e4beb..f6009c0 100644
--- a/crates/fluss/src/rpc/api_key.rs
+++ b/crates/fluss/src/rpc/api_key.rs
@@ -27,6 +27,7 @@ pub enum ApiKey {
DropTable,
GetTable,
ListTables,
+ ListPartitionInfos,
TableExists,
MetaData,
ProduceLog,
@@ -37,6 +38,8 @@ pub enum ApiKey {
GetFileSystemSecurityToken,
GetDatabaseInfo,
GetLatestLakeSnapshot,
+ CreatePartition,
+ DropPartition,
Unknown(i16),
}
@@ -51,6 +54,7 @@ impl From<i16> for ApiKey {
1006 => ApiKey::DropTable,
1007 => ApiKey::GetTable,
1008 => ApiKey::ListTables,
+ 1009 => ApiKey::ListPartitionInfos,
1010 => ApiKey::TableExists,
1012 => ApiKey::MetaData,
1014 => ApiKey::ProduceLog,
@@ -61,6 +65,8 @@ impl From<i16> for ApiKey {
1025 => ApiKey::GetFileSystemSecurityToken,
1032 => ApiKey::GetLatestLakeSnapshot,
1035 => ApiKey::GetDatabaseInfo,
+ 1036 => ApiKey::CreatePartition,
+ 1037 => ApiKey::DropPartition,
_ => Unknown(key),
}
}
@@ -77,6 +83,7 @@ impl From<ApiKey> for i16 {
ApiKey::DropTable => 1006,
ApiKey::GetTable => 1007,
ApiKey::ListTables => 1008,
+ ApiKey::ListPartitionInfos => 1009,
ApiKey::TableExists => 1010,
ApiKey::MetaData => 1012,
ApiKey::ProduceLog => 1014,
@@ -87,6 +94,8 @@ impl From<ApiKey> for i16 {
ApiKey::GetFileSystemSecurityToken => 1025,
ApiKey::GetLatestLakeSnapshot => 1032,
ApiKey::GetDatabaseInfo => 1035,
+ ApiKey::CreatePartition => 1036,
+ ApiKey::DropPartition => 1037,
Unknown(x) => x,
}
}
@@ -107,6 +116,7 @@ mod tests {
(1006, ApiKey::DropTable),
(1007, ApiKey::GetTable),
(1008, ApiKey::ListTables),
+ (1009, ApiKey::ListPartitionInfos),
(1010, ApiKey::TableExists),
(1012, ApiKey::MetaData),
(1014, ApiKey::ProduceLog),
@@ -117,6 +127,8 @@ mod tests {
(1025, ApiKey::GetFileSystemSecurityToken),
(1032, ApiKey::GetLatestLakeSnapshot),
(1035, ApiKey::GetDatabaseInfo),
+ (1036, ApiKey::CreatePartition),
+ (1037, ApiKey::DropPartition),
];
for (raw, key) in cases {
diff --git a/crates/fluss/src/rpc/message/create_partition.rs
b/crates/fluss/src/rpc/message/create_partition.rs
new file mode 100644
index 0000000..93dbf70
--- /dev/null
+++ b/crates/fluss/src/rpc/message/create_partition.rs
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::{PartitionSpec, TablePath};
+use crate::proto::CreatePartitionResponse;
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::convert::to_table_path;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[derive(Debug)]
+pub struct CreatePartitionRequest {
+ pub inner_request: proto::CreatePartitionRequest,
+}
+
+impl CreatePartitionRequest {
+ pub fn new(
+ table_path: &TablePath,
+ partition_spec: &PartitionSpec,
+ ignore_if_exists: bool,
+ ) -> Self {
+ CreatePartitionRequest {
+ inner_request: proto::CreatePartitionRequest {
+ table_path: to_table_path(table_path),
+ partition_spec: partition_spec.to_pb(),
+ ignore_if_exists,
+ },
+ }
+ }
+}
+
+impl RequestBody for CreatePartitionRequest {
+ type ResponseBody = CreatePartitionResponse;
+
+ const API_KEY: ApiKey = ApiKey::CreatePartition;
+
+ const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(CreatePartitionRequest);
+impl_read_version_type!(CreatePartitionResponse);
diff --git a/crates/fluss/src/rpc/message/drop_partition.rs
b/crates/fluss/src/rpc/message/drop_partition.rs
new file mode 100644
index 0000000..ddc97d8
--- /dev/null
+++ b/crates/fluss/src/rpc/message/drop_partition.rs
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::{PartitionSpec, TablePath};
+use crate::proto::DropPartitionResponse;
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::convert::to_table_path;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[derive(Debug)]
+pub struct DropPartitionRequest {
+ pub inner_request: proto::DropPartitionRequest,
+}
+
+impl DropPartitionRequest {
+ pub fn new(
+ table_path: &TablePath,
+ partition_spec: &PartitionSpec,
+ ignore_if_not_exists: bool,
+ ) -> Self {
+ DropPartitionRequest {
+ inner_request: proto::DropPartitionRequest {
+ table_path: to_table_path(table_path),
+ partition_spec: partition_spec.to_pb(),
+ ignore_if_not_exists,
+ },
+ }
+ }
+}
+
+impl RequestBody for DropPartitionRequest {
+ type ResponseBody = DropPartitionResponse;
+
+ const API_KEY: ApiKey = ApiKey::DropPartition;
+
+ const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(DropPartitionRequest);
+impl_read_version_type!(DropPartitionResponse);
diff --git a/crates/fluss/src/rpc/message/list_offsets.rs
b/crates/fluss/src/rpc/message/list_offsets.rs
index fcecb41..262645a 100644
--- a/crates/fluss/src/rpc/message/list_offsets.rs
+++ b/crates/fluss/src/rpc/message/list_offsets.rs
@@ -15,7 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-use crate::{impl_read_version_type, impl_write_version_type, proto};
+use crate::{
+ BucketId, PartitionId, TableId, impl_read_version_type,
impl_write_version_type, proto,
+};
use crate::error::Result as FlussResult;
use crate::error::{Error, FlussError};
@@ -74,9 +76,9 @@ pub struct ListOffsetsRequest {
impl ListOffsetsRequest {
pub fn new(
- table_id: i64,
- partition_id: Option<i64>,
- bucket_ids: Vec<i32>,
+ table_id: TableId,
+ partition_id: Option<PartitionId>,
+ bucket_ids: Vec<BucketId>,
offset_spec: OffsetSpec,
) -> Self {
ListOffsetsRequest {
diff --git a/crates/fluss/src/rpc/message/list_partition_infos.rs
b/crates/fluss/src/rpc/message/list_partition_infos.rs
new file mode 100644
index 0000000..ab69367
--- /dev/null
+++ b/crates/fluss/src/rpc/message/list_partition_infos.rs
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::{PartitionInfo, PartitionSpec, TablePath};
+use crate::proto::ListPartitionInfosResponse;
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::convert::to_table_path;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+use crate::{impl_read_version_type, impl_write_version_type, proto};
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[derive(Debug)]
+pub struct ListPartitionInfosRequest {
+ pub inner_request: proto::ListPartitionInfosRequest,
+}
+
+impl ListPartitionInfosRequest {
+ pub fn new(table_path: &TablePath, partial_partition_spec:
Option<&PartitionSpec>) -> Self {
+ ListPartitionInfosRequest {
+ inner_request: proto::ListPartitionInfosRequest {
+ table_path: to_table_path(table_path),
+ partial_partition_spec: partial_partition_spec.map(|s|
s.to_pb()),
+ },
+ }
+ }
+}
+
+impl RequestBody for ListPartitionInfosRequest {
+ type ResponseBody = ListPartitionInfosResponse;
+
+ const API_KEY: ApiKey = ApiKey::ListPartitionInfos;
+
+ const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(ListPartitionInfosRequest);
+impl_read_version_type!(ListPartitionInfosResponse);
+
+impl ListPartitionInfosResponse {
+ pub fn get_partitions_info(&self) -> Vec<PartitionInfo> {
+ self.partitions_info
+ .iter()
+ .map(PartitionInfo::from_pb)
+ .collect()
+ }
+}
diff --git a/crates/fluss/src/rpc/message/mod.rs
b/crates/fluss/src/rpc/message/mod.rs
index 881a64f..addb97a 100644
--- a/crates/fluss/src/rpc/message/mod.rs
+++ b/crates/fluss/src/rpc/message/mod.rs
@@ -21,9 +21,11 @@ use crate::rpc::frame::{ReadError, WriteError};
use bytes::{Buf, BufMut};
mod create_database;
+mod create_partition;
mod create_table;
mod database_exists;
mod drop_database;
+mod drop_partition;
mod drop_table;
mod fetch;
mod get_database_info;
@@ -33,6 +35,7 @@ mod get_table;
mod header;
mod list_databases;
mod list_offsets;
+mod list_partition_infos;
mod list_tables;
mod lookup;
mod produce_log;
@@ -42,9 +45,11 @@ mod update_metadata;
pub use crate::rpc::RpcError;
pub use create_database::*;
+pub use create_partition::*;
pub use create_table::*;
pub use database_exists::*;
pub use drop_database::*;
+pub use drop_partition::*;
pub use drop_table::*;
pub use fetch::*;
pub use get_database_info::*;
@@ -54,6 +59,7 @@ pub use get_table::*;
pub use header::*;
pub use list_databases::*;
pub use list_offsets::*;
+pub use list_partition_infos::*;
pub use list_tables::*;
pub use lookup::*;
pub use produce_log::*;
diff --git a/crates/fluss/src/util/mod.rs b/crates/fluss/src/util/mod.rs
index 156ef04..3760487 100644
--- a/crates/fluss/src/util/mod.rs
+++ b/crates/fluss/src/util/mod.rs
@@ -18,6 +18,7 @@
pub mod murmur_hash;
pub mod varint;
+use crate::TableId;
use crate::metadata::TableBucket;
use linked_hash_map::LinkedHashMap;
use std::collections::{HashMap, HashSet};
@@ -151,7 +152,7 @@ impl<S> FairBucketStatusMap<S> {
self.map.clear();
// Group buckets by table ID
- let mut table_to_buckets: LinkedHashMap<i64, Vec<TableBucket>> =
LinkedHashMap::new();
+ let mut table_to_buckets: LinkedHashMap<TableId, Vec<TableBucket>> =
LinkedHashMap::new();
for bucket in bucket_to_status.keys() {
table_to_buckets
.entry(bucket.table_id())
diff --git a/crates/fluss/tests/integration/admin.rs
b/crates/fluss/tests/integration/admin.rs
index fbdb295..9842a5a 100644
--- a/crates/fluss/tests/integration/admin.rs
+++ b/crates/fluss/tests/integration/admin.rs
@@ -37,9 +37,10 @@ mod admin_test {
use crate::integration::utils::{get_cluster, start_cluster, stop_cluster};
use fluss::error::FlussError;
use fluss::metadata::{
- DataTypes, DatabaseDescriptorBuilder, KvFormat, LogFormat, Schema,
TableDescriptor,
- TablePath,
+ DataTypes, DatabaseDescriptorBuilder, KvFormat, LogFormat,
PartitionSpec, Schema,
+ TableDescriptor, TablePath,
};
+ use std::collections::HashMap;
use std::sync::Arc;
fn before_all() {
@@ -223,6 +224,146 @@ mod admin_test {
assert_eq!(admin.database_exists(test_db_name).await.unwrap(), false);
}
+ #[tokio::test]
+ async fn test_partition_apis() {
+ let cluster = get_fluss_cluster();
+ let connection = cluster.get_fluss_connection().await;
+ let admin = connection
+ .get_admin()
+ .await
+ .expect("Failed to get admin client");
+
+ let test_db_name = "test_partition_apis_db";
+ let db_descriptor = DatabaseDescriptorBuilder::default()
+ .comment("Database for test_partition_apis")
+ .build();
+
+ admin
+ .create_database(test_db_name, true, Some(&db_descriptor))
+ .await
+ .expect("Failed to create test database");
+
+ let test_table_name = "partitioned_table";
+ let table_path = TablePath::new(test_db_name.to_string(),
test_table_name.to_string());
+
+ let table_schema = Schema::builder()
+ .column("id", DataTypes::int())
+ .column("name", DataTypes::string())
+ .column("dt", DataTypes::string())
+ .column("region", DataTypes::string())
+ .primary_key(vec![
+ "id".to_string(),
+ "dt".to_string(),
+ "region".to_string(),
+ ])
+ .build()
+ .expect("Failed to build table schema");
+
+ let table_descriptor = TableDescriptor::builder()
+ .schema(table_schema)
+ .distributed_by(Some(3), vec!["id".to_string()])
+ .partitioned_by(vec!["dt".to_string(), "region".to_string()])
+ .property("table.replication.factor", "1")
+ .log_format(LogFormat::ARROW)
+ .kv_format(KvFormat::COMPACTED)
+ .build()
+ .expect("Failed to build table descriptor");
+
+ admin
+ .create_table(&table_path, &table_descriptor, true)
+ .await
+ .expect("Failed to create partitioned table");
+
+ let partitions = admin
+ .list_partition_infos(&table_path)
+ .await
+ .expect("Failed to list partitions");
+ assert!(
+ partitions.is_empty(),
+ "Expected no partitions initially, found {}",
+ partitions.len()
+ );
+
+ let mut partition_values = HashMap::new();
+ partition_values.insert("dt".to_string(), "2024-01-15".to_string());
+ partition_values.insert("region".to_string(), "EMEA".to_string());
+ let partition_spec = PartitionSpec::new(partition_values);
+
+ admin
+ .create_partition(&table_path, &partition_spec, false)
+ .await
+ .expect("Failed to create partition");
+
+ let partitions = admin
+ .list_partition_infos(&table_path)
+ .await
+ .expect("Failed to list partitions");
+ assert_eq!(
+ partitions.len(),
+ 1,
+ "Expected exactly one partition after creation"
+ );
+ assert_eq!(
+ partitions[0].get_partition_name(),
+ "2024-01-15$EMEA",
+ "Partition name mismatch"
+ );
+
+ // list with partial spec filter - should find the partition
+ let mut partition_values = HashMap::new();
+ partition_values.insert("dt".to_string(), "2024-01-15".to_string());
+ let partial_partition_spec = PartitionSpec::new(partition_values);
+
+ let partitions_with_spec = admin
+ .list_partition_infos_with_spec(&table_path,
Some(&partial_partition_spec))
+ .await
+ .expect("Failed to list partitions with spec");
+ assert_eq!(
+ partitions_with_spec.len(),
+ 1,
+ "Expected one partition matching the spec"
+ );
+ assert_eq!(
+ partitions_with_spec[0].get_partition_name(),
+ "2024-01-15$EMEA",
+ "Partition name mismatch with spec filter"
+ );
+
+ // list with non-matching spec - should find no partitions
+ let mut non_matching_values = HashMap::new();
+ non_matching_values.insert("dt".to_string(), "2024-01-16".to_string());
+ let non_matching_spec = PartitionSpec::new(non_matching_values);
+ let partitions_non_matching = admin
+ .list_partition_infos_with_spec(&table_path,
Some(&non_matching_spec))
+ .await
+ .expect("Failed to list partitions with non-matching spec");
+ assert!(
+ partitions_non_matching.is_empty(),
+ "Expected no partitions for non-matching spec"
+ );
+
+ admin
+ .drop_partition(&table_path, &partition_spec, false)
+ .await
+ .expect("Failed to drop partition");
+
+ let partitions = admin
+ .list_partition_infos(&table_path)
+ .await
+ .expect("Failed to list partitions");
+ assert!(
+ partitions.is_empty(),
+ "Expected no partitions after drop, found {}",
+ partitions.len()
+ );
+
+ admin
+ .drop_table(&table_path, true)
+ .await
+ .expect("Failed to drop table");
+ admin.drop_database(test_db_name, true, true).await;
+ }
+
#[tokio::test]
async fn test_fluss_error_response() {
let cluster = get_fluss_cluster();