This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new ab9fb84 fix: validate filesystem catalog identifiers (#334)
ab9fb84 is described below
commit ab9fb84aeae008712c127fc6e3005250215813de
Author: Minh Vu <[email protected]>
AuthorDate: Thu May 21 05:40:32 2026 +0200
fix: validate filesystem catalog identifiers (#334)
---
crates/paimon/src/catalog/filesystem.rs | 92 +++++++++++++++++++++++++++++++++
crates/paimon/src/catalog/mod.rs | 85 +++++++++++++++++++++++++++++-
2 files changed, 176 insertions(+), 1 deletion(-)
diff --git a/crates/paimon/src/catalog/filesystem.rs
b/crates/paimon/src/catalog/filesystem.rs
index c544a15..7f18f95 100644
--- a/crates/paimon/src/catalog/filesystem.rs
+++ b/crates/paimon/src/catalog/filesystem.rs
@@ -215,6 +215,8 @@ impl Catalog for FileSystemCatalog {
ignore_if_exists: bool,
properties: HashMap<String, String>,
) -> Result<()> {
+ Identifier::validate_database_name(name)?;
+
if properties.contains_key(DB_LOCATION_PROP) {
return Err(Error::ConfigInvalid {
message: "Cannot specify location for a database when using
fileSystem catalog."
@@ -239,6 +241,8 @@ impl Catalog for FileSystemCatalog {
}
async fn get_database(&self, name: &str) -> Result<Database> {
+ Identifier::validate_database_name(name)?;
+
if !self.database_exists(name).await? {
return Err(Error::DatabaseNotExist {
database: name.to_string(),
@@ -254,6 +258,8 @@ impl Catalog for FileSystemCatalog {
ignore_if_not_exists: bool,
cascade: bool,
) -> Result<()> {
+ Identifier::validate_database_name(name)?;
+
let path = self.database_path(name);
let database_exists = self.database_exists(name).await?;
@@ -279,6 +285,8 @@ impl Catalog for FileSystemCatalog {
}
async fn get_table(&self, identifier: &Identifier) -> Result<Table> {
+ identifier.validate()?;
+
let table_path = self.table_path(identifier);
if !self.table_exists(identifier).await? {
@@ -304,6 +312,8 @@ impl Catalog for FileSystemCatalog {
}
async fn list_tables(&self, database_name: &str) -> Result<Vec<String>> {
+ Identifier::validate_database_name(database_name)?;
+
let path = self.database_path(database_name);
if !self.database_exists(database_name).await? {
@@ -321,6 +331,8 @@ impl Catalog for FileSystemCatalog {
creation: Schema,
ignore_if_exists: bool,
) -> Result<()> {
+ identifier.validate()?;
+
let table_path = self.table_path(identifier);
let table_exists = self.table_exists(identifier).await?;
@@ -348,6 +360,8 @@ impl Catalog for FileSystemCatalog {
}
async fn drop_table(&self, identifier: &Identifier, ignore_if_not_exists:
bool) -> Result<()> {
+ identifier.validate()?;
+
let table_path = self.table_path(identifier);
let table_exists = self.table_exists(identifier).await?;
@@ -372,6 +386,9 @@ impl Catalog for FileSystemCatalog {
to: &Identifier,
ignore_if_not_exists: bool,
) -> Result<()> {
+ from.validate()?;
+ to.validate()?;
+
let from_path = self.table_path(from);
let to_path = self.table_path(to);
@@ -403,6 +420,8 @@ impl Catalog for FileSystemCatalog {
changes: Vec<crate::spec::SchemaChange>,
ignore_if_not_exists: bool,
) -> Result<()> {
+ identifier.validate()?;
+
let table_path = self.table_path(identifier);
if !self.table_exists(identifier).await? {
if ignore_if_not_exists {
@@ -511,6 +530,31 @@ mod tests {
assert!(!catalog.database_exists("db1").await.unwrap());
}
+ #[tokio::test]
+ async fn test_create_database_should_reject_path_traversal_name() {
+ let (temp_dir, catalog) = create_test_catalog();
+ let escaped_path = temp_dir.path().join("escaped.db");
+
+ let result = catalog
+ .create_database("../escaped", false, HashMap::new())
+ .await;
+
+ assert!(matches!(result, Err(Error::IdentifierInvalid { .. })));
+ assert!(!escaped_path.exists());
+ }
+
+ #[tokio::test]
+ async fn test_drop_database_should_reject_path_traversal_name() {
+ let (temp_dir, catalog) = create_test_catalog();
+ let escaped_path = temp_dir.path().join("escaped.db");
+ std::fs::create_dir(&escaped_path).unwrap();
+
+ let result = catalog.drop_database("../escaped", false, true).await;
+
+ assert!(matches!(result, Err(Error::IdentifierInvalid { .. })));
+ assert!(escaped_path.exists());
+ }
+
#[tokio::test]
async fn test_table_operations() {
let (_temp_dir, catalog) = create_test_catalog();
@@ -568,6 +612,54 @@ mod tests {
assert!(!tables.contains(&"table1".to_string()));
}
+ #[tokio::test]
+ async fn test_create_table_should_reject_path_traversal_object_name() {
+ let (temp_dir, catalog) = create_test_catalog();
+ catalog
+ .create_database("db1", false, HashMap::new())
+ .await
+ .unwrap();
+ let escaped_path = temp_dir.path().join("table_escape");
+
+ let result = catalog
+ .create_table(
+ &Identifier::new("db1", "../../table_escape"),
+ testing_schema(),
+ false,
+ )
+ .await;
+
+ assert!(matches!(result, Err(Error::IdentifierInvalid { .. })));
+ assert!(!escaped_path.exists());
+ }
+
+ #[tokio::test]
+ async fn test_rename_table_should_reject_path_traversal_target_name() {
+ let (temp_dir, catalog) = create_test_catalog();
+ catalog
+ .create_database("db1", false, HashMap::new())
+ .await
+ .unwrap();
+ let source = Identifier::new("db1", "source");
+ catalog
+ .create_table(&source, testing_schema(), false)
+ .await
+ .unwrap();
+ let escaped_path = temp_dir.path().join("renamed_escape");
+
+ let result = catalog
+ .rename_table(
+ &source,
+ &Identifier::new("db1", "../../renamed_escape"),
+ false,
+ )
+ .await;
+
+ assert!(matches!(result, Err(Error::IdentifierInvalid { .. })));
+ assert!(!escaped_path.exists());
+ assert!(catalog.get_table(&source).await.is_ok());
+ }
+
#[tokio::test]
async fn test_list_partitions_default_table_not_found_errors() {
let (_temp_dir, catalog) = create_test_catalog();
diff --git a/crates/paimon/src/catalog/mod.rs b/crates/paimon/src/catalog/mod.rs
index 0c944a4..166d59a 100644
--- a/crates/paimon/src/catalog/mod.rs
+++ b/crates/paimon/src/catalog/mod.rs
@@ -29,6 +29,7 @@ mod rest;
use std::collections::HashMap;
use std::fmt;
+use crate::{Error, Result};
pub use database::*;
pub use factory::*;
pub use filesystem::*;
@@ -76,6 +77,22 @@ impl Identifier {
}
}
+ /// Validate this identifier's database and object names.
+ pub(crate) fn validate(&self) -> Result<()> {
+ Self::validate_database_name(&self.database)?;
+ Self::validate_object_name(&self.object)
+ }
+
+ /// Validate a database name for path-safe catalog use.
+ pub(crate) fn validate_database_name(name: &str) -> Result<()> {
+ validate_identifier_name("database", name)
+ }
+
+ /// Validate an object name for path-safe catalog use.
+ pub(crate) fn validate_object_name(name: &str) -> Result<()> {
+ validate_identifier_name("object", name)
+ }
+
/// Database name.
pub fn database(&self) -> &str {
&self.database
@@ -97,6 +114,28 @@ impl Identifier {
}
}
+fn validate_identifier_name(kind: &str, name: &str) -> Result<()> {
+ let invalid = if name.trim().is_empty() {
+ Some("cannot be empty or whitespace")
+ } else if matches!(name, "." | "..") {
+ Some("cannot be '.' or '..'")
+ } else if name.contains('/') || name.contains('\\') {
+ Some("cannot contain path separators")
+ } else if name.chars().any(char::is_control) {
+ Some("cannot contain control characters")
+ } else {
+ None
+ };
+
+ if let Some(reason) = invalid {
+ return Err(Error::IdentifierInvalid {
+ message: format!("{kind} name {reason}: {name:?}"),
+ });
+ }
+
+ Ok(())
+}
+
impl fmt::Display for Identifier {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.full_name())
@@ -119,7 +158,6 @@ use async_trait::async_trait;
use crate::api::PagedList;
use crate::spec::{Partition, Schema, SchemaChange};
use crate::table::Table;
-use crate::Result;
/// Catalog API for reading and writing metadata (databases, tables) in Paimon.
///
@@ -257,3 +295,48 @@ pub trait Catalog: Send + Sync {
))
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_identifier_validate_should_reject_path_control_names() {
+ for (database, object) in [
+ ("", "table"),
+ (" ", "table"),
+ (".", "table"),
+ ("..", "table"),
+ ("../escaped", "table"),
+ ("db\\escaped", "table"),
+ ("db\nescaped", "table"),
+ ("db", ""),
+ ("db", " "),
+ ("db", "."),
+ ("db", ".."),
+ ("db", "../escaped"),
+ ("db", "nested/table"),
+ ("db", "nested\\table"),
+ ("db", "table\0name"),
+ ] {
+ let result = Identifier::new(database, object).validate();
+ assert!(
+ matches!(result, Err(Error::IdentifierInvalid { .. })),
+ "expected invalid identifier for database={database:?},
object={object:?}, got {result:?}"
+ );
+ }
+ }
+
+ #[test]
+ fn test_identifier_validate_should_allow_system_suffix_and_unicode_names()
{
+ let identifier = Identifier::new("analytics", "orders$snapshots");
+ identifier.validate().unwrap();
+ assert_eq!(identifier.database(), "analytics");
+ assert_eq!(identifier.object(), "orders$snapshots");
+
+ let identifier = Identifier::new("数据", "订单");
+ identifier.validate().unwrap();
+ assert_eq!(identifier.database(), "数据");
+ assert_eq!(identifier.object(), "订单");
+ }
+}