This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new ece3834  chore: accept Into<String> to improve api usage (#234)
ece3834 is described below

commit ece3834593fa43021bc93079997098af6ca08596
Author: yuxia Luo <[email protected]>
AuthorDate: Wed Feb 4 07:43:24 2026 +0800

    chore: accept Into<String> to improve api usage (#234)
---
 bindings/python/src/metadata.rs                    |   6 +-
 crates/examples/src/example_kv_table.rs            |   4 +-
 .../examples/src/example_partitioned_kv_table.rs   |  10 +-
 crates/examples/src/example_table.rs               |   2 +-
 crates/fluss/src/client/table/log_fetch_buffer.rs  |  10 +-
 crates/fluss/src/client/table/partition_getter.rs  |  46 ++------
 crates/fluss/src/client/table/upsert.rs            |  30 +++---
 crates/fluss/src/metadata/database.rs              |  18 ++--
 crates/fluss/src/metadata/datatype.rs              |  36 ++++---
 crates/fluss/src/metadata/json_serde.rs            |  16 +--
 crates/fluss/src/metadata/partition.rs             |  10 +-
 crates/fluss/src/metadata/table.rs                 | 117 +++++++++++++--------
 crates/fluss/src/record/arrow.rs                   |  12 +--
 .../fluss/src/record/kv/kv_record_read_context.rs  |   2 +-
 crates/fluss/src/test_utils.rs                     |   6 +-
 crates/fluss/tests/integration/admin.rs            |  30 ++----
 crates/fluss/tests/integration/kv_table.rs         |  23 ++--
 crates/fluss/tests/integration/log_table.rs        |  20 ++--
 .../fluss/tests/integration/table_remote_scan.rs   |   5 +-
 crates/fluss/tests/integration/utils.rs            |   2 +-
 20 files changed, 194 insertions(+), 211 deletions(-)

diff --git a/bindings/python/src/metadata.rs b/bindings/python/src/metadata.rs
index bc5f288..235df56 100644
--- a/bindings/python/src/metadata.rs
+++ b/bindings/python/src/metadata.rs
@@ -218,11 +218,11 @@ impl TableDescriptor {
         schema: &Schema, // fluss schema
         kwargs: Option<&Bound<'_, PyDict>>,
     ) -> PyResult<Self> {
-        let mut partition_keys = Vec::new();
+        let mut partition_keys: Vec<String> = Vec::new();
         let mut bucket_count = None;
         let mut bucket_keys = Vec::new();
-        let mut properties = std::collections::HashMap::new();
-        let mut custom_properties = std::collections::HashMap::new();
+        let mut properties: HashMap<String, String> = HashMap::new();
+        let mut custom_properties: HashMap<String, String> = HashMap::new();
         let mut comment: Option<String> = None;
         let mut log_format = None;
         let mut kv_format = None;
diff --git a/crates/examples/src/example_kv_table.rs 
b/crates/examples/src/example_kv_table.rs
index 325d842..042e384 100644
--- a/crates/examples/src/example_kv_table.rs
+++ b/crates/examples/src/example_kv_table.rs
@@ -36,12 +36,12 @@ pub async fn main() -> Result<()> {
                 .column("id", DataTypes::int())
                 .column("name", DataTypes::string())
                 .column("age", DataTypes::bigint())
-                .primary_key(vec!["id".to_string()])
+                .primary_key(vec!["id"])
                 .build()?,
         )
         .build()?;
 
-    let table_path = TablePath::new("fluss".to_owned(), 
"rust_upsert_lookup_example".to_owned());
+    let table_path = TablePath::new("fluss", "rust_upsert_lookup_example");
 
     let admin = conn.get_admin().await?;
     admin
diff --git a/crates/examples/src/example_partitioned_kv_table.rs 
b/crates/examples/src/example_partitioned_kv_table.rs
index ab28758..d1b6814 100644
--- a/crates/examples/src/example_partitioned_kv_table.rs
+++ b/crates/examples/src/example_partitioned_kv_table.rs
@@ -38,17 +38,13 @@ pub async fn main() -> Result<()> {
                 .column("region", DataTypes::string())
                 .column("zone", DataTypes::bigint())
                 .column("score", DataTypes::bigint())
-                .primary_key(vec![
-                    "id".to_string(),
-                    "region".to_string(),
-                    "zone".to_string(),
-                ])
+                .primary_key(vec!["id", "region", "zone"])
                 .build()?,
         )
-        .partitioned_by(vec!["region".to_string(), "zone".to_string()])
+        .partitioned_by(vec!["region", "zone"])
         .build()?;
 
-    let table_path = TablePath::new("fluss".to_owned(), 
"partitioned_kv_example".to_owned());
+    let table_path = TablePath::new("fluss", "partitioned_kv_example");
 
     let mut admin = conn.get_admin().await?;
     admin
diff --git a/crates/examples/src/example_table.rs 
b/crates/examples/src/example_table.rs
index ae21b1b..733b13e 100644
--- a/crates/examples/src/example_table.rs
+++ b/crates/examples/src/example_table.rs
@@ -44,7 +44,7 @@ pub async fn main() -> Result<()> {
         )
         .build()?;
 
-    let table_path = TablePath::new("fluss".to_owned(), 
"rust_test_long".to_owned());
+    let table_path = TablePath::new("fluss", "rust_test_long");
 
     let admin = conn.get_admin().await?;
 
diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs 
b/crates/fluss/src/client/table/log_fetch_buffer.rs
index b622f19..edab91d 100644
--- a/crates/fluss/src/client/table/log_fetch_buffer.rs
+++ b/crates/fluss/src/client/table/log_fetch_buffer.rs
@@ -840,11 +840,7 @@ mod tests {
     use std::sync::Arc;
 
     fn test_read_context() -> Result<ReadContext> {
-        let row_type = RowType::new(vec![DataField::new(
-            "id".to_string(),
-            DataTypes::int(),
-            None,
-        )]);
+        let row_type = RowType::new(vec![DataField::new("id", 
DataTypes::int(), None)]);
         Ok(ReadContext::new(to_arrow_schema(&row_type)?, false))
     }
 
@@ -897,8 +893,8 @@ mod tests {
     #[test]
     fn default_completed_fetch_reads_records() -> Result<()> {
         let row_type = RowType::new(vec![
-            DataField::new("id".to_string(), DataTypes::int(), None),
-            DataField::new("name".to_string(), DataTypes::string(), None),
+            DataField::new("id", DataTypes::int(), None),
+            DataField::new("name", DataTypes::string(), None),
         ]);
         let table_path = TablePath::new("db".to_string(), "tbl".to_string());
         let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1));
diff --git a/crates/fluss/src/client/table/partition_getter.rs 
b/crates/fluss/src/client/table/partition_getter.rs
index 9136801..a1aad2d 100644
--- a/crates/fluss/src/client/table/partition_getter.rs
+++ b/crates/fluss/src/client/table/partition_getter.rs
@@ -109,12 +109,8 @@ mod tests {
     #[test]
     fn test_partition_getter_single_key() {
         let row_type = RowType::new(vec![
-            DataField::new("id".to_string(), DataType::Int(IntType::new()), 
None),
-            DataField::new(
-                "region".to_string(),
-                DataType::String(StringType::new()),
-                None,
-            ),
+            DataField::new("id", DataType::Int(IntType::new()), None),
+            DataField::new("region", DataType::String(StringType::new()), 
None),
         ]);
 
         let getter = PartitionGetter::new(&row_type, 
Arc::from(["region".to_string()]))
@@ -128,17 +124,9 @@ mod tests {
     #[test]
     fn test_partition_getter_multiple_keys() {
         let row_type = RowType::new(vec![
-            DataField::new("id".to_string(), DataType::Int(IntType::new()), 
None),
-            DataField::new(
-                "date".to_string(),
-                DataType::String(StringType::new()),
-                None,
-            ),
-            DataField::new(
-                "region".to_string(),
-                DataType::String(StringType::new()),
-                None,
-            ),
+            DataField::new("id", DataType::Int(IntType::new()), None),
+            DataField::new("date", DataType::String(StringType::new()), None),
+            DataField::new("region", DataType::String(StringType::new()), 
None),
         ]);
 
         let getter = PartitionGetter::new(
@@ -159,7 +147,7 @@ mod tests {
     #[test]
     fn test_partition_getter_invalid_column() {
         let row_type = RowType::new(vec![DataField::new(
-            "id".to_string(),
+            "id",
             DataType::Int(IntType::new()),
             None,
         )]);
@@ -171,12 +159,8 @@ mod tests {
     #[test]
     fn test_partition_getter_null_value() {
         let row_type = RowType::new(vec![
-            DataField::new("id".to_string(), DataType::Int(IntType::new()), 
None),
-            DataField::new(
-                "region".to_string(),
-                DataType::String(StringType::new()),
-                None,
-            ),
+            DataField::new("id", DataType::Int(IntType::new()), None),
+            DataField::new("region", DataType::String(StringType::new()), 
None),
         ]);
 
         let getter = PartitionGetter::new(&row_type, 
Arc::from(["region".to_string()]))
@@ -190,17 +174,9 @@ mod tests {
     #[test]
     fn test_get_partition_spec() {
         let row_type = RowType::new(vec![
-            DataField::new("id".to_string(), DataType::Int(IntType::new()), 
None),
-            DataField::new(
-                "date".to_string(),
-                DataType::String(StringType::new()),
-                None,
-            ),
-            DataField::new(
-                "region".to_string(),
-                DataType::String(StringType::new()),
-                None,
-            ),
+            DataField::new("id", DataType::Int(IntType::new()), None),
+            DataField::new("date", DataType::String(StringType::new()), None),
+            DataField::new("region", DataType::String(StringType::new()), 
None),
         ]);
 
         let getter = PartitionGetter::new(
diff --git a/crates/fluss/src/client/table/upsert.rs 
b/crates/fluss/src/client/table/upsert.rs
index 0595397..bb6c651 100644
--- a/crates/fluss/src/client/table/upsert.rs
+++ b/crates/fluss/src/client/table/upsert.rs
@@ -403,8 +403,8 @@ mod tests {
     fn sanity_check() {
         // No target columns specified but table has auto-increment column
         let fields = vec![
-            DataField::new("id".to_string(), 
DataTypes::int().as_non_nullable(), None),
-            DataField::new("name".to_string(), DataTypes::string(), None),
+            DataField::new("id", DataTypes::int().as_non_nullable(), None),
+            DataField::new("name", DataTypes::string(), None),
         ];
         let row_type = RowType::new(fields);
         let primary_keys = vec!["id".to_string()];
@@ -424,9 +424,9 @@ mod tests {
 
         // Target columns do not contain primary key
         let fields = vec![
-            DataField::new("id".to_string(), 
DataTypes::int().as_non_nullable(), None),
-            DataField::new("name".to_string(), DataTypes::string(), None),
-            DataField::new("value".to_string(), DataTypes::int(), None),
+            DataField::new("id", DataTypes::int().as_non_nullable(), None),
+            DataField::new("name", DataTypes::string(), None),
+            DataField::new("value", DataTypes::int(), None),
         ];
         let row_type = RowType::new(fields);
         let primary_keys = vec!["id".to_string()];
@@ -449,8 +449,8 @@ mod tests {
 
         // Primary key column not found in row type
         let fields = vec![
-            DataField::new("id".to_string(), 
DataTypes::int().as_non_nullable(), None),
-            DataField::new("name".to_string(), DataTypes::string(), None),
+            DataField::new("id", DataTypes::int().as_non_nullable(), None),
+            DataField::new("name", DataTypes::string(), None),
         ];
         let row_type = RowType::new(fields);
         let primary_keys = vec!["nonexistent_pk".to_string()];
@@ -473,13 +473,9 @@ mod tests {
 
         // Target columns include auto-increment column
         let fields = vec![
-            DataField::new("id".to_string(), 
DataTypes::int().as_non_nullable(), None),
-            DataField::new(
-                "seq".to_string(),
-                DataTypes::bigint().as_non_nullable(),
-                None,
-            ),
-            DataField::new("name".to_string(), DataTypes::string(), None),
+            DataField::new("id", DataTypes::int().as_non_nullable(), None),
+            DataField::new("seq", DataTypes::bigint().as_non_nullable(), None),
+            DataField::new("name", DataTypes::string(), None),
         ];
         let row_type = RowType::new(fields);
         let primary_keys = vec!["id".to_string()];
@@ -499,13 +495,13 @@ mod tests {
 
         // Non-nullable column not in target columns (partial update requires 
nullable)
         let fields = vec![
-            DataField::new("id".to_string(), 
DataTypes::int().as_non_nullable(), None),
+            DataField::new("id", DataTypes::int().as_non_nullable(), None),
             DataField::new(
-                "required_field".to_string(),
+                "required_field",
                 DataTypes::string().as_non_nullable(),
                 None,
             ),
-            DataField::new("optional_field".to_string(), DataTypes::int(), 
None),
+            DataField::new("optional_field", DataTypes::int(), None),
         ];
         let row_type = RowType::new(fields);
         let primary_keys = vec!["id".to_string()];
diff --git a/crates/fluss/src/metadata/database.rs 
b/crates/fluss/src/metadata/database.rs
index fad1498..15fefb5 100644
--- a/crates/fluss/src/metadata/database.rs
+++ b/crates/fluss/src/metadata/database.rs
@@ -89,19 +89,23 @@ impl DatabaseDescriptor {
 }
 
 impl DatabaseDescriptorBuilder {
-    pub fn comment(mut self, comment: &str) -> Self {
-        self.comment = Some(comment.to_string());
+    pub fn comment<C: Into<String>>(mut self, comment: C) -> Self {
+        self.comment = Some(comment.into());
         self
     }
 
-    pub fn custom_properties(mut self, properties: HashMap<String, String>) -> 
Self {
-        self.custom_properties = properties;
+    pub fn custom_properties<K: Into<String>, V: Into<String>>(
+        mut self,
+        properties: HashMap<K, V>,
+    ) -> Self {
+        for (k, v) in properties {
+            self.custom_properties.insert(k.into(), v.into());
+        }
         self
     }
 
-    pub fn custom_property(mut self, key: &str, value: &str) -> Self {
-        self.custom_properties
-            .insert(key.to_string(), value.to_string());
+    pub fn custom_property<K: Into<String>, V: Into<String>>(mut self, key: K, 
value: V) -> Self {
+        self.custom_properties.insert(key.into(), value.into());
         self
     }
 
diff --git a/crates/fluss/src/metadata/datatype.rs 
b/crates/fluss/src/metadata/datatype.rs
index 6431d3a..3da270b 100644
--- a/crates/fluss/src/metadata/datatype.rs
+++ b/crates/fluss/src/metadata/datatype.rs
@@ -1114,13 +1114,13 @@ impl DataTypes {
     }
 
     /// Field definition with field name and data type.
-    pub fn field(name: String, data_type: DataType) -> DataField {
+    pub fn field<N: Into<String>>(name: N, data_type: DataType) -> DataField {
         DataField::new(name, data_type, None)
     }
 
     /// Field definition with field name, data type, and a description.
-    pub fn field_with_description(
-        name: String,
+    pub fn field_with_description<N: Into<String>>(
+        name: N,
         data_type: DataType,
         description: String,
     ) -> DataField {
@@ -1151,9 +1151,13 @@ pub struct DataField {
 }
 
 impl DataField {
-    pub fn new(name: String, data_type: DataType, description: Option<String>) 
-> DataField {
+    pub fn new<N: Into<String>>(
+        name: N,
+        data_type: DataType,
+        description: Option<String>,
+    ) -> DataField {
         DataField {
-            name,
+            name: name.into(),
             data_type,
             description,
         }
@@ -1318,13 +1322,13 @@ fn test_map_display() {
 #[test]
 fn test_row_display() {
     let fields = vec![
-        DataTypes::field("id".to_string(), DataTypes::int()),
-        DataTypes::field("name".to_string(), DataTypes::string()),
+        DataTypes::field("id", DataTypes::int()),
+        DataTypes::field("name", DataTypes::string()),
     ];
     let row_type = RowType::new(fields);
     assert_eq!(row_type.to_string(), "ROW<id INT, name STRING>");
 
-    let fields_non_null = vec![DataTypes::field("age".to_string(), 
DataTypes::bigint())];
+    let fields_non_null = vec![DataTypes::field("age", DataTypes::bigint())];
     let row_type_non_null = RowType::with_nullable(false, fields_non_null);
     assert_eq!(row_type_non_null.to_string(), "ROW<age BIGINT> NOT NULL");
 }
@@ -1354,23 +1358,23 @@ fn test_datatype_display() {
 
 #[test]
 fn test_datafield_display() {
-    let field = DataTypes::field("user_id".to_string(), DataTypes::bigint());
+    let field = DataTypes::field("user_id", DataTypes::bigint());
     assert_eq!(field.to_string(), "user_id BIGINT");
 
-    let field2 = DataTypes::field("email".to_string(), DataTypes::string());
+    let field2 = DataTypes::field("email", DataTypes::string());
     assert_eq!(field2.to_string(), "email STRING");
 
-    let field3 = DataTypes::field("score".to_string(), DataTypes::decimal(10, 
2));
+    let field3 = DataTypes::field("score", DataTypes::decimal(10, 2));
     assert_eq!(field3.to_string(), "score DECIMAL(10, 2)");
 }
 
 #[test]
 fn test_complex_nested_display() {
     let row_type = DataTypes::row(vec![
-        DataTypes::field("id".to_string(), DataTypes::int()),
-        DataTypes::field("tags".to_string(), 
DataTypes::array(DataTypes::string())),
+        DataTypes::field("id", DataTypes::int()),
+        DataTypes::field("tags", DataTypes::array(DataTypes::string())),
         DataTypes::field(
-            "metadata".to_string(),
+            "metadata",
             DataTypes::map(DataTypes::string(), DataTypes::string()),
         ),
     ]);
@@ -1394,8 +1398,8 @@ fn test_deeply_nested_types() {
     let nested = DataTypes::array(DataTypes::map(
         DataTypes::string(),
         DataTypes::row(vec![
-            DataTypes::field("x".to_string(), DataTypes::int()),
-            DataTypes::field("y".to_string(), DataTypes::int()),
+            DataTypes::field("x", DataTypes::int()),
+            DataTypes::field("y", DataTypes::int()),
         ]),
     ));
     assert_eq!(nested.to_string(), "ARRAY<MAP<STRING, ROW<x INT, y INT>>>");
diff --git a/crates/fluss/src/metadata/json_serde.rs 
b/crates/fluss/src/metadata/json_serde.rs
index d0d56ef..d58fb7e 100644
--- a/crates/fluss/src/metadata/json_serde.rs
+++ b/crates/fluss/src/metadata/json_serde.rs
@@ -403,8 +403,7 @@ impl JsonSerde for Column {
             .and_then(|v| v.as_str())
             .ok_or_else(|| Error::JsonSerdeError {
                 message: format!("Missing required field: {}", Self::NAME),
-            })?
-            .to_string();
+            })?;
 
         let data_type_node = node
             .get(Self::DATA_TYPE)
@@ -414,7 +413,7 @@ impl JsonSerde for Column {
 
         let data_type = DataType::deserialize_json(data_type_node)?;
 
-        let mut column = Column::new(&name, data_type);
+        let mut column = Column::new(name, data_type);
 
         if let Some(comment) = node.get(Self::COMMENT).and_then(|v| 
v.as_str()) {
             column = column.with_comment(comment);
@@ -483,14 +482,9 @@ impl JsonSerde for Schema {
 
             let mut primary_keys = Vec::with_capacity(pk_array.len());
             for name_node in pk_array {
-                primary_keys.push(
-                    name_node
-                        .as_str()
-                        .ok_or_else(|| Error::InvalidTableError {
-                            message: "Primary key element must be a 
string".to_string(),
-                        })?
-                        .to_string(),
-                );
+                primary_keys.push(name_node.as_str().ok_or_else(|| 
Error::InvalidTableError {
+                    message: "Primary key element must be a 
string".to_string(),
+                })?);
             }
 
             schema_builder = schema_builder.primary_key(primary_keys);
diff --git a/crates/fluss/src/metadata/partition.rs 
b/crates/fluss/src/metadata/partition.rs
index bc1935c..1840235 100644
--- a/crates/fluss/src/metadata/partition.rs
+++ b/crates/fluss/src/metadata/partition.rs
@@ -31,8 +31,14 @@ pub struct PartitionSpec {
 }
 
 impl PartitionSpec {
-    pub fn new(partition_spec: HashMap<String, String>) -> Self {
-        Self { partition_spec }
+    pub fn new<K: Into<String>, V: Into<String>>(partition_spec: HashMap<K, 
V>) -> Self {
+        let mut new_map = HashMap::new();
+        for (k, v) in partition_spec {
+            new_map.insert(k.into(), v.into());
+        }
+        Self {
+            partition_spec: new_map,
+        }
     }
 
     pub fn get_spec_map(&self) -> &HashMap<String, String> {
diff --git a/crates/fluss/src/metadata/table.rs 
b/crates/fluss/src/metadata/table.rs
index ce362c4..908f446 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -36,16 +36,16 @@ pub struct Column {
 }
 
 impl Column {
-    pub fn new(name: &str, data_type: DataType) -> Self {
+    pub fn new<N: Into<String>>(name: N, data_type: DataType) -> Self {
         Self {
-            name: name.to_string(),
+            name: name.into(),
             data_type,
             comment: None,
         }
     }
 
-    pub fn with_comment(mut self, comment: &str) -> Self {
-        self.comment = Some(comment.to_string());
+    pub fn with_comment<C: Into<String>>(mut self, comment: C) -> Self {
+        self.comment = Some(comment.into());
         self
     }
 
@@ -78,9 +78,9 @@ pub struct PrimaryKey {
 }
 
 impl PrimaryKey {
-    pub fn new(constraint_name: &str, column_names: Vec<String>) -> Self {
+    pub fn new<N: Into<String>>(constraint_name: N, column_names: Vec<String>) 
-> Self {
         Self {
-            constraint_name: constraint_name.to_string(),
+            constraint_name: constraint_name.into(),
             column_names,
         }
     }
@@ -178,8 +178,8 @@ impl SchemaBuilder {
         }
     }
 
-    pub fn column(mut self, name: &str, data_type: DataType) -> Self {
-        self.columns.push(Column::new(name, data_type));
+    pub fn column<N: Into<String>>(mut self, name: N, data_type: DataType) -> 
Self {
+        self.columns.push(Column::new(name.into(), data_type));
         self
     }
 
@@ -188,20 +188,34 @@ impl SchemaBuilder {
         self
     }
 
-    pub fn with_comment(mut self, comment: &str) -> Self {
+    pub fn with_comment<C: Into<String>>(mut self, comment: C) -> Self {
         if let Some(last) = self.columns.last_mut() {
-            *last = last.clone().with_comment(comment);
+            *last = last.clone().with_comment(comment.into());
         }
         self
     }
 
-    pub fn primary_key(self, column_names: Vec<String>) -> Self {
-        let constraint_name = format!("PK_{}", column_names.join("_"));
-        self.primary_key_named(&constraint_name, column_names)
+    pub fn primary_key<I, S>(self, column_names: I) -> Self
+    where
+        I: IntoIterator<Item = S>,
+        S: Into<String>,
+    {
+        let names: Vec<String> = column_names.into_iter().map(|s| 
s.into()).collect();
+
+        let constraint_name = format!("PK_{}", names.join("_"));
+
+        self.primary_key_named(&constraint_name, names)
     }
 
-    pub fn primary_key_named(mut self, constraint_name: &str, column_names: 
Vec<String>) -> Self {
-        self.primary_key = Some(PrimaryKey::new(constraint_name, 
column_names));
+    pub fn primary_key_named<N: Into<String>, P: Into<String>>(
+        mut self,
+        constraint_name: N,
+        column_names: Vec<P>,
+    ) -> Self {
+        self.primary_key = Some(PrimaryKey::new(
+            constraint_name.into(),
+            column_names.into_iter().map(|s| s.into()).collect(),
+        ));
         self
     }
 
@@ -209,14 +223,14 @@ impl SchemaBuilder {
     /// whenever a new row is inserted into the table, the new row will be 
assigned with the next
     /// available value from the auto-increment sequence. A table can have at 
most one auto
     /// increment column.
-    pub fn enable_auto_increment(mut self, column_name: &str) -> Result<Self> {
+    pub fn enable_auto_increment<N: Into<String>>(mut self, column_name: N) -> 
Result<Self> {
         if !self.auto_increment_col_names.is_empty() {
             return Err(IllegalArgument {
                 message: "Multiple auto increment columns are not supported 
yet.".to_string(),
             });
         }
 
-        self.auto_increment_col_names.push(column_name.to_string());
+        self.auto_increment_col_names.push(column_name.into());
         Ok(self)
     }
 
@@ -353,29 +367,43 @@ impl TableDescriptorBuilder {
         self
     }
 
-    pub fn property<T: ToString>(mut self, key: &str, value: T) -> Self {
-        self.properties.insert(key.to_string(), value.to_string());
+    pub fn property<K: Into<String>, V: Into<String>>(mut self, key: K, value: 
V) -> Self {
+        self.properties.insert(key.into(), value.into());
         self
     }
 
-    pub fn properties(mut self, properties: HashMap<String, String>) -> Self {
-        self.properties.extend(properties);
+    pub fn properties<K: Into<String>, V: Into<String>>(
+        mut self,
+        properties: HashMap<K, V>,
+    ) -> Self {
+        for (k, v) in properties {
+            self.properties.insert(k.into(), v.into());
+        }
         self
     }
 
-    pub fn custom_property(mut self, key: &str, value: &str) -> Self {
-        self.custom_properties
-            .insert(key.to_string(), value.to_string());
+    pub fn custom_property<K: Into<String>, V: Into<String>>(mut self, key: K, 
value: V) -> Self {
+        self.custom_properties.insert(key.into(), value.into());
         self
     }
 
-    pub fn custom_properties(mut self, custom_properties: HashMap<String, 
String>) -> Self {
-        self.custom_properties.extend(custom_properties);
+    pub fn custom_properties<K: Into<String>, V: Into<String>>(
+        mut self,
+        custom_properties: HashMap<K, V>,
+    ) -> Self {
+        for (k, v) in custom_properties {
+            self.custom_properties.insert(k.into(), v.into());
+        }
         self
     }
 
-    pub fn partitioned_by(mut self, partition_keys: Vec<String>) -> Self {
-        self.partition_keys = Arc::from(partition_keys);
+    pub fn partitioned_by<P: Into<String>>(mut self, partition_keys: Vec<P>) 
-> Self {
+        self.partition_keys = Arc::from(
+            partition_keys
+                .into_iter()
+                .map(|s| s.into())
+                .collect::<Vec<String>>(),
+        );
         self
     }
 
@@ -387,8 +415,8 @@ impl TableDescriptorBuilder {
         self
     }
 
-    pub fn comment(mut self, comment: &str) -> Self {
-        self.comment = Some(comment.to_string());
+    pub fn comment<S: Into<String>>(mut self, comment: S) -> Self {
+        self.comment = Some(comment.into());
         self
     }
 
@@ -487,9 +515,16 @@ impl TableDescriptor {
             })
     }
 
-    pub fn with_properties(&self, new_properties: HashMap<String, String>) -> 
Self {
+    pub fn with_properties<K: Into<String>, V: Into<String>>(
+        &self,
+        new_properties: HashMap<K, V>,
+    ) -> Self {
+        let mut properties = HashMap::new();
+        for (k, v) in new_properties {
+            properties.insert(k.into(), v.into());
+        }
         Self {
-            properties: new_properties,
+            properties,
             ..self.clone()
         }
     }
@@ -684,10 +719,10 @@ const MAX_NAME_LENGTH: usize = 200;
 const INTERNAL_NAME_PREFIX: &str = "__";
 
 impl TablePath {
-    pub fn new(db: String, tbl: String) -> Self {
+    pub fn new<D: Into<String>, T: Into<String>>(db: D, tbl: T) -> Self {
         TablePath {
-            database: db,
-            table: tbl,
+            database: db.into(),
+            table: tbl.into(),
         }
     }
 
@@ -769,14 +804,14 @@ impl PhysicalTablePath {
         }
     }
 
-    pub fn of_with_names(
-        database_name: String,
-        table_name: String,
-        partition_name: Option<String>,
+    pub fn of_with_names<D: Into<String>, T: Into<String>, P: Into<String>>(
+        database_name: D,
+        table_name: T,
+        partition_name: Option<P>,
     ) -> Self {
         Self {
             table_path: Arc::new(TablePath::new(database_name, table_name)),
-            partition_name,
+            partition_name: partition_name.map(|p| p.into()),
         }
     }
 
@@ -1122,7 +1157,7 @@ impl TableInfo {
             .custom_properties(self.custom_properties.clone());
 
         if let Some(comment) = &self.comment {
-            builder = builder.comment(&comment.clone());
+            builder = builder.comment(comment.clone());
         }
 
         builder.build()
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index b798896..fe2f2f4 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -1590,8 +1590,8 @@ mod tests {
 
         assert_eq!(
             to_arrow_type(&DataTypes::row(vec![
-                DataTypes::field("f1".to_string(), DataTypes::int()),
-                DataTypes::field("f2".to_string(), DataTypes::string()),
+                DataTypes::field("f1", DataTypes::int()),
+                DataTypes::field("f2", DataTypes::string()),
             ]))
             .unwrap(),
             ArrowDataType::Struct(arrow_schema::Fields::from(vec![
@@ -1654,8 +1654,8 @@ mod tests {
     #[test]
     fn projection_rejects_out_of_bounds_index() {
         let row_type = RowType::new(vec![
-            DataField::new("id".to_string(), DataTypes::int(), None),
-            DataField::new("name".to_string(), DataTypes::string(), None),
+            DataField::new("id", DataTypes::int(), None),
+            DataField::new("name", DataTypes::string(), None),
         ]);
         let schema = to_arrow_schema(&row_type).unwrap();
         let result = ReadContext::with_projection_pushdown(schema, vec![0, 2], 
false);
@@ -1722,7 +1722,7 @@ mod tests {
 
         // Test 1: Rescaling from scale 3 to scale 2
         let row_type = RowType::new(vec![DataField::new(
-            "amount".to_string(),
+            "amount",
             DataTypes::decimal(10, 2),
             None,
         )]);
@@ -1743,7 +1743,7 @@ mod tests {
 
         // Test 2: Precision overflow (should error)
         let row_type = RowType::new(vec![DataField::new(
-            "amount".to_string(),
+            "amount",
             DataTypes::decimal(5, 2),
             None,
         )]);
diff --git a/crates/fluss/src/record/kv/kv_record_read_context.rs 
b/crates/fluss/src/record/kv/kv_record_read_context.rs
index 9236321..77cdbcb 100644
--- a/crates/fluss/src/record/kv/kv_record_read_context.rs
+++ b/crates/fluss/src/record/kv/kv_record_read_context.rs
@@ -120,7 +120,7 @@ mod tests {
         fn new(data_types: Vec<crate::metadata::DataType>) -> Self {
             let mut builder = Schema::builder();
             for (i, dt) in data_types.iter().enumerate() {
-                builder = builder.column(&format!("field{i}"), dt.clone());
+                builder = builder.column(format!("field{i}"), dt.clone());
             }
             let schema = builder.build().expect("Failed to build schema");
 
diff --git a/crates/fluss/src/test_utils.rs b/crates/fluss/src/test_utils.rs
index 8e8fbe4..752d422 100644
--- a/crates/fluss/src/test_utils.rs
+++ b/crates/fluss/src/test_utils.rs
@@ -24,11 +24,7 @@ use std::collections::HashMap;
 use std::sync::Arc;
 
 pub(crate) fn build_table_info(table_path: TablePath, table_id: i64, buckets: 
i32) -> TableInfo {
-    let row_type = DataTypes::row(vec![DataField::new(
-        "id".to_string(),
-        DataTypes::int(),
-        None,
-    )]);
+    let row_type = DataTypes::row(vec![DataField::new("id", DataTypes::int(), 
None)]);
     let mut schema_builder = Schema::builder().with_row_type(&row_type);
     let schema = schema_builder.build().expect("schema build");
     let table_descriptor = TableDescriptor::builder()
diff --git a/crates/fluss/tests/integration/admin.rs 
b/crates/fluss/tests/integration/admin.rs
index e94b67c..c6c98b8 100644
--- a/crates/fluss/tests/integration/admin.rs
+++ b/crates/fluss/tests/integration/admin.rs
@@ -64,13 +64,7 @@ mod admin_test {
 
         let db_descriptor = DatabaseDescriptorBuilder::default()
             .comment("test_db")
-            .custom_properties(
-                [
-                    ("k1".to_string(), "v1".to_string()),
-                    ("k2".to_string(), "v2".to_string()),
-                ]
-                .into(),
-            )
+            .custom_properties([("k1", "v1"), ("k2", "v2")].into())
             .build();
 
         let db_name = "test_create_database";
@@ -128,7 +122,7 @@ mod admin_test {
             .expect("Failed to create test database");
 
         let test_table_name = "test_user_table";
-        let table_path = TablePath::new(test_db_name.to_string(), 
test_table_name.to_string());
+        let table_path = TablePath::new(test_db_name, test_table_name);
 
         // build table schema
         let table_schema = Schema::builder()
@@ -250,25 +244,21 @@ mod admin_test {
             .expect("Failed to create test database");
 
         let test_table_name = "partitioned_table";
-        let table_path = TablePath::new(test_db_name.to_string(), 
test_table_name.to_string());
+        let table_path = TablePath::new(test_db_name, test_table_name);
 
         let table_schema = Schema::builder()
             .column("id", DataTypes::int())
             .column("name", DataTypes::string())
             .column("dt", DataTypes::string())
             .column("region", DataTypes::string())
-            .primary_key(vec![
-                "id".to_string(),
-                "dt".to_string(),
-                "region".to_string(),
-            ])
+            .primary_key(vec!["id", "dt", "region"])
             .build()
             .expect("Failed to build table schema");
 
         let table_descriptor = TableDescriptor::builder()
             .schema(table_schema)
             .distributed_by(Some(3), vec!["id".to_string()])
-            .partitioned_by(vec!["dt".to_string(), "region".to_string()])
+            .partitioned_by(vec!["dt", "region"])
             .property("table.replication.factor", "1")
             .log_format(LogFormat::ARROW)
             .kv_format(KvFormat::COMPACTED)
@@ -291,8 +281,8 @@ mod admin_test {
         );
 
         let mut partition_values = HashMap::new();
-        partition_values.insert("dt".to_string(), "2024-01-15".to_string());
-        partition_values.insert("region".to_string(), "EMEA".to_string());
+        partition_values.insert("dt", "2024-01-15");
+        partition_values.insert("region", "EMEA");
         let partition_spec = PartitionSpec::new(partition_values);
 
         admin
@@ -317,7 +307,7 @@ mod admin_test {
 
         // list with partial spec filter - should find the partition
         let mut partition_values = HashMap::new();
-        partition_values.insert("dt".to_string(), "2024-01-15".to_string());
+        partition_values.insert("dt", "2024-01-15");
         let partial_partition_spec = PartitionSpec::new(partition_values);
 
         let partitions_with_spec = admin
@@ -337,7 +327,7 @@ mod admin_test {
 
         // list with non-matching spec - should find no partitions
         let mut non_matching_values = HashMap::new();
-        non_matching_values.insert("dt".to_string(), "2024-01-16".to_string());
+        non_matching_values.insert("dt", "2024-01-16");
         let non_matching_spec = PartitionSpec::new(non_matching_values);
         let partitions_non_matching = admin
             .list_partition_infos_with_spec(&table_path, 
Some(&non_matching_spec))
@@ -382,7 +372,7 @@ mod admin_test {
             .await
             .expect("Failed to get admin client");
 
-        let table_path = TablePath::new("fluss".to_string(), 
"not_exist".to_string());
+        let table_path = TablePath::new("fluss", "not_exist");
 
         let result = admin.get_table(&table_path).await;
         assert!(result.is_err(), "Expected error but got Ok");
diff --git a/crates/fluss/tests/integration/kv_table.rs 
b/crates/fluss/tests/integration/kv_table.rs
index 3691d65..c419ed9 100644
--- a/crates/fluss/tests/integration/kv_table.rs
+++ b/crates/fluss/tests/integration/kv_table.rs
@@ -66,7 +66,7 @@ mod kv_table_test {
 
         let admin = connection.get_admin().await.expect("Failed to get admin");
 
-        let table_path = TablePath::new("fluss".to_string(), 
"test_upsert_and_lookup".to_string());
+        let table_path = TablePath::new("fluss", "test_upsert_and_lookup");
 
         let table_descriptor = TableDescriptor::builder()
             .schema(
@@ -74,7 +74,7 @@ mod kv_table_test {
                     .column("id", DataTypes::int())
                     .column("name", DataTypes::string())
                     .column("age", DataTypes::bigint())
-                    .primary_key(vec!["id".to_string()])
+                    .primary_key(vec!["id"])
                     .build()
                     .expect("Failed to build schema"),
             )
@@ -223,7 +223,7 @@ mod kv_table_test {
 
         let admin = connection.get_admin().await.expect("Failed to get admin");
 
-        let table_path = TablePath::new("fluss".to_string(), 
"test_composite_pk".to_string());
+        let table_path = TablePath::new("fluss", "test_composite_pk");
 
         let table_descriptor = TableDescriptor::builder()
             .schema(
@@ -231,7 +231,7 @@ mod kv_table_test {
                     .column("region", DataTypes::string())
                     .column("user_id", DataTypes::int())
                     .column("score", DataTypes::bigint())
-                    .primary_key(vec!["region".to_string(), 
"user_id".to_string()])
+                    .primary_key(vec!["region", "user_id"])
                     .build()
                     .expect("Failed to build schema"),
             )
@@ -335,7 +335,7 @@ mod kv_table_test {
 
         let admin = connection.get_admin().await.expect("Failed to get admin");
 
-        let table_path = TablePath::new("fluss".to_string(), 
"test_partial_update".to_string());
+        let table_path = TablePath::new("fluss", "test_partial_update");
 
         let table_descriptor = TableDescriptor::builder()
             .schema(
@@ -344,7 +344,7 @@ mod kv_table_test {
                     .column("name", DataTypes::string())
                     .column("age", DataTypes::bigint())
                     .column("score", DataTypes::bigint())
-                    .primary_key(vec!["id".to_string()])
+                    .primary_key(vec!["id"])
                     .build()
                     .expect("Failed to build schema"),
             )
@@ -446,8 +446,7 @@ mod kv_table_test {
 
         let admin = connection.get_admin().await.expect("Failed to get admin");
 
-        let table_path =
-            TablePath::new("fluss".to_string(), 
"test_partitioned_kv_table".to_string());
+        let table_path = TablePath::new("fluss", "test_partitioned_kv_table");
 
         // Create a partitioned KV table with region as partition key
         let table_descriptor = TableDescriptor::builder()
@@ -457,11 +456,11 @@ mod kv_table_test {
                     .column("user_id", DataTypes::int())
                     .column("name", DataTypes::string())
                     .column("score", DataTypes::bigint())
-                    .primary_key(vec!["region".to_string(), 
"user_id".to_string()])
+                    .primary_key(vec!["region", "user_id"])
                     .build()
                     .expect("Failed to build schema"),
             )
-            .partitioned_by(vec!["region".to_string()])
+            .partitioned_by(vec!["region"])
             .build()
             .expect("Failed to build table");
 
@@ -614,7 +613,7 @@ mod kv_table_test {
 
         let admin = connection.get_admin().await.expect("Failed to get admin");
 
-        let table_path = TablePath::new("fluss".to_string(), 
"test_all_datatypes".to_string());
+        let table_path = TablePath::new("fluss", "test_all_datatypes");
 
         // Create a table with all supported primitive datatypes
         let table_descriptor = TableDescriptor::builder()
@@ -645,7 +644,7 @@ mod kv_table_test {
                     // Binary types
                     .column("col_bytes", DataTypes::bytes())
                     .column("col_binary", DataTypes::binary(20))
-                    .primary_key(vec!["pk_int".to_string()])
+                    .primary_key(vec!["pk_int"])
                     .build()
                     .expect("Failed to build schema"),
             )
diff --git a/crates/fluss/tests/integration/log_table.rs 
b/crates/fluss/tests/integration/log_table.rs
index 27b4d83..514df82 100644
--- a/crates/fluss/tests/integration/log_table.rs
+++ b/crates/fluss/tests/integration/log_table.rs
@@ -69,10 +69,7 @@ mod table_test {
 
         let admin = connection.get_admin().await.expect("Failed to get admin");
 
-        let table_path = TablePath::new(
-            "fluss".to_string(),
-            "test_append_record_batch_and_scan".to_string(),
-        );
+        let table_path = TablePath::new("fluss", 
"test_append_record_batch_and_scan");
 
         let table_descriptor = TableDescriptor::builder()
             .schema(
@@ -171,7 +168,7 @@ mod table_test {
 
         let admin = connection.get_admin().await.expect("Failed to get admin");
 
-        let table_path = TablePath::new("fluss".to_string(), 
"test_list_offsets".to_string());
+        let table_path = TablePath::new("fluss", "test_list_offsets");
 
         let table_descriptor = TableDescriptor::builder()
             .schema(
@@ -295,7 +292,7 @@ mod table_test {
 
         let admin = connection.get_admin().await.expect("Failed to get admin");
 
-        let table_path = TablePath::new("fluss".to_string(), 
"test_project".to_string());
+        let table_path = TablePath::new("fluss", "test_project");
 
         let table_descriptor = TableDescriptor::builder()
             .schema(
@@ -451,7 +448,7 @@ mod table_test {
         let connection = cluster.get_fluss_connection().await;
         let admin = connection.get_admin().await.expect("Failed to get admin");
 
-        let table_path = TablePath::new("fluss".to_string(), 
"test_poll_batches".to_string());
+        let table_path = TablePath::new("fluss", "test_poll_batches");
         let schema = Schema::builder()
             .column("id", DataTypes::int())
             .column("name", DataTypes::string())
@@ -595,7 +592,7 @@ mod table_test {
 
         let admin = connection.get_admin().await.expect("Failed to get admin");
 
-        let table_path = TablePath::new("fluss".to_string(), 
"test_log_all_datatypes".to_string());
+        let table_path = TablePath::new("fluss", "test_log_all_datatypes");
 
         // Create a log table with all supported datatypes for append/scan
         let table_descriptor = TableDescriptor::builder()
@@ -983,10 +980,7 @@ mod table_test {
 
         let admin = connection.get_admin().await.expect("Failed to get admin");
 
-        let table_path = TablePath::new(
-            "fluss".to_string(),
-            "test_partitioned_log_append".to_string(),
-        );
+        let table_path = TablePath::new("fluss", 
"test_partitioned_log_append");
 
         // Create a partitioned log table
         let table_descriptor = TableDescriptor::builder()
@@ -998,7 +992,7 @@ mod table_test {
                     .build()
                     .expect("Failed to build schema"),
             )
-            .partitioned_by(vec!["region".to_string()])
+            .partitioned_by(vec!["region"])
             .build()
             .expect("Failed to build table");
 
diff --git a/crates/fluss/tests/integration/table_remote_scan.rs 
b/crates/fluss/tests/integration/table_remote_scan.rs
index ce0c137..0efe388 100644
--- a/crates/fluss/tests/integration/table_remote_scan.rs
+++ b/crates/fluss/tests/integration/table_remote_scan.rs
@@ -115,10 +115,7 @@ mod table_remote_scan_test {
 
         let admin = connection.get_admin().await.expect("Failed to get admin");
 
-        let table_path = TablePath::new(
-            "fluss".to_string(),
-            "test_append_record_batch_and_scan".to_string(),
-        );
+        let table_path = TablePath::new("fluss", 
"test_append_record_batch_and_scan");
 
         let table_descriptor = TableDescriptor::builder()
             .schema(
diff --git a/crates/fluss/tests/integration/utils.rs 
b/crates/fluss/tests/integration/utils.rs
index fd5145a..ae61d3a 100644
--- a/crates/fluss/tests/integration/utils.rs
+++ b/crates/fluss/tests/integration/utils.rs
@@ -119,7 +119,7 @@ pub async fn create_partitions(
 ) {
     for value in partition_values {
         let mut partition_map = HashMap::new();
-        partition_map.insert(partition_column.to_string(), value.to_string());
+        partition_map.insert(partition_column, *value);
         admin
             .create_partition(table_path, &PartitionSpec::new(partition_map), 
true)
             .await


Reply via email to