This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 88f9f03  feat: align python table APIs with rust (#267)
88f9f03 is described below

commit 88f9f0368ddca9e5d49a6f63a6215efc1887f95d
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Jan 26 21:09:15 2025 -0600

    feat: align python table APIs with rust (#267)
---
 .github/workflows/ci.yml                           |  2 +-
 crates/core/src/file_group/reader.rs               |  2 +-
 crates/core/src/table/mod.rs                       | 88 ++++++++++++++++------
 demo/{app => }/.gitignore                          |  0
 demo/compose.yaml                                  |  2 +-
 demo/{run_app.sh => run_demo.sh}                   |  8 +-
 demo/{app/rust => sql-datafusion}/Cargo.toml       |  4 +-
 demo/{app/.gitignore => sql-datafusion/run.sh}     |  7 +-
 demo/{app/rust => sql-datafusion}/src/main.rs      | 55 +++++++-------
 demo/{app/.gitignore => table-api-python/run.sh}   |  7 +-
 .../python => table-api-python}/src/__init__.py    |  0
 demo/{app/python => table-api-python}/src/main.py  |  9 ++-
 demo/{app/rust => table-api-rust}/Cargo.toml       |  7 +-
 demo/{app/.gitignore => table-api-rust/run.sh}     |  7 +-
 demo/{app/rust => table-api-rust}/src/main.rs      | 43 ++++++-----
 python/hudi/_internal.pyi                          | 58 ++++++++++----
 python/src/internal.rs                             | 64 +++++++++++++++-
 python/tests/test_table_read.py                    | 21 +++++-
 18 files changed, 271 insertions(+), 113 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 6fc487c..80861c0 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -120,7 +120,7 @@ jobs:
       - name: Integration tests
         run: |
           cd demo
-          ./run_app.sh
+          ./run_demo.sh
 
   publish-coverage:
     name: Publish coverage reports to codecov.io
diff --git a/crates/core/src/file_group/reader.rs 
b/crates/core/src/file_group/reader.rs
index 2a0610e..113a27a 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -121,7 +121,7 @@ impl FileGroupReader {
             .map_err(|e| ReadFileSliceError(format!("Failed to filter records: 
{e:?}")))
     }
 
-    pub async fn read_file_slice(
+    pub(crate) async fn read_file_slice(
         &self,
         file_slice: &FileSlice,
         base_file_only: bool,
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 0a163b8..ee8114d 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -140,6 +140,27 @@ impl Table {
             .await
     }
 
+    pub fn hudi_options(&self) -> HashMap<String, String> {
+        self.hudi_configs.as_options()
+    }
+
+    pub fn storage_options(&self) -> HashMap<String, String> {
+        self.storage_options.as_ref().clone()
+    }
+
+    #[cfg(feature = "datafusion")]
+    pub fn register_storage(
+        &self,
+        runtime_env: Arc<datafusion::execution::runtime_env::RuntimeEnv>,
+    ) {
+        self.timeline
+            .storage
+            .register_object_store(runtime_env.clone());
+        self.file_system_view
+            .storage
+            .register_object_store(runtime_env.clone());
+    }
+
     pub fn base_url(&self) -> Url {
         let err_msg = format!("{:?} is missing or invalid.", 
HudiTableConfig::BasePath);
         self.hudi_configs
@@ -165,27 +186,6 @@ impl Table {
             .to::<String>()
     }
 
-    pub fn hudi_options(&self) -> HashMap<String, String> {
-        self.hudi_configs.as_options()
-    }
-
-    pub fn storage_options(&self) -> HashMap<String, String> {
-        self.storage_options.as_ref().clone()
-    }
-
-    #[cfg(feature = "datafusion")]
-    pub fn register_storage(
-        &self,
-        runtime_env: Arc<datafusion::execution::runtime_env::RuntimeEnv>,
-    ) {
-        self.timeline
-            .storage
-            .register_object_store(runtime_env.clone());
-        self.file_system_view
-            .storage
-            .register_object_store(runtime_env.clone());
-    }
-
     /// Get the latest [Schema] of the table.
     pub async fn get_schema(&self) -> Result<Schema> {
         self.timeline.get_latest_schema().await
@@ -222,7 +222,36 @@ impl Table {
         n: usize,
         filters: &[(&str, &str, &str)],
     ) -> Result<Vec<Vec<FileSlice>>> {
-        let file_slices = self.get_file_slices(filters).await?;
+        let filters = from_str_tuples(filters)?;
+        if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) {
+            self.get_file_slices_splits_internal(n, 
timestamp.to::<String>().as_str(), &filters)
+                .await
+        } else if let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp() {
+            self.get_file_slices_splits_internal(n, timestamp, &filters)
+                .await
+        } else {
+            Ok(Vec::new())
+        }
+    }
+
+    pub async fn get_file_slices_splits_as_of(
+        &self,
+        n: usize,
+        timestamp: &str,
+        filters: &[(&str, &str, &str)],
+    ) -> Result<Vec<Vec<FileSlice>>> {
+        let filters = from_str_tuples(filters)?;
+        self.get_file_slices_splits_internal(n, timestamp, &filters)
+            .await
+    }
+
+    async fn get_file_slices_splits_internal(
+        &self,
+        n: usize,
+        timestamp: &str,
+        filters: &[Filter],
+    ) -> Result<Vec<Vec<FileSlice>>> {
+        let file_slices = self.get_file_slices_internal(timestamp, 
filters).await?;
         if file_slices.is_empty() {
             return Ok(Vec::new());
         }
@@ -282,7 +311,7 @@ impl Table {
         )
     }
 
-    pub fn create_file_group_reader_with_filters(
+    fn create_file_group_reader_with_filters(
         &self,
         filters: &[(&str, &str, &str)],
         schema: &Schema,
@@ -797,6 +826,19 @@ mod tests {
         assert_eq!(file_slices_splits[1].len(), 1);
     }
 
+    #[tokio::test]
+    async fn hudi_table_get_file_slices_splits_as_of() {
+        let base_url = 
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor();
+
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
+        let file_slices_splits = hudi_table
+            .get_file_slices_splits_as_of(2, "20250121000702475", &[])
+            .await
+            .unwrap();
+        assert_eq!(file_slices_splits.len(), 1);
+        assert_eq!(file_slices_splits[0].len(), 1);
+    }
+
     #[tokio::test]
     async fn hudi_table_get_file_slices_as_of_timestamps() {
         let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
diff --git a/demo/app/.gitignore b/demo/.gitignore
similarity index 100%
copy from demo/app/.gitignore
copy to demo/.gitignore
diff --git a/demo/compose.yaml b/demo/compose.yaml
index f56cd64..ab299c9 100644
--- a/demo/compose.yaml
+++ b/demo/compose.yaml
@@ -64,5 +64,5 @@ services:
       AWS_REGION: us-east-1 # minio default
 
 networks:
-  app_network:
+  demo_network:
     driver: bridge
diff --git a/demo/run_app.sh b/demo/run_demo.sh
similarity index 86%
rename from demo/run_app.sh
rename to demo/run_demo.sh
index 839eeba..cec677b 100755
--- a/demo/run_app.sh
+++ b/demo/run_demo.sh
@@ -34,11 +34,11 @@ if [ $attempt -eq $max_attempts ]; then
   exit 1
 fi
 
-# install dependencies and run the app
+# install dependencies and run the demo apps
 docker compose exec -T runner /bin/bash -c "
   cd /opt/hudi-rs/python && \
   make setup develop && \
-  cd /opt/hudi-rs/demo/app && \
-  cargo run --manifest-path=rust/Cargo.toml && \
-  python -m python.src.main
+  cd /opt/hudi-rs/demo/sql-datafusion && ./run.sh &&\
+  cd /opt/hudi-rs/demo/table-api-python && ./run.sh && \
+  cd /opt/hudi-rs/demo/table-api-rust && ./run.sh
   "
diff --git a/demo/app/rust/Cargo.toml b/demo/sql-datafusion/Cargo.toml
similarity index 91%
copy from demo/app/rust/Cargo.toml
copy to demo/sql-datafusion/Cargo.toml
index 349a2d2..9cce925 100644
--- a/demo/app/rust/Cargo.toml
+++ b/demo/sql-datafusion/Cargo.toml
@@ -19,11 +19,11 @@
 # keep this empty such that it won't be linked to the repo workspace
 
 [package]
-name = "app"
+name = "demo-sql-datafusion"
 version = "0.1.0"
 edition = "2021"
 
 [dependencies]
 tokio = "^1"
 datafusion = "^43"
-hudi = { path = "../../../crates/hudi", features = ["datafusion"] }
+hudi = { path = "../../crates/hudi", features = ["datafusion"] }
diff --git a/demo/app/.gitignore b/demo/sql-datafusion/run.sh
old mode 100644
new mode 100755
similarity index 96%
copy from demo/app/.gitignore
copy to demo/sql-datafusion/run.sh
index afcc356..241a60b
--- a/demo/app/.gitignore
+++ b/demo/sql-datafusion/run.sh
@@ -1,3 +1,5 @@
+#!/bin/bash
+#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -14,7 +16,6 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+#
 
-venv/
-Cargo.lock
-**/target/
+cargo run
diff --git a/demo/app/rust/src/main.rs b/demo/sql-datafusion/src/main.rs
similarity index 61%
copy from demo/app/rust/src/main.rs
copy to demo/sql-datafusion/src/main.rs
index 1bfbfd3..c9623ba 100644
--- a/demo/app/rust/src/main.rs
+++ b/demo/sql-datafusion/src/main.rs
@@ -29,37 +29,38 @@ async fn main() -> Result<()> {
     let hudi = 
HudiDataSource::new("s3://hudi-demo/cow/v6_complexkeygen_hivestyle").await?;
     ctx.register_table("cow_v6_table", Arc::new(hudi))?;
     let df: DataFrame = ctx.sql("SELECT * from cow_v6_table").await?;
-    assert!(
+    assert_eq!(
         df.schema()
             .columns()
             .iter()
             .map(|c| c.name())
-            .collect::<Vec<_>>()
-            == vec![
-                "_hoodie_commit_time",
-                "_hoodie_commit_seqno",
-                "_hoodie_record_key",
-                "_hoodie_partition_path",
-                "_hoodie_file_name",
-                "id",
-                "name",
-                "isActive",
-                "intField",
-                "longField",
-                "floatField",
-                "doubleField",
-                "decimalField",
-                "dateField",
-                "timestampField",
-                "binaryField",
-                "arrayField",
-                "mapField",
-                "structField",
-                "byteField",
-                "shortField",
-            ]
+            .collect::<Vec<_>>(),
+        vec![
+            "_hoodie_commit_time",
+            "_hoodie_commit_seqno",
+            "_hoodie_record_key",
+            "_hoodie_partition_path",
+            "_hoodie_file_name",
+            "id",
+            "name",
+            "isActive",
+            "intField",
+            "longField",
+            "floatField",
+            "doubleField",
+            "decimalField",
+            "dateField",
+            "timestampField",
+            "binaryField",
+            "arrayField",
+            "mapField",
+            "structField",
+            "byteField",
+            "shortField",
+        ]
     );
-    assert!(df.count().await.unwrap() == 4);
-    println!("Rust API: read snapshot successfully!");
+    assert_eq!(df.count().await?, 4);
+
+    println!("SQL (DataFusion): read snapshot successfully!");
     Ok(())
 }
diff --git a/demo/app/.gitignore b/demo/table-api-python/run.sh
old mode 100644
new mode 100755
similarity index 95%
copy from demo/app/.gitignore
copy to demo/table-api-python/run.sh
index afcc356..e80a057
--- a/demo/app/.gitignore
+++ b/demo/table-api-python/run.sh
@@ -1,3 +1,5 @@
+#!/bin/bash
+#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -14,7 +16,6 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+#
 
-venv/
-Cargo.lock
-**/target/
+python -m src.main
diff --git a/demo/app/python/src/__init__.py 
b/demo/table-api-python/src/__init__.py
similarity index 100%
rename from demo/app/python/src/__init__.py
rename to demo/table-api-python/src/__init__.py
diff --git a/demo/app/python/src/main.py b/demo/table-api-python/src/main.py
similarity index 91%
rename from demo/app/python/src/main.py
rename to demo/table-api-python/src/main.py
index cc87068..826584f 100644
--- a/demo/app/python/src/main.py
+++ b/demo/table-api-python/src/main.py
@@ -15,17 +15,18 @@
 #  specific language governing permissions and limitations
 #  under the License.
 
-from hudi import HudiTableBuilder
 import pyarrow as pa
 
+from hudi import HudiTableBuilder
+
 for url in [
     "s3://hudi-demo/cow/v6_complexkeygen_hivestyle",
     "s3://hudi-demo/mor/v6_complexkeygen_hivestyle",
 ]:
     hudi_table = HudiTableBuilder.from_base_uri(url).build()
-    records = hudi_table.read_snapshot()
+    batches = hudi_table.read_snapshot()
 
-    arrow_table = pa.Table.from_batches(records)
+    arrow_table = pa.Table.from_batches(batches)
     assert arrow_table.schema.names == [
         "_hoodie_commit_time",
         "_hoodie_commit_seqno",
@@ -51,4 +52,4 @@ for url in [
     ]
     assert arrow_table.num_rows == 4
 
-print("Python API: read snapshot successfully!")
+print("Table API (Python): read snapshot successfully!")
diff --git a/demo/app/rust/Cargo.toml b/demo/table-api-rust/Cargo.toml
similarity index 88%
rename from demo/app/rust/Cargo.toml
rename to demo/table-api-rust/Cargo.toml
index 349a2d2..06fd623 100644
--- a/demo/app/rust/Cargo.toml
+++ b/demo/table-api-rust/Cargo.toml
@@ -19,11 +19,12 @@
 # keep this empty such that it won't be linked to the repo workspace
 
 [package]
-name = "app"
+name = "demo-table-api-rust"
 version = "0.1.0"
 edition = "2021"
 
 [dependencies]
 tokio = "^1"
-datafusion = "^43"
-hudi = { path = "../../../crates/hudi", features = ["datafusion"] }
+arrow = { version = "= 53.3.0", features = ["pyarrow"] }
+
+hudi = { path = "../../crates/hudi" }
diff --git a/demo/app/.gitignore b/demo/table-api-rust/run.sh
old mode 100644
new mode 100755
similarity index 96%
rename from demo/app/.gitignore
rename to demo/table-api-rust/run.sh
index afcc356..241a60b
--- a/demo/app/.gitignore
+++ b/demo/table-api-rust/run.sh
@@ -1,3 +1,5 @@
+#!/bin/bash
+#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -14,7 +16,6 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+#
 
-venv/
-Cargo.lock
-**/target/
+cargo run
diff --git a/demo/app/rust/src/main.rs b/demo/table-api-rust/src/main.rs
similarity index 66%
rename from demo/app/rust/src/main.rs
rename to demo/table-api-rust/src/main.rs
index 1bfbfd3..e9209dc 100644
--- a/demo/app/rust/src/main.rs
+++ b/demo/table-api-rust/src/main.rs
@@ -17,25 +17,28 @@
  * under the License.
  */
 
-use std::sync::Arc;
-
-use datafusion::error::Result;
-use datafusion::prelude::{DataFrame, SessionContext};
-use hudi::HudiDataSource;
+use arrow::compute::concat_batches;
+use hudi::error::Result;
+use hudi::table::builder::TableBuilder as HudiTableBuilder;
 
 #[tokio::main]
 async fn main() -> Result<()> {
-    let ctx = SessionContext::new();
-    let hudi = 
HudiDataSource::new("s3://hudi-demo/cow/v6_complexkeygen_hivestyle").await?;
-    ctx.register_table("cow_v6_table", Arc::new(hudi))?;
-    let df: DataFrame = ctx.sql("SELECT * from cow_v6_table").await?;
-    assert!(
-        df.schema()
-            .columns()
-            .iter()
-            .map(|c| c.name())
-            .collect::<Vec<_>>()
-            == vec![
+    for url in [
+        "s3://hudi-demo/cow/v6_complexkeygen_hivestyle",
+        "s3://hudi-demo/mor/v6_complexkeygen_hivestyle",
+    ] {
+        let hudi_table = HudiTableBuilder::from_base_uri(url).build().await?;
+        let batches = hudi_table.read_snapshot(&[]).await?;
+
+        let batch = concat_batches(&batches[0].schema(), &batches)?;
+        assert_eq!(
+            batch
+                .schema()
+                .fields()
+                .iter()
+                .map(|f| f.name())
+                .collect::<Vec<_>>(),
+            vec![
                 "_hoodie_commit_time",
                 "_hoodie_commit_seqno",
                 "_hoodie_record_key",
@@ -58,8 +61,10 @@ async fn main() -> Result<()> {
                 "byteField",
                 "shortField",
             ]
-    );
-    assert!(df.count().await.unwrap() == 4);
-    println!("Rust API: read snapshot successfully!");
+        );
+        assert_eq!(batch.num_rows(), 4);
+    }
+
+    println!("Table API (Rust): read snapshot successfully!");
     Ok(())
 }
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index eb4b110..d6f6623 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -109,36 +109,36 @@ class HudiTable:
             options (Optional[Dict[str, str]]): Additional configuration 
options (optional).
         """
         ...
-    def get_schema(self) -> "pyarrow.Schema":
+    def hudi_options(self) -> Dict[str, str]:
         """
-        Returns the schema of the Hudi table.
+        Get hudi options for table.
 
         Returns:
-            pyarrow.Schema: The schema of the table.
+            Dict[str, str]: A dictionary of hudi options.
         """
         ...
-    def get_partition_schema(self) -> "pyarrow.Schema":
+    def storage_options(self) -> Dict[str, str]:
         """
-        Returns the partition schema of the Hudi table.
+        Get storage options set for table instance.
 
         Returns:
-            pyarrow.Schema: The schema used for partitioning the table.
+            Dict[str, str]: A dictionary of storage options.
         """
         ...
-    def hudi_options(self) -> Dict[str, str]:
+    def get_schema(self) -> "pyarrow.Schema":
         """
-        Get hudi options for table.
+        Returns the schema of the Hudi table.
 
         Returns:
-            Dict[str, str]: A dictionary of hudi options.
+            pyarrow.Schema: The schema of the table.
         """
         ...
-    def storage_options(self) -> Dict[str, str]:
+    def get_partition_schema(self) -> "pyarrow.Schema":
         """
-        Get storage options set for table instance.
+        Returns the partition schema of the Hudi table.
 
         Returns:
-            Dict[str, str]: A dictionary of storage options.
+            pyarrow.Schema: The schema used for partitioning the table.
         """
         ...
     def get_file_slices_splits(
@@ -155,6 +155,13 @@ class HudiTable:
             List[List[HudiFileSlice]]: A list of file slice groups, each group 
being a list of HudiFileSlice objects.
         """
         ...
+    def get_file_slices_splits_as_of(
+        self, n: int, timestamp: str, filters: Optional[List[Tuple[str, str, 
str]]]
+    ) -> List[List[HudiFileSlice]]:
+        """
+        Retrieves all file slices in the Hudi table as of a timestamp in 'n' 
splits, optionally filtered by given filters.
+        """
+        ...
     def get_file_slices(
         self, filters: Optional[List[Tuple[str, str, str]]]
     ) -> List[HudiFileSlice]:
@@ -168,6 +175,13 @@ class HudiTable:
             List[HudiFileSlice]: A list of file slices matching the filters.
         """
         ...
+    def get_file_slices_as_of(
+        self, timestamp: str, filters: Optional[List[Tuple[str, str, str]]]
+    ) -> List[HudiFileSlice]:
+        """
+        Retrieves all file slices in the Hudi table as of a timestamp, 
optionally filtered by the provided filters.
+        """
+        ...
     def create_file_group_reader(self) -> HudiFileGroupReader:
         """
         Creates a HudiFileGroupReader for reading records from file groups in 
the Hudi table.
@@ -189,9 +203,27 @@ class HudiTable:
             List[pyarrow.RecordBatch]: A list of record batches from the 
snapshot of the table.
         """
         ...
+    def read_snapshot_as_of(
+        self, timestamp: str, filters: Optional[List[Tuple[str, str, str]]]
+    ) -> List["pyarrow.RecordBatch"]:
+        """
+        Reads the snapshot of the Hudi table as of a timestamp, optionally 
filtered by the provided filters.
+        """
+        ...
     def read_incremental_records(
         self, start_timestamp: str, end_timestamp: Optional[str]
-    ) -> List["pyarrow.RecordBatch"]: ...
+    ) -> List["pyarrow.RecordBatch"]:
+        """
+        Reads incremental records from the Hudi table between the given 
timestamps.
+
+        Parameters:
+            start_timestamp (str): The start timestamp (exclusive).
+            end_timestamp (Optional[str]): The end timestamp (inclusive).
+
+        Returns:
+            List[pyarrow.RecordBatch]: A list of record batches containing 
incremental records.
+        """
+        ...
 
 def build_hudi_table(
     base_uri: str,
diff --git a/python/src/internal.rs b/python/src/internal.rs
index 1bfb953..b09ea12 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -17,12 +17,11 @@
  * under the License.
  */
 
+use arrow::pyarrow::ToPyArrow;
 use std::collections::HashMap;
 use std::convert::From;
 use std::path::PathBuf;
 use std::sync::OnceLock;
-
-use arrow::pyarrow::ToPyArrow;
 use tokio::runtime::Runtime;
 
 use hudi::error::CoreError;
@@ -208,6 +207,30 @@ impl HudiTable {
         })
     }
 
+    #[pyo3(signature = (n, timestamp, filters=None))]
+    fn get_file_slices_splits_as_of(
+        &self,
+        n: usize,
+        timestamp: &str,
+        filters: Option<Vec<(String, String, String)>>,
+        py: Python,
+    ) -> PyResult<Vec<Vec<HudiFileSlice>>> {
+        let filters = filters.unwrap_or_default();
+
+        py.allow_threads(|| {
+            let file_slices = rt()
+                .block_on(
+                    self.inner
+                        .get_file_slices_splits_as_of(n, timestamp, 
&filters.as_strs()),
+                )
+                .map_err(PythonError::from)?;
+            Ok(file_slices
+                .iter()
+                .map(|inner_vec| 
inner_vec.iter().map(convert_file_slice).collect())
+                .collect())
+        })
+    }
+
     #[pyo3(signature = (filters=None))]
     fn get_file_slices(
         &self,
@@ -224,6 +247,26 @@ impl HudiTable {
         })
     }
 
+    #[pyo3(signature = (timestamp, filters=None))]
+    fn get_file_slices_as_of(
+        &self,
+        timestamp: &str,
+        filters: Option<Vec<(String, String, String)>>,
+        py: Python,
+    ) -> PyResult<Vec<HudiFileSlice>> {
+        let filters = filters.unwrap_or_default();
+
+        py.allow_threads(|| {
+            let file_slices = rt()
+                .block_on(
+                    self.inner
+                        .get_file_slices_as_of(timestamp, &filters.as_strs()),
+                )
+                .map_err(PythonError::from)?;
+            Ok(file_slices.iter().map(convert_file_slice).collect())
+        })
+    }
+
     fn create_file_group_reader(&self) -> PyResult<HudiFileGroupReader> {
         let fg_reader = self.inner.create_file_group_reader();
         Ok(HudiFileGroupReader { inner: fg_reader })
@@ -242,6 +285,23 @@ impl HudiTable {
             .to_pyarrow(py)
     }
 
+    #[pyo3(signature = (timestamp, filters=None))]
+    fn read_snapshot_as_of(
+        &self,
+        timestamp: &str,
+        filters: Option<Vec<(String, String, String)>>,
+        py: Python,
+    ) -> PyResult<PyObject> {
+        let filters = filters.unwrap_or_default();
+
+        rt().block_on(
+            self.inner
+                .read_snapshot_as_of(timestamp, &filters.as_strs()),
+        )
+        .map_err(PythonError::from)?
+        .to_pyarrow(py)
+    }
+
     #[pyo3(signature = (start_timestamp, end_timestamp=None))]
     fn read_incremental_records(
         &self,
diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py
index c35be59..c2931aa 100644
--- a/python/tests/test_table_read.py
+++ b/python/tests/test_table_read.py
@@ -15,6 +15,8 @@
 #  specific language governing permissions and limitations
 #  under the License.
 
+from itertools import chain
+
 import pyarrow as pa
 
 from hudi import HudiTable
@@ -153,13 +155,24 @@ def test_read_table_for_partition(get_sample_table):
     ]
 
 
-def test_read_table_as_of_timestamp(get_sample_table):
+def test_table_apis_as_of_timestamp(get_sample_table):
     table_path = get_sample_table
-    table = HudiTable(
-        table_path, options={"hoodie.read.as.of.timestamp": 
"20240402123035233"}
+    table = HudiTable(table_path)
+    timestamp = "20240402123035233"
+
+    file_slices_gen = table.get_file_slices_splits_as_of(2, timestamp)
+    file_slices_base_paths = set(
+        f.base_file_relative_path() for f in 
chain.from_iterable(file_slices_gen)
     )
+    assert file_slices_base_paths == {
+        
"san_francisco/780b8586-3ad0-48ef-a6a1-d2217845ce4a-0_0-8-0_20240402123035233.parquet",
+        
"san_francisco/d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0_1-9-0_20240402123035233.parquet",
+        
"san_francisco/5a226868-2934-4f84-a16f-55124630c68d-0_2-10-0_20240402123035233.parquet",
+        
"sao_paulo/ee915c68-d7f8-44f6-9759-e691add290d8-0_3-11-0_20240402123035233.parquet",
+        
"chennai/68d3c349-f621-4cd8-9e8b-c6dd8eb20d08-0_4-12-0_20240402123035233.parquet",
+    }
 
-    batches = table.read_snapshot()
+    batches = table.read_snapshot_as_of(timestamp)
     t = pa.Table.from_batches(batches).select([0, 5, 6, 9]).sort_by("ts")
     assert t.to_pylist() == [
         {

Reply via email to