This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 147f6d0 feat: introduce filesystem catalog (#93)
147f6d0 is described below
commit 147f6d01115b5ab7846643f2ca7c3de98bc89b13
Author: yuxia Luo <[email protected]>
AuthorDate: Mon Mar 2 11:54:57 2026 +0800
feat: introduce filesystem catalog (#93)
---
crates/paimon/Cargo.toml | 1 +
crates/paimon/src/catalog/filesystem.rs | 548 ++++++++++++++++++++++
crates/paimon/src/catalog/mod.rs | 12 +
crates/paimon/src/error.rs | 5 +
crates/paimon/src/file_index/file_index_format.rs | 2 +-
crates/paimon/src/io/file_io.rs | 51 +-
crates/paimon/src/spec/schema.rs | 65 +++
crates/paimon/src/spec/types.rs | 8 +
crates/paimon/src/table.rs | 49 +-
9 files changed, 727 insertions(+), 14 deletions(-)
diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index 948c0cb..7ea9c2c 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -55,3 +55,4 @@ indexmap = "2.5.0"
[dev-dependencies]
rand = "0.8.5"
serde_avro_fast = { version = "1.1.2", features = ["snappy"] }
+tempfile = "3"
diff --git a/crates/paimon/src/catalog/filesystem.rs
b/crates/paimon/src/catalog/filesystem.rs
new file mode 100644
index 0000000..2068550
--- /dev/null
+++ b/crates/paimon/src/catalog/filesystem.rs
@@ -0,0 +1,548 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Filesystem catalog implementation for Apache Paimon.
+//!
+//! Reference:
[org.apache.paimon.catalog.FileSystemCatalog](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java)
+
+use std::collections::HashMap;
+
+use crate::catalog::{Catalog, Identifier, DB_LOCATION_PROP, DB_SUFFIX};
+use crate::error::{Error, Result};
+use crate::io::FileIO;
+use crate::spec::{Schema, TableSchema};
+use crate::table::Table;
+use async_trait::async_trait;
+use bytes::Bytes;
+use opendal::raw::get_basename;
+use snafu::{FromString, Whatever};
+
+/// Name of the schema directory under each table path.
+const SCHEMA_DIR: &str = "schema";
+/// Prefix for schema files (under the schema directory).
+const SCHEMA_PREFIX: &str = "schema-";
+
+fn make_path(parent: &str, child: &str) -> String {
+ format!("{parent}/{child}")
+}
+
+/// Filesystem catalog implementation.
+///
+/// This catalog stores metadata on a filesystem with the following structure:
+/// ```text
+/// warehouse/
+/// ├── database1.db/
+/// │ ├── table1/
+/// │ │ └── schema/
+/// │ │ ├── schema-0
+/// │ │ ├── schema-1
+/// │ │ └── ...
+/// │ └── table2/
+/// │ └── schema/
+/// │ ├── schema-0
+/// │ └── ...
+/// └── database2.db/
+/// └── ...
+/// ```
+///
+/// Reference:
[org.apache.paimon.catalog.FileSystemCatalog](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java)
+#[derive(Clone, Debug)]
+pub struct FileSystemCatalog {
+ file_io: FileIO,
+ warehouse: String,
+}
+
+#[allow(dead_code)]
+impl FileSystemCatalog {
+ /// Create a new filesystem catalog.
+ ///
+ /// # Arguments
+ /// * `warehouse` - The root warehouse path
+ pub fn new(warehouse: impl Into<String>) -> crate::Result<Self> {
+ let warehouse = warehouse.into();
+ Ok(Self {
+ file_io: FileIO::from_path(warehouse.as_str())?.build()?,
+ warehouse,
+ })
+ }
+
+ /// Get the warehouse path.
+ pub fn warehouse(&self) -> &str {
+ &self.warehouse
+ }
+
+ /// Get the FileIO instance.
+ pub fn file_io(&self) -> &FileIO {
+ &self.file_io
+ }
+
+ /// Get the path for a database (warehouse / `name` + [DB_SUFFIX]).
+ fn database_path(&self, database_name: &str) -> String {
+ make_path(
+ &self.warehouse,
+ format!("{database_name}{DB_SUFFIX}").as_str(),
+ )
+ }
+
+ /// Get the path for a table (warehouse / `database.db` / table).
+ fn table_path(&self, identifier: &Identifier) -> String {
+ make_path(
+ &self.database_path(identifier.database()),
+ identifier.object(),
+ )
+ }
+
+ /// Path to the schema directory under a table path.
+ fn schema_dir_path(&self, table_path: &str) -> String {
+ make_path(table_path, SCHEMA_DIR)
+ }
+
+ /// Get the schema file path for a given version
(table_path/schema/schema-{version}).
+ fn schema_file_path(&self, table_path: &str, schema_id: i64) -> String {
+ make_path(
+ make_path(table_path, SCHEMA_DIR).as_str(),
+ format!("{SCHEMA_PREFIX}{schema_id}").as_str(),
+ )
+ }
+
+ /// List directories in the given path.
+ async fn list_directories(&self, path: &str) -> Result<Vec<String>> {
+ let statuses = self.file_io.list_status(path).await?;
+ let mut dirs = Vec::new();
+ for status in statuses {
+ if status.is_dir {
+ if let Some(p) = get_basename(status.path.as_str())
+ // opendal get_basename will contain "/" for directory,
+ // we need to strip suffix to get the real base name
+ .strip_suffix("/")
+ {
+ dirs.push(p.to_string());
+ }
+ }
+ }
+ Ok(dirs)
+ }
+
+ /// Load the latest schema for a table (highest schema-{version} file
under table_path/schema).
+ async fn load_latest_table_schema(&self, table_path: &str) ->
Result<Option<TableSchema>> {
+ let schema_dir = self.schema_dir_path(table_path);
+ if !self.file_io.exists(&schema_dir).await? {
+ return Ok(None);
+ }
+ let statuses = self.file_io.list_status(&schema_dir).await?;
+
+ let latest_schema_id = statuses
+ .into_iter()
+ .filter(|s| !s.is_dir)
+ .filter_map(|s| {
+ get_basename(s.path.as_str())
+ .strip_prefix(SCHEMA_PREFIX)?
+ .parse::<i64>()
+ .ok()
+ })
+ .max();
+
+ if let Some(schema_id) = latest_schema_id {
+ let schema_path = self.schema_file_path(table_path, schema_id);
+ let input_file = self.file_io.new_input(&schema_path)?;
+ let content = input_file.read().await?;
+ let schema: TableSchema =
+ serde_json::from_slice(&content).map_err(|e|
Error::DataInvalid {
+ message: format!("Failed to parse schema file:
{schema_path}"),
+ source: Whatever::without_source(e.to_string()),
+ })?;
+ return Ok(Some(schema));
+ }
+
+ Ok(None)
+ }
+
+ /// Save a table schema to a file.
+ async fn save_table_schema(&self, table_path: &str, schema: &TableSchema)
-> Result<()> {
+ let schema_dir = self.schema_dir_path(table_path);
+ self.file_io.mkdirs(&schema_dir).await?;
+ let schema_path = self.schema_file_path(table_path, schema.id());
+ let output_file = self.file_io.new_output(&schema_path)?;
+ let content =
+ Bytes::from(
+ serde_json::to_string(schema).map_err(|e| Error::DataInvalid {
+ message: format!("Failed to serialize schema: {e}"),
+ source: Whatever::without_source(e.to_string()),
+ })?,
+ );
+ output_file.write(content).await?;
+ Ok(())
+ }
+
+ /// Check if a database exists.
+ async fn database_exists(&self, name: &str) -> Result<bool> {
+ self.file_io.exists(&self.database_path(name)).await
+ }
+
+ /// Check if a table exists.
+ async fn table_exists(&self, identifier: &Identifier) -> Result<bool> {
+ self.file_io.exists(&self.table_path(identifier)).await
+ }
+}
+
+#[async_trait]
+impl Catalog for FileSystemCatalog {
+ async fn list_databases(&self) -> Result<Vec<String>> {
+ let dirs = self.list_directories(&self.warehouse).await?;
+ Ok(dirs
+ .into_iter()
+ .filter_map(|name| name.strip_suffix(DB_SUFFIX).map(|s|
s.to_string()))
+ .collect())
+ }
+
+ async fn create_database(
+ &self,
+ name: &str,
+ ignore_if_exists: bool,
+ properties: HashMap<String, String>,
+ ) -> Result<()> {
+ if properties.contains_key(DB_LOCATION_PROP) {
+ return Err(Error::ConfigInvalid {
+ message: "Cannot specify location for a database when using
fileSystem catalog."
+ .to_string(),
+ });
+ }
+
+ let path = self.database_path(name);
+ let database_exists = self.database_exists(name).await?;
+
+ if !ignore_if_exists && database_exists {
+ return Err(Error::DatabaseAlreadyExist {
+ database: name.to_string(),
+ });
+ }
+
+ if !database_exists {
+ self.file_io.mkdirs(&path).await?;
+ }
+
+ Ok(())
+ }
+
+ async fn drop_database(
+ &self,
+ name: &str,
+ ignore_if_not_exists: bool,
+ cascade: bool,
+ ) -> Result<()> {
+ let path = self.database_path(name);
+
+ let database_exists = self.database_exists(name).await?;
+ if !ignore_if_not_exists && !database_exists {
+ return Err(Error::DatabaseNotExist {
+ database: name.to_string(),
+ });
+ }
+
+ if !database_exists {
+ return Ok(());
+ }
+
+ let tables = self.list_directories(&path).await?;
+ if !tables.is_empty() && !cascade {
+ return Err(Error::DatabaseNotEmpty {
+ database: name.to_string(),
+ });
+ }
+
+ self.file_io.delete_dir(&path).await?;
+ Ok(())
+ }
+
+ async fn get_table(&self, identifier: &Identifier) -> Result<Table> {
+ let table_path = self.table_path(identifier);
+
+ if !self.table_exists(identifier).await? {
+ return Err(Error::TableNotExist {
+ full_name: identifier.full_name(),
+ });
+ }
+
+ let schema = self
+ .load_latest_table_schema(&table_path)
+ .await?
+ .ok_or_else(|| Error::TableNotExist {
+ full_name: identifier.full_name(),
+ })?;
+
+ Ok(Table::new(
+ self.file_io.clone(),
+ identifier.clone(),
+ table_path,
+ schema,
+ ))
+ }
+
+ async fn list_tables(&self, database_name: &str) -> Result<Vec<String>> {
+ let path = self.database_path(database_name);
+
+ if !self.database_exists(database_name).await? {
+ return Err(Error::DatabaseNotExist {
+ database: database_name.to_string(),
+ });
+ }
+
+ self.list_directories(&path).await
+ }
+
+ async fn create_table(
+ &self,
+ identifier: &Identifier,
+ creation: Schema,
+ ignore_if_exists: bool,
+ ) -> Result<()> {
+ let table_path = self.table_path(identifier);
+
+ let table_exists = self.table_exists(identifier).await?;
+
+ if !ignore_if_exists && table_exists {
+ return Err(Error::TableAlreadyExist {
+ full_name: identifier.full_name(),
+ });
+ }
+
+ if !self.database_exists(identifier.database()).await? {
+ return Err(Error::DatabaseNotExist {
+ database: identifier.database().to_string(),
+ });
+ }
+
+ // todo: consider with lock
+ if !table_exists {
+ self.file_io.mkdirs(&table_path).await?;
+ let table_schema = TableSchema::new(0, &creation);
+ self.save_table_schema(&table_path, &table_schema).await?;
+ }
+
+ Ok(())
+ }
+
+ async fn drop_table(&self, identifier: &Identifier, ignore_if_not_exists:
bool) -> Result<()> {
+ let table_path = self.table_path(identifier);
+
+ let table_exists = self.table_exists(identifier).await?;
+
+ if !ignore_if_not_exists && !table_exists {
+ return Err(Error::TableNotExist {
+ full_name: identifier.full_name(),
+ });
+ }
+
+ if !table_exists {
+ return Ok(());
+ }
+
+ self.file_io.delete_dir(&table_path).await?;
+ Ok(())
+ }
+
+ async fn rename_table(
+ &self,
+ from: &Identifier,
+ to: &Identifier,
+ ignore_if_not_exists: bool,
+ ) -> Result<()> {
+ let from_path = self.table_path(from);
+ let to_path = self.table_path(to);
+
+ let table_exists = self.table_exists(from).await?;
+
+ if !ignore_if_not_exists && !table_exists {
+ return Err(Error::TableNotExist {
+ full_name: from.full_name(),
+ });
+ }
+
+ if !table_exists {
+ return Ok(());
+ }
+
+ if self.table_exists(to).await? {
+ return Err(Error::TableAlreadyExist {
+ full_name: to.full_name(),
+ });
+ }
+
+ self.file_io.rename(&from_path, &to_path).await?;
+ Ok(())
+ }
+
+ async fn alter_table(
+ &self,
+ identifier: &Identifier,
+ _changes: Vec<crate::spec::SchemaChange>,
+ ignore_if_not_exists: bool,
+ ) -> Result<()> {
+ if !ignore_if_not_exists && !self.table_exists(identifier).await? {
+ return Err(Error::TableNotExist {
+ full_name: identifier.full_name(),
+ });
+ }
+
+ // TODO: Implement alter table with schema versioning
+ Err(Error::Unsupported {
+ message: "Alter table is not yet implemented for filesystem
catalog".to_string(),
+ })
+ }
+}
+
+#[cfg(test)]
+#[cfg(not(windows))] // Skip on Windows due to path compatibility issues
+mod tests {
+ use super::*;
+ use tempfile::TempDir;
+
+ /// Returns a temp dir guard (keep alive for test duration) and a catalog
using it as warehouse.
+ fn create_test_catalog() -> (TempDir, FileSystemCatalog) {
+ let temp_dir = TempDir::new().unwrap();
+ let warehouse = temp_dir.path().to_str().unwrap().to_string();
+ let catalog = FileSystemCatalog::new(warehouse).unwrap();
+ (temp_dir, catalog)
+ }
+
+ fn testing_schema() -> Schema {
+ Schema::builder()
+ .column(
+ "id",
+ crate::spec::DataType::Int(crate::spec::IntType::new()),
+ )
+ .build()
+ .unwrap()
+ }
+
+ #[tokio::test]
+ async fn test_database_operations() {
+ let (_temp_dir, catalog) = create_test_catalog();
+
+ // create and list
+ catalog
+ .create_database("db1", false, HashMap::new())
+ .await
+ .unwrap();
+ catalog
+ .create_database("db2", false, HashMap::new())
+ .await
+ .unwrap();
+ let databases = catalog.list_databases().await.unwrap();
+ assert_eq!(databases.len(), 2);
+ assert!(databases.contains(&"db1".to_string()));
+ assert!(databases.contains(&"db2".to_string()));
+
+ // create same db without ignore_if_exists -> error
+ let result = catalog.create_database("db1", false,
HashMap::new()).await;
+ assert!(result.is_err());
+ assert!(matches!(result, Err(Error::DatabaseAlreadyExist { .. })));
+
+ // create same db with ignore_if_exists -> ok
+ catalog
+ .create_database("db1", true, HashMap::new())
+ .await
+ .unwrap();
+
+ // create_database with location property -> error (filesystem catalog)
+ let mut props = HashMap::new();
+ props.insert(DB_LOCATION_PROP.to_string(), "/some/path".to_string());
+ let result = catalog.create_database("dbx", false, props).await;
+ assert!(result.is_err());
+ assert!(matches!(result, Err(Error::ConfigInvalid { .. })));
+
+ // drop empty database
+ catalog.drop_database("db1", false, false).await.unwrap();
+ let databases = catalog.list_databases().await.unwrap();
+ assert_eq!(databases.len(), 1);
+ assert!(databases.contains(&"db2".to_string()));
+
+ // drop non-empty database without cascade -> error
+ catalog
+ .create_database("db1", false, HashMap::new())
+ .await
+ .unwrap();
+ catalog
+ .create_table(&Identifier::new("db1", "table1"), testing_schema(),
false)
+ .await
+ .unwrap();
+ let result = catalog.drop_database("db1", false, false).await;
+ assert!(result.is_err());
+ assert!(matches!(result, Err(Error::DatabaseNotEmpty { .. })));
+
+ // drop database with cascade
+ catalog.drop_database("db1", false, true).await.unwrap();
+ assert!(!catalog.database_exists("db1").await.unwrap());
+ }
+
+ #[tokio::test]
+ async fn test_table_operations() {
+ let (_temp_dir, catalog) = create_test_catalog();
+ catalog
+ .create_database("db1", false, HashMap::new())
+ .await
+ .unwrap();
+
+ // create and list tables
+ let schema = testing_schema();
+ catalog
+ .create_table(&Identifier::new("db1", "table1"), schema.clone(),
false)
+ .await
+ .unwrap();
+ catalog
+ .create_table(&Identifier::new("db1", "table2"), schema, false)
+ .await
+ .unwrap();
+ let tables = catalog.list_tables("db1").await.unwrap();
+ assert_eq!(tables.len(), 2);
+ assert!(tables.contains(&"table1".to_string()));
+ assert!(tables.contains(&"table2".to_string()));
+
+ // get_table and check schema
+ let schema_with_name = Schema::builder()
+ .column(
+ "id",
+ crate::spec::DataType::Int(crate::spec::IntType::new()),
+ )
+ .column(
+ "name",
+
crate::spec::DataType::VarChar(crate::spec::VarCharType::string_type()),
+ )
+ .build()
+ .unwrap();
+ catalog
+ .create_table(&Identifier::new("db1", "table3"), schema_with_name,
false)
+ .await
+ .unwrap();
+ let table = catalog
+ .get_table(&Identifier::new("db1", "table3"))
+ .await
+ .unwrap();
+ let table_schema = table.schema();
+ assert_eq!(table_schema.id(), 0);
+ assert_eq!(table_schema.fields().len(), 2);
+
+ // drop table
+ catalog
+ .drop_table(&Identifier::new("db1", "table1"), false)
+ .await
+ .unwrap();
+ let tables = catalog.list_tables("db1").await.unwrap();
+ assert_eq!(tables.len(), 2);
+ assert!(!tables.contains(&"table1".to_string()));
+ }
+}
diff --git a/crates/paimon/src/catalog/mod.rs b/crates/paimon/src/catalog/mod.rs
index d416d43..086e98b 100644
--- a/crates/paimon/src/catalog/mod.rs
+++ b/crates/paimon/src/catalog/mod.rs
@@ -20,6 +20,8 @@
//! Design aligns with [Paimon Java
Catalog](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java)
//! and follows API patterns from Apache Iceberg Rust.
+mod filesystem;
+
use std::collections::HashMap;
use std::fmt;
@@ -36,6 +38,14 @@ pub const SYSTEM_BRANCH_PREFIX: &str = "branch_";
pub const DEFAULT_MAIN_BRANCH: &str = "main";
/// Database value when the database is not known; [`Identifier::full_name`]
returns only the object.
pub const UNKNOWN_DATABASE: &str = "unknown";
+/// Database property key for custom location. Not allowed for filesystem
catalog.
+/// See
[Catalog.DB_LOCATION_PROP](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java).
+#[allow(dead_code)] // Public API - allow unused until used by external code
+pub const DB_LOCATION_PROP: &str = "location";
+/// Suffix for database directory names in the filesystem (e.g. `mydb.db`).
+/// See
[Catalog.DB_SUFFIX](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java).
+#[allow(dead_code)] // Public API - allow unused until used by external code
+pub const DB_SUFFIX: &str = ".db";
// ======================= Identifier ===============================
@@ -53,6 +63,7 @@ pub struct Identifier {
object: String,
}
+#[allow(dead_code)]
impl Identifier {
/// Create an identifier from database and object name.
pub fn new(database: impl Into<String>, object: impl Into<String>) -> Self
{
@@ -110,6 +121,7 @@ use crate::Result;
///
/// Corresponds to
[org.apache.paimon.catalog.Catalog](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java).
#[async_trait]
+#[allow(dead_code)]
pub trait Catalog: Send + Sync {
// ======================= database methods ===============================
diff --git a/crates/paimon/src/error.rs b/crates/paimon/src/error.rs
index 10c1f4e..d49d883 100644
--- a/crates/paimon/src/error.rs
+++ b/crates/paimon/src/error.rs
@@ -29,6 +29,11 @@ pub enum Error {
#[snafu(backtrace)]
source: snafu::Whatever,
},
+ #[snafu(
+ visibility(pub(crate)),
+ display("Paimon hitting unsupported error {}", message)
+ )]
+ Unsupported { message: String },
#[snafu(
visibility(pub(crate)),
display("Paimon data type invalid for {}", message)
diff --git a/crates/paimon/src/file_index/file_index_format.rs
b/crates/paimon/src/file_index/file_index_format.rs
index 7a23871..c473e47 100644
--- a/crates/paimon/src/file_index/file_index_format.rs
+++ b/crates/paimon/src/file_index/file_index_format.rs
@@ -101,7 +101,7 @@ pub async fn write_column_indexes(
path: &str,
indexes: HashMap<String, HashMap<String, Bytes>>,
) -> crate::Result<OutputFile> {
- let file_io = FileIO::from_url(path)?.build()?;
+ let file_io = FileIO::from_path(path)?.build()?;
let output = file_io.new_output(path)?;
let mut writer = output.writer().await?;
diff --git a/crates/paimon/src/io/file_io.rs b/crates/paimon/src/io/file_io.rs
index ee1b08f..e10e59d 100644
--- a/crates/paimon/src/io/file_io.rs
+++ b/crates/paimon/src/io/file_io.rs
@@ -22,7 +22,8 @@ use std::sync::Arc;
use bytes::Bytes;
use chrono::{DateTime, Utc};
-use opendal::Operator;
+use opendal::raw::normalize_root;
+use opendal::{Metakey, Operator};
use snafu::ResultExt;
use url::Url;
@@ -45,6 +46,25 @@ impl FileIO {
Ok(FileIOBuilder::new(url.scheme()))
}
+ /// Try to infer file io scheme from path. See [`FileIO`] for supported
schemes.
+ ///
+ /// - If it's a valid url, for example `s3://bucket/a`, url scheme will be
used, and the rest of the url will be ignored.
+ /// - If it's not a valid url, will try to detect if it's a file path.
+ ///
+ /// Otherwise will return parsing error.
+ pub fn from_path(path: impl AsRef<str>) -> crate::Result<FileIOBuilder> {
+ let url = Url::parse(path.as_ref())
+ .map_err(|_| Error::ConfigInvalid {
+ message: format!("Invalid URL: {}", path.as_ref()),
+ })
+ .or_else(|_| {
+ Url::from_file_path(path.as_ref()).map_err(|_|
Error::ConfigInvalid {
+ message: format!("Input {} is neither a valid url nor
path", path.as_ref()),
+ })
+ })?;
+ Ok(FileIOBuilder::new(url.scheme()))
+ }
+
/// Create a new input file to read data.
///
/// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L76>
@@ -97,10 +117,18 @@ impl FileIO {
/// FIXME: how to handle large dir? Better to return a stream instead?
pub async fn list_status(&self, path: &str) -> Result<Vec<FileStatus>> {
let (op, relative_path) = self.storage.create(path)?;
-
- let entries = op.list(relative_path).await.context(IoUnexpectedSnafu {
- message: format!("Failed to list files in '{path}'"),
- })?;
+ // Opendal list() expects directory path to end with `/`.
+ // use normalize_root to make sure it end with `/`.
+ let list_path = normalize_root(relative_path);
+
+ // Request ContentLength and LastModified so accessing
meta.content_length() / last_modified()
+ let entries = op
+ .list_with(&list_path)
+ .metakey(Metakey::ContentLength | Metakey::LastModified)
+ .await
+ .context(IoUnexpectedSnafu {
+ message: format!("Failed to list files in '{path}'"),
+ })?;
let mut statuses = Vec::new();
@@ -109,7 +137,7 @@ impl FileIO {
statuses.push(FileStatus {
size: meta.content_length(),
is_dir: meta.is_dir(),
- path: path.to_string(),
+ path: entry.path().to_string(),
last_modified: meta.last_modified(),
});
}
@@ -163,12 +191,11 @@ impl FileIO {
/// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L150>
pub async fn mkdirs(&self, path: &str) -> Result<()> {
let (op, relative_path) = self.storage.create(path)?;
-
- op.create_dir(relative_path)
- .await
- .context(IoUnexpectedSnafu {
- message: format!("Failed to create directory '{path}'"),
- })?;
+ // Opendal create_dir expects the path to end with `/` to indicate a
directory.
+ let dir_path = normalize_root(relative_path);
+ op.create_dir(&dir_path).await.context(IoUnexpectedSnafu {
+ message: format!("Failed to create directory '{path}'"),
+ })?;
Ok(())
}
diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs
index 927a51e..06cdd11 100644
--- a/crates/paimon/src/spec/schema.rs
+++ b/crates/paimon/src/spec/schema.rs
@@ -38,6 +38,71 @@ pub struct TableSchema {
time_millis: i64,
}
+impl TableSchema {
+ pub const CURRENT_VERSION: i32 = 3;
+
+ /// Create a TableSchema from a Schema with the given ID.
+ ///
+ /// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java#L373>
+ pub fn new(id: i64, schema: &Schema) -> Self {
+ let fields = schema.fields().to_vec();
+ let highest_field_id = Self::current_highest_field_id(&fields);
+
+ Self {
+ version: Self::CURRENT_VERSION,
+ id,
+ fields,
+ highest_field_id,
+ partition_keys: schema.partition_keys().to_vec(),
+ primary_keys: schema.primary_keys().to_vec(),
+ options: schema.options().clone(),
+ comment: schema.comment().map(|s| s.to_string()),
+ time_millis: chrono::Utc::now().timestamp_millis(),
+ }
+ }
+
+ /// Get the highest field ID from a list of fields.
+ pub fn current_highest_field_id(fields: &[DataField]) -> i32 {
+ fields.iter().map(|f| f.id()).max().unwrap_or(-1)
+ }
+
+ pub fn version(&self) -> i32 {
+ self.version
+ }
+
+ pub fn id(&self) -> i64 {
+ self.id
+ }
+
+ pub fn fields(&self) -> &[DataField] {
+ &self.fields
+ }
+
+ pub fn highest_field_id(&self) -> i32 {
+ self.highest_field_id
+ }
+
+ pub fn partition_keys(&self) -> &[String] {
+ &self.partition_keys
+ }
+
+ pub fn primary_keys(&self) -> &[String] {
+ &self.primary_keys
+ }
+
+ pub fn options(&self) -> &HashMap<String, String> {
+ &self.options
+ }
+
+ pub fn comment(&self) -> Option<&str> {
+ self.comment.as_deref()
+ }
+
+ pub fn time_millis(&self) -> i64 {
+ self.time_millis
+ }
+}
+
/// Data field for paimon table.
///
/// Impl Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/types/DataField.java#L40>
diff --git a/crates/paimon/src/spec/types.rs b/crates/paimon/src/spec/types.rs
index 815c7c6..4001638 100644
--- a/crates/paimon/src/spec/types.rs
+++ b/crates/paimon/src/spec/types.rs
@@ -1270,6 +1270,14 @@ impl VarCharType {
pub const DEFAULT_LENGTH: u32 = 1;
+ /// Variable-length string type with maximum length (Java:
`VarCharType.STRING_TYPE`).
+ ///
+ /// Reference:
[VarCharType.STRING_TYPE](https://github.com/apache/paimon/blob/release-1.3/paimon-api/src/main/java/org/apache/paimon/types/VarCharType.java).
+ #[inline]
+ pub fn string_type() -> Self {
+ Self::with_nullable(true, Self::MAX_LENGTH).unwrap()
+ }
+
pub fn new(length: u32) -> Result<Self, Error> {
Self::with_nullable(true, length)
}
diff --git a/crates/paimon/src/table.rs b/crates/paimon/src/table.rs
index 5e8d262..718937b 100644
--- a/crates/paimon/src/table.rs
+++ b/crates/paimon/src/table.rs
@@ -17,6 +17,53 @@
//! Table API for Apache Paimon
+use crate::catalog::Identifier;
+use crate::io::FileIO;
+use crate::spec::TableSchema;
+
/// Table represents a table in the catalog.
#[derive(Debug, Clone)]
-pub struct Table {}
+pub struct Table {
+ file_io: FileIO,
+ identifier: Identifier,
+ location: String,
+ schema: TableSchema,
+}
+
+#[allow(dead_code)]
+impl Table {
+ /// Create a new table.
+ pub fn new(
+ file_io: FileIO,
+ identifier: Identifier,
+ location: String,
+ schema: TableSchema,
+ ) -> Self {
+ Self {
+ file_io,
+ identifier,
+ location,
+ schema,
+ }
+ }
+
+ /// Get the table's identifier.
+ pub fn identifier(&self) -> &Identifier {
+ &self.identifier
+ }
+
+ /// Get the table's location.
+ pub fn location(&self) -> &str {
+ &self.location
+ }
+
+ /// Get the table's schema.
+ pub fn schema(&self) -> &TableSchema {
+ &self.schema
+ }
+
+ /// Get the FileIO instance for this table.
+ pub fn file_io(&self) -> &FileIO {
+ &self.file_io
+ }
+}