This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 64b7b3b feat: add table and timeline APIs for retrieving useful info
(#313)
64b7b3b is described below
commit 64b7b3b69687486ba67c4235686f92700a4cc14c
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Apr 13 15:07:28 2025 -0500
feat: add table and timeline APIs for retrieving useful info (#313)
Add new Rust APIs for `Table` and `Timeline` to get useful info:
- table name
- table type
- timezone
- check if it's MOR
- get avro schema
- get commits, replace commits, delta commits, clustering commits
Add the corresponding python APIs.
---
crates/core/src/table/mod.rs | 46 ++++++---
crates/core/src/timeline/instant.rs | 31 +++---
crates/core/src/timeline/mod.rs | 171 ++++++++++++++++++++++++++++-----
crates/core/src/timeline/selector.rs | 37 +++----
python/hudi/__init__.py | 10 +-
python/hudi/_internal.pyi | 73 ++++++++++++++
python/src/internal.rs | 181 +++++++++++++++++++++++++++++++++++
python/src/lib.rs | 4 +-
8 files changed, 485 insertions(+), 68 deletions(-)
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 2087ead..e47ff9b 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -106,7 +106,6 @@ use crate::config::read::HudiReadConfig;
use arrow::record_batch::RecordBatch;
use arrow_schema::{Field, Schema};
use std::collections::{HashMap, HashSet};
-use std::str::FromStr;
use std::sync::Arc;
use url::Url;
@@ -168,14 +167,24 @@ impl Table {
.expect(&err_msg)
}
- pub fn table_type(&self) -> TableTypeValue {
+ pub fn table_name(&self) -> String {
+ let err_msg = format!("{:?} is missing or invalid.",
HudiTableConfig::TableName);
+ self.hudi_configs
+ .get(HudiTableConfig::TableName)
+ .expect(&err_msg)
+ .to::<String>()
+ }
+
+ pub fn table_type(&self) -> String {
let err_msg = format!("{:?} is missing or invalid.",
HudiTableConfig::TableType);
- let table_type = self
- .hudi_configs
+ self.hudi_configs
.get(HudiTableConfig::TableType)
.expect(&err_msg)
- .to::<String>();
- TableTypeValue::from_str(table_type.as_str()).expect(&err_msg)
+ .to::<String>()
+ }
+
+ pub fn is_mor(&self) -> bool {
+ self.table_type() == TableTypeValue::MergeOnRead.as_ref()
}
pub fn timezone(&self) -> String {
@@ -184,12 +193,17 @@ impl Table {
.to::<String>()
}
- /// Get the latest [Schema] of the table.
+ /// Get the latest Avro schema string of the table.
+ pub async fn get_avro_schema(&self) -> Result<String> {
+ self.timeline.get_latest_avro_schema().await
+ }
+
+ /// Get the latest [arrow_schema::Schema] of the table.
pub async fn get_schema(&self) -> Result<Schema> {
self.timeline.get_latest_schema().await
}
- /// Get the latest partition [Schema] of the table
+ /// Get the latest partition [arrow_schema::Schema] of the table.
pub async fn get_partition_schema(&self) -> Result<Schema> {
let partition_fields: HashSet<String> = self
.hudi_configs
@@ -209,6 +223,11 @@ impl Table {
Ok(Schema::new(partition_fields))
}
+ /// Get the [Timeline] of the table.
+ pub fn get_timeline(&self) -> &Timeline {
+ &self.timeline
+ }
+
/// Get all the [FileSlice]s in splits from the table.
///
/// # Arguments
@@ -219,7 +238,7 @@ impl Table {
n: usize,
filters: &[(&str, &str, &str)],
) -> Result<Vec<Vec<FileSlice>>> {
- if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() {
+ if let Some(timestamp) =
self.timeline.get_latest_commit_timestamp_as_option() {
let filters = from_str_tuples(filters)?;
self.get_file_slices_splits_internal(n, timestamp, &filters)
.await
@@ -273,7 +292,7 @@ impl Table {
/// # Notes
/// * This API is useful for implementing snapshot query.
pub async fn get_file_slices(&self, filters: &[(&str, &str, &str)]) ->
Result<Vec<FileSlice>> {
- if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() {
+ if let Some(timestamp) =
self.timeline.get_latest_commit_timestamp_as_option() {
let filters = from_str_tuples(filters)?;
self.get_file_slices_internal(timestamp, &filters).await
} else {
@@ -329,7 +348,8 @@ impl Table {
end_timestamp: Option<&str>,
) -> Result<Vec<FileSlice>> {
// If the end timestamp is not provided, use the latest commit
timestamp.
- let Some(end) = end_timestamp.or_else(||
self.timeline.get_latest_commit_timestamp())
+ let Some(end) =
+ end_timestamp.or_else(||
self.timeline.get_latest_commit_timestamp_as_option())
else {
// No latest commit timestamp means the table is empty.
return Ok(Vec::new());
@@ -387,7 +407,7 @@ impl Table {
/// # Arguments
/// * `filters` - Partition filters to apply.
pub async fn read_snapshot(&self, filters: &[(&str, &str, &str)]) ->
Result<Vec<RecordBatch>> {
- if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() {
+ if let Some(timestamp) =
self.timeline.get_latest_commit_timestamp_as_option() {
let filters = from_str_tuples(filters)?;
self.read_snapshot_internal(timestamp, &filters).await
} else {
@@ -440,7 +460,7 @@ impl Table {
) -> Result<Vec<RecordBatch>> {
// If the end timestamp is not provided, use the latest commit
timestamp.
let Some(end_timestamp) =
- end_timestamp.or_else(||
self.timeline.get_latest_commit_timestamp())
+ end_timestamp.or_else(||
self.timeline.get_latest_commit_timestamp_as_option())
else {
return Ok(Vec::new());
};
diff --git a/crates/core/src/timeline/instant.rs
b/crates/core/src/timeline/instant.rs
index ff9ef46..e3e191e 100644
--- a/crates/core/src/timeline/instant.rs
+++ b/crates/core/src/timeline/instant.rs
@@ -18,6 +18,7 @@
*/
use crate::config::table::TimelineTimezoneValue;
use crate::error::CoreError;
+use crate::metadata::HUDI_METADATA_DIR;
use crate::storage::error::StorageError;
use crate::Result;
use chrono::{DateTime, Local, NaiveDateTime, TimeZone, Timelike, Utc};
@@ -45,15 +46,17 @@ impl FromStr for Action {
}
}
-impl Action {
- pub fn as_str(&self) -> &str {
+impl AsRef<str> for Action {
+ fn as_ref(&self) -> &str {
match self {
Action::Commit => "commit",
Action::DeltaCommit => "deltacommit",
Action::ReplaceCommit => "replacecommit",
}
}
+}
+impl Action {
pub fn is_replacecommit(&self) -> bool {
self == &Action::ReplaceCommit
}
@@ -84,8 +87,8 @@ impl FromStr for State {
}
}
-impl State {
- pub fn as_str(&self) -> &str {
+impl AsRef<str> for State {
+ fn as_ref(&self) -> &str {
match self {
State::Requested => "requested",
State::Inflight => "inflight",
@@ -200,21 +203,21 @@ impl Instant {
pub fn file_name(&self) -> String {
match (&self.action, &self.state) {
- (_, State::Completed) => format!("{}.{}", self.timestamp,
self.action.as_str()),
+ (_, State::Completed) => format!("{}.{}", self.timestamp,
self.action.as_ref()),
(Action::Commit, State::Inflight) => {
- format!("{}.{}", self.timestamp, self.state.as_str())
+ format!("{}.{}", self.timestamp, self.state.as_ref())
}
_ => format!(
"{}.{}.{}",
self.timestamp,
- self.action.as_str(),
- self.state.as_str()
+ self.action.as_ref(),
+ self.state.as_ref()
),
}
}
pub fn relative_path(&self) -> Result<String> {
- let mut commit_file_path = PathBuf::from(".hoodie");
+ let mut commit_file_path = PathBuf::from(HUDI_METADATA_DIR);
commit_file_path.push(self.file_name());
commit_file_path
.to_str()
@@ -237,8 +240,8 @@ mod tests {
#[test]
fn test_action_methods() {
- assert_eq!(Action::Commit.as_str(), "commit");
- assert_eq!(Action::ReplaceCommit.as_str(), "replacecommit");
+ assert_eq!(Action::Commit.as_ref(), "commit");
+ assert_eq!(Action::ReplaceCommit.as_ref(), "replacecommit");
assert!(!Action::Commit.is_replacecommit());
assert!(Action::ReplaceCommit.is_replacecommit());
@@ -256,9 +259,9 @@ mod tests {
#[test]
fn test_state_methods() {
- assert_eq!(State::Requested.as_str(), "requested");
- assert_eq!(State::Inflight.as_str(), "inflight");
- assert_eq!(State::Completed.as_str(), "");
+ assert_eq!(State::Requested.as_ref(), "requested");
+ assert_eq!(State::Inflight.as_ref(), "inflight");
+ assert_eq!(State::Completed.as_ref(), "");
}
#[test]
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
index 67dad41..ac78903 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/timeline/mod.rs
@@ -16,14 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-mod instant;
+pub mod instant;
pub(crate) mod selector;
use crate::config::HudiConfigs;
use crate::error::CoreError;
use crate::file_group::builder::{build_file_groups,
build_replaced_file_groups, FileGroupMerger};
use crate::file_group::FileGroup;
+use crate::metadata::HUDI_METADATA_DIR;
use crate::storage::Storage;
+use crate::timeline::instant::Action;
use crate::timeline::selector::TimelineSelector;
use crate::Result;
use arrow_schema::Schema;
@@ -46,6 +48,8 @@ pub struct Timeline {
}
pub const EARLIEST_START_TIMESTAMP: &str = "19700101000000000";
+pub const DEFAULT_LOADING_ACTIONS: &[Action] =
+ &[Action::Commit, Action::DeltaCommit, Action::ReplaceCommit];
impl Timeline {
#[cfg(test)]
@@ -67,8 +71,13 @@ impl Timeline {
storage_options: Arc<HashMap<String, String>>,
) -> Result<Self> {
let storage = Storage::new(storage_options.clone(),
hudi_configs.clone())?;
- let selector =
TimelineSelector::completed_commits(hudi_configs.clone())?;
- let completed_commits = Self::load_instants(&selector,
&storage).await?;
+ let selector = TimelineSelector::completed_actions_in_range(
+ DEFAULT_LOADING_ACTIONS,
+ hudi_configs.clone(),
+ None,
+ None,
+ )?;
+ let completed_commits = Self::load_instants(&selector, &storage,
false).await?;
Ok(Self {
hudi_configs,
storage,
@@ -76,8 +85,12 @@ impl Timeline {
})
}
- async fn load_instants(selector: &TimelineSelector, storage: &Storage) ->
Result<Vec<Instant>> {
- let files = storage.list_files(Some(".hoodie")).await?;
+ async fn load_instants(
+ selector: &TimelineSelector,
+ storage: &Storage,
+ desc: bool,
+ ) -> Result<Vec<Instant>> {
+ let files = storage.list_files(Some(HUDI_METADATA_DIR)).await?;
// For most cases, we load completed instants, so we can pre-allocate
the vector with a
// capacity of 1/3 of the total number of listed files,
@@ -103,24 +116,86 @@ impl Timeline {
// so we can save some memory by shrinking the capacity.
instants.shrink_to_fit();
- Ok(instants)
+ if desc {
+ Ok(instants.into_iter().rev().collect())
+ } else {
+ Ok(instants)
+ }
}
- pub(crate) fn get_latest_commit_timestamp(&self) -> Option<&str> {
- self.completed_commits
- .iter()
- .next_back()
- .map(|instant| instant.timestamp.as_str())
+ /// Get the completed commit [Instant]s in the timeline.
+ ///
+ /// * For Copy-on-write tables, this includes commit instants.
+ /// * For Merge-on-read tables, this includes compaction commit instants.
+ ///
+ /// # Arguments
+ ///
+ /// * `desc` - If true, the [Instant]s are sorted in descending order.
+ pub async fn get_completed_commits(&self, desc: bool) ->
Result<Vec<Instant>> {
+ let selector =
+
TimelineSelector::completed_commits_in_range(self.hudi_configs.clone(), None,
None)?;
+ Self::load_instants(&selector, &self.storage, desc).await
}
- async fn get_latest_commit_metadata(&self) -> Result<Map<String, Value>> {
- match self.completed_commits.iter().next_back() {
- Some(instant) => self.get_commit_metadata(instant).await,
- None => Ok(Map::new()),
+ /// Get the completed deltacommit [Instant]s in the timeline.
+ ///
+ /// Only applicable for Merge-on-read tables. Empty vector will be
returned for Copy-on-write tables.
+ ///
+ /// # Arguments
+ ///
+ /// * `desc` - If true, the [Instant]s are sorted in descending order.
+ pub async fn get_completed_deltacommits(&self, desc: bool) ->
Result<Vec<Instant>> {
+ let selector = TimelineSelector::completed_deltacommits_in_range(
+ self.hudi_configs.clone(),
+ None,
+ None,
+ )?;
+ Self::load_instants(&selector, &self.storage, desc).await
+ }
+
+ /// Get the completed replacecommit [Instant]s in the timeline.
+ ///
+ /// # Arguments
+ ///
+ /// * `desc` - If true, the [Instant]s are sorted in descending order.
+ pub async fn get_completed_replacecommits(&self, desc: bool) ->
Result<Vec<Instant>> {
+ let selector = TimelineSelector::completed_replacecommits_in_range(
+ self.hudi_configs.clone(),
+ None,
+ None,
+ )?;
+ Self::load_instants(&selector, &self.storage, desc).await
+ }
+
+ /// Get the completed clustering commit [Instant]s in the timeline.
+ ///
+ /// # Arguments
+ ///
+ /// * `desc` - If true, the [Instant]s are sorted in descending order.
+ pub async fn get_completed_clustering_commits(&self, desc: bool) ->
Result<Vec<Instant>> {
+ let selector = TimelineSelector::completed_replacecommits_in_range(
+ self.hudi_configs.clone(),
+ None,
+ None,
+ )?;
+ let instants = Self::load_instants(&selector, &self.storage,
desc).await?;
+ let mut clustering_instants = Vec::new();
+ for instant in instants {
+ let metadata = self.get_instant_metadata(&instant).await?;
+ let op_type = metadata
+ .get("operationType")
+ .and_then(|v| v.as_str())
+ .ok_or_else(|| {
+ CoreError::CommitMetadata("Failed to get operation
type".to_string())
+ })?;
+ if op_type == "cluster" {
+ clustering_instants.push(instant);
+ }
}
+ Ok(clustering_instants)
}
- async fn get_commit_metadata(&self, instant: &Instant) ->
Result<Map<String, Value>> {
+ async fn get_instant_metadata(&self, instant: &Instant) ->
Result<Map<String, Value>> {
let path = instant.relative_path()?;
let bytes = self.storage.get_file_data(path.as_str()).await?;
@@ -128,7 +203,58 @@ impl Timeline {
.map_err(|e| CoreError::Timeline(format!("Failed to get commit
metadata: {}", e)))
}
- pub(crate) async fn get_latest_schema(&self) -> Result<Schema> {
+ /// Get the instant metadata in JSON format.
+ pub async fn get_instant_metadata_in_json(&self, instant: &Instant) ->
Result<String> {
+ let path = instant.relative_path()?;
+ let bytes = self.storage.get_file_data(path.as_str()).await?;
+ String::from_utf8(bytes.to_vec())
+ .map_err(|e| CoreError::Timeline(format!("Failed to get commit
metadata: {}", e)))
+ }
+
+ async fn get_latest_commit_metadata(&self) -> Result<Map<String, Value>> {
+ match self.completed_commits.iter().next_back() {
+ Some(instant) => self.get_instant_metadata(instant).await,
+ None => Ok(Map::new()),
+ }
+ }
+
+ pub(crate) fn get_latest_commit_timestamp_as_option(&self) -> Option<&str>
{
+ self.completed_commits
+ .iter()
+ .next_back()
+ .map(|instant| instant.timestamp.as_str())
+ }
+
+ /// Get the latest commit timestamp from the [Timeline].
+ ///
+ /// Only completed commits are considered.
+ pub fn get_latest_commit_timestamp(&self) -> Result<String> {
+ self.get_latest_commit_timestamp_as_option().map_or_else(
+ || Err(CoreError::Timeline("No commits found".to_string())),
+ |t| Ok(t.to_string()),
+ )
+ }
+
+ /// Get the latest Avro schema string from the [Timeline].
+ pub async fn get_latest_avro_schema(&self) -> Result<String> {
+ let commit_metadata = self.get_latest_commit_metadata().await?;
+ commit_metadata
+ .get("extraMetadata")
+ .and_then(|v| v.as_object())
+ .and_then(|obj| {
+ obj.get("schema")
+ .and_then(|v| v.as_str())
+ .map(|s| s.to_string())
+ })
+ .ok_or_else(|| {
+ CoreError::CommitMetadata(
+ "Failed to resolve the latest schema: no schema
found".to_string(),
+ )
+ })
+ }
+
+ /// Get the latest [arrow_schema::Schema] from the [Timeline].
+ pub async fn get_latest_schema(&self) -> Result<Schema> {
let commit_metadata = self.get_latest_commit_metadata().await?;
let first_partition = commit_metadata
@@ -194,7 +320,7 @@ impl Timeline {
Some(timestamp),
)?;
for instant in selector.select(self)? {
- let commit_metadata = self.get_commit_metadata(&instant).await?;
+ let commit_metadata = self.get_instant_metadata(&instant).await?;
file_groups.extend(build_replaced_file_groups(&commit_metadata)?);
}
@@ -212,14 +338,15 @@ impl Timeline {
) -> Result<HashSet<FileGroup>> {
let mut file_groups: HashSet<FileGroup> = HashSet::new();
let mut replaced_file_groups: HashSet<FileGroup> = HashSet::new();
- let selector = TimelineSelector::completed_commits_in_range(
+ let selector = TimelineSelector::completed_actions_in_range(
+ DEFAULT_LOADING_ACTIONS,
self.hudi_configs.clone(),
start_timestamp,
end_timestamp,
)?;
let commits = selector.select(self)?;
for commit in commits {
- let commit_metadata = self.get_commit_metadata(&commit).await?;
+ let commit_metadata = self.get_instant_metadata(&commit).await?;
file_groups.merge(build_file_groups(&commit_metadata)?)?;
if commit.is_replacecommit() {
@@ -308,7 +435,7 @@ mod tests {
let instant = Instant::from_str("20240402123035233.commit").unwrap();
// Test error when reading empty commit metadata file
- let result = timeline.get_commit_metadata(&instant).await;
+ let result = timeline.get_instant_metadata(&instant).await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(matches!(err, CoreError::Timeline(_)));
@@ -317,7 +444,7 @@ mod tests {
let instant = Instant::from_str("20240402144910683.commit").unwrap();
// Test error when reading a commit metadata file with invalid JSON
- let result = timeline.get_commit_metadata(&instant).await;
+ let result = timeline.get_instant_metadata(&instant).await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(matches!(err, CoreError::Timeline(_)));
diff --git a/crates/core/src/timeline/selector.rs
b/crates/core/src/timeline/selector.rs
index ff02910..078ff05 100644
--- a/crates/core/src/timeline/selector.rs
+++ b/crates/core/src/timeline/selector.rs
@@ -158,11 +158,8 @@ impl TimelineSelector {
.transpose()
}
- pub fn completed_commits(hudi_configs: Arc<HudiConfigs>) -> Result<Self> {
- Self::completed_commits_in_range(hudi_configs, None, None)
- }
-
- pub fn completed_commits_in_range(
+ pub fn completed_actions_in_range(
+ actions: &[Action],
hudi_configs: Arc<HudiConfigs>,
start: Option<&str>,
end: Option<&str>,
@@ -175,27 +172,33 @@ impl TimelineSelector {
start_datetime,
end_datetime,
states: vec![State::Completed],
- actions: vec![Action::Commit, Action::DeltaCommit,
Action::ReplaceCommit],
+ actions: actions.to_vec(),
include_archived: false,
})
}
+ pub fn completed_commits_in_range(
+ hudi_configs: Arc<HudiConfigs>,
+ start: Option<&str>,
+ end: Option<&str>,
+ ) -> Result<Self> {
+ Self::completed_actions_in_range(&[Action::Commit], hudi_configs,
start, end)
+ }
+
+ pub fn completed_deltacommits_in_range(
+ hudi_configs: Arc<HudiConfigs>,
+ start: Option<&str>,
+ end: Option<&str>,
+ ) -> Result<Self> {
+ Self::completed_actions_in_range(&[Action::DeltaCommit], hudi_configs,
start, end)
+ }
+
pub fn completed_replacecommits_in_range(
hudi_configs: Arc<HudiConfigs>,
start: Option<&str>,
end: Option<&str>,
) -> Result<Self> {
- let timezone = Self::get_timezone_from_configs(&hudi_configs);
- let start_datetime = Self::parse_datetime(&timezone, start)?;
- let end_datetime = Self::parse_datetime(&timezone, end)?;
- Ok(Self {
- timezone,
- start_datetime,
- end_datetime,
- states: vec![State::Completed],
- actions: vec![Action::ReplaceCommit],
- include_archived: false,
- })
+ Self::completed_actions_in_range(&[Action::ReplaceCommit],
hudi_configs, start, end)
}
pub fn should_include_action(&self, action: &Action) -> bool {
diff --git a/python/hudi/__init__.py b/python/hudi/__init__.py
index ae6b9aa..35df3af 100644
--- a/python/hudi/__init__.py
+++ b/python/hudi/__init__.py
@@ -16,13 +16,21 @@
# under the License.
-from hudi._internal import HudiFileGroupReader, HudiFileSlice, HudiTable
+from hudi._internal import (
+ HudiFileGroupReader,
+ HudiFileSlice,
+ HudiInstant,
+ HudiTable,
+ HudiTimeline,
+)
from hudi._internal import __version__ as __version__
from hudi.table.builder import HudiTableBuilder
__all__ = [
"HudiFileGroupReader",
"HudiFileSlice",
+ "HudiInstant",
"HudiTable",
"HudiTableBuilder",
+ "HudiTimeline",
]
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index a884265..b16c5a0 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -107,6 +107,17 @@ class HudiFileSlice:
"""
...
+@dataclass(init=False)
+class HudiInstant:
+ @property
+ def timestamp(self) -> str: ...
+ @property
+ def action(self) -> str: ...
+ @property
+ def state(self) -> str: ...
+ @property
+ def epoch_mills(self) -> int: ...
+
@dataclass(init=False)
class HudiTable:
"""
@@ -146,6 +157,50 @@ class HudiTable:
Dict[str, str]: A dictionary of storage options.
"""
...
+ @property
+ def table_name(self) -> str:
+ """
+ Get table name.
+
+ Returns:
+ str: The name of the table.
+ """
+ ...
+ @property
+ def table_type(self) -> str:
+ """
+ Get table type.
+
+ Returns:
+ str: The type of the table.
+ """
+ ...
+ @property
+ def is_mor(self) -> str:
+ """
+ Get whether the table is an MOR table.
+
+ Returns:
+ str: True if the table is a MOR table, False otherwise.
+ """
+ ...
+ @property
+ def timezone(self) -> str:
+ """
+ Get timezone.
+
+ Returns:
+ str: The timezone of the table.
+ """
+ ...
+ def get_avro_schema(self) -> str:
+ """
+ Returns the Avro schema of the Hudi table.
+
+ Returns:
+ str: The Avro schema of the table.
+ """
+ ...
def get_schema(self) -> "pyarrow.Schema":
"""
Returns the schema of the Hudi table.
@@ -162,6 +217,11 @@ class HudiTable:
pyarrow.Schema: The schema used for partitioning the table.
"""
...
+ def get_timeline(self) -> HudiTimeline:
+ """
+ Returns the timeline of the Hudi table.
+ """
+ ...
def get_file_slices_splits(
self, n: int, filters: Optional[List[Tuple[str, str, str]]]
) -> List[List[HudiFileSlice]]:
@@ -257,6 +317,19 @@ class HudiTable:
"""
...
+@dataclass(init=False)
+class HudiTimeline:
+ def get_completed_commits(self, desc: bool = False) -> List[HudiInstant]:
...
+ def get_completed_deltacommits(self, desc: bool = False) ->
List[HudiInstant]: ...
+ def get_completed_replacecommits(self, desc: bool = False) ->
List[HudiInstant]: ...
+ def get_completed_clustering_commits(
+ self, desc: bool = False
+ ) -> List[HudiInstant]: ...
+ def get_instant_metadata_in_json(self, instant: HudiInstant) -> str: ...
+ def get_latest_commit_timestamp(self) -> str: ...
+ def get_latest_avro_schema(self) -> str: ...
+ def get_latest_schema(self) -> "pyarrow.Schema": ...
+
def build_hudi_table(
base_uri: str,
hudi_options: Optional[Dict[str, str]] = None,
diff --git a/python/src/internal.rs b/python/src/internal.rs
index 90eb4af..86c50e0 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -31,6 +31,8 @@ use hudi::file_group::FileGroup;
use hudi::storage::error::StorageError;
use hudi::table::builder::TableBuilder;
use hudi::table::Table;
+use hudi::timeline::instant::Instant;
+use hudi::timeline::Timeline;
use hudi::util::StrTupleRef;
use pyo3::exceptions::PyException;
use pyo3::{create_exception, pyclass, pyfunction, pymethods, PyErr, PyObject,
PyResult, Python};
@@ -199,6 +201,45 @@ impl From<&FileSlice> for HudiFileSlice {
}
}
+#[cfg(not(tarpaulin_include))]
+#[derive(Clone, Debug)]
+#[pyclass]
+pub struct HudiInstant {
+ inner: Instant,
+}
+
+#[cfg(not(tarpaulin_include))]
+#[pymethods]
+impl HudiInstant {
+ #[getter]
+ pub fn timestamp(&self) -> String {
+ self.inner.timestamp.to_string()
+ }
+
+ #[getter]
+ pub fn action(&self) -> String {
+ self.inner.action.as_ref().to_string()
+ }
+
+ #[getter]
+ pub fn state(&self) -> String {
+ self.inner.state.as_ref().to_string()
+ }
+
+ #[getter]
+ pub fn epoch_mills(&self) -> i64 {
+ self.inner.epoch_millis
+ }
+}
+
+impl From<&Instant> for HudiInstant {
+ fn from(i: &Instant) -> Self {
+ HudiInstant {
+ inner: i.to_owned(),
+ }
+ }
+}
+
#[cfg(not(tarpaulin_include))]
#[pyclass]
pub struct HudiTable {
@@ -231,6 +272,35 @@ impl HudiTable {
self.inner.storage_options()
}
+ #[getter]
+ fn table_name(&self) -> String {
+ self.inner.table_name()
+ }
+
+ #[getter]
+ fn table_type(&self) -> String {
+ self.inner.table_type()
+ }
+
+ #[getter]
+ fn is_mor(&self) -> bool {
+ self.inner.is_mor()
+ }
+
+ #[getter]
+ fn timezone(&self) -> String {
+ self.inner.timezone()
+ }
+
+ fn get_avro_schema(&self, py: Python) -> PyResult<String> {
+ py.allow_threads(|| {
+ let avro_schema = rt()
+ .block_on(self.inner.get_avro_schema())
+ .map_err(PythonError::from)?;
+ Ok(avro_schema)
+ })
+ }
+
fn get_schema(&self, py: Python) -> PyResult<PyObject> {
rt().block_on(self.inner.get_schema())
.map_err(PythonError::from)?
@@ -243,6 +313,13 @@ impl HudiTable {
.to_pyarrow(py)
}
+ fn get_timeline(&self, py: Python) -> HudiTimeline {
+ py.allow_threads(|| {
+ let timeline = self.inner.get_timeline();
+ HudiTimeline::from(timeline)
+ })
+ }
+
#[pyo3(signature = (n, filters=None))]
fn get_file_slices_splits(
&self,
@@ -399,6 +476,110 @@ impl HudiTable {
}
}
+#[cfg(not(tarpaulin_include))]
+#[pyclass]
+pub struct HudiTimeline {
+ inner: Timeline,
+}
+
+#[cfg(not(tarpaulin_include))]
+#[pymethods]
+impl HudiTimeline {
+ #[pyo3(signature = (desc=false))]
+ pub fn get_completed_commits(&self, desc: bool, py: Python) ->
PyResult<Vec<HudiInstant>> {
+ py.allow_threads(|| {
+ let instants = rt()
+ .block_on(self.inner.get_completed_commits(desc))
+ .map_err(PythonError::from)?;
+ Ok(instants.iter().map(HudiInstant::from).collect())
+ })
+ }
+
+ #[pyo3(signature = (desc=false))]
+ pub fn get_completed_deltacommits(&self, desc: bool, py: Python) ->
PyResult<Vec<HudiInstant>> {
+ py.allow_threads(|| {
+ let instants = rt()
+ .block_on(self.inner.get_completed_deltacommits(desc))
+ .map_err(PythonError::from)?;
+ Ok(instants.iter().map(HudiInstant::from).collect())
+ })
+ }
+
+ #[pyo3(signature = (desc=false))]
+ pub fn get_completed_replacecommits(
+ &self,
+ desc: bool,
+ py: Python,
+ ) -> PyResult<Vec<HudiInstant>> {
+ py.allow_threads(|| {
+ let instants = rt()
+ .block_on(self.inner.get_completed_replacecommits(desc))
+ .map_err(PythonError::from)?;
+ Ok(instants.iter().map(HudiInstant::from).collect())
+ })
+ }
+
+ #[pyo3(signature = (desc=false))]
+ pub fn get_completed_clustering_commits(
+ &self,
+ desc: bool,
+ py: Python,
+ ) -> PyResult<Vec<HudiInstant>> {
+ py.allow_threads(|| {
+ let instants = rt()
+ .block_on(self.inner.get_completed_clustering_commits(desc))
+ .map_err(PythonError::from)?;
+ Ok(instants.iter().map(HudiInstant::from).collect())
+ })
+ }
+
+ pub fn get_instant_metadata_in_json(
+ &self,
+ instant: &HudiInstant,
+ py: Python,
+ ) -> PyResult<String> {
+ py.allow_threads(|| {
+ let commit_metadata = rt()
+
.block_on(self.inner.get_instant_metadata_in_json(&instant.inner))
+ .map_err(PythonError::from)?;
+ Ok(commit_metadata)
+ })
+ }
+
+ pub fn get_latest_commit_timestamp(&self, py: Python) -> PyResult<String> {
+ py.allow_threads(|| {
+ let commit_timestamp = self
+ .inner
+ .get_latest_commit_timestamp()
+ .map_err(PythonError::from)?;
+ Ok(commit_timestamp)
+ })
+ }
+
+ pub fn get_latest_avro_schema(&self, py: Python) -> PyResult<String> {
+ py.allow_threads(|| {
+ let schema = rt()
+ .block_on(self.inner.get_latest_avro_schema())
+ .map_err(PythonError::from)?;
+ Ok(schema)
+ })
+ }
+
+ pub fn get_latest_schema(&self, py: Python) -> PyResult<PyObject> {
+ rt().block_on(self.inner.get_latest_schema())
+ .map_err(PythonError::from)?
+ .to_pyarrow(py)
+ }
+}
+
+impl From<&Timeline> for HudiTimeline {
+ fn from(t: &Timeline) -> Self {
+ HudiTimeline {
+ inner: t.to_owned(),
+ }
+ }
+}
+
#[cfg(not(tarpaulin_include))]
#[pyfunction]
#[pyo3(signature = (base_uri, hudi_options=None, storage_options=None,
options=None))]
diff --git a/python/src/lib.rs b/python/src/lib.rs
index afd3e59..ff822b9 100644
--- a/python/src/lib.rs
+++ b/python/src/lib.rs
@@ -25,10 +25,12 @@ mod internal;
fn _internal(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add("__version__", env!("CARGO_PKG_VERSION"))?;
- use internal::{HudiFileGroupReader, HudiFileSlice, HudiTable};
+ use internal::{HudiFileGroupReader, HudiFileSlice, HudiInstant, HudiTable,
HudiTimeline};
m.add_class::<HudiFileGroupReader>()?;
m.add_class::<HudiFileSlice>()?;
+ m.add_class::<HudiInstant>()?;
m.add_class::<HudiTable>()?;
+ m.add_class::<HudiTimeline>()?;
use internal::build_hudi_table;
m.add_function(wrap_pyfunction!(build_hudi_table, m)?)?;