This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 0d71bd7 feat: implement `HudiTable` as python API (#23)
0d71bd7 is described below
commit 0d71bd7a0c3ec8d2aae631f5425b97166de8a76d
Author: Shiyan Xu <[email protected]>
AuthorDate: Wed Jun 19 15:39:40 2024 -0500
feat: implement `HudiTable` as python API (#23)
---
Cargo.toml | 2 +-
.../.hoodie/hoodie.properties | 44 ++---
crates/core/src/error.rs | 2 +
crates/core/src/file_group/mod.rs | 9 +-
crates/core/src/lib.rs | 7 +-
crates/core/src/table/config.rs | 147 ++++++++++++++
crates/core/src/table/file_system_view.rs | 104 ----------
.../core/src/table/{meta_client.rs => fs_view.rs} | 112 ++++++-----
crates/core/src/table/metadata.rs | 55 ++++++
crates/core/src/table/mod.rs | 216 ++++++++++++++++++---
crates/core/src/timeline/mod.rs | 6 +-
python/src/lib.rs => crates/core/src/utils.rs | 28 ++-
crates/datafusion/src/lib.rs | 7 +-
python/Cargo.toml | 6 +-
python/Makefile | 2 +-
python/hudi/__init__.py | 4 +
python/hudi/_internal.pyi | 10 +
python/hudi/{_internal.pyi => table.py} | 16 +-
python/src/lib.rs | 32 +++
19 files changed, 572 insertions(+), 237 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index e24d132..29294d8 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -41,7 +41,7 @@ arrow-ord = { version = "50" }
arrow-row = { version = "50" }
arrow-schema = { version = "50" }
arrow-select = { version = "50" }
-object_store = { version = "0.9" }
+object_store = { version = "0.10.1" }
parquet = { version = "50" }
# datafusion
diff --git a/python/Cargo.toml
b/crates/core/fixtures/table_metadata/sample_table_properties/.hoodie/hoodie.properties
similarity index 51%
copy from python/Cargo.toml
copy to
crates/core/fixtures/table_metadata/sample_table_properties/.hoodie/hoodie.properties
index 0a6c666..9c0c7e2 100644
--- a/python/Cargo.toml
+++
b/crates/core/fixtures/table_metadata/sample_table_properties/.hoodie/hoodie.properties
@@ -1,3 +1,4 @@
+#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -14,27 +15,24 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+#
-[package]
-name = "hudi-python"
-version = "0.1.0"
-edition = "2021"
-
-# See more keys and their definitions at
https://doc.rust-lang.org/cargo/reference/manifest.html
-[lib]
-name = "hudi"
-crate-type = ["cdylib"]
-doc = false
-
-[dependencies]
-# runtime
-futures = { workspace = true }
-num_cpus = { workspace = true }
-tokio = { workspace = true, features = ["rt-multi-thread"] }
-
-[dependencies.pyo3]
-version = "0.21.2"
-features = ["extension-module", "abi3", "abi3-py38", "gil-refs"]
-
-[dependencies.hudi]
-path = "../crates/hudi"
+hoodie.table.type=COPY_ON_WRITE
+hoodie.table.metadata.partitions=files
+hoodie.table.precombine.field=ts
+hoodie.table.partition.fields=city
+hoodie.archivelog.folder=archived
+hoodie.table.cdc.enabled=false
+hoodie.timeline.layout.version=1
+hoodie.table.checksum=3761586722
+hoodie.datasource.write.drop.partition.columns=false
+hoodie.table.recordkey.fields=uuid
+hoodie.table.name=trips
+hoodie.partition.metafile.use.base.format=false
+hoodie.datasource.write.hive_style_partitioning=false
+hoodie.table.metadata.partitions.inflight=
+hoodie.populate.meta.fields=true
+hoodie.table.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
+hoodie.table.base.file.format=PARQUET
+hoodie.datasource.write.partitionpath.urlencode=false
+hoodie.table.version=6
diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs
index f7f0bd7..4a2f0f2 100644
--- a/crates/core/src/error.rs
+++ b/crates/core/src/error.rs
@@ -40,4 +40,6 @@ pub enum HudiCoreError {
FailToLoadFileGroup(#[from] HudiFileGroupError),
#[error("Failed to build file system view")]
FailToBuildFileSystemView(#[from] HudiFileSystemViewError),
+ #[error("Failed to load table properties")]
+ LoadTablePropertiesError,
}
diff --git a/crates/core/src/file_group/mod.rs
b/crates/core/src/file_group/mod.rs
index 609b6e6..29761bd 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -59,9 +59,8 @@ pub struct FileSlice {
pub partition_path: Option<String>,
}
-#[allow(dead_code)]
impl FileSlice {
- pub fn file_path(&self) -> Option<&str> {
+ pub fn base_file_path(&self) -> Option<&str> {
match &self.base_file.metadata {
None => None,
Some(file_metadata) => Some(file_metadata.path.as_str()),
@@ -72,16 +71,12 @@ impl FileSlice {
&self.base_file.file_group_id
}
- pub fn base_instant_time(&self) -> &str {
- &self.base_file.commit_time
- }
-
pub fn set_base_file(&mut self, base_file: BaseFile) {
self.base_file = base_file
}
}
-#[derive(Debug)]
+#[derive(Clone, Debug)]
pub struct FileGroup {
pub id: String,
pub partition_path: Option<String>,
diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs
index abf9a66..86a191f 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/lib.rs
@@ -24,12 +24,7 @@ mod file_group;
pub mod table;
pub type HudiTable = Table;
mod timeline;
-
-pub const BASE_FILE_EXTENSIONS: [&str; 1] = ["parquet"];
-
-pub fn is_base_file_format_supported(ext: &str) -> bool {
- BASE_FILE_EXTENSIONS.contains(&ext)
-}
+mod utils;
pub fn crate_version() -> &'static str {
env!("CARGO_PKG_VERSION")
diff --git a/crates/core/src/table/config.rs b/crates/core/src/table/config.rs
new file mode 100644
index 0000000..05a75db
--- /dev/null
+++ b/crates/core/src/table/config.rs
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::error::HudiCoreError;
+use std::str::FromStr;
+
+pub enum ConfigKey {
+ BaseFileFormat,
+ Checksum,
+ DatabaseName,
+ DropsPartitionFields,
+ IsHiveStylePartitioning,
+ IsPartitionPathUrlencoded,
+ KeyGeneratorClass,
+ PartitionFields,
+ PrecombineField,
+ PopulatesMetaFields,
+ RecordKeyFields,
+ TableName,
+ TableType,
+ TableVersion,
+ TimelineLayoutVersion,
+}
+
+impl AsRef<str> for ConfigKey {
+ fn as_ref(&self) -> &str {
+ match self {
+ Self::BaseFileFormat => "hoodie.table.base.file.format",
+ Self::Checksum => "hoodie.table.checksum",
+ Self::DatabaseName => "hoodie.database.name",
+ Self::DropsPartitionFields =>
"hoodie.datasource.write.drop.partition.columns",
+ Self::IsHiveStylePartitioning =>
"hoodie.datasource.write.hive_style_partitioning",
+ Self::IsPartitionPathUrlencoded =>
"hoodie.datasource.write.partitionpath.urlencode",
+ Self::KeyGeneratorClass => "hoodie.table.keygenerator.class",
+ Self::PartitionFields => "hoodie.table.partition.fields",
+ Self::PrecombineField => "hoodie.table.precombine.field",
+ Self::PopulatesMetaFields => "hoodie.populate.meta.fields",
+ Self::RecordKeyFields => "hoodie.table.recordkey.fields",
+ Self::TableName => "hoodie.table.name",
+ Self::TableType => "hoodie.table.type",
+ Self::TableVersion => "hoodie.table.version",
+ Self::TimelineLayoutVersion => "hoodie.timeline.layout.version",
+ }
+ }
+}
+
+#[derive(Debug, PartialEq)]
+pub enum TableType {
+ CopyOnWrite,
+ MergeOnRead,
+}
+
+impl FromStr for TableType {
+ type Err = HudiCoreError;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s.to_ascii_lowercase().as_str() {
+ "copy_on_write" => Ok(Self::CopyOnWrite),
+ "copy-on-write" => Ok(Self::CopyOnWrite),
+ "cow" => Ok(Self::CopyOnWrite),
+ "merge_on_read" => Ok(Self::MergeOnRead),
+ "merge-on-read" => Ok(Self::MergeOnRead),
+ "mor" => Ok(Self::MergeOnRead),
+ _ => Err(HudiCoreError::LoadTablePropertiesError),
+ }
+ }
+}
+
+#[derive(Debug, PartialEq)]
+pub enum BaseFileFormat {
+ Parquet,
+}
+
+impl FromStr for BaseFileFormat {
+ type Err = HudiCoreError;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s.to_ascii_lowercase().as_str() {
+ "parquet" => Ok(Self::Parquet),
+ _ => Err(HudiCoreError::LoadTablePropertiesError),
+ }
+ }
+}
+#[cfg(test)]
+mod tests {
+ use crate::table::config::{BaseFileFormat, TableType};
+ use std::str::FromStr;
+
+ #[test]
+ fn create_table_type() {
+ assert_eq!(TableType::from_str("cow").unwrap(),
TableType::CopyOnWrite);
+ assert_eq!(
+ TableType::from_str("copy_on_write").unwrap(),
+ TableType::CopyOnWrite
+ );
+ assert_eq!(
+ TableType::from_str("COPY-ON-WRITE").unwrap(),
+ TableType::CopyOnWrite
+ );
+ assert_eq!(TableType::from_str("MOR").unwrap(),
TableType::MergeOnRead);
+ assert_eq!(
+ TableType::from_str("Merge_on_read").unwrap(),
+ TableType::MergeOnRead
+ );
+ assert_eq!(
+ TableType::from_str("Merge-on-read").unwrap(),
+ TableType::MergeOnRead
+ );
+ assert!(TableType::from_str("").is_err());
+ assert!(TableType::from_str("copyonwrite").is_err());
+ assert!(TableType::from_str("MERGEONREAD").is_err());
+ assert!(TableType::from_str("foo").is_err());
+ }
+
+ #[test]
+ fn create_base_file_format() {
+ assert_eq!(
+ BaseFileFormat::from_str("parquet").unwrap(),
+ BaseFileFormat::Parquet
+ );
+ assert_eq!(
+ BaseFileFormat::from_str("PArquet").unwrap(),
+ BaseFileFormat::Parquet
+ );
+ assert!(TableType::from_str("").is_err());
+ assert!(
+ TableType::from_str("orc").is_err(),
+ "orc is not yet supported."
+ );
+ }
+}
diff --git a/crates/core/src/table/file_system_view.rs
b/crates/core/src/table/file_system_view.rs
deleted file mode 100644
index 7714a8c..0000000
--- a/crates/core/src/table/file_system_view.rs
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use crate::error::HudiFileSystemViewError;
-use crate::error::HudiFileSystemViewError::FailToLoadPartitions;
-use crate::file_group::{FileGroup, FileSlice};
-use crate::table::meta_client::MetaClient;
-use hashbrown::HashMap;
-
-pub struct FileSystemView {
- meta_client: MetaClient,
- partition_to_file_groups: HashMap<String, Vec<FileGroup>>,
-}
-
-impl FileSystemView {
- pub fn init(meta_client: MetaClient) -> Result<Self,
HudiFileSystemViewError> {
- let mut fs_view = FileSystemView {
- meta_client,
- partition_to_file_groups: HashMap::new(),
- };
- fs_view.load_partitions()?;
- Ok(fs_view)
- }
-
- fn load_partitions(&mut self) -> Result<(), HudiFileSystemViewError> {
- match self.meta_client.get_partition_paths() {
- Ok(partition_paths) => {
- for p in partition_paths {
- match self.meta_client.get_file_groups(p.as_str()) {
- Ok(file_groups) => {
- self.partition_to_file_groups.insert(p,
file_groups);
- }
- Err(e) => return Err(FailToLoadPartitions(e)),
- }
- }
- }
- Err(e) => return Err(FailToLoadPartitions(Box::new(e))),
- }
- Ok(())
- }
-
- pub fn get_latest_file_slices(&self) -> Vec<&FileSlice> {
- let mut file_slices = Vec::new();
- for fgs in self.partition_to_file_groups.values() {
- for fg in fgs {
- if let Some(file_slice) = fg.get_latest_file_slice() {
- file_slices.push(file_slice)
- }
- }
- }
- file_slices
- }
-}
-
-#[cfg(test)]
-mod tests {
- use crate::table::file_system_view::FileSystemView;
- use crate::table::meta_client::MetaClient;
- use hudi_fs::test_utils::extract_test_table;
- use std::collections::HashSet;
- use std::path::Path;
-
- #[test]
- fn meta_client_get_file_groups() {
- let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
- let target_table_path = extract_test_table(fixture_path);
- let meta_client = MetaClient::new(&target_table_path);
- let fs_view = FileSystemView::init(meta_client).unwrap();
- let file_slices = fs_view.get_latest_file_slices();
- assert_eq!(file_slices.len(), 5);
- let mut fg_ids = Vec::new();
- for f in file_slices {
- let fp = f.file_group_id();
- fg_ids.push(fp);
- }
- let actual: HashSet<&str> = fg_ids.into_iter().collect();
- assert_eq!(
- actual,
- HashSet::from_iter(vec![
- "780b8586-3ad0-48ef-a6a1-d2217845ce4a-0",
- "d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0",
- "ee915c68-d7f8-44f6-9759-e691add290d8-0",
- "68d3c349-f621-4cd8-9e8b-c6dd8eb20d08-0",
- "5a226868-2934-4f84-a16f-55124630c68d-0"
- ])
- );
- }
-}
diff --git a/crates/core/src/table/meta_client.rs
b/crates/core/src/table/fs_view.rs
similarity index 61%
rename from crates/core/src/table/meta_client.rs
rename to crates/core/src/table/fs_view.rs
index 15f8495..f672dfc 100644
--- a/crates/core/src/table/meta_client.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -25,42 +25,42 @@ use hashbrown::HashMap;
use hudi_fs::file_systems::FileMetadata;
-use crate::file_group::{BaseFile, FileGroup};
-use crate::timeline::Timeline;
+use crate::error::HudiFileSystemViewError;
+use crate::error::HudiFileSystemViewError::FailToLoadPartitions;
+use crate::file_group::{BaseFile, FileGroup, FileSlice};
+use crate::utils::get_leaf_dirs;
-#[derive(Debug, Clone)]
-pub struct MetaClient {
+#[derive(Clone, Debug)]
+pub struct FileSystemView {
pub base_path: PathBuf,
- pub timeline: Timeline,
+ partition_to_file_groups: HashMap<String, Vec<FileGroup>>,
}
-impl MetaClient {
- pub fn new(base_path: &Path) -> Self {
- match Timeline::init(base_path) {
- Ok(timeline) => Self {
- base_path: base_path.to_path_buf(),
- timeline,
- },
- Err(e) => panic!("Failed to instantiate meta client: {}", e),
- }
+impl FileSystemView {
+ pub fn new(base_path: &Path) -> Result<Self, HudiFileSystemViewError> {
+ let mut fs_view = FileSystemView {
+ base_path: base_path.to_path_buf(),
+ partition_to_file_groups: HashMap::new(),
+ };
+ fs_view.load_partitions()?;
+ Ok(fs_view)
}
- fn get_leaf_dirs(path: &Path) -> Result<Vec<PathBuf>, io::Error> {
- let mut leaf_dirs = Vec::new();
- let mut is_leaf_dir = true;
- for entry in fs::read_dir(path)? {
- let entry = entry?;
- if entry.path().is_dir() {
- is_leaf_dir = false;
- let curr_sub_dir = entry.path();
- let curr = Self::get_leaf_dirs(&curr_sub_dir)?;
- leaf_dirs.extend(curr);
+ fn load_partitions(&mut self) -> Result<(), HudiFileSystemViewError> {
+ match self.get_partition_paths() {
+ Ok(partition_paths) => {
+ for p in partition_paths {
+ match self.get_file_groups(p.as_str()) {
+ Ok(file_groups) => {
+ self.partition_to_file_groups.insert(p,
file_groups);
+ }
+ Err(e) => return Err(FailToLoadPartitions(e)),
+ }
+ }
}
+ Err(e) => return Err(FailToLoadPartitions(Box::new(e))),
}
- if is_leaf_dir {
- leaf_dirs.push(path.to_path_buf())
- }
- Ok(leaf_dirs)
+ Ok(())
}
pub fn get_partition_paths(&self) -> Result<Vec<String>, io::Error> {
@@ -73,7 +73,7 @@ impl MetaClient {
}
let mut full_partition_paths: Vec<PathBuf> = Vec::new();
for p in first_level_partition_paths {
- full_partition_paths.extend(Self::get_leaf_dirs(p.as_path())?)
+ full_partition_paths.extend(get_leaf_dirs(p.as_path())?)
}
let common_prefix_len = self.base_path.to_str().unwrap().len() + 1;
let mut partition_paths = Vec::new();
@@ -110,21 +110,35 @@ impl MetaClient {
}
Ok(file_groups)
}
+
+ pub fn get_latest_file_slices(&self) -> Vec<&FileSlice> {
+ let mut file_slices = Vec::new();
+ for fgs in self.partition_to_file_groups.values() {
+ for fg in fgs {
+ if let Some(file_slice) = fg.get_latest_file_slice() {
+ file_slices.push(file_slice)
+ }
+ }
+ }
+ file_slices
+ }
}
#[cfg(test)]
mod tests {
- use crate::table::meta_client::MetaClient;
- use hudi_fs::test_utils::extract_test_table;
use std::collections::HashSet;
use std::path::Path;
+ use hudi_fs::test_utils::extract_test_table;
+
+ use crate::table::fs_view::FileSystemView;
+
#[test]
- fn meta_client_get_partition_paths() {
+ fn get_partition_paths() {
let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
let target_table_path = extract_test_table(fixture_path);
- let meta_client = MetaClient::new(&target_table_path);
- let partition_paths = meta_client.get_partition_paths().unwrap();
+ let fs_view = FileSystemView::new(&target_table_path).unwrap();
+ let partition_paths = fs_view.get_partition_paths().unwrap();
let partition_path_set: HashSet<&str> =
HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
assert_eq!(
@@ -134,27 +148,27 @@ mod tests {
}
#[test]
- fn meta_client_get_file_groups() {
+ fn get_latest_file_slices() {
let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
let target_table_path = extract_test_table(fixture_path);
- let meta_client = MetaClient::new(&target_table_path);
- let file_groups =
meta_client.get_file_groups("san_francisco").unwrap();
- assert_eq!(file_groups.len(), 3);
- let fg_ids: HashSet<&str> =
HashSet::from_iter(file_groups.iter().map(|fg| fg.id.as_str()));
+ let fs_view = FileSystemView::new(&target_table_path).unwrap();
+ let file_slices = fs_view.get_latest_file_slices();
+ assert_eq!(file_slices.len(), 5);
+ let mut fg_ids = Vec::new();
+ for f in file_slices {
+ let fp = f.file_group_id();
+ fg_ids.push(fp);
+ }
+ let actual: HashSet<&str> = fg_ids.into_iter().collect();
assert_eq!(
- fg_ids,
+ actual,
HashSet::from_iter(vec![
- "5a226868-2934-4f84-a16f-55124630c68d-0",
"780b8586-3ad0-48ef-a6a1-d2217845ce4a-0",
- "d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0"
+ "d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0",
+ "ee915c68-d7f8-44f6-9759-e691add290d8-0",
+ "68d3c349-f621-4cd8-9e8b-c6dd8eb20d08-0",
+ "5a226868-2934-4f84-a16f-55124630c68d-0"
])
);
}
- #[test]
- fn meta_client_active_timeline_init_as_expected() {
- let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
- let target_table_path = extract_test_table(fixture_path);
- let meta_client = MetaClient::new(&target_table_path);
- assert_eq!(meta_client.timeline.instants.len(), 2)
- }
}
diff --git a/crates/core/src/table/metadata.rs
b/crates/core/src/table/metadata.rs
new file mode 100644
index 0000000..fb2d41b
--- /dev/null
+++ b/crates/core/src/table/metadata.rs
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::table::config::{BaseFileFormat, TableType};
+
+pub trait ProvidesTableMetadata {
+ fn base_file_format(&self) -> BaseFileFormat;
+
+ fn checksum(&self) -> i64;
+
+ fn database_name(&self) -> String;
+
+ fn drops_partition_fields(&self) -> bool;
+
+ fn is_hive_style_partitioning(&self) -> bool;
+
+ fn is_partition_path_urlencoded(&self) -> bool;
+
+ fn is_partitioned(&self) -> bool;
+
+ fn key_generator_class(&self) -> String;
+
+ fn location(&self) -> String;
+
+ fn partition_fields(&self) -> Vec<String>;
+
+ fn precombine_field(&self) -> String;
+
+ fn populates_meta_fields(&self) -> bool;
+
+ fn record_key_fields(&self) -> Vec<String>;
+
+ fn table_name(&self) -> String;
+
+ fn table_type(&self) -> TableType;
+
+ fn table_version(&self) -> u32;
+
+ fn timeline_layout_version(&self) -> u32;
+}
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index d1c7dc7..2548554 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -17,48 +17,101 @@
* under the License.
*/
+use std::collections::HashMap;
use std::error::Error;
-use std::path::PathBuf;
+use std::fs::File;
+use std::io::{BufRead, BufReader};
+use std::path::{Path, PathBuf};
+use std::str::FromStr;
use arrow_schema::SchemaRef;
-use crate::table::file_system_view::FileSystemView;
-use crate::table::meta_client::MetaClient;
+use crate::file_group::FileSlice;
+use crate::table::config::BaseFileFormat;
+use crate::table::config::{ConfigKey, TableType};
+use crate::table::fs_view::FileSystemView;
+use crate::table::metadata::ProvidesTableMetadata;
+use crate::timeline::Timeline;
-mod file_system_view;
-mod meta_client;
+mod config;
+mod fs_view;
+mod metadata;
#[derive(Debug, Clone)]
pub struct Table {
pub base_path: PathBuf,
- meta_client: MetaClient,
+ pub props: HashMap<String, String>,
}
impl Table {
- pub fn new(base_path: &str) -> Self {
- let p = PathBuf::from(base_path);
- let meta_client = MetaClient::new(p.as_path());
- Self {
- base_path: p,
- meta_client,
+ pub fn new(table_base_path: &str) -> Self {
+ let base_path = PathBuf::from(table_base_path);
+ let props_path = base_path.join(".hoodie").join("hoodie.properties");
+ match Self::load_properties(props_path.as_path()) {
+ Ok(props) => Self { base_path, props },
+ Err(e) => {
+ panic!("Failed to load table properties: {}", e)
+ }
+ }
+ }
+
+ fn load_properties(path: &Path) -> Result<HashMap<String, String>,
std::io::Error> {
+ let file = File::open(path)?;
+ let reader = BufReader::new(file);
+ let lines = reader.lines();
+ let mut properties: HashMap<String, String> = HashMap::new();
+ for line in lines {
+ let line = line?;
+ let trimmed_line = line.trim();
+ if trimmed_line.is_empty() || trimmed_line.starts_with('#') {
+ continue;
+ }
+ let mut parts = trimmed_line.splitn(2, '=');
+ let key = parts.next().unwrap().to_owned();
+ let value = parts.next().unwrap_or("").to_owned();
+ properties.insert(key, value);
}
+ Ok(properties)
+ }
+
+ pub fn get_property(&self, key: &str) -> &str {
+ match self.props.get(key) {
+ Some(value) => value,
+ None => panic!("Failed to retrieve property {}", key),
+ }
+ }
+
+ pub fn get_timeline(&self) -> Result<Timeline, std::io::Error> {
+ Timeline::new(self.base_path.as_path())
}
pub fn schema(&self) -> SchemaRef {
- match self.meta_client.timeline.get_latest_schema() {
- Ok(table_schema) => SchemaRef::from(table_schema),
+ match Timeline::new(self.base_path.as_path()) {
+ Ok(timeline) => match timeline.get_latest_schema() {
+ Ok(schema) => SchemaRef::from(schema),
+ Err(e) => {
+ panic!("Failed to resolve table schema: {}", e)
+ }
+ },
Err(e) => {
panic!("Failed to resolve table schema: {}", e)
}
}
}
- pub fn get_snapshot_file_paths(&self) -> Result<Vec<String>, Box<dyn
Error>> {
- let meta_client = MetaClient::new(&self.base_path);
- let fs_view = FileSystemView::init(meta_client)?;
- let mut file_paths = Vec::new();
+ pub fn get_latest_file_slices(&self) -> Result<Vec<FileSlice>, Box<dyn
Error>> {
+ let mut file_slices = Vec::new();
+ let fs_view = FileSystemView::new(self.base_path.as_path())?;
for f in fs_view.get_latest_file_slices() {
- if let Some(f) = f.file_path() {
+ file_slices.push(f.clone());
+ }
+ Ok(file_slices)
+ }
+
+ pub fn get_latest_file_paths(&self) -> Result<Vec<String>, Box<dyn Error>>
{
+ let mut file_paths = Vec::new();
+ for f in self.get_latest_file_slices()? {
+ if let Some(f) = f.base_file_path() {
file_paths.push(f.to_string());
}
}
@@ -66,18 +119,135 @@ impl Table {
}
}
+impl ProvidesTableMetadata for Table {
+ fn base_file_format(&self) -> BaseFileFormat {
+
BaseFileFormat::from_str(self.get_property(ConfigKey::BaseFileFormat.as_ref())).unwrap()
+ }
+
+ fn checksum(&self) -> i64 {
+ i64::from_str(self.get_property(ConfigKey::Checksum.as_ref())).unwrap()
+ }
+
+ fn database_name(&self) -> String {
+ match self.props.get(ConfigKey::DatabaseName.as_ref()) {
+ Some(value) => value.to_string(),
+ None => "default".to_string(),
+ }
+ }
+
+ fn drops_partition_fields(&self) -> bool {
+
bool::from_str(self.get_property(ConfigKey::DropsPartitionFields.as_ref())).unwrap()
+ }
+
+ fn is_hive_style_partitioning(&self) -> bool {
+
bool::from_str(self.get_property(ConfigKey::IsHiveStylePartitioning.as_ref())).unwrap()
+ }
+
+ fn is_partition_path_urlencoded(&self) -> bool {
+
bool::from_str(self.get_property(ConfigKey::IsPartitionPathUrlencoded.as_ref())).unwrap()
+ }
+
+ fn is_partitioned(&self) -> bool {
+ !self
+ .key_generator_class()
+ .ends_with("NonpartitionedKeyGenerator")
+ }
+
+ fn key_generator_class(&self) -> String {
+ self.get_property(ConfigKey::KeyGeneratorClass.as_ref())
+ .to_string()
+ }
+
+ fn location(&self) -> String {
+ self.base_path.to_str().unwrap().to_string()
+ }
+
+ fn partition_fields(&self) -> Vec<String> {
+ self.get_property(ConfigKey::PartitionFields.as_ref())
+ .split(',')
+ .map(str::to_string)
+ .collect()
+ }
+
+ fn precombine_field(&self) -> String {
+ self.get_property(ConfigKey::PrecombineField.as_ref())
+ .to_string()
+ }
+
+ fn populates_meta_fields(&self) -> bool {
+
bool::from_str(self.get_property(ConfigKey::PopulatesMetaFields.as_ref())).unwrap()
+ }
+
+ fn record_key_fields(&self) -> Vec<String> {
+ self.get_property(ConfigKey::RecordKeyFields.as_ref())
+ .split(',')
+ .map(str::to_string)
+ .collect()
+ }
+
+ fn table_name(&self) -> String {
+ self.get_property(ConfigKey::TableName.as_ref()).to_string()
+ }
+
+ fn table_type(&self) -> TableType {
+
TableType::from_str(self.get_property(ConfigKey::TableType.as_ref())).unwrap()
+ }
+
+ fn table_version(&self) -> u32 {
+
u32::from_str(self.get_property(ConfigKey::TableVersion.as_ref())).unwrap()
+ }
+
+ fn timeline_layout_version(&self) -> u32 {
+
u32::from_str(self.get_property(ConfigKey::TimelineLayoutVersion.as_ref())).unwrap()
+ }
+}
+
#[cfg(test)]
mod tests {
+ use std::path::Path;
+
+ use crate::table::config::BaseFileFormat::Parquet;
+ use crate::table::config::TableType::CopyOnWrite;
+ use crate::table::metadata::ProvidesTableMetadata;
use crate::table::Table;
use hudi_fs::test_utils::extract_test_table;
- use std::path::Path;
#[test]
- fn load_snapshot_file_paths() {
+ fn hudi_table_get_latest_file_paths() {
let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
let target_table_path = extract_test_table(fixture_path);
- let hudi_table =
Table::new(target_table_path.as_path().to_str().unwrap());
- assert_eq!(hudi_table.get_snapshot_file_paths().unwrap().len(), 5);
+ let hudi_table = Table::new(target_table_path.to_str().unwrap());
+ assert_eq!(hudi_table.get_timeline().unwrap().instants.len(), 2);
+ assert_eq!(hudi_table.get_latest_file_paths().unwrap().len(), 5);
println!("{}", hudi_table.schema());
}
+
+ #[test]
+ fn hudi_table_get_table_metadata() {
+ let fixture_path =
Path::new("fixtures/table_metadata/sample_table_properties");
+ let table = Table::new(fixture_path.to_str().unwrap());
+ assert_eq!(table.base_file_format(), Parquet);
+ assert_eq!(table.checksum(), 3761586722);
+ assert_eq!(table.database_name(), "default");
+ assert!(!table.drops_partition_fields());
+ assert!(!table.is_hive_style_partitioning());
+ assert!(!table.is_partition_path_urlencoded());
+ assert!(table.is_partitioned());
+ assert_eq!(
+ table.key_generator_class(),
+ "org.apache.hudi.keygen.SimpleKeyGenerator"
+ );
+ assert_eq!(
+ table.location(),
+ "fixtures/table_metadata/sample_table_properties"
+ );
+ assert_eq!(table.partition_fields(), vec!["city"]);
+ assert_eq!(table.precombine_field(), "ts");
+ assert!(table.populates_meta_fields());
+ assert_eq!(table.record_key_fields(), vec!["uuid"]);
+ assert_eq!(table.table_name(), "trips");
+ assert_eq!(table.table_type(), CopyOnWrite);
+ assert_eq!(table.table_version(), 6);
+ assert_eq!(table.timeline_layout_version(), 1);
+ }
}
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
index 75d89d5..1cd9e13 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/timeline/mod.rs
@@ -64,7 +64,7 @@ pub struct Timeline {
}
impl Timeline {
- pub fn init(base_path: &Path) -> Result<Self, io::Error> {
+ pub fn new(base_path: &Path) -> Result<Self, io::Error> {
let instants = Self::load_completed_commit_instants(base_path)?;
Ok(Self {
base_path: base_path.to_path_buf(),
@@ -140,7 +140,7 @@ mod tests {
fn read_latest_schema() {
let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
let target_table_path = extract_test_table(fixture_path);
- let timeline = Timeline::init(target_table_path.as_path()).unwrap();
+ let timeline = Timeline::new(target_table_path.as_path()).unwrap();
let table_schema = timeline.get_latest_schema().unwrap();
assert_eq!(table_schema.fields.len(), 11)
}
@@ -148,7 +148,7 @@ mod tests {
#[test]
fn init_commits_timeline() {
let fixture_path = Path::new("fixtures/timeline/commits_stub");
- let timeline = Timeline::init(fixture_path).unwrap();
+ let timeline = Timeline::new(fixture_path).unwrap();
assert_eq!(
timeline.instants,
vec![
diff --git a/python/src/lib.rs b/crates/core/src/utils.rs
similarity index 59%
copy from python/src/lib.rs
copy to crates/core/src/utils.rs
index ad03f49..c4189c5 100644
--- a/python/src/lib.rs
+++ b/crates/core/src/utils.rs
@@ -17,15 +17,23 @@
* under the License.
*/
-use pyo3::prelude::*;
+use std::path::{Path, PathBuf};
+use std::{fs, io};
-#[pyfunction]
-fn rust_core_version() -> &'static str {
- hudi::crate_version()
-}
-
-#[pymodule]
-fn _internal(_py: Python, m: &PyModule) -> PyResult<()> {
- m.add_function(wrap_pyfunction!(rust_core_version, m)?)?;
- Ok(())
+pub fn get_leaf_dirs(path: &Path) -> Result<Vec<PathBuf>, io::Error> {
+ let mut leaf_dirs = Vec::new();
+ let mut is_leaf_dir = true;
+ for entry in fs::read_dir(path)? {
+ let entry = entry?;
+ if entry.path().is_dir() {
+ is_leaf_dir = false;
+ let curr_sub_dir = entry.path();
+ let curr = get_leaf_dirs(&curr_sub_dir)?;
+ leaf_dirs.extend(curr);
+ }
+ }
+ if is_leaf_dir {
+ leaf_dirs.push(path.to_path_buf())
+ }
+ Ok(leaf_dirs)
}
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index ff603de..56aae9d 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -44,8 +44,9 @@ pub struct HudiDataSource {
impl HudiDataSource {
pub fn new(base_path: &str) -> Self {
- let table = HudiTable::new(base_path);
- Self { table }
+ Self {
+ table: HudiTable::new(base_path),
+ }
}
pub(crate) async fn create_physical_plan(
&self,
@@ -56,7 +57,7 @@ impl HudiDataSource {
}
fn get_record_batches(&self) ->
datafusion_common::Result<Vec<RecordBatch>> {
- match self.table.get_snapshot_file_paths() {
+ match self.table.get_latest_file_paths() {
Ok(file_paths) => {
let mut record_batches = Vec::new();
for f in file_paths {
diff --git a/python/Cargo.toml b/python/Cargo.toml
index 0a6c666..6c5702b 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -20,17 +20,13 @@ name = "hudi-python"
version = "0.1.0"
edition = "2021"
-# See more keys and their definitions at
https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
name = "hudi"
crate-type = ["cdylib"]
doc = false
[dependencies]
-# runtime
-futures = { workspace = true }
-num_cpus = { workspace = true }
-tokio = { workspace = true, features = ["rt-multi-thread"] }
+object_store = { workspace = true }
[dependencies.pyo3]
version = "0.21.2"
diff --git a/python/Makefile b/python/Makefile
index c73bacc..8017d2f 100644
--- a/python/Makefile
+++ b/python/Makefile
@@ -17,7 +17,7 @@
.DEFAULT_GOAL := help
-VENV := .venv
+VENV := venv
MATURIN_VERSION := $(shell grep 'requires =' pyproject.toml | cut -d= -f2- |
tr -d '[ "]')
PACKAGE_VERSION := $(shell grep version Cargo.toml | head -n 1 | awk '{print
$$3}' | tr -d '"' )
diff --git a/python/hudi/__init__.py b/python/hudi/__init__.py
index a67d5ea..fa70485 100644
--- a/python/hudi/__init__.py
+++ b/python/hudi/__init__.py
@@ -14,3 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
+from ._internal import __version__ as __version__
+from ._internal import rust_core_version as rust_core_version
+from .table import HudiTable as HudiTable
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index 75aaa86..8759c2e 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -15,6 +15,16 @@
# specific language governing permissions and limitations
# under the License.
+from typing import List
+
__version__: str
+
def rust_core_version() -> str: ...
+
+
+class BindingHudiTable:
+
+ def __init__(self, table_uri: str): ...
+
+ def get_latest_file_paths(self) -> List[str]: ...
diff --git a/python/hudi/_internal.pyi b/python/hudi/table.py
similarity index 66%
copy from python/hudi/_internal.pyi
copy to python/hudi/table.py
index 75aaa86..a8cfbcb 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/table.py
@@ -15,6 +15,18 @@
# specific language governing permissions and limitations
# under the License.
-__version__: str
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Union, List
-def rust_core_version() -> str: ...
+from python.hudi._internal import BindingHudiTable
+
+
+@dataclass(init=False)
+class HudiTable:
+
+ def __init__(self, table_uri: Union[str, Path, ""]):
+ self._table = BindingHudiTable(str(table_uri))
+
+ def get_latest_file_paths(self) -> List[str]:
+ return self._table.get_latest_file_paths()
diff --git a/python/src/lib.rs b/python/src/lib.rs
index ad03f49..8ac33f4 100644
--- a/python/src/lib.rs
+++ b/python/src/lib.rs
@@ -19,6 +19,35 @@
use pyo3::prelude::*;
+use hudi::table::Table;
+
+#[pyclass]
+struct BindingHudiTable {
+ _table: hudi::HudiTable,
+}
+
+#[pymethods]
+impl BindingHudiTable {
+ #[new]
+ #[pyo3(signature = (table_uri))]
+ fn new(py: Python, table_uri: &str) -> PyResult<Self> {
+ py.allow_threads(|| {
+ Ok(BindingHudiTable {
+ _table: Table::new(table_uri),
+ })
+ })
+ }
+
+ pub fn get_latest_file_paths(&self) -> PyResult<Vec<String>> {
+ match self._table.get_latest_file_paths() {
+ Ok(paths) => Ok(paths),
+ Err(_e) => {
+ panic!("Failed to retrieve the latest file paths.")
+ }
+ }
+ }
+}
+
#[pyfunction]
fn rust_core_version() -> &'static str {
hudi::crate_version()
@@ -26,6 +55,9 @@ fn rust_core_version() -> &'static str {
#[pymodule]
fn _internal(_py: Python, m: &PyModule) -> PyResult<()> {
+ m.add("__version__", env!("CARGO_PKG_VERSION"))?;
m.add_function(wrap_pyfunction!(rust_core_version, m)?)?;
+
+ m.add_class::<BindingHudiTable>()?;
Ok(())
}