This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new ab9fb84  fix: validate filesystem catalog identifiers (#334)
ab9fb84 is described below

commit ab9fb84aeae008712c127fc6e3005250215813de
Author: Minh Vu <[email protected]>
AuthorDate: Thu May 21 05:40:32 2026 +0200

    fix: validate filesystem catalog identifiers (#334)
---
 crates/paimon/src/catalog/filesystem.rs | 92 +++++++++++++++++++++++++++++++++
 crates/paimon/src/catalog/mod.rs        | 85 +++++++++++++++++++++++++++++-
 2 files changed, 176 insertions(+), 1 deletion(-)

diff --git a/crates/paimon/src/catalog/filesystem.rs 
b/crates/paimon/src/catalog/filesystem.rs
index c544a15..7f18f95 100644
--- a/crates/paimon/src/catalog/filesystem.rs
+++ b/crates/paimon/src/catalog/filesystem.rs
@@ -215,6 +215,8 @@ impl Catalog for FileSystemCatalog {
         ignore_if_exists: bool,
         properties: HashMap<String, String>,
     ) -> Result<()> {
+        Identifier::validate_database_name(name)?;
+
         if properties.contains_key(DB_LOCATION_PROP) {
             return Err(Error::ConfigInvalid {
                 message: "Cannot specify location for a database when using 
fileSystem catalog."
@@ -239,6 +241,8 @@ impl Catalog for FileSystemCatalog {
     }
 
     async fn get_database(&self, name: &str) -> Result<Database> {
+        Identifier::validate_database_name(name)?;
+
         if !self.database_exists(name).await? {
             return Err(Error::DatabaseNotExist {
                 database: name.to_string(),
@@ -254,6 +258,8 @@ impl Catalog for FileSystemCatalog {
         ignore_if_not_exists: bool,
         cascade: bool,
     ) -> Result<()> {
+        Identifier::validate_database_name(name)?;
+
         let path = self.database_path(name);
 
         let database_exists = self.database_exists(name).await?;
@@ -279,6 +285,8 @@ impl Catalog for FileSystemCatalog {
     }
 
     async fn get_table(&self, identifier: &Identifier) -> Result<Table> {
+        identifier.validate()?;
+
         let table_path = self.table_path(identifier);
 
         if !self.table_exists(identifier).await? {
@@ -304,6 +312,8 @@ impl Catalog for FileSystemCatalog {
     }
 
     async fn list_tables(&self, database_name: &str) -> Result<Vec<String>> {
+        Identifier::validate_database_name(database_name)?;
+
         let path = self.database_path(database_name);
 
         if !self.database_exists(database_name).await? {
@@ -321,6 +331,8 @@ impl Catalog for FileSystemCatalog {
         creation: Schema,
         ignore_if_exists: bool,
     ) -> Result<()> {
+        identifier.validate()?;
+
         let table_path = self.table_path(identifier);
 
         let table_exists = self.table_exists(identifier).await?;
@@ -348,6 +360,8 @@ impl Catalog for FileSystemCatalog {
     }
 
     async fn drop_table(&self, identifier: &Identifier, ignore_if_not_exists: 
bool) -> Result<()> {
+        identifier.validate()?;
+
         let table_path = self.table_path(identifier);
 
         let table_exists = self.table_exists(identifier).await?;
@@ -372,6 +386,9 @@ impl Catalog for FileSystemCatalog {
         to: &Identifier,
         ignore_if_not_exists: bool,
     ) -> Result<()> {
+        from.validate()?;
+        to.validate()?;
+
         let from_path = self.table_path(from);
         let to_path = self.table_path(to);
 
@@ -403,6 +420,8 @@ impl Catalog for FileSystemCatalog {
         changes: Vec<crate::spec::SchemaChange>,
         ignore_if_not_exists: bool,
     ) -> Result<()> {
+        identifier.validate()?;
+
         let table_path = self.table_path(identifier);
         if !self.table_exists(identifier).await? {
             if ignore_if_not_exists {
@@ -511,6 +530,31 @@ mod tests {
         assert!(!catalog.database_exists("db1").await.unwrap());
     }
 
+    #[tokio::test]
+    async fn test_create_database_should_reject_path_traversal_name() {
+        let (temp_dir, catalog) = create_test_catalog();
+        let escaped_path = temp_dir.path().join("escaped.db");
+
+        let result = catalog
+            .create_database("../escaped", false, HashMap::new())
+            .await;
+
+        assert!(matches!(result, Err(Error::IdentifierInvalid { .. })));
+        assert!(!escaped_path.exists());
+    }
+
+    #[tokio::test]
+    async fn test_drop_database_should_reject_path_traversal_name() {
+        let (temp_dir, catalog) = create_test_catalog();
+        let escaped_path = temp_dir.path().join("escaped.db");
+        std::fs::create_dir(&escaped_path).unwrap();
+
+        let result = catalog.drop_database("../escaped", false, true).await;
+
+        assert!(matches!(result, Err(Error::IdentifierInvalid { .. })));
+        assert!(escaped_path.exists());
+    }
+
     #[tokio::test]
     async fn test_table_operations() {
         let (_temp_dir, catalog) = create_test_catalog();
@@ -568,6 +612,54 @@ mod tests {
         assert!(!tables.contains(&"table1".to_string()));
     }
 
+    #[tokio::test]
+    async fn test_create_table_should_reject_path_traversal_object_name() {
+        let (temp_dir, catalog) = create_test_catalog();
+        catalog
+            .create_database("db1", false, HashMap::new())
+            .await
+            .unwrap();
+        let escaped_path = temp_dir.path().join("table_escape");
+
+        let result = catalog
+            .create_table(
+                &Identifier::new("db1", "../../table_escape"),
+                testing_schema(),
+                false,
+            )
+            .await;
+
+        assert!(matches!(result, Err(Error::IdentifierInvalid { .. })));
+        assert!(!escaped_path.exists());
+    }
+
+    #[tokio::test]
+    async fn test_rename_table_should_reject_path_traversal_target_name() {
+        let (temp_dir, catalog) = create_test_catalog();
+        catalog
+            .create_database("db1", false, HashMap::new())
+            .await
+            .unwrap();
+        let source = Identifier::new("db1", "source");
+        catalog
+            .create_table(&source, testing_schema(), false)
+            .await
+            .unwrap();
+        let escaped_path = temp_dir.path().join("renamed_escape");
+
+        let result = catalog
+            .rename_table(
+                &source,
+                &Identifier::new("db1", "../../renamed_escape"),
+                false,
+            )
+            .await;
+
+        assert!(matches!(result, Err(Error::IdentifierInvalid { .. })));
+        assert!(!escaped_path.exists());
+        assert!(catalog.get_table(&source).await.is_ok());
+    }
+
     #[tokio::test]
     async fn test_list_partitions_default_table_not_found_errors() {
         let (_temp_dir, catalog) = create_test_catalog();
diff --git a/crates/paimon/src/catalog/mod.rs b/crates/paimon/src/catalog/mod.rs
index 0c944a4..166d59a 100644
--- a/crates/paimon/src/catalog/mod.rs
+++ b/crates/paimon/src/catalog/mod.rs
@@ -29,6 +29,7 @@ mod rest;
 use std::collections::HashMap;
 use std::fmt;
 
+use crate::{Error, Result};
 pub use database::*;
 pub use factory::*;
 pub use filesystem::*;
@@ -76,6 +77,22 @@ impl Identifier {
         }
     }
 
+    /// Validate this identifier's database and object names.
+    pub(crate) fn validate(&self) -> Result<()> {
+        Self::validate_database_name(&self.database)?;
+        Self::validate_object_name(&self.object)
+    }
+
+    /// Validate a database name for path-safe catalog use.
+    pub(crate) fn validate_database_name(name: &str) -> Result<()> {
+        validate_identifier_name("database", name)
+    }
+
+    /// Validate an object name for path-safe catalog use.
+    pub(crate) fn validate_object_name(name: &str) -> Result<()> {
+        validate_identifier_name("object", name)
+    }
+
     /// Database name.
     pub fn database(&self) -> &str {
         &self.database
@@ -97,6 +114,28 @@ impl Identifier {
     }
 }
 
+fn validate_identifier_name(kind: &str, name: &str) -> Result<()> {
+    let invalid = if name.trim().is_empty() {
+        Some("cannot be empty or whitespace")
+    } else if matches!(name, "." | "..") {
+        Some("cannot be '.' or '..'")
+    } else if name.contains('/') || name.contains('\\') {
+        Some("cannot contain path separators")
+    } else if name.chars().any(char::is_control) {
+        Some("cannot contain control characters")
+    } else {
+        None
+    };
+
+    if let Some(reason) = invalid {
+        return Err(Error::IdentifierInvalid {
+            message: format!("{kind} name {reason}: {name:?}"),
+        });
+    }
+
+    Ok(())
+}
+
 impl fmt::Display for Identifier {
     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
         write!(f, "{}", self.full_name())
@@ -119,7 +158,6 @@ use async_trait::async_trait;
 use crate::api::PagedList;
 use crate::spec::{Partition, Schema, SchemaChange};
 use crate::table::Table;
-use crate::Result;
 
 /// Catalog API for reading and writing metadata (databases, tables) in Paimon.
 ///
@@ -257,3 +295,48 @@ pub trait Catalog: Send + Sync {
         ))
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_identifier_validate_should_reject_path_control_names() {
+        for (database, object) in [
+            ("", "table"),
+            ("   ", "table"),
+            (".", "table"),
+            ("..", "table"),
+            ("../escaped", "table"),
+            ("db\\escaped", "table"),
+            ("db\nescaped", "table"),
+            ("db", ""),
+            ("db", "   "),
+            ("db", "."),
+            ("db", ".."),
+            ("db", "../escaped"),
+            ("db", "nested/table"),
+            ("db", "nested\\table"),
+            ("db", "table\0name"),
+        ] {
+            let result = Identifier::new(database, object).validate();
+            assert!(
+                matches!(result, Err(Error::IdentifierInvalid { .. })),
+                "expected invalid identifier for database={database:?}, 
object={object:?}, got {result:?}"
+            );
+        }
+    }
+
+    #[test]
+    fn test_identifier_validate_should_allow_system_suffix_and_unicode_names() 
{
+        let identifier = Identifier::new("analytics", "orders$snapshots");
+        identifier.validate().unwrap();
+        assert_eq!(identifier.database(), "analytics");
+        assert_eq!(identifier.object(), "orders$snapshots");
+
+        let identifier = Identifier::new("数据", "订单");
+        identifier.validate().unwrap();
+        assert_eq!(identifier.database(), "数据");
+        assert_eq!(identifier.object(), "订单");
+    }
+}

Reply via email to