This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new ece3834 chore: accept Into<String> to improve api usage (#234)
ece3834 is described below
commit ece3834593fa43021bc93079997098af6ca08596
Author: yuxia Luo <[email protected]>
AuthorDate: Wed Feb 4 07:43:24 2026 +0800
chore: accept Into<String> to improve api usage (#234)
---
bindings/python/src/metadata.rs | 6 +-
crates/examples/src/example_kv_table.rs | 4 +-
.../examples/src/example_partitioned_kv_table.rs | 10 +-
crates/examples/src/example_table.rs | 2 +-
crates/fluss/src/client/table/log_fetch_buffer.rs | 10 +-
crates/fluss/src/client/table/partition_getter.rs | 46 ++------
crates/fluss/src/client/table/upsert.rs | 30 +++---
crates/fluss/src/metadata/database.rs | 18 ++--
crates/fluss/src/metadata/datatype.rs | 36 ++++---
crates/fluss/src/metadata/json_serde.rs | 16 +--
crates/fluss/src/metadata/partition.rs | 10 +-
crates/fluss/src/metadata/table.rs | 117 +++++++++++++--------
crates/fluss/src/record/arrow.rs | 12 +--
.../fluss/src/record/kv/kv_record_read_context.rs | 2 +-
crates/fluss/src/test_utils.rs | 6 +-
crates/fluss/tests/integration/admin.rs | 30 ++----
crates/fluss/tests/integration/kv_table.rs | 23 ++--
crates/fluss/tests/integration/log_table.rs | 20 ++--
.../fluss/tests/integration/table_remote_scan.rs | 5 +-
crates/fluss/tests/integration/utils.rs | 2 +-
20 files changed, 194 insertions(+), 211 deletions(-)
diff --git a/bindings/python/src/metadata.rs b/bindings/python/src/metadata.rs
index bc5f288..235df56 100644
--- a/bindings/python/src/metadata.rs
+++ b/bindings/python/src/metadata.rs
@@ -218,11 +218,11 @@ impl TableDescriptor {
schema: &Schema, // fluss schema
kwargs: Option<&Bound<'_, PyDict>>,
) -> PyResult<Self> {
- let mut partition_keys = Vec::new();
+ let mut partition_keys: Vec<String> = Vec::new();
let mut bucket_count = None;
let mut bucket_keys = Vec::new();
- let mut properties = std::collections::HashMap::new();
- let mut custom_properties = std::collections::HashMap::new();
+ let mut properties: HashMap<String, String> = HashMap::new();
+ let mut custom_properties: HashMap<String, String> = HashMap::new();
let mut comment: Option<String> = None;
let mut log_format = None;
let mut kv_format = None;
diff --git a/crates/examples/src/example_kv_table.rs
b/crates/examples/src/example_kv_table.rs
index 325d842..042e384 100644
--- a/crates/examples/src/example_kv_table.rs
+++ b/crates/examples/src/example_kv_table.rs
@@ -36,12 +36,12 @@ pub async fn main() -> Result<()> {
.column("id", DataTypes::int())
.column("name", DataTypes::string())
.column("age", DataTypes::bigint())
- .primary_key(vec!["id".to_string()])
+ .primary_key(vec!["id"])
.build()?,
)
.build()?;
- let table_path = TablePath::new("fluss".to_owned(),
"rust_upsert_lookup_example".to_owned());
+ let table_path = TablePath::new("fluss", "rust_upsert_lookup_example");
let admin = conn.get_admin().await?;
admin
diff --git a/crates/examples/src/example_partitioned_kv_table.rs
b/crates/examples/src/example_partitioned_kv_table.rs
index ab28758..d1b6814 100644
--- a/crates/examples/src/example_partitioned_kv_table.rs
+++ b/crates/examples/src/example_partitioned_kv_table.rs
@@ -38,17 +38,13 @@ pub async fn main() -> Result<()> {
.column("region", DataTypes::string())
.column("zone", DataTypes::bigint())
.column("score", DataTypes::bigint())
- .primary_key(vec![
- "id".to_string(),
- "region".to_string(),
- "zone".to_string(),
- ])
+ .primary_key(vec!["id", "region", "zone"])
.build()?,
)
- .partitioned_by(vec!["region".to_string(), "zone".to_string()])
+ .partitioned_by(vec!["region", "zone"])
.build()?;
- let table_path = TablePath::new("fluss".to_owned(),
"partitioned_kv_example".to_owned());
+ let table_path = TablePath::new("fluss", "partitioned_kv_example");
let mut admin = conn.get_admin().await?;
admin
diff --git a/crates/examples/src/example_table.rs
b/crates/examples/src/example_table.rs
index ae21b1b..733b13e 100644
--- a/crates/examples/src/example_table.rs
+++ b/crates/examples/src/example_table.rs
@@ -44,7 +44,7 @@ pub async fn main() -> Result<()> {
)
.build()?;
- let table_path = TablePath::new("fluss".to_owned(),
"rust_test_long".to_owned());
+ let table_path = TablePath::new("fluss", "rust_test_long");
let admin = conn.get_admin().await?;
diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs
b/crates/fluss/src/client/table/log_fetch_buffer.rs
index b622f19..edab91d 100644
--- a/crates/fluss/src/client/table/log_fetch_buffer.rs
+++ b/crates/fluss/src/client/table/log_fetch_buffer.rs
@@ -840,11 +840,7 @@ mod tests {
use std::sync::Arc;
fn test_read_context() -> Result<ReadContext> {
- let row_type = RowType::new(vec![DataField::new(
- "id".to_string(),
- DataTypes::int(),
- None,
- )]);
+ let row_type = RowType::new(vec![DataField::new("id",
DataTypes::int(), None)]);
Ok(ReadContext::new(to_arrow_schema(&row_type)?, false))
}
@@ -897,8 +893,8 @@ mod tests {
#[test]
fn default_completed_fetch_reads_records() -> Result<()> {
let row_type = RowType::new(vec![
- DataField::new("id".to_string(), DataTypes::int(), None),
- DataField::new("name".to_string(), DataTypes::string(), None),
+ DataField::new("id", DataTypes::int(), None),
+ DataField::new("name", DataTypes::string(), None),
]);
let table_path = TablePath::new("db".to_string(), "tbl".to_string());
let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1));
diff --git a/crates/fluss/src/client/table/partition_getter.rs
b/crates/fluss/src/client/table/partition_getter.rs
index 9136801..a1aad2d 100644
--- a/crates/fluss/src/client/table/partition_getter.rs
+++ b/crates/fluss/src/client/table/partition_getter.rs
@@ -109,12 +109,8 @@ mod tests {
#[test]
fn test_partition_getter_single_key() {
let row_type = RowType::new(vec![
- DataField::new("id".to_string(), DataType::Int(IntType::new()),
None),
- DataField::new(
- "region".to_string(),
- DataType::String(StringType::new()),
- None,
- ),
+ DataField::new("id", DataType::Int(IntType::new()), None),
+ DataField::new("region", DataType::String(StringType::new()),
None),
]);
let getter = PartitionGetter::new(&row_type,
Arc::from(["region".to_string()]))
@@ -128,17 +124,9 @@ mod tests {
#[test]
fn test_partition_getter_multiple_keys() {
let row_type = RowType::new(vec![
- DataField::new("id".to_string(), DataType::Int(IntType::new()),
None),
- DataField::new(
- "date".to_string(),
- DataType::String(StringType::new()),
- None,
- ),
- DataField::new(
- "region".to_string(),
- DataType::String(StringType::new()),
- None,
- ),
+ DataField::new("id", DataType::Int(IntType::new()), None),
+ DataField::new("date", DataType::String(StringType::new()), None),
+ DataField::new("region", DataType::String(StringType::new()),
None),
]);
let getter = PartitionGetter::new(
@@ -159,7 +147,7 @@ mod tests {
#[test]
fn test_partition_getter_invalid_column() {
let row_type = RowType::new(vec![DataField::new(
- "id".to_string(),
+ "id",
DataType::Int(IntType::new()),
None,
)]);
@@ -171,12 +159,8 @@ mod tests {
#[test]
fn test_partition_getter_null_value() {
let row_type = RowType::new(vec![
- DataField::new("id".to_string(), DataType::Int(IntType::new()),
None),
- DataField::new(
- "region".to_string(),
- DataType::String(StringType::new()),
- None,
- ),
+ DataField::new("id", DataType::Int(IntType::new()), None),
+ DataField::new("region", DataType::String(StringType::new()),
None),
]);
let getter = PartitionGetter::new(&row_type,
Arc::from(["region".to_string()]))
@@ -190,17 +174,9 @@ mod tests {
#[test]
fn test_get_partition_spec() {
let row_type = RowType::new(vec![
- DataField::new("id".to_string(), DataType::Int(IntType::new()),
None),
- DataField::new(
- "date".to_string(),
- DataType::String(StringType::new()),
- None,
- ),
- DataField::new(
- "region".to_string(),
- DataType::String(StringType::new()),
- None,
- ),
+ DataField::new("id", DataType::Int(IntType::new()), None),
+ DataField::new("date", DataType::String(StringType::new()), None),
+ DataField::new("region", DataType::String(StringType::new()),
None),
]);
let getter = PartitionGetter::new(
diff --git a/crates/fluss/src/client/table/upsert.rs
b/crates/fluss/src/client/table/upsert.rs
index 0595397..bb6c651 100644
--- a/crates/fluss/src/client/table/upsert.rs
+++ b/crates/fluss/src/client/table/upsert.rs
@@ -403,8 +403,8 @@ mod tests {
fn sanity_check() {
// No target columns specified but table has auto-increment column
let fields = vec![
- DataField::new("id".to_string(),
DataTypes::int().as_non_nullable(), None),
- DataField::new("name".to_string(), DataTypes::string(), None),
+ DataField::new("id", DataTypes::int().as_non_nullable(), None),
+ DataField::new("name", DataTypes::string(), None),
];
let row_type = RowType::new(fields);
let primary_keys = vec!["id".to_string()];
@@ -424,9 +424,9 @@ mod tests {
// Target columns do not contain primary key
let fields = vec![
- DataField::new("id".to_string(),
DataTypes::int().as_non_nullable(), None),
- DataField::new("name".to_string(), DataTypes::string(), None),
- DataField::new("value".to_string(), DataTypes::int(), None),
+ DataField::new("id", DataTypes::int().as_non_nullable(), None),
+ DataField::new("name", DataTypes::string(), None),
+ DataField::new("value", DataTypes::int(), None),
];
let row_type = RowType::new(fields);
let primary_keys = vec!["id".to_string()];
@@ -449,8 +449,8 @@ mod tests {
// Primary key column not found in row type
let fields = vec![
- DataField::new("id".to_string(),
DataTypes::int().as_non_nullable(), None),
- DataField::new("name".to_string(), DataTypes::string(), None),
+ DataField::new("id", DataTypes::int().as_non_nullable(), None),
+ DataField::new("name", DataTypes::string(), None),
];
let row_type = RowType::new(fields);
let primary_keys = vec!["nonexistent_pk".to_string()];
@@ -473,13 +473,9 @@ mod tests {
// Target columns include auto-increment column
let fields = vec![
- DataField::new("id".to_string(),
DataTypes::int().as_non_nullable(), None),
- DataField::new(
- "seq".to_string(),
- DataTypes::bigint().as_non_nullable(),
- None,
- ),
- DataField::new("name".to_string(), DataTypes::string(), None),
+ DataField::new("id", DataTypes::int().as_non_nullable(), None),
+ DataField::new("seq", DataTypes::bigint().as_non_nullable(), None),
+ DataField::new("name", DataTypes::string(), None),
];
let row_type = RowType::new(fields);
let primary_keys = vec!["id".to_string()];
@@ -499,13 +495,13 @@ mod tests {
// Non-nullable column not in target columns (partial update requires
nullable)
let fields = vec![
- DataField::new("id".to_string(),
DataTypes::int().as_non_nullable(), None),
+ DataField::new("id", DataTypes::int().as_non_nullable(), None),
DataField::new(
- "required_field".to_string(),
+ "required_field",
DataTypes::string().as_non_nullable(),
None,
),
- DataField::new("optional_field".to_string(), DataTypes::int(),
None),
+ DataField::new("optional_field", DataTypes::int(), None),
];
let row_type = RowType::new(fields);
let primary_keys = vec!["id".to_string()];
diff --git a/crates/fluss/src/metadata/database.rs
b/crates/fluss/src/metadata/database.rs
index fad1498..15fefb5 100644
--- a/crates/fluss/src/metadata/database.rs
+++ b/crates/fluss/src/metadata/database.rs
@@ -89,19 +89,23 @@ impl DatabaseDescriptor {
}
impl DatabaseDescriptorBuilder {
- pub fn comment(mut self, comment: &str) -> Self {
- self.comment = Some(comment.to_string());
+ pub fn comment<C: Into<String>>(mut self, comment: C) -> Self {
+ self.comment = Some(comment.into());
self
}
- pub fn custom_properties(mut self, properties: HashMap<String, String>) ->
Self {
- self.custom_properties = properties;
+ pub fn custom_properties<K: Into<String>, V: Into<String>>(
+ mut self,
+ properties: HashMap<K, V>,
+ ) -> Self {
+ for (k, v) in properties {
+ self.custom_properties.insert(k.into(), v.into());
+ }
self
}
- pub fn custom_property(mut self, key: &str, value: &str) -> Self {
- self.custom_properties
- .insert(key.to_string(), value.to_string());
+ pub fn custom_property<K: Into<String>, V: Into<String>>(mut self, key: K,
value: V) -> Self {
+ self.custom_properties.insert(key.into(), value.into());
self
}
diff --git a/crates/fluss/src/metadata/datatype.rs
b/crates/fluss/src/metadata/datatype.rs
index 6431d3a..3da270b 100644
--- a/crates/fluss/src/metadata/datatype.rs
+++ b/crates/fluss/src/metadata/datatype.rs
@@ -1114,13 +1114,13 @@ impl DataTypes {
}
/// Field definition with field name and data type.
- pub fn field(name: String, data_type: DataType) -> DataField {
+ pub fn field<N: Into<String>>(name: N, data_type: DataType) -> DataField {
DataField::new(name, data_type, None)
}
/// Field definition with field name, data type, and a description.
- pub fn field_with_description(
- name: String,
+ pub fn field_with_description<N: Into<String>>(
+ name: N,
data_type: DataType,
description: String,
) -> DataField {
@@ -1151,9 +1151,13 @@ pub struct DataField {
}
impl DataField {
- pub fn new(name: String, data_type: DataType, description: Option<String>)
-> DataField {
+ pub fn new<N: Into<String>>(
+ name: N,
+ data_type: DataType,
+ description: Option<String>,
+ ) -> DataField {
DataField {
- name,
+ name: name.into(),
data_type,
description,
}
@@ -1318,13 +1322,13 @@ fn test_map_display() {
#[test]
fn test_row_display() {
let fields = vec![
- DataTypes::field("id".to_string(), DataTypes::int()),
- DataTypes::field("name".to_string(), DataTypes::string()),
+ DataTypes::field("id", DataTypes::int()),
+ DataTypes::field("name", DataTypes::string()),
];
let row_type = RowType::new(fields);
assert_eq!(row_type.to_string(), "ROW<id INT, name STRING>");
- let fields_non_null = vec![DataTypes::field("age".to_string(),
DataTypes::bigint())];
+ let fields_non_null = vec![DataTypes::field("age", DataTypes::bigint())];
let row_type_non_null = RowType::with_nullable(false, fields_non_null);
assert_eq!(row_type_non_null.to_string(), "ROW<age BIGINT> NOT NULL");
}
@@ -1354,23 +1358,23 @@ fn test_datatype_display() {
#[test]
fn test_datafield_display() {
- let field = DataTypes::field("user_id".to_string(), DataTypes::bigint());
+ let field = DataTypes::field("user_id", DataTypes::bigint());
assert_eq!(field.to_string(), "user_id BIGINT");
- let field2 = DataTypes::field("email".to_string(), DataTypes::string());
+ let field2 = DataTypes::field("email", DataTypes::string());
assert_eq!(field2.to_string(), "email STRING");
- let field3 = DataTypes::field("score".to_string(), DataTypes::decimal(10,
2));
+ let field3 = DataTypes::field("score", DataTypes::decimal(10, 2));
assert_eq!(field3.to_string(), "score DECIMAL(10, 2)");
}
#[test]
fn test_complex_nested_display() {
let row_type = DataTypes::row(vec![
- DataTypes::field("id".to_string(), DataTypes::int()),
- DataTypes::field("tags".to_string(),
DataTypes::array(DataTypes::string())),
+ DataTypes::field("id", DataTypes::int()),
+ DataTypes::field("tags", DataTypes::array(DataTypes::string())),
DataTypes::field(
- "metadata".to_string(),
+ "metadata",
DataTypes::map(DataTypes::string(), DataTypes::string()),
),
]);
@@ -1394,8 +1398,8 @@ fn test_deeply_nested_types() {
let nested = DataTypes::array(DataTypes::map(
DataTypes::string(),
DataTypes::row(vec![
- DataTypes::field("x".to_string(), DataTypes::int()),
- DataTypes::field("y".to_string(), DataTypes::int()),
+ DataTypes::field("x", DataTypes::int()),
+ DataTypes::field("y", DataTypes::int()),
]),
));
assert_eq!(nested.to_string(), "ARRAY<MAP<STRING, ROW<x INT, y INT>>>");
diff --git a/crates/fluss/src/metadata/json_serde.rs
b/crates/fluss/src/metadata/json_serde.rs
index d0d56ef..d58fb7e 100644
--- a/crates/fluss/src/metadata/json_serde.rs
+++ b/crates/fluss/src/metadata/json_serde.rs
@@ -403,8 +403,7 @@ impl JsonSerde for Column {
.and_then(|v| v.as_str())
.ok_or_else(|| Error::JsonSerdeError {
message: format!("Missing required field: {}", Self::NAME),
- })?
- .to_string();
+ })?;
let data_type_node = node
.get(Self::DATA_TYPE)
@@ -414,7 +413,7 @@ impl JsonSerde for Column {
let data_type = DataType::deserialize_json(data_type_node)?;
- let mut column = Column::new(&name, data_type);
+ let mut column = Column::new(name, data_type);
if let Some(comment) = node.get(Self::COMMENT).and_then(|v|
v.as_str()) {
column = column.with_comment(comment);
@@ -483,14 +482,9 @@ impl JsonSerde for Schema {
let mut primary_keys = Vec::with_capacity(pk_array.len());
for name_node in pk_array {
- primary_keys.push(
- name_node
- .as_str()
- .ok_or_else(|| Error::InvalidTableError {
- message: "Primary key element must be a
string".to_string(),
- })?
- .to_string(),
- );
+ primary_keys.push(name_node.as_str().ok_or_else(||
Error::InvalidTableError {
+ message: "Primary key element must be a
string".to_string(),
+ })?);
}
schema_builder = schema_builder.primary_key(primary_keys);
diff --git a/crates/fluss/src/metadata/partition.rs
b/crates/fluss/src/metadata/partition.rs
index bc1935c..1840235 100644
--- a/crates/fluss/src/metadata/partition.rs
+++ b/crates/fluss/src/metadata/partition.rs
@@ -31,8 +31,14 @@ pub struct PartitionSpec {
}
impl PartitionSpec {
- pub fn new(partition_spec: HashMap<String, String>) -> Self {
- Self { partition_spec }
+ pub fn new<K: Into<String>, V: Into<String>>(partition_spec: HashMap<K,
V>) -> Self {
+ let mut new_map = HashMap::new();
+ for (k, v) in partition_spec {
+ new_map.insert(k.into(), v.into());
+ }
+ Self {
+ partition_spec: new_map,
+ }
}
pub fn get_spec_map(&self) -> &HashMap<String, String> {
diff --git a/crates/fluss/src/metadata/table.rs
b/crates/fluss/src/metadata/table.rs
index ce362c4..908f446 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -36,16 +36,16 @@ pub struct Column {
}
impl Column {
- pub fn new(name: &str, data_type: DataType) -> Self {
+ pub fn new<N: Into<String>>(name: N, data_type: DataType) -> Self {
Self {
- name: name.to_string(),
+ name: name.into(),
data_type,
comment: None,
}
}
- pub fn with_comment(mut self, comment: &str) -> Self {
- self.comment = Some(comment.to_string());
+ pub fn with_comment<C: Into<String>>(mut self, comment: C) -> Self {
+ self.comment = Some(comment.into());
self
}
@@ -78,9 +78,9 @@ pub struct PrimaryKey {
}
impl PrimaryKey {
- pub fn new(constraint_name: &str, column_names: Vec<String>) -> Self {
+ pub fn new<N: Into<String>>(constraint_name: N, column_names: Vec<String>)
-> Self {
Self {
- constraint_name: constraint_name.to_string(),
+ constraint_name: constraint_name.into(),
column_names,
}
}
@@ -178,8 +178,8 @@ impl SchemaBuilder {
}
}
- pub fn column(mut self, name: &str, data_type: DataType) -> Self {
- self.columns.push(Column::new(name, data_type));
+ pub fn column<N: Into<String>>(mut self, name: N, data_type: DataType) ->
Self {
+ self.columns.push(Column::new(name.into(), data_type));
self
}
@@ -188,20 +188,34 @@ impl SchemaBuilder {
self
}
- pub fn with_comment(mut self, comment: &str) -> Self {
+ pub fn with_comment<C: Into<String>>(mut self, comment: C) -> Self {
if let Some(last) = self.columns.last_mut() {
- *last = last.clone().with_comment(comment);
+ *last = last.clone().with_comment(comment.into());
}
self
}
- pub fn primary_key(self, column_names: Vec<String>) -> Self {
- let constraint_name = format!("PK_{}", column_names.join("_"));
- self.primary_key_named(&constraint_name, column_names)
+ pub fn primary_key<I, S>(self, column_names: I) -> Self
+ where
+ I: IntoIterator<Item = S>,
+ S: Into<String>,
+ {
+ let names: Vec<String> = column_names.into_iter().map(|s|
s.into()).collect();
+
+ let constraint_name = format!("PK_{}", names.join("_"));
+
+ self.primary_key_named(&constraint_name, names)
}
- pub fn primary_key_named(mut self, constraint_name: &str, column_names:
Vec<String>) -> Self {
- self.primary_key = Some(PrimaryKey::new(constraint_name,
column_names));
+ pub fn primary_key_named<N: Into<String>, P: Into<String>>(
+ mut self,
+ constraint_name: N,
+ column_names: Vec<P>,
+ ) -> Self {
+ self.primary_key = Some(PrimaryKey::new(
+ constraint_name.into(),
+ column_names.into_iter().map(|s| s.into()).collect(),
+ ));
self
}
@@ -209,14 +223,14 @@ impl SchemaBuilder {
/// whenever a new row is inserted into the table, the new row will be
assigned with the next
/// available value from the auto-increment sequence. A table can have at
most one auto
/// increment column.
- pub fn enable_auto_increment(mut self, column_name: &str) -> Result<Self> {
+ pub fn enable_auto_increment<N: Into<String>>(mut self, column_name: N) ->
Result<Self> {
if !self.auto_increment_col_names.is_empty() {
return Err(IllegalArgument {
message: "Multiple auto increment columns are not supported
yet.".to_string(),
});
}
- self.auto_increment_col_names.push(column_name.to_string());
+ self.auto_increment_col_names.push(column_name.into());
Ok(self)
}
@@ -353,29 +367,43 @@ impl TableDescriptorBuilder {
self
}
- pub fn property<T: ToString>(mut self, key: &str, value: T) -> Self {
- self.properties.insert(key.to_string(), value.to_string());
+ pub fn property<K: Into<String>, V: Into<String>>(mut self, key: K, value:
V) -> Self {
+ self.properties.insert(key.into(), value.into());
self
}
- pub fn properties(mut self, properties: HashMap<String, String>) -> Self {
- self.properties.extend(properties);
+ pub fn properties<K: Into<String>, V: Into<String>>(
+ mut self,
+ properties: HashMap<K, V>,
+ ) -> Self {
+ for (k, v) in properties {
+ self.properties.insert(k.into(), v.into());
+ }
self
}
- pub fn custom_property(mut self, key: &str, value: &str) -> Self {
- self.custom_properties
- .insert(key.to_string(), value.to_string());
+ pub fn custom_property<K: Into<String>, V: Into<String>>(mut self, key: K,
value: V) -> Self {
+ self.custom_properties.insert(key.into(), value.into());
self
}
- pub fn custom_properties(mut self, custom_properties: HashMap<String,
String>) -> Self {
- self.custom_properties.extend(custom_properties);
+ pub fn custom_properties<K: Into<String>, V: Into<String>>(
+ mut self,
+ custom_properties: HashMap<K, V>,
+ ) -> Self {
+ for (k, v) in custom_properties {
+ self.custom_properties.insert(k.into(), v.into());
+ }
self
}
- pub fn partitioned_by(mut self, partition_keys: Vec<String>) -> Self {
- self.partition_keys = Arc::from(partition_keys);
+ pub fn partitioned_by<P: Into<String>>(mut self, partition_keys: Vec<P>)
-> Self {
+ self.partition_keys = Arc::from(
+ partition_keys
+ .into_iter()
+ .map(|s| s.into())
+ .collect::<Vec<String>>(),
+ );
self
}
@@ -387,8 +415,8 @@ impl TableDescriptorBuilder {
self
}
- pub fn comment(mut self, comment: &str) -> Self {
- self.comment = Some(comment.to_string());
+ pub fn comment<S: Into<String>>(mut self, comment: S) -> Self {
+ self.comment = Some(comment.into());
self
}
@@ -487,9 +515,16 @@ impl TableDescriptor {
})
}
- pub fn with_properties(&self, new_properties: HashMap<String, String>) ->
Self {
+ pub fn with_properties<K: Into<String>, V: Into<String>>(
+ &self,
+ new_properties: HashMap<K, V>,
+ ) -> Self {
+ let mut properties = HashMap::new();
+ for (k, v) in new_properties {
+ properties.insert(k.into(), v.into());
+ }
Self {
- properties: new_properties,
+ properties,
..self.clone()
}
}
@@ -684,10 +719,10 @@ const MAX_NAME_LENGTH: usize = 200;
const INTERNAL_NAME_PREFIX: &str = "__";
impl TablePath {
- pub fn new(db: String, tbl: String) -> Self {
+ pub fn new<D: Into<String>, T: Into<String>>(db: D, tbl: T) -> Self {
TablePath {
- database: db,
- table: tbl,
+ database: db.into(),
+ table: tbl.into(),
}
}
@@ -769,14 +804,14 @@ impl PhysicalTablePath {
}
}
- pub fn of_with_names(
- database_name: String,
- table_name: String,
- partition_name: Option<String>,
+ pub fn of_with_names<D: Into<String>, T: Into<String>, P: Into<String>>(
+ database_name: D,
+ table_name: T,
+ partition_name: Option<P>,
) -> Self {
Self {
table_path: Arc::new(TablePath::new(database_name, table_name)),
- partition_name,
+ partition_name: partition_name.map(|p| p.into()),
}
}
@@ -1122,7 +1157,7 @@ impl TableInfo {
.custom_properties(self.custom_properties.clone());
if let Some(comment) = &self.comment {
- builder = builder.comment(&comment.clone());
+ builder = builder.comment(comment.clone());
}
builder.build()
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index b798896..fe2f2f4 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -1590,8 +1590,8 @@ mod tests {
assert_eq!(
to_arrow_type(&DataTypes::row(vec![
- DataTypes::field("f1".to_string(), DataTypes::int()),
- DataTypes::field("f2".to_string(), DataTypes::string()),
+ DataTypes::field("f1", DataTypes::int()),
+ DataTypes::field("f2", DataTypes::string()),
]))
.unwrap(),
ArrowDataType::Struct(arrow_schema::Fields::from(vec![
@@ -1654,8 +1654,8 @@ mod tests {
#[test]
fn projection_rejects_out_of_bounds_index() {
let row_type = RowType::new(vec![
- DataField::new("id".to_string(), DataTypes::int(), None),
- DataField::new("name".to_string(), DataTypes::string(), None),
+ DataField::new("id", DataTypes::int(), None),
+ DataField::new("name", DataTypes::string(), None),
]);
let schema = to_arrow_schema(&row_type).unwrap();
let result = ReadContext::with_projection_pushdown(schema, vec![0, 2],
false);
@@ -1722,7 +1722,7 @@ mod tests {
// Test 1: Rescaling from scale 3 to scale 2
let row_type = RowType::new(vec![DataField::new(
- "amount".to_string(),
+ "amount",
DataTypes::decimal(10, 2),
None,
)]);
@@ -1743,7 +1743,7 @@ mod tests {
// Test 2: Precision overflow (should error)
let row_type = RowType::new(vec![DataField::new(
- "amount".to_string(),
+ "amount",
DataTypes::decimal(5, 2),
None,
)]);
diff --git a/crates/fluss/src/record/kv/kv_record_read_context.rs
b/crates/fluss/src/record/kv/kv_record_read_context.rs
index 9236321..77cdbcb 100644
--- a/crates/fluss/src/record/kv/kv_record_read_context.rs
+++ b/crates/fluss/src/record/kv/kv_record_read_context.rs
@@ -120,7 +120,7 @@ mod tests {
fn new(data_types: Vec<crate::metadata::DataType>) -> Self {
let mut builder = Schema::builder();
for (i, dt) in data_types.iter().enumerate() {
- builder = builder.column(&format!("field{i}"), dt.clone());
+ builder = builder.column(format!("field{i}"), dt.clone());
}
let schema = builder.build().expect("Failed to build schema");
diff --git a/crates/fluss/src/test_utils.rs b/crates/fluss/src/test_utils.rs
index 8e8fbe4..752d422 100644
--- a/crates/fluss/src/test_utils.rs
+++ b/crates/fluss/src/test_utils.rs
@@ -24,11 +24,7 @@ use std::collections::HashMap;
use std::sync::Arc;
pub(crate) fn build_table_info(table_path: TablePath, table_id: i64, buckets:
i32) -> TableInfo {
- let row_type = DataTypes::row(vec![DataField::new(
- "id".to_string(),
- DataTypes::int(),
- None,
- )]);
+ let row_type = DataTypes::row(vec![DataField::new("id", DataTypes::int(),
None)]);
let mut schema_builder = Schema::builder().with_row_type(&row_type);
let schema = schema_builder.build().expect("schema build");
let table_descriptor = TableDescriptor::builder()
diff --git a/crates/fluss/tests/integration/admin.rs
b/crates/fluss/tests/integration/admin.rs
index e94b67c..c6c98b8 100644
--- a/crates/fluss/tests/integration/admin.rs
+++ b/crates/fluss/tests/integration/admin.rs
@@ -64,13 +64,7 @@ mod admin_test {
let db_descriptor = DatabaseDescriptorBuilder::default()
.comment("test_db")
- .custom_properties(
- [
- ("k1".to_string(), "v1".to_string()),
- ("k2".to_string(), "v2".to_string()),
- ]
- .into(),
- )
+ .custom_properties([("k1", "v1"), ("k2", "v2")].into())
.build();
let db_name = "test_create_database";
@@ -128,7 +122,7 @@ mod admin_test {
.expect("Failed to create test database");
let test_table_name = "test_user_table";
- let table_path = TablePath::new(test_db_name.to_string(),
test_table_name.to_string());
+ let table_path = TablePath::new(test_db_name, test_table_name);
// build table schema
let table_schema = Schema::builder()
@@ -250,25 +244,21 @@ mod admin_test {
.expect("Failed to create test database");
let test_table_name = "partitioned_table";
- let table_path = TablePath::new(test_db_name.to_string(),
test_table_name.to_string());
+ let table_path = TablePath::new(test_db_name, test_table_name);
let table_schema = Schema::builder()
.column("id", DataTypes::int())
.column("name", DataTypes::string())
.column("dt", DataTypes::string())
.column("region", DataTypes::string())
- .primary_key(vec![
- "id".to_string(),
- "dt".to_string(),
- "region".to_string(),
- ])
+ .primary_key(vec!["id", "dt", "region"])
.build()
.expect("Failed to build table schema");
let table_descriptor = TableDescriptor::builder()
.schema(table_schema)
.distributed_by(Some(3), vec!["id".to_string()])
- .partitioned_by(vec!["dt".to_string(), "region".to_string()])
+ .partitioned_by(vec!["dt", "region"])
.property("table.replication.factor", "1")
.log_format(LogFormat::ARROW)
.kv_format(KvFormat::COMPACTED)
@@ -291,8 +281,8 @@ mod admin_test {
);
let mut partition_values = HashMap::new();
- partition_values.insert("dt".to_string(), "2024-01-15".to_string());
- partition_values.insert("region".to_string(), "EMEA".to_string());
+ partition_values.insert("dt", "2024-01-15");
+ partition_values.insert("region", "EMEA");
let partition_spec = PartitionSpec::new(partition_values);
admin
@@ -317,7 +307,7 @@ mod admin_test {
// list with partial spec filter - should find the partition
let mut partition_values = HashMap::new();
- partition_values.insert("dt".to_string(), "2024-01-15".to_string());
+ partition_values.insert("dt", "2024-01-15");
let partial_partition_spec = PartitionSpec::new(partition_values);
let partitions_with_spec = admin
@@ -337,7 +327,7 @@ mod admin_test {
// list with non-matching spec - should find no partitions
let mut non_matching_values = HashMap::new();
- non_matching_values.insert("dt".to_string(), "2024-01-16".to_string());
+ non_matching_values.insert("dt", "2024-01-16");
let non_matching_spec = PartitionSpec::new(non_matching_values);
let partitions_non_matching = admin
.list_partition_infos_with_spec(&table_path,
Some(&non_matching_spec))
@@ -382,7 +372,7 @@ mod admin_test {
.await
.expect("Failed to get admin client");
- let table_path = TablePath::new("fluss".to_string(),
"not_exist".to_string());
+ let table_path = TablePath::new("fluss", "not_exist");
let result = admin.get_table(&table_path).await;
assert!(result.is_err(), "Expected error but got Ok");
diff --git a/crates/fluss/tests/integration/kv_table.rs
b/crates/fluss/tests/integration/kv_table.rs
index 3691d65..c419ed9 100644
--- a/crates/fluss/tests/integration/kv_table.rs
+++ b/crates/fluss/tests/integration/kv_table.rs
@@ -66,7 +66,7 @@ mod kv_table_test {
let admin = connection.get_admin().await.expect("Failed to get admin");
- let table_path = TablePath::new("fluss".to_string(),
"test_upsert_and_lookup".to_string());
+ let table_path = TablePath::new("fluss", "test_upsert_and_lookup");
let table_descriptor = TableDescriptor::builder()
.schema(
@@ -74,7 +74,7 @@ mod kv_table_test {
.column("id", DataTypes::int())
.column("name", DataTypes::string())
.column("age", DataTypes::bigint())
- .primary_key(vec!["id".to_string()])
+ .primary_key(vec!["id"])
.build()
.expect("Failed to build schema"),
)
@@ -223,7 +223,7 @@ mod kv_table_test {
let admin = connection.get_admin().await.expect("Failed to get admin");
- let table_path = TablePath::new("fluss".to_string(),
"test_composite_pk".to_string());
+ let table_path = TablePath::new("fluss", "test_composite_pk");
let table_descriptor = TableDescriptor::builder()
.schema(
@@ -231,7 +231,7 @@ mod kv_table_test {
.column("region", DataTypes::string())
.column("user_id", DataTypes::int())
.column("score", DataTypes::bigint())
- .primary_key(vec!["region".to_string(),
"user_id".to_string()])
+ .primary_key(vec!["region", "user_id"])
.build()
.expect("Failed to build schema"),
)
@@ -335,7 +335,7 @@ mod kv_table_test {
let admin = connection.get_admin().await.expect("Failed to get admin");
- let table_path = TablePath::new("fluss".to_string(),
"test_partial_update".to_string());
+ let table_path = TablePath::new("fluss", "test_partial_update");
let table_descriptor = TableDescriptor::builder()
.schema(
@@ -344,7 +344,7 @@ mod kv_table_test {
.column("name", DataTypes::string())
.column("age", DataTypes::bigint())
.column("score", DataTypes::bigint())
- .primary_key(vec!["id".to_string()])
+ .primary_key(vec!["id"])
.build()
.expect("Failed to build schema"),
)
@@ -446,8 +446,7 @@ mod kv_table_test {
let admin = connection.get_admin().await.expect("Failed to get admin");
- let table_path =
- TablePath::new("fluss".to_string(),
"test_partitioned_kv_table".to_string());
+ let table_path = TablePath::new("fluss", "test_partitioned_kv_table");
// Create a partitioned KV table with region as partition key
let table_descriptor = TableDescriptor::builder()
@@ -457,11 +456,11 @@ mod kv_table_test {
.column("user_id", DataTypes::int())
.column("name", DataTypes::string())
.column("score", DataTypes::bigint())
- .primary_key(vec!["region".to_string(),
"user_id".to_string()])
+ .primary_key(vec!["region", "user_id"])
.build()
.expect("Failed to build schema"),
)
- .partitioned_by(vec!["region".to_string()])
+ .partitioned_by(vec!["region"])
.build()
.expect("Failed to build table");
@@ -614,7 +613,7 @@ mod kv_table_test {
let admin = connection.get_admin().await.expect("Failed to get admin");
- let table_path = TablePath::new("fluss".to_string(),
"test_all_datatypes".to_string());
+ let table_path = TablePath::new("fluss", "test_all_datatypes");
// Create a table with all supported primitive datatypes
let table_descriptor = TableDescriptor::builder()
@@ -645,7 +644,7 @@ mod kv_table_test {
// Binary types
.column("col_bytes", DataTypes::bytes())
.column("col_binary", DataTypes::binary(20))
- .primary_key(vec!["pk_int".to_string()])
+ .primary_key(vec!["pk_int"])
.build()
.expect("Failed to build schema"),
)
diff --git a/crates/fluss/tests/integration/log_table.rs
b/crates/fluss/tests/integration/log_table.rs
index 27b4d83..514df82 100644
--- a/crates/fluss/tests/integration/log_table.rs
+++ b/crates/fluss/tests/integration/log_table.rs
@@ -69,10 +69,7 @@ mod table_test {
let admin = connection.get_admin().await.expect("Failed to get admin");
- let table_path = TablePath::new(
- "fluss".to_string(),
- "test_append_record_batch_and_scan".to_string(),
- );
+ let table_path = TablePath::new("fluss",
"test_append_record_batch_and_scan");
let table_descriptor = TableDescriptor::builder()
.schema(
@@ -171,7 +168,7 @@ mod table_test {
let admin = connection.get_admin().await.expect("Failed to get admin");
- let table_path = TablePath::new("fluss".to_string(),
"test_list_offsets".to_string());
+ let table_path = TablePath::new("fluss", "test_list_offsets");
let table_descriptor = TableDescriptor::builder()
.schema(
@@ -295,7 +292,7 @@ mod table_test {
let admin = connection.get_admin().await.expect("Failed to get admin");
- let table_path = TablePath::new("fluss".to_string(),
"test_project".to_string());
+ let table_path = TablePath::new("fluss", "test_project");
let table_descriptor = TableDescriptor::builder()
.schema(
@@ -451,7 +448,7 @@ mod table_test {
let connection = cluster.get_fluss_connection().await;
let admin = connection.get_admin().await.expect("Failed to get admin");
- let table_path = TablePath::new("fluss".to_string(),
"test_poll_batches".to_string());
+ let table_path = TablePath::new("fluss", "test_poll_batches");
let schema = Schema::builder()
.column("id", DataTypes::int())
.column("name", DataTypes::string())
@@ -595,7 +592,7 @@ mod table_test {
let admin = connection.get_admin().await.expect("Failed to get admin");
- let table_path = TablePath::new("fluss".to_string(),
"test_log_all_datatypes".to_string());
+ let table_path = TablePath::new("fluss", "test_log_all_datatypes");
// Create a log table with all supported datatypes for append/scan
let table_descriptor = TableDescriptor::builder()
@@ -983,10 +980,7 @@ mod table_test {
let admin = connection.get_admin().await.expect("Failed to get admin");
- let table_path = TablePath::new(
- "fluss".to_string(),
- "test_partitioned_log_append".to_string(),
- );
+ let table_path = TablePath::new("fluss",
"test_partitioned_log_append");
// Create a partitioned log table
let table_descriptor = TableDescriptor::builder()
@@ -998,7 +992,7 @@ mod table_test {
.build()
.expect("Failed to build schema"),
)
- .partitioned_by(vec!["region".to_string()])
+ .partitioned_by(vec!["region"])
.build()
.expect("Failed to build table");
diff --git a/crates/fluss/tests/integration/table_remote_scan.rs
b/crates/fluss/tests/integration/table_remote_scan.rs
index ce0c137..0efe388 100644
--- a/crates/fluss/tests/integration/table_remote_scan.rs
+++ b/crates/fluss/tests/integration/table_remote_scan.rs
@@ -115,10 +115,7 @@ mod table_remote_scan_test {
let admin = connection.get_admin().await.expect("Failed to get admin");
- let table_path = TablePath::new(
- "fluss".to_string(),
- "test_append_record_batch_and_scan".to_string(),
- );
+ let table_path = TablePath::new("fluss",
"test_append_record_batch_and_scan");
let table_descriptor = TableDescriptor::builder()
.schema(
diff --git a/crates/fluss/tests/integration/utils.rs
b/crates/fluss/tests/integration/utils.rs
index fd5145a..ae61d3a 100644
--- a/crates/fluss/tests/integration/utils.rs
+++ b/crates/fluss/tests/integration/utils.rs
@@ -119,7 +119,7 @@ pub async fn create_partitions(
) {
for value in partition_values {
let mut partition_map = HashMap::new();
- partition_map.insert(partition_column.to_string(), value.to_string());
+ partition_map.insert(partition_column, *value);
admin
.create_partition(table_path, &PartitionSpec::new(partition_map),
true)
.await