This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 147f6d0  feat: introduce filesystem catalog (#93)
147f6d0 is described below

commit 147f6d01115b5ab7846643f2ca7c3de98bc89b13
Author: yuxia Luo <[email protected]>
AuthorDate: Mon Mar 2 11:54:57 2026 +0800

    feat: introduce filesystem catalog (#93)
---
 crates/paimon/Cargo.toml                          |   1 +
 crates/paimon/src/catalog/filesystem.rs           | 548 ++++++++++++++++++++++
 crates/paimon/src/catalog/mod.rs                  |  12 +
 crates/paimon/src/error.rs                        |   5 +
 crates/paimon/src/file_index/file_index_format.rs |   2 +-
 crates/paimon/src/io/file_io.rs                   |  51 +-
 crates/paimon/src/spec/schema.rs                  |  65 +++
 crates/paimon/src/spec/types.rs                   |   8 +
 crates/paimon/src/table.rs                        |  49 +-
 9 files changed, 727 insertions(+), 14 deletions(-)

diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index 948c0cb..7ea9c2c 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -55,3 +55,4 @@ indexmap = "2.5.0"
 [dev-dependencies]
 rand = "0.8.5"
 serde_avro_fast = { version = "1.1.2", features = ["snappy"] }
+tempfile = "3"
diff --git a/crates/paimon/src/catalog/filesystem.rs 
b/crates/paimon/src/catalog/filesystem.rs
new file mode 100644
index 0000000..2068550
--- /dev/null
+++ b/crates/paimon/src/catalog/filesystem.rs
@@ -0,0 +1,548 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Filesystem catalog implementation for Apache Paimon.
+//!
+//! Reference: 
[org.apache.paimon.catalog.FileSystemCatalog](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java)
+
+use std::collections::HashMap;
+
+use crate::catalog::{Catalog, Identifier, DB_LOCATION_PROP, DB_SUFFIX};
+use crate::error::{Error, Result};
+use crate::io::FileIO;
+use crate::spec::{Schema, TableSchema};
+use crate::table::Table;
+use async_trait::async_trait;
+use bytes::Bytes;
+use opendal::raw::get_basename;
+use snafu::{FromString, Whatever};
+
+/// Name of the schema directory under each table path.
+const SCHEMA_DIR: &str = "schema";
+/// Prefix for schema files (under the schema directory).
+const SCHEMA_PREFIX: &str = "schema-";
+
+fn make_path(parent: &str, child: &str) -> String {
+    format!("{parent}/{child}")
+}
+
+/// Filesystem catalog implementation.
+///
+/// This catalog stores metadata on a filesystem with the following structure:
+/// ```text
+/// warehouse/
+///   ├── database1.db/
+///   │   ├── table1/
+///   │   │   └── schema/
+///   │   │       ├── schema-0
+///   │   │       ├── schema-1
+///   │   │       └── ...
+///   │   └── table2/
+///   │       └── schema/
+///   │           ├── schema-0
+///   │           └── ...
+///   └── database2.db/
+///       └── ...
+/// ```
+///
+/// Reference: 
[org.apache.paimon.catalog.FileSystemCatalog](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java)
+#[derive(Clone, Debug)]
+pub struct FileSystemCatalog {
+    file_io: FileIO,
+    warehouse: String,
+}
+
+#[allow(dead_code)]
+impl FileSystemCatalog {
+    /// Create a new filesystem catalog.
+    ///
+    /// # Arguments
+    /// * `warehouse` - The root warehouse path
+    pub fn new(warehouse: impl Into<String>) -> crate::Result<Self> {
+        let warehouse = warehouse.into();
+        Ok(Self {
+            file_io: FileIO::from_path(warehouse.as_str())?.build()?,
+            warehouse,
+        })
+    }
+
+    /// Get the warehouse path.
+    pub fn warehouse(&self) -> &str {
+        &self.warehouse
+    }
+
+    /// Get the FileIO instance.
+    pub fn file_io(&self) -> &FileIO {
+        &self.file_io
+    }
+
+    /// Get the path for a database (warehouse / `name` + [DB_SUFFIX]).
+    fn database_path(&self, database_name: &str) -> String {
+        make_path(
+            &self.warehouse,
+            format!("{database_name}{DB_SUFFIX}").as_str(),
+        )
+    }
+
+    /// Get the path for a table (warehouse / `database.db` / table).
+    fn table_path(&self, identifier: &Identifier) -> String {
+        make_path(
+            &self.database_path(identifier.database()),
+            identifier.object(),
+        )
+    }
+
+    /// Path to the schema directory under a table path.
+    fn schema_dir_path(&self, table_path: &str) -> String {
+        make_path(table_path, SCHEMA_DIR)
+    }
+
+    /// Get the schema file path for a given version 
(table_path/schema/schema-{version}).
+    fn schema_file_path(&self, table_path: &str, schema_id: i64) -> String {
+        make_path(
+            make_path(table_path, SCHEMA_DIR).as_str(),
+            format!("{SCHEMA_PREFIX}{schema_id}").as_str(),
+        )
+    }
+
+    /// List directories in the given path.
+    async fn list_directories(&self, path: &str) -> Result<Vec<String>> {
+        let statuses = self.file_io.list_status(path).await?;
+        let mut dirs = Vec::new();
+        for status in statuses {
+            if status.is_dir {
+                if let Some(p) = get_basename(status.path.as_str())
+                    // opendal get_basename will contain "/" for directory,
+                    // we need to strip suffix to get the real base name
+                    .strip_suffix("/")
+                {
+                    dirs.push(p.to_string());
+                }
+            }
+        }
+        Ok(dirs)
+    }
+
+    /// Load the latest schema for a table (highest schema-{version} file 
under table_path/schema).
+    async fn load_latest_table_schema(&self, table_path: &str) -> 
Result<Option<TableSchema>> {
+        let schema_dir = self.schema_dir_path(table_path);
+        if !self.file_io.exists(&schema_dir).await? {
+            return Ok(None);
+        }
+        let statuses = self.file_io.list_status(&schema_dir).await?;
+
+        let latest_schema_id = statuses
+            .into_iter()
+            .filter(|s| !s.is_dir)
+            .filter_map(|s| {
+                get_basename(s.path.as_str())
+                    .strip_prefix(SCHEMA_PREFIX)?
+                    .parse::<i64>()
+                    .ok()
+            })
+            .max();
+
+        if let Some(schema_id) = latest_schema_id {
+            let schema_path = self.schema_file_path(table_path, schema_id);
+            let input_file = self.file_io.new_input(&schema_path)?;
+            let content = input_file.read().await?;
+            let schema: TableSchema =
+                serde_json::from_slice(&content).map_err(|e| 
Error::DataInvalid {
+                    message: format!("Failed to parse schema file: 
{schema_path}"),
+                    source: Whatever::without_source(e.to_string()),
+                })?;
+            return Ok(Some(schema));
+        }
+
+        Ok(None)
+    }
+
+    /// Save a table schema to a file.
+    async fn save_table_schema(&self, table_path: &str, schema: &TableSchema) 
-> Result<()> {
+        let schema_dir = self.schema_dir_path(table_path);
+        self.file_io.mkdirs(&schema_dir).await?;
+        let schema_path = self.schema_file_path(table_path, schema.id());
+        let output_file = self.file_io.new_output(&schema_path)?;
+        let content =
+            Bytes::from(
+                serde_json::to_string(schema).map_err(|e| Error::DataInvalid {
+                    message: format!("Failed to serialize schema: {e}"),
+                    source: Whatever::without_source(e.to_string()),
+                })?,
+            );
+        output_file.write(content).await?;
+        Ok(())
+    }
+
+    /// Check if a database exists.
+    async fn database_exists(&self, name: &str) -> Result<bool> {
+        self.file_io.exists(&self.database_path(name)).await
+    }
+
+    /// Check if a table exists.
+    async fn table_exists(&self, identifier: &Identifier) -> Result<bool> {
+        self.file_io.exists(&self.table_path(identifier)).await
+    }
+}
+
+#[async_trait]
+impl Catalog for FileSystemCatalog {
+    async fn list_databases(&self) -> Result<Vec<String>> {
+        let dirs = self.list_directories(&self.warehouse).await?;
+        Ok(dirs
+            .into_iter()
+            .filter_map(|name| name.strip_suffix(DB_SUFFIX).map(|s| 
s.to_string()))
+            .collect())
+    }
+
+    async fn create_database(
+        &self,
+        name: &str,
+        ignore_if_exists: bool,
+        properties: HashMap<String, String>,
+    ) -> Result<()> {
+        if properties.contains_key(DB_LOCATION_PROP) {
+            return Err(Error::ConfigInvalid {
+                message: "Cannot specify location for a database when using 
fileSystem catalog."
+                    .to_string(),
+            });
+        }
+
+        let path = self.database_path(name);
+        let database_exists = self.database_exists(name).await?;
+
+        if !ignore_if_exists && database_exists {
+            return Err(Error::DatabaseAlreadyExist {
+                database: name.to_string(),
+            });
+        }
+
+        if !database_exists {
+            self.file_io.mkdirs(&path).await?;
+        }
+
+        Ok(())
+    }
+
+    async fn drop_database(
+        &self,
+        name: &str,
+        ignore_if_not_exists: bool,
+        cascade: bool,
+    ) -> Result<()> {
+        let path = self.database_path(name);
+
+        let database_exists = self.database_exists(name).await?;
+        if !ignore_if_not_exists && !database_exists {
+            return Err(Error::DatabaseNotExist {
+                database: name.to_string(),
+            });
+        }
+
+        if !database_exists {
+            return Ok(());
+        }
+
+        let tables = self.list_directories(&path).await?;
+        if !tables.is_empty() && !cascade {
+            return Err(Error::DatabaseNotEmpty {
+                database: name.to_string(),
+            });
+        }
+
+        self.file_io.delete_dir(&path).await?;
+        Ok(())
+    }
+
+    async fn get_table(&self, identifier: &Identifier) -> Result<Table> {
+        let table_path = self.table_path(identifier);
+
+        if !self.table_exists(identifier).await? {
+            return Err(Error::TableNotExist {
+                full_name: identifier.full_name(),
+            });
+        }
+
+        let schema = self
+            .load_latest_table_schema(&table_path)
+            .await?
+            .ok_or_else(|| Error::TableNotExist {
+                full_name: identifier.full_name(),
+            })?;
+
+        Ok(Table::new(
+            self.file_io.clone(),
+            identifier.clone(),
+            table_path,
+            schema,
+        ))
+    }
+
+    async fn list_tables(&self, database_name: &str) -> Result<Vec<String>> {
+        let path = self.database_path(database_name);
+
+        if !self.database_exists(database_name).await? {
+            return Err(Error::DatabaseNotExist {
+                database: database_name.to_string(),
+            });
+        }
+
+        self.list_directories(&path).await
+    }
+
+    async fn create_table(
+        &self,
+        identifier: &Identifier,
+        creation: Schema,
+        ignore_if_exists: bool,
+    ) -> Result<()> {
+        let table_path = self.table_path(identifier);
+
+        let table_exists = self.table_exists(identifier).await?;
+
+        if !ignore_if_exists && table_exists {
+            return Err(Error::TableAlreadyExist {
+                full_name: identifier.full_name(),
+            });
+        }
+
+        if !self.database_exists(identifier.database()).await? {
+            return Err(Error::DatabaseNotExist {
+                database: identifier.database().to_string(),
+            });
+        }
+
+        // todo: consider with lock
+        if !table_exists {
+            self.file_io.mkdirs(&table_path).await?;
+            let table_schema = TableSchema::new(0, &creation);
+            self.save_table_schema(&table_path, &table_schema).await?;
+        }
+
+        Ok(())
+    }
+
+    async fn drop_table(&self, identifier: &Identifier, ignore_if_not_exists: 
bool) -> Result<()> {
+        let table_path = self.table_path(identifier);
+
+        let table_exists = self.table_exists(identifier).await?;
+
+        if !ignore_if_not_exists && !table_exists {
+            return Err(Error::TableNotExist {
+                full_name: identifier.full_name(),
+            });
+        }
+
+        if !table_exists {
+            return Ok(());
+        }
+
+        self.file_io.delete_dir(&table_path).await?;
+        Ok(())
+    }
+
+    async fn rename_table(
+        &self,
+        from: &Identifier,
+        to: &Identifier,
+        ignore_if_not_exists: bool,
+    ) -> Result<()> {
+        let from_path = self.table_path(from);
+        let to_path = self.table_path(to);
+
+        let table_exists = self.table_exists(from).await?;
+
+        if !ignore_if_not_exists && !table_exists {
+            return Err(Error::TableNotExist {
+                full_name: from.full_name(),
+            });
+        }
+
+        if !table_exists {
+            return Ok(());
+        }
+
+        if self.table_exists(to).await? {
+            return Err(Error::TableAlreadyExist {
+                full_name: to.full_name(),
+            });
+        }
+
+        self.file_io.rename(&from_path, &to_path).await?;
+        Ok(())
+    }
+
+    async fn alter_table(
+        &self,
+        identifier: &Identifier,
+        _changes: Vec<crate::spec::SchemaChange>,
+        ignore_if_not_exists: bool,
+    ) -> Result<()> {
+        if !ignore_if_not_exists && !self.table_exists(identifier).await? {
+            return Err(Error::TableNotExist {
+                full_name: identifier.full_name(),
+            });
+        }
+
+        // TODO: Implement alter table with schema versioning
+        Err(Error::Unsupported {
+            message: "Alter table is not yet implemented for filesystem 
catalog".to_string(),
+        })
+    }
+}
+
+#[cfg(test)]
+#[cfg(not(windows))] // Skip on Windows due to path compatibility issues
+mod tests {
+    use super::*;
+    use tempfile::TempDir;
+
+    /// Returns a temp dir guard (keep alive for test duration) and a catalog 
using it as warehouse.
+    fn create_test_catalog() -> (TempDir, FileSystemCatalog) {
+        let temp_dir = TempDir::new().unwrap();
+        let warehouse = temp_dir.path().to_str().unwrap().to_string();
+        let catalog = FileSystemCatalog::new(warehouse).unwrap();
+        (temp_dir, catalog)
+    }
+
+    fn testing_schema() -> Schema {
+        Schema::builder()
+            .column(
+                "id",
+                crate::spec::DataType::Int(crate::spec::IntType::new()),
+            )
+            .build()
+            .unwrap()
+    }
+
+    #[tokio::test]
+    async fn test_database_operations() {
+        let (_temp_dir, catalog) = create_test_catalog();
+
+        // create and list
+        catalog
+            .create_database("db1", false, HashMap::new())
+            .await
+            .unwrap();
+        catalog
+            .create_database("db2", false, HashMap::new())
+            .await
+            .unwrap();
+        let databases = catalog.list_databases().await.unwrap();
+        assert_eq!(databases.len(), 2);
+        assert!(databases.contains(&"db1".to_string()));
+        assert!(databases.contains(&"db2".to_string()));
+
+        // create same db without ignore_if_exists -> error
+        let result = catalog.create_database("db1", false, 
HashMap::new()).await;
+        assert!(result.is_err());
+        assert!(matches!(result, Err(Error::DatabaseAlreadyExist { .. })));
+
+        // create same db with ignore_if_exists -> ok
+        catalog
+            .create_database("db1", true, HashMap::new())
+            .await
+            .unwrap();
+
+        // create_database with location property -> error (filesystem catalog)
+        let mut props = HashMap::new();
+        props.insert(DB_LOCATION_PROP.to_string(), "/some/path".to_string());
+        let result = catalog.create_database("dbx", false, props).await;
+        assert!(result.is_err());
+        assert!(matches!(result, Err(Error::ConfigInvalid { .. })));
+
+        // drop empty database
+        catalog.drop_database("db1", false, false).await.unwrap();
+        let databases = catalog.list_databases().await.unwrap();
+        assert_eq!(databases.len(), 1);
+        assert!(databases.contains(&"db2".to_string()));
+
+        // drop non-empty database without cascade -> error
+        catalog
+            .create_database("db1", false, HashMap::new())
+            .await
+            .unwrap();
+        catalog
+            .create_table(&Identifier::new("db1", "table1"), testing_schema(), 
false)
+            .await
+            .unwrap();
+        let result = catalog.drop_database("db1", false, false).await;
+        assert!(result.is_err());
+        assert!(matches!(result, Err(Error::DatabaseNotEmpty { .. })));
+
+        // drop database with cascade
+        catalog.drop_database("db1", false, true).await.unwrap();
+        assert!(!catalog.database_exists("db1").await.unwrap());
+    }
+
+    #[tokio::test]
+    async fn test_table_operations() {
+        let (_temp_dir, catalog) = create_test_catalog();
+        catalog
+            .create_database("db1", false, HashMap::new())
+            .await
+            .unwrap();
+
+        // create and list tables
+        let schema = testing_schema();
+        catalog
+            .create_table(&Identifier::new("db1", "table1"), schema.clone(), 
false)
+            .await
+            .unwrap();
+        catalog
+            .create_table(&Identifier::new("db1", "table2"), schema, false)
+            .await
+            .unwrap();
+        let tables = catalog.list_tables("db1").await.unwrap();
+        assert_eq!(tables.len(), 2);
+        assert!(tables.contains(&"table1".to_string()));
+        assert!(tables.contains(&"table2".to_string()));
+
+        // get_table and check schema
+        let schema_with_name = Schema::builder()
+            .column(
+                "id",
+                crate::spec::DataType::Int(crate::spec::IntType::new()),
+            )
+            .column(
+                "name",
+                
crate::spec::DataType::VarChar(crate::spec::VarCharType::string_type()),
+            )
+            .build()
+            .unwrap();
+        catalog
+            .create_table(&Identifier::new("db1", "table3"), schema_with_name, 
false)
+            .await
+            .unwrap();
+        let table = catalog
+            .get_table(&Identifier::new("db1", "table3"))
+            .await
+            .unwrap();
+        let table_schema = table.schema();
+        assert_eq!(table_schema.id(), 0);
+        assert_eq!(table_schema.fields().len(), 2);
+
+        // drop table
+        catalog
+            .drop_table(&Identifier::new("db1", "table1"), false)
+            .await
+            .unwrap();
+        let tables = catalog.list_tables("db1").await.unwrap();
+        assert_eq!(tables.len(), 2);
+        assert!(!tables.contains(&"table1".to_string()));
+    }
+}
diff --git a/crates/paimon/src/catalog/mod.rs b/crates/paimon/src/catalog/mod.rs
index d416d43..086e98b 100644
--- a/crates/paimon/src/catalog/mod.rs
+++ b/crates/paimon/src/catalog/mod.rs
@@ -20,6 +20,8 @@
 //! Design aligns with [Paimon Java 
Catalog](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java)
 //! and follows API patterns from Apache Iceberg Rust.
 
+mod filesystem;
+
 use std::collections::HashMap;
 use std::fmt;
 
@@ -36,6 +38,14 @@ pub const SYSTEM_BRANCH_PREFIX: &str = "branch_";
 pub const DEFAULT_MAIN_BRANCH: &str = "main";
 /// Database value when the database is not known; [`Identifier::full_name`] 
returns only the object.
 pub const UNKNOWN_DATABASE: &str = "unknown";
+/// Database property key for custom location. Not allowed for filesystem 
catalog.
+/// See 
[Catalog.DB_LOCATION_PROP](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java).
+#[allow(dead_code)] // Public API - allow unused until used by external code
+pub const DB_LOCATION_PROP: &str = "location";
+/// Suffix for database directory names in the filesystem (e.g. `mydb.db`).
+/// See 
[Catalog.DB_SUFFIX](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java).
+#[allow(dead_code)] // Public API - allow unused until used by external code
+pub const DB_SUFFIX: &str = ".db";
 
 // ======================= Identifier ===============================
 
@@ -53,6 +63,7 @@ pub struct Identifier {
     object: String,
 }
 
+#[allow(dead_code)]
 impl Identifier {
     /// Create an identifier from database and object name.
     pub fn new(database: impl Into<String>, object: impl Into<String>) -> Self 
{
@@ -110,6 +121,7 @@ use crate::Result;
 ///
 /// Corresponds to 
[org.apache.paimon.catalog.Catalog](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java).
 #[async_trait]
+#[allow(dead_code)]
 pub trait Catalog: Send + Sync {
     // ======================= database methods ===============================
 
diff --git a/crates/paimon/src/error.rs b/crates/paimon/src/error.rs
index 10c1f4e..d49d883 100644
--- a/crates/paimon/src/error.rs
+++ b/crates/paimon/src/error.rs
@@ -29,6 +29,11 @@ pub enum Error {
         #[snafu(backtrace)]
         source: snafu::Whatever,
     },
+    #[snafu(
+        visibility(pub(crate)),
+        display("Paimon hitting unsupported error {}", message)
+    )]
+    Unsupported { message: String },
     #[snafu(
         visibility(pub(crate)),
         display("Paimon data type invalid for {}", message)
diff --git a/crates/paimon/src/file_index/file_index_format.rs 
b/crates/paimon/src/file_index/file_index_format.rs
index 7a23871..c473e47 100644
--- a/crates/paimon/src/file_index/file_index_format.rs
+++ b/crates/paimon/src/file_index/file_index_format.rs
@@ -101,7 +101,7 @@ pub async fn write_column_indexes(
     path: &str,
     indexes: HashMap<String, HashMap<String, Bytes>>,
 ) -> crate::Result<OutputFile> {
-    let file_io = FileIO::from_url(path)?.build()?;
+    let file_io = FileIO::from_path(path)?.build()?;
     let output = file_io.new_output(path)?;
     let mut writer = output.writer().await?;
 
diff --git a/crates/paimon/src/io/file_io.rs b/crates/paimon/src/io/file_io.rs
index ee1b08f..e10e59d 100644
--- a/crates/paimon/src/io/file_io.rs
+++ b/crates/paimon/src/io/file_io.rs
@@ -22,7 +22,8 @@ use std::sync::Arc;
 
 use bytes::Bytes;
 use chrono::{DateTime, Utc};
-use opendal::Operator;
+use opendal::raw::normalize_root;
+use opendal::{Metakey, Operator};
 use snafu::ResultExt;
 use url::Url;
 
@@ -45,6 +46,25 @@ impl FileIO {
         Ok(FileIOBuilder::new(url.scheme()))
     }
 
+    /// Try to infer file io scheme from path. See [`FileIO`] for supported 
schemes.
+    ///
+    /// - If it's a valid url, for example `s3://bucket/a`, url scheme will be 
used, and the rest of the url will be ignored.
+    /// - If it's not a valid url, will try to detect if it's a file path.
+    ///
+    /// Otherwise will return parsing error.
+    pub fn from_path(path: impl AsRef<str>) -> crate::Result<FileIOBuilder> {
+        let url = Url::parse(path.as_ref())
+            .map_err(|_| Error::ConfigInvalid {
+                message: format!("Invalid URL: {}", path.as_ref()),
+            })
+            .or_else(|_| {
+                Url::from_file_path(path.as_ref()).map_err(|_| 
Error::ConfigInvalid {
+                    message: format!("Input {} is neither a valid url nor 
path", path.as_ref()),
+                })
+            })?;
+        Ok(FileIOBuilder::new(url.scheme()))
+    }
+
     /// Create a new input file to read data.
     ///
     /// Reference: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L76>
@@ -97,10 +117,18 @@ impl FileIO {
     /// FIXME: how to handle large dir? Better to return a stream instead?
     pub async fn list_status(&self, path: &str) -> Result<Vec<FileStatus>> {
         let (op, relative_path) = self.storage.create(path)?;
-
-        let entries = op.list(relative_path).await.context(IoUnexpectedSnafu {
-            message: format!("Failed to list files in '{path}'"),
-        })?;
+        // Opendal list() expects directory path to end with `/`.
+        // use normalize_root to make sure it end with `/`.
+        let list_path = normalize_root(relative_path);
+
+        // Request ContentLength and LastModified so accessing 
meta.content_length() / last_modified()
+        let entries = op
+            .list_with(&list_path)
+            .metakey(Metakey::ContentLength | Metakey::LastModified)
+            .await
+            .context(IoUnexpectedSnafu {
+                message: format!("Failed to list files in '{path}'"),
+            })?;
 
         let mut statuses = Vec::new();
 
@@ -109,7 +137,7 @@ impl FileIO {
             statuses.push(FileStatus {
                 size: meta.content_length(),
                 is_dir: meta.is_dir(),
-                path: path.to_string(),
+                path: entry.path().to_string(),
                 last_modified: meta.last_modified(),
             });
         }
@@ -163,12 +191,11 @@ impl FileIO {
     /// Reference: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L150>
     pub async fn mkdirs(&self, path: &str) -> Result<()> {
         let (op, relative_path) = self.storage.create(path)?;
-
-        op.create_dir(relative_path)
-            .await
-            .context(IoUnexpectedSnafu {
-                message: format!("Failed to create directory '{path}'"),
-            })?;
+        // Opendal create_dir expects the path to end with `/` to indicate a 
directory.
+        let dir_path = normalize_root(relative_path);
+        op.create_dir(&dir_path).await.context(IoUnexpectedSnafu {
+            message: format!("Failed to create directory '{path}'"),
+        })?;
 
         Ok(())
     }
diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs
index 927a51e..06cdd11 100644
--- a/crates/paimon/src/spec/schema.rs
+++ b/crates/paimon/src/spec/schema.rs
@@ -38,6 +38,71 @@ pub struct TableSchema {
     time_millis: i64,
 }
 
+impl TableSchema {
+    pub const CURRENT_VERSION: i32 = 3;
+
+    /// Create a TableSchema from a Schema with the given ID.
+    ///
+    /// Reference: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java#L373>
+    pub fn new(id: i64, schema: &Schema) -> Self {
+        let fields = schema.fields().to_vec();
+        let highest_field_id = Self::current_highest_field_id(&fields);
+
+        Self {
+            version: Self::CURRENT_VERSION,
+            id,
+            fields,
+            highest_field_id,
+            partition_keys: schema.partition_keys().to_vec(),
+            primary_keys: schema.primary_keys().to_vec(),
+            options: schema.options().clone(),
+            comment: schema.comment().map(|s| s.to_string()),
+            time_millis: chrono::Utc::now().timestamp_millis(),
+        }
+    }
+
+    /// Get the highest field ID from a list of fields.
+    pub fn current_highest_field_id(fields: &[DataField]) -> i32 {
+        fields.iter().map(|f| f.id()).max().unwrap_or(-1)
+    }
+
+    pub fn version(&self) -> i32 {
+        self.version
+    }
+
+    pub fn id(&self) -> i64 {
+        self.id
+    }
+
+    pub fn fields(&self) -> &[DataField] {
+        &self.fields
+    }
+
+    pub fn highest_field_id(&self) -> i32 {
+        self.highest_field_id
+    }
+
+    pub fn partition_keys(&self) -> &[String] {
+        &self.partition_keys
+    }
+
+    pub fn primary_keys(&self) -> &[String] {
+        &self.primary_keys
+    }
+
+    pub fn options(&self) -> &HashMap<String, String> {
+        &self.options
+    }
+
+    pub fn comment(&self) -> Option<&str> {
+        self.comment.as_deref()
+    }
+
+    pub fn time_millis(&self) -> i64 {
+        self.time_millis
+    }
+}
+
 /// Data field for paimon table.
 ///
 /// Impl Reference: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/types/DataField.java#L40>
diff --git a/crates/paimon/src/spec/types.rs b/crates/paimon/src/spec/types.rs
index 815c7c6..4001638 100644
--- a/crates/paimon/src/spec/types.rs
+++ b/crates/paimon/src/spec/types.rs
@@ -1270,6 +1270,14 @@ impl VarCharType {
 
     pub const DEFAULT_LENGTH: u32 = 1;
 
+    /// Variable-length string type with maximum length (Java: 
`VarCharType.STRING_TYPE`).
+    ///
+    /// Reference: 
[VarCharType.STRING_TYPE](https://github.com/apache/paimon/blob/release-1.3/paimon-api/src/main/java/org/apache/paimon/types/VarCharType.java).
+    #[inline]
+    pub fn string_type() -> Self {
+        Self::with_nullable(true, Self::MAX_LENGTH).unwrap()
+    }
+
     pub fn new(length: u32) -> Result<Self, Error> {
         Self::with_nullable(true, length)
     }
diff --git a/crates/paimon/src/table.rs b/crates/paimon/src/table.rs
index 5e8d262..718937b 100644
--- a/crates/paimon/src/table.rs
+++ b/crates/paimon/src/table.rs
@@ -17,6 +17,53 @@
 
 //! Table API for Apache Paimon
 
+use crate::catalog::Identifier;
+use crate::io::FileIO;
+use crate::spec::TableSchema;
+
 /// Table represents a table in the catalog.
 #[derive(Debug, Clone)]
-pub struct Table {}
+pub struct Table {
+    file_io: FileIO,
+    identifier: Identifier,
+    location: String,
+    schema: TableSchema,
+}
+
+#[allow(dead_code)]
+impl Table {
+    /// Create a new table.
+    pub fn new(
+        file_io: FileIO,
+        identifier: Identifier,
+        location: String,
+        schema: TableSchema,
+    ) -> Self {
+        Self {
+            file_io,
+            identifier,
+            location,
+            schema,
+        }
+    }
+
+    /// Get the table's identifier.
+    pub fn identifier(&self) -> &Identifier {
+        &self.identifier
+    }
+
+    /// Get the table's location.
+    pub fn location(&self) -> &str {
+        &self.location
+    }
+
+    /// Get the table's schema.
+    pub fn schema(&self) -> &TableSchema {
+        &self.schema
+    }
+
+    /// Get the FileIO instance for this table.
+    pub fn file_io(&self) -> &FileIO {
+        &self.file_io
+    }
+}

Reply via email to