This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 64b7b3b  feat: add table and timeline APIs for retrieving useful info 
(#313)
64b7b3b is described below

commit 64b7b3b69687486ba67c4235686f92700a4cc14c
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Apr 13 15:07:28 2025 -0500

    feat: add table and timeline APIs for retrieving useful info (#313)
    
    Add new Rust APIs for `Table` and `Timeline` to get useful info:
    
    - table name
    - table type
    - timezone
    - check if it's MOR
    - get avro schema
    - get commits, replace commits, delta commits, clustering commits
    
    Add the corresponding python APIs.
---
 crates/core/src/table/mod.rs         |  46 ++++++---
 crates/core/src/timeline/instant.rs  |  31 +++---
 crates/core/src/timeline/mod.rs      | 171 ++++++++++++++++++++++++++++-----
 crates/core/src/timeline/selector.rs |  37 +++----
 python/hudi/__init__.py              |  10 +-
 python/hudi/_internal.pyi            |  73 ++++++++++++++
 python/src/internal.rs               | 181 +++++++++++++++++++++++++++++++++++
 python/src/lib.rs                    |   4 +-
 8 files changed, 485 insertions(+), 68 deletions(-)

diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 2087ead..e47ff9b 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -106,7 +106,6 @@ use crate::config::read::HudiReadConfig;
 use arrow::record_batch::RecordBatch;
 use arrow_schema::{Field, Schema};
 use std::collections::{HashMap, HashSet};
-use std::str::FromStr;
 use std::sync::Arc;
 use url::Url;
 
@@ -168,14 +167,24 @@ impl Table {
             .expect(&err_msg)
     }
 
-    pub fn table_type(&self) -> TableTypeValue {
+    pub fn table_name(&self) -> String {
+        let err_msg = format!("{:?} is missing or invalid.", 
HudiTableConfig::TableName);
+        self.hudi_configs
+            .get(HudiTableConfig::TableName)
+            .expect(&err_msg)
+            .to::<String>()
+    }
+
+    pub fn table_type(&self) -> String {
         let err_msg = format!("{:?} is missing or invalid.", 
HudiTableConfig::TableType);
-        let table_type = self
-            .hudi_configs
+        self.hudi_configs
             .get(HudiTableConfig::TableType)
             .expect(&err_msg)
-            .to::<String>();
-        TableTypeValue::from_str(table_type.as_str()).expect(&err_msg)
+            .to::<String>()
+    }
+
+    pub fn is_mor(&self) -> bool {
+        self.table_type() == TableTypeValue::MergeOnRead.as_ref()
     }
 
     pub fn timezone(&self) -> String {
@@ -184,12 +193,17 @@ impl Table {
             .to::<String>()
     }
 
-    /// Get the latest [Schema] of the table.
+    /// Get the latest Avro schema string of the table.
+    pub async fn get_avro_schema(&self) -> Result<String> {
+        self.timeline.get_latest_avro_schema().await
+    }
+
+    /// Get the latest [arrow_schema::Schema] of the table.
     pub async fn get_schema(&self) -> Result<Schema> {
         self.timeline.get_latest_schema().await
     }
 
-    /// Get the latest partition [Schema] of the table
+    /// Get the latest partition [arrow_schema::Schema] of the table.
     pub async fn get_partition_schema(&self) -> Result<Schema> {
         let partition_fields: HashSet<String> = self
             .hudi_configs
@@ -209,6 +223,11 @@ impl Table {
         Ok(Schema::new(partition_fields))
     }
 
+    /// Get the [Timeline] of the table.
+    pub fn get_timeline(&self) -> &Timeline {
+        &self.timeline
+    }
+
     /// Get all the [FileSlice]s in splits from the table.
     ///
     /// # Arguments
@@ -219,7 +238,7 @@ impl Table {
         n: usize,
         filters: &[(&str, &str, &str)],
     ) -> Result<Vec<Vec<FileSlice>>> {
-        if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() {
+        if let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp_as_option() {
             let filters = from_str_tuples(filters)?;
             self.get_file_slices_splits_internal(n, timestamp, &filters)
                 .await
@@ -273,7 +292,7 @@ impl Table {
     /// # Notes
     ///     * This API is useful for implementing snapshot query.
     pub async fn get_file_slices(&self, filters: &[(&str, &str, &str)]) -> 
Result<Vec<FileSlice>> {
-        if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() {
+        if let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp_as_option() {
             let filters = from_str_tuples(filters)?;
             self.get_file_slices_internal(timestamp, &filters).await
         } else {
@@ -329,7 +348,8 @@ impl Table {
         end_timestamp: Option<&str>,
     ) -> Result<Vec<FileSlice>> {
         // If the end timestamp is not provided, use the latest commit 
timestamp.
-        let Some(end) = end_timestamp.or_else(|| 
self.timeline.get_latest_commit_timestamp())
+        let Some(end) =
+            end_timestamp.or_else(|| 
self.timeline.get_latest_commit_timestamp_as_option())
         else {
             // No latest commit timestamp means the table is empty.
             return Ok(Vec::new());
@@ -387,7 +407,7 @@ impl Table {
     /// # Arguments
     ///     * `filters` - Partition filters to apply.
     pub async fn read_snapshot(&self, filters: &[(&str, &str, &str)]) -> 
Result<Vec<RecordBatch>> {
-        if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() {
+        if let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp_as_option() {
             let filters = from_str_tuples(filters)?;
             self.read_snapshot_internal(timestamp, &filters).await
         } else {
@@ -440,7 +460,7 @@ impl Table {
     ) -> Result<Vec<RecordBatch>> {
         // If the end timestamp is not provided, use the latest commit 
timestamp.
         let Some(end_timestamp) =
-            end_timestamp.or_else(|| 
self.timeline.get_latest_commit_timestamp())
+            end_timestamp.or_else(|| 
self.timeline.get_latest_commit_timestamp_as_option())
         else {
             return Ok(Vec::new());
         };
diff --git a/crates/core/src/timeline/instant.rs 
b/crates/core/src/timeline/instant.rs
index ff9ef46..e3e191e 100644
--- a/crates/core/src/timeline/instant.rs
+++ b/crates/core/src/timeline/instant.rs
@@ -18,6 +18,7 @@
  */
 use crate::config::table::TimelineTimezoneValue;
 use crate::error::CoreError;
+use crate::metadata::HUDI_METADATA_DIR;
 use crate::storage::error::StorageError;
 use crate::Result;
 use chrono::{DateTime, Local, NaiveDateTime, TimeZone, Timelike, Utc};
@@ -45,15 +46,17 @@ impl FromStr for Action {
     }
 }
 
-impl Action {
-    pub fn as_str(&self) -> &str {
+impl AsRef<str> for Action {
+    fn as_ref(&self) -> &str {
         match self {
             Action::Commit => "commit",
             Action::DeltaCommit => "deltacommit",
             Action::ReplaceCommit => "replacecommit",
         }
     }
+}
 
+impl Action {
     pub fn is_replacecommit(&self) -> bool {
         self == &Action::ReplaceCommit
     }
@@ -84,8 +87,8 @@ impl FromStr for State {
     }
 }
 
-impl State {
-    pub fn as_str(&self) -> &str {
+impl AsRef<str> for State {
+    fn as_ref(&self) -> &str {
         match self {
             State::Requested => "requested",
             State::Inflight => "inflight",
@@ -200,21 +203,21 @@ impl Instant {
 
     pub fn file_name(&self) -> String {
         match (&self.action, &self.state) {
-            (_, State::Completed) => format!("{}.{}", self.timestamp, 
self.action.as_str()),
+            (_, State::Completed) => format!("{}.{}", self.timestamp, 
self.action.as_ref()),
             (Action::Commit, State::Inflight) => {
-                format!("{}.{}", self.timestamp, self.state.as_str())
+                format!("{}.{}", self.timestamp, self.state.as_ref())
             }
             _ => format!(
                 "{}.{}.{}",
                 self.timestamp,
-                self.action.as_str(),
-                self.state.as_str()
+                self.action.as_ref(),
+                self.state.as_ref()
             ),
         }
     }
 
     pub fn relative_path(&self) -> Result<String> {
-        let mut commit_file_path = PathBuf::from(".hoodie");
+        let mut commit_file_path = PathBuf::from(HUDI_METADATA_DIR);
         commit_file_path.push(self.file_name());
         commit_file_path
             .to_str()
@@ -237,8 +240,8 @@ mod tests {
 
     #[test]
     fn test_action_methods() {
-        assert_eq!(Action::Commit.as_str(), "commit");
-        assert_eq!(Action::ReplaceCommit.as_str(), "replacecommit");
+        assert_eq!(Action::Commit.as_ref(), "commit");
+        assert_eq!(Action::ReplaceCommit.as_ref(), "replacecommit");
 
         assert!(!Action::Commit.is_replacecommit());
         assert!(Action::ReplaceCommit.is_replacecommit());
@@ -256,9 +259,9 @@ mod tests {
 
     #[test]
     fn test_state_methods() {
-        assert_eq!(State::Requested.as_str(), "requested");
-        assert_eq!(State::Inflight.as_str(), "inflight");
-        assert_eq!(State::Completed.as_str(), "");
+        assert_eq!(State::Requested.as_ref(), "requested");
+        assert_eq!(State::Inflight.as_ref(), "inflight");
+        assert_eq!(State::Completed.as_ref(), "");
     }
 
     #[test]
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
index 67dad41..ac78903 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/timeline/mod.rs
@@ -16,14 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-mod instant;
+pub mod instant;
 pub(crate) mod selector;
 
 use crate::config::HudiConfigs;
 use crate::error::CoreError;
 use crate::file_group::builder::{build_file_groups, 
build_replaced_file_groups, FileGroupMerger};
 use crate::file_group::FileGroup;
+use crate::metadata::HUDI_METADATA_DIR;
 use crate::storage::Storage;
+use crate::timeline::instant::Action;
 use crate::timeline::selector::TimelineSelector;
 use crate::Result;
 use arrow_schema::Schema;
@@ -46,6 +48,8 @@ pub struct Timeline {
 }
 
 pub const EARLIEST_START_TIMESTAMP: &str = "19700101000000000";
+pub const DEFAULT_LOADING_ACTIONS: &[Action] =
+    &[Action::Commit, Action::DeltaCommit, Action::ReplaceCommit];
 
 impl Timeline {
     #[cfg(test)]
@@ -67,8 +71,13 @@ impl Timeline {
         storage_options: Arc<HashMap<String, String>>,
     ) -> Result<Self> {
         let storage = Storage::new(storage_options.clone(), 
hudi_configs.clone())?;
-        let selector = 
TimelineSelector::completed_commits(hudi_configs.clone())?;
-        let completed_commits = Self::load_instants(&selector, 
&storage).await?;
+        let selector = TimelineSelector::completed_actions_in_range(
+            DEFAULT_LOADING_ACTIONS,
+            hudi_configs.clone(),
+            None,
+            None,
+        )?;
+        let completed_commits = Self::load_instants(&selector, &storage, 
false).await?;
         Ok(Self {
             hudi_configs,
             storage,
@@ -76,8 +85,12 @@ impl Timeline {
         })
     }
 
-    async fn load_instants(selector: &TimelineSelector, storage: &Storage) -> 
Result<Vec<Instant>> {
-        let files = storage.list_files(Some(".hoodie")).await?;
+    async fn load_instants(
+        selector: &TimelineSelector,
+        storage: &Storage,
+        desc: bool,
+    ) -> Result<Vec<Instant>> {
+        let files = storage.list_files(Some(HUDI_METADATA_DIR)).await?;
 
         // For most cases, we load completed instants, so we can pre-allocate 
the vector with a
         // capacity of 1/3 of the total number of listed files,
@@ -103,24 +116,86 @@ impl Timeline {
         // so we can save some memory by shrinking the capacity.
         instants.shrink_to_fit();
 
-        Ok(instants)
+        if desc {
+            Ok(instants.into_iter().rev().collect())
+        } else {
+            Ok(instants)
+        }
     }
 
-    pub(crate) fn get_latest_commit_timestamp(&self) -> Option<&str> {
-        self.completed_commits
-            .iter()
-            .next_back()
-            .map(|instant| instant.timestamp.as_str())
+    /// Get the completed commit [Instant]s in the timeline.
+    ///
+    /// * For Copy-on-write tables, this includes commit instants.
+    /// * For Merge-on-read tables, this includes compaction commit instants.
+    ///
+    /// # Arguments
+    ///
+    /// * `desc` - If true, the [Instant]s are sorted in descending order.
+    pub async fn get_completed_commits(&self, desc: bool) -> 
Result<Vec<Instant>> {
+        let selector =
+            
TimelineSelector::completed_commits_in_range(self.hudi_configs.clone(), None, 
None)?;
+        Self::load_instants(&selector, &self.storage, desc).await
     }
 
-    async fn get_latest_commit_metadata(&self) -> Result<Map<String, Value>> {
-        match self.completed_commits.iter().next_back() {
-            Some(instant) => self.get_commit_metadata(instant).await,
-            None => Ok(Map::new()),
+    /// Get the completed deltacommit [Instant]s in the timeline.
+    ///
+    /// Only applicable for Merge-on-read tables. Empty vector will be 
returned for Copy-on-write tables.
+    ///
+    /// # Arguments
+    ///
+    /// * `desc` - If true, the [Instant]s are sorted in descending order.
+    pub async fn get_completed_deltacommits(&self, desc: bool) -> 
Result<Vec<Instant>> {
+        let selector = TimelineSelector::completed_deltacommits_in_range(
+            self.hudi_configs.clone(),
+            None,
+            None,
+        )?;
+        Self::load_instants(&selector, &self.storage, desc).await
+    }
+
+    /// Get the completed replacecommit [Instant]s in the timeline.
+    ///
+    /// # Arguments
+    ///
+    /// * `desc` - If true, the [Instant]s are sorted in descending order.
+    pub async fn get_completed_replacecommits(&self, desc: bool) -> 
Result<Vec<Instant>> {
+        let selector = TimelineSelector::completed_replacecommits_in_range(
+            self.hudi_configs.clone(),
+            None,
+            None,
+        )?;
+        Self::load_instants(&selector, &self.storage, desc).await
+    }
+
+    /// Get the completed clustering commit [Instant]s in the timeline.
+    ///
+    /// # Arguments
+    ///
+    /// * `desc` - If true, the [Instant]s are sorted in descending order.
+    pub async fn get_completed_clustering_commits(&self, desc: bool) -> 
Result<Vec<Instant>> {
+        let selector = TimelineSelector::completed_replacecommits_in_range(
+            self.hudi_configs.clone(),
+            None,
+            None,
+        )?;
+        let instants = Self::load_instants(&selector, &self.storage, 
desc).await?;
+        let mut clustering_instants = Vec::new();
+        for instant in instants {
+            let metadata = self.get_instant_metadata(&instant).await?;
+            let op_type = metadata
+                .get("operationType")
+                .and_then(|v| v.as_str())
+                .ok_or_else(|| {
+                    CoreError::CommitMetadata("Failed to get operation 
type".to_string())
+                })?;
+            if op_type == "cluster" {
+                clustering_instants.push(instant);
+            }
         }
+        Ok(clustering_instants)
     }
 
-    async fn get_commit_metadata(&self, instant: &Instant) -> 
Result<Map<String, Value>> {
+    async fn get_instant_metadata(&self, instant: &Instant) -> 
Result<Map<String, Value>> {
         let path = instant.relative_path()?;
         let bytes = self.storage.get_file_data(path.as_str()).await?;
 
@@ -128,7 +203,58 @@ impl Timeline {
             .map_err(|e| CoreError::Timeline(format!("Failed to get commit 
metadata: {}", e)))
     }
 
-    pub(crate) async fn get_latest_schema(&self) -> Result<Schema> {
+    /// Get the instant metadata in JSON format.
+    pub async fn get_instant_metadata_in_json(&self, instant: &Instant) -> 
Result<String> {
+        let path = instant.relative_path()?;
+        let bytes = self.storage.get_file_data(path.as_str()).await?;
+        String::from_utf8(bytes.to_vec())
+            .map_err(|e| CoreError::Timeline(format!("Failed to get commit 
metadata: {}", e)))
+    }
+
+    async fn get_latest_commit_metadata(&self) -> Result<Map<String, Value>> {
+        match self.completed_commits.iter().next_back() {
+            Some(instant) => self.get_instant_metadata(instant).await,
+            None => Ok(Map::new()),
+        }
+    }
+
+    pub(crate) fn get_latest_commit_timestamp_as_option(&self) -> Option<&str> 
{
+        self.completed_commits
+            .iter()
+            .next_back()
+            .map(|instant| instant.timestamp.as_str())
+    }
+
+    /// Get the latest commit timestamp from the [Timeline].
+    ///
+    /// Only completed commits are considered.
+    pub fn get_latest_commit_timestamp(&self) -> Result<String> {
+        self.get_latest_commit_timestamp_as_option().map_or_else(
+            || Err(CoreError::Timeline("No commits found".to_string())),
+            |t| Ok(t.to_string()),
+        )
+    }
+
+    /// Get the latest Avro schema string from the [Timeline].
+    pub async fn get_latest_avro_schema(&self) -> Result<String> {
+        let commit_metadata = self.get_latest_commit_metadata().await?;
+        commit_metadata
+            .get("extraMetadata")
+            .and_then(|v| v.as_object())
+            .and_then(|obj| {
+                obj.get("schema")
+                    .and_then(|v| v.as_str())
+                    .map(|s| s.to_string())
+            })
+            .ok_or_else(|| {
+                CoreError::CommitMetadata(
+                    "Failed to resolve the latest schema: no schema 
found".to_string(),
+                )
+            })
+    }
+
+    /// Get the latest [arrow_schema::Schema] from the [Timeline].
+    pub async fn get_latest_schema(&self) -> Result<Schema> {
         let commit_metadata = self.get_latest_commit_metadata().await?;
 
         let first_partition = commit_metadata
@@ -194,7 +320,7 @@ impl Timeline {
             Some(timestamp),
         )?;
         for instant in selector.select(self)? {
-            let commit_metadata = self.get_commit_metadata(&instant).await?;
+            let commit_metadata = self.get_instant_metadata(&instant).await?;
             file_groups.extend(build_replaced_file_groups(&commit_metadata)?);
         }
 
@@ -212,14 +338,15 @@ impl Timeline {
     ) -> Result<HashSet<FileGroup>> {
         let mut file_groups: HashSet<FileGroup> = HashSet::new();
         let mut replaced_file_groups: HashSet<FileGroup> = HashSet::new();
-        let selector = TimelineSelector::completed_commits_in_range(
+        let selector = TimelineSelector::completed_actions_in_range(
+            DEFAULT_LOADING_ACTIONS,
             self.hudi_configs.clone(),
             start_timestamp,
             end_timestamp,
         )?;
         let commits = selector.select(self)?;
         for commit in commits {
-            let commit_metadata = self.get_commit_metadata(&commit).await?;
+            let commit_metadata = self.get_instant_metadata(&commit).await?;
             file_groups.merge(build_file_groups(&commit_metadata)?)?;
 
             if commit.is_replacecommit() {
@@ -308,7 +435,7 @@ mod tests {
         let instant = Instant::from_str("20240402123035233.commit").unwrap();
 
         // Test error when reading empty commit metadata file
-        let result = timeline.get_commit_metadata(&instant).await;
+        let result = timeline.get_instant_metadata(&instant).await;
         assert!(result.is_err());
         let err = result.unwrap_err();
         assert!(matches!(err, CoreError::Timeline(_)));
@@ -317,7 +444,7 @@ mod tests {
         let instant = Instant::from_str("20240402144910683.commit").unwrap();
 
         // Test error when reading a commit metadata file with invalid JSON
-        let result = timeline.get_commit_metadata(&instant).await;
+        let result = timeline.get_instant_metadata(&instant).await;
         assert!(result.is_err());
         let err = result.unwrap_err();
         assert!(matches!(err, CoreError::Timeline(_)));
diff --git a/crates/core/src/timeline/selector.rs 
b/crates/core/src/timeline/selector.rs
index ff02910..078ff05 100644
--- a/crates/core/src/timeline/selector.rs
+++ b/crates/core/src/timeline/selector.rs
@@ -158,11 +158,8 @@ impl TimelineSelector {
             .transpose()
     }
 
-    pub fn completed_commits(hudi_configs: Arc<HudiConfigs>) -> Result<Self> {
-        Self::completed_commits_in_range(hudi_configs, None, None)
-    }
-
-    pub fn completed_commits_in_range(
+    pub fn completed_actions_in_range(
+        actions: &[Action],
         hudi_configs: Arc<HudiConfigs>,
         start: Option<&str>,
         end: Option<&str>,
@@ -175,27 +172,33 @@ impl TimelineSelector {
             start_datetime,
             end_datetime,
             states: vec![State::Completed],
-            actions: vec![Action::Commit, Action::DeltaCommit, 
Action::ReplaceCommit],
+            actions: actions.to_vec(),
             include_archived: false,
         })
     }
 
+    pub fn completed_commits_in_range(
+        hudi_configs: Arc<HudiConfigs>,
+        start: Option<&str>,
+        end: Option<&str>,
+    ) -> Result<Self> {
+        Self::completed_actions_in_range(&[Action::Commit], hudi_configs, 
start, end)
+    }
+
+    pub fn completed_deltacommits_in_range(
+        hudi_configs: Arc<HudiConfigs>,
+        start: Option<&str>,
+        end: Option<&str>,
+    ) -> Result<Self> {
+        Self::completed_actions_in_range(&[Action::DeltaCommit], hudi_configs, 
start, end)
+    }
+
     pub fn completed_replacecommits_in_range(
         hudi_configs: Arc<HudiConfigs>,
         start: Option<&str>,
         end: Option<&str>,
     ) -> Result<Self> {
-        let timezone = Self::get_timezone_from_configs(&hudi_configs);
-        let start_datetime = Self::parse_datetime(&timezone, start)?;
-        let end_datetime = Self::parse_datetime(&timezone, end)?;
-        Ok(Self {
-            timezone,
-            start_datetime,
-            end_datetime,
-            states: vec![State::Completed],
-            actions: vec![Action::ReplaceCommit],
-            include_archived: false,
-        })
+        Self::completed_actions_in_range(&[Action::ReplaceCommit], 
hudi_configs, start, end)
     }
 
     pub fn should_include_action(&self, action: &Action) -> bool {
diff --git a/python/hudi/__init__.py b/python/hudi/__init__.py
index ae6b9aa..35df3af 100644
--- a/python/hudi/__init__.py
+++ b/python/hudi/__init__.py
@@ -16,13 +16,21 @@
 #  under the License.
 
 
-from hudi._internal import HudiFileGroupReader, HudiFileSlice, HudiTable
+from hudi._internal import (
+    HudiFileGroupReader,
+    HudiFileSlice,
+    HudiInstant,
+    HudiTable,
+    HudiTimeline,
+)
 from hudi._internal import __version__ as __version__
 from hudi.table.builder import HudiTableBuilder
 
 __all__ = [
     "HudiFileGroupReader",
     "HudiFileSlice",
+    "HudiInstant",
     "HudiTable",
     "HudiTableBuilder",
+    "HudiTimeline",
 ]
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index a884265..b16c5a0 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -107,6 +107,17 @@ class HudiFileSlice:
         """
         ...
 
+@dataclass(init=False)
+class HudiInstant:
+    @property
+    def timestamp(self) -> str: ...
+    @property
+    def action(self) -> str: ...
+    @property
+    def state(self) -> str: ...
+    @property
+    def epoch_mills(self) -> int: ...
+
 @dataclass(init=False)
 class HudiTable:
     """
@@ -146,6 +157,50 @@ class HudiTable:
             Dict[str, str]: A dictionary of storage options.
         """
         ...
+    @property
+    def table_name(self) -> str:
+        """
+        Get table name.
+
+        Returns:
+            str: The name of the table.
+        """
+        ...
+    @property
+    def table_type(self) -> str:
+        """
+        Get table type.
+
+        Returns:
+            str: The type of the table.
+        """
+        ...
+    @property
+    def is_mor(self) -> str:
+        """
+        Get whether the table is an MOR table.
+
+        Returns:
+            str: True if the table is a MOR table, False otherwise.
+        """
+        ...
+    @property
+    def timezone(self) -> str:
+        """
+        Get timezone.
+
+        Returns:
+            str: The timezone of the table.
+        """
+        ...
+    def get_avro_schema(self) -> str:
+        """
+        Returns the Avro schema of the Hudi table.
+
+        Returns:
+            str: The Avro schema of the table.
+        """
+        ...
     def get_schema(self) -> "pyarrow.Schema":
         """
         Returns the schema of the Hudi table.
@@ -162,6 +217,11 @@ class HudiTable:
             pyarrow.Schema: The schema used for partitioning the table.
         """
         ...
+    def get_timeline(self) -> HudiTimeline:
+        """
+        Returns the timeline of the Hudi table.
+        """
+        ...
     def get_file_slices_splits(
         self, n: int, filters: Optional[List[Tuple[str, str, str]]]
     ) -> List[List[HudiFileSlice]]:
@@ -257,6 +317,19 @@ class HudiTable:
         """
         ...
 
+@dataclass(init=False)
+class HudiTimeline:
+    def get_completed_commits(self, desc: bool = False) -> List[HudiInstant]: 
...
+    def get_completed_deltacommits(self, desc: bool = False) -> 
List[HudiInstant]: ...
+    def get_completed_replacecommits(self, desc: bool = False) -> 
List[HudiInstant]: ...
+    def get_completed_clustering_commits(
+        self, desc: bool = False
+    ) -> List[HudiInstant]: ...
+    def get_instant_metadata_in_json(self, instant: HudiInstant) -> str: ...
+    def get_latest_commit_timestamp(self) -> str: ...
+    def get_latest_avro_schema(self) -> str: ...
+    def get_latest_schema(self) -> "pyarrow.Schema": ...
+
 def build_hudi_table(
     base_uri: str,
     hudi_options: Optional[Dict[str, str]] = None,
diff --git a/python/src/internal.rs b/python/src/internal.rs
index 90eb4af..86c50e0 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -31,6 +31,8 @@ use hudi::file_group::FileGroup;
 use hudi::storage::error::StorageError;
 use hudi::table::builder::TableBuilder;
 use hudi::table::Table;
+use hudi::timeline::instant::Instant;
+use hudi::timeline::Timeline;
 use hudi::util::StrTupleRef;
 use pyo3::exceptions::PyException;
 use pyo3::{create_exception, pyclass, pyfunction, pymethods, PyErr, PyObject, 
PyResult, Python};
@@ -199,6 +201,45 @@ impl From<&FileSlice> for HudiFileSlice {
     }
 }
 
+#[cfg(not(tarpaulin_include))]
+#[derive(Clone, Debug)]
+#[pyclass]
+pub struct HudiInstant {
+    inner: Instant,
+}
+
+#[cfg(not(tarpaulin_include))]
+#[pymethods]
+impl HudiInstant {
+    #[getter]
+    pub fn timestamp(&self) -> String {
+        self.inner.timestamp.to_string()
+    }
+
+    #[getter]
+    pub fn action(&self) -> String {
+        self.inner.action.as_ref().to_string()
+    }
+
+    #[getter]
+    pub fn state(&self) -> String {
+        self.inner.state.as_ref().to_string()
+    }
+
+    #[getter]
+    pub fn epoch_mills(&self) -> i64 {
+        self.inner.epoch_millis
+    }
+}
+
+impl From<&Instant> for HudiInstant {
+    fn from(i: &Instant) -> Self {
+        HudiInstant {
+            inner: i.to_owned(),
+        }
+    }
+}
+
 #[cfg(not(tarpaulin_include))]
 #[pyclass]
 pub struct HudiTable {
@@ -231,6 +272,35 @@ impl HudiTable {
         self.inner.storage_options()
     }
 
+    #[getter]
+    fn table_name(&self) -> String {
+        self.inner.table_name()
+    }
+
+    #[getter]
+    fn table_type(&self) -> String {
+        self.inner.table_type()
+    }
+
+    #[getter]
+    fn is_mor(&self) -> bool {
+        self.inner.is_mor()
+    }
+
+    #[getter]
+    fn timezone(&self) -> String {
+        self.inner.timezone()
+    }
+
+    fn get_avro_schema(&self, py: Python) -> PyResult<String> {
+        py.allow_threads(|| {
+            let avro_schema = rt()
+                .block_on(self.inner.get_avro_schema())
+                .map_err(PythonError::from)?;
+            Ok(avro_schema)
+        })
+    }
+
     fn get_schema(&self, py: Python) -> PyResult<PyObject> {
         rt().block_on(self.inner.get_schema())
             .map_err(PythonError::from)?
@@ -243,6 +313,13 @@ impl HudiTable {
             .to_pyarrow(py)
     }
 
+    fn get_timeline(&self, py: Python) -> HudiTimeline {
+        py.allow_threads(|| {
+            let timeline = self.inner.get_timeline();
+            HudiTimeline::from(timeline)
+        })
+    }
+
     #[pyo3(signature = (n, filters=None))]
     fn get_file_slices_splits(
         &self,
@@ -399,6 +476,110 @@ impl HudiTable {
     }
 }
 
+#[cfg(not(tarpaulin_include))]
+#[pyclass]
+pub struct HudiTimeline {
+    inner: Timeline,
+}
+
+#[cfg(not(tarpaulin_include))]
+#[pymethods]
+impl HudiTimeline {
+    #[pyo3(signature = (desc=false))]
+    pub fn get_completed_commits(&self, desc: bool, py: Python) -> 
PyResult<Vec<HudiInstant>> {
+        py.allow_threads(|| {
+            let instants = rt()
+                .block_on(self.inner.get_completed_commits(desc))
+                .map_err(PythonError::from)?;
+            Ok(instants.iter().map(HudiInstant::from).collect())
+        })
+    }
+
+    #[pyo3(signature = (desc=false))]
+    pub fn get_completed_deltacommits(&self, desc: bool, py: Python) -> 
PyResult<Vec<HudiInstant>> {
+        py.allow_threads(|| {
+            let instants = rt()
+                .block_on(self.inner.get_completed_deltacommits(desc))
+                .map_err(PythonError::from)?;
+            Ok(instants.iter().map(HudiInstant::from).collect())
+        })
+    }
+
+    #[pyo3(signature = (desc=false))]
+    pub fn get_completed_replacecommits(
+        &self,
+        desc: bool,
+        py: Python,
+    ) -> PyResult<Vec<HudiInstant>> {
+        py.allow_threads(|| {
+            let instants = rt()
+                .block_on(self.inner.get_completed_replacecommits(desc))
+                .map_err(PythonError::from)?;
+            Ok(instants.iter().map(HudiInstant::from).collect())
+        })
+    }
+
+    #[pyo3(signature = (desc=false))]
+    pub fn get_completed_clustering_commits(
+        &self,
+        desc: bool,
+        py: Python,
+    ) -> PyResult<Vec<HudiInstant>> {
+        py.allow_threads(|| {
+            let instants = rt()
+                .block_on(self.inner.get_completed_clustering_commits(desc))
+                .map_err(PythonError::from)?;
+            Ok(instants.iter().map(HudiInstant::from).collect())
+        })
+    }
+
+    pub fn get_instant_metadata_in_json(
+        &self,
+        instant: &HudiInstant,
+        py: Python,
+    ) -> PyResult<String> {
+        py.allow_threads(|| {
+            let commit_metadata = rt()
+                
.block_on(self.inner.get_instant_metadata_in_json(&instant.inner))
+                .map_err(PythonError::from)?;
+            Ok(commit_metadata)
+        })
+    }
+
+    pub fn get_latest_commit_timestamp(&self, py: Python) -> PyResult<String> {
+        py.allow_threads(|| {
+            let commit_timestamp = self
+                .inner
+                .get_latest_commit_timestamp()
+                .map_err(PythonError::from)?;
+            Ok(commit_timestamp)
+        })
+    }
+
+    pub fn get_latest_avro_schema(&self, py: Python) -> PyResult<String> {
+        py.allow_threads(|| {
+            let schema = rt()
+                .block_on(self.inner.get_latest_avro_schema())
+                .map_err(PythonError::from)?;
+            Ok(schema)
+        })
+    }
+
+    pub fn get_latest_schema(&self, py: Python) -> PyResult<PyObject> {
+        rt().block_on(self.inner.get_latest_schema())
+            .map_err(PythonError::from)?
+            .to_pyarrow(py)
+    }
+}
+
+impl From<&Timeline> for HudiTimeline {
+    fn from(t: &Timeline) -> Self {
+        HudiTimeline {
+            inner: t.to_owned(),
+        }
+    }
+}
+
 #[cfg(not(tarpaulin_include))]
 #[pyfunction]
 #[pyo3(signature = (base_uri, hudi_options=None, storage_options=None, 
options=None))]
diff --git a/python/src/lib.rs b/python/src/lib.rs
index afd3e59..ff822b9 100644
--- a/python/src/lib.rs
+++ b/python/src/lib.rs
@@ -25,10 +25,12 @@ mod internal;
 fn _internal(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
     m.add("__version__", env!("CARGO_PKG_VERSION"))?;
 
-    use internal::{HudiFileGroupReader, HudiFileSlice, HudiTable};
+    use internal::{HudiFileGroupReader, HudiFileSlice, HudiInstant, HudiTable, 
HudiTimeline};
     m.add_class::<HudiFileGroupReader>()?;
     m.add_class::<HudiFileSlice>()?;
+    m.add_class::<HudiInstant>()?;
     m.add_class::<HudiTable>()?;
+    m.add_class::<HudiTimeline>()?;
 
     use internal::build_hudi_table;
     m.add_function(wrap_pyfunction!(build_hudi_table, m)?)?;

Reply via email to