This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 88f9f03 feat: align python table APIs with rust (#267)
88f9f03 is described below
commit 88f9f0368ddca9e5d49a6f63a6215efc1887f95d
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Jan 26 21:09:15 2025 -0600
feat: align python table APIs with rust (#267)
---
.github/workflows/ci.yml | 2 +-
crates/core/src/file_group/reader.rs | 2 +-
crates/core/src/table/mod.rs | 88 ++++++++++++++++------
demo/{app => }/.gitignore | 0
demo/compose.yaml | 2 +-
demo/{run_app.sh => run_demo.sh} | 8 +-
demo/{app/rust => sql-datafusion}/Cargo.toml | 4 +-
demo/{app/.gitignore => sql-datafusion/run.sh} | 7 +-
demo/{app/rust => sql-datafusion}/src/main.rs | 55 +++++++-------
demo/{app/.gitignore => table-api-python/run.sh} | 7 +-
.../python => table-api-python}/src/__init__.py | 0
demo/{app/python => table-api-python}/src/main.py | 9 ++-
demo/{app/rust => table-api-rust}/Cargo.toml | 7 +-
demo/{app/.gitignore => table-api-rust/run.sh} | 7 +-
demo/{app/rust => table-api-rust}/src/main.rs | 43 ++++++-----
python/hudi/_internal.pyi | 58 ++++++++++----
python/src/internal.rs | 64 +++++++++++++++-
python/tests/test_table_read.py | 21 +++++-
18 files changed, 271 insertions(+), 113 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 6fc487c..80861c0 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -120,7 +120,7 @@ jobs:
- name: Integration tests
run: |
cd demo
- ./run_app.sh
+ ./run_demo.sh
publish-coverage:
name: Publish coverage reports to codecov.io
diff --git a/crates/core/src/file_group/reader.rs
b/crates/core/src/file_group/reader.rs
index 2a0610e..113a27a 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -121,7 +121,7 @@ impl FileGroupReader {
.map_err(|e| ReadFileSliceError(format!("Failed to filter records:
{e:?}")))
}
- pub async fn read_file_slice(
+ pub(crate) async fn read_file_slice(
&self,
file_slice: &FileSlice,
base_file_only: bool,
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 0a163b8..ee8114d 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -140,6 +140,27 @@ impl Table {
.await
}
+ pub fn hudi_options(&self) -> HashMap<String, String> {
+ self.hudi_configs.as_options()
+ }
+
+ pub fn storage_options(&self) -> HashMap<String, String> {
+ self.storage_options.as_ref().clone()
+ }
+
+ #[cfg(feature = "datafusion")]
+ pub fn register_storage(
+ &self,
+ runtime_env: Arc<datafusion::execution::runtime_env::RuntimeEnv>,
+ ) {
+ self.timeline
+ .storage
+ .register_object_store(runtime_env.clone());
+ self.file_system_view
+ .storage
+ .register_object_store(runtime_env.clone());
+ }
+
pub fn base_url(&self) -> Url {
let err_msg = format!("{:?} is missing or invalid.",
HudiTableConfig::BasePath);
self.hudi_configs
@@ -165,27 +186,6 @@ impl Table {
.to::<String>()
}
- pub fn hudi_options(&self) -> HashMap<String, String> {
- self.hudi_configs.as_options()
- }
-
- pub fn storage_options(&self) -> HashMap<String, String> {
- self.storage_options.as_ref().clone()
- }
-
- #[cfg(feature = "datafusion")]
- pub fn register_storage(
- &self,
- runtime_env: Arc<datafusion::execution::runtime_env::RuntimeEnv>,
- ) {
- self.timeline
- .storage
- .register_object_store(runtime_env.clone());
- self.file_system_view
- .storage
- .register_object_store(runtime_env.clone());
- }
-
/// Get the latest [Schema] of the table.
pub async fn get_schema(&self) -> Result<Schema> {
self.timeline.get_latest_schema().await
@@ -222,7 +222,36 @@ impl Table {
n: usize,
filters: &[(&str, &str, &str)],
) -> Result<Vec<Vec<FileSlice>>> {
- let file_slices = self.get_file_slices(filters).await?;
+ let filters = from_str_tuples(filters)?;
+ if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) {
+ self.get_file_slices_splits_internal(n,
timestamp.to::<String>().as_str(), &filters)
+ .await
+ } else if let Some(timestamp) =
self.timeline.get_latest_commit_timestamp() {
+ self.get_file_slices_splits_internal(n, timestamp, &filters)
+ .await
+ } else {
+ Ok(Vec::new())
+ }
+ }
+
+ pub async fn get_file_slices_splits_as_of(
+ &self,
+ n: usize,
+ timestamp: &str,
+ filters: &[(&str, &str, &str)],
+ ) -> Result<Vec<Vec<FileSlice>>> {
+ let filters = from_str_tuples(filters)?;
+ self.get_file_slices_splits_internal(n, timestamp, &filters)
+ .await
+ }
+
+ async fn get_file_slices_splits_internal(
+ &self,
+ n: usize,
+ timestamp: &str,
+ filters: &[Filter],
+ ) -> Result<Vec<Vec<FileSlice>>> {
+ let file_slices = self.get_file_slices_internal(timestamp,
filters).await?;
if file_slices.is_empty() {
return Ok(Vec::new());
}
@@ -282,7 +311,7 @@ impl Table {
)
}
- pub fn create_file_group_reader_with_filters(
+ fn create_file_group_reader_with_filters(
&self,
filters: &[(&str, &str, &str)],
schema: &Schema,
@@ -797,6 +826,19 @@ mod tests {
assert_eq!(file_slices_splits[1].len(), 1);
}
+ #[tokio::test]
+ async fn hudi_table_get_file_slices_splits_as_of() {
+ let base_url =
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor();
+
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
+ let file_slices_splits = hudi_table
+ .get_file_slices_splits_as_of(2, "20250121000702475", &[])
+ .await
+ .unwrap();
+ assert_eq!(file_slices_splits.len(), 1);
+ assert_eq!(file_slices_splits[0].len(), 1);
+ }
+
#[tokio::test]
async fn hudi_table_get_file_slices_as_of_timestamps() {
let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
diff --git a/demo/app/.gitignore b/demo/.gitignore
similarity index 100%
copy from demo/app/.gitignore
copy to demo/.gitignore
diff --git a/demo/compose.yaml b/demo/compose.yaml
index f56cd64..ab299c9 100644
--- a/demo/compose.yaml
+++ b/demo/compose.yaml
@@ -64,5 +64,5 @@ services:
AWS_REGION: us-east-1 # minio default
networks:
- app_network:
+ demo_network:
driver: bridge
diff --git a/demo/run_app.sh b/demo/run_demo.sh
similarity index 86%
rename from demo/run_app.sh
rename to demo/run_demo.sh
index 839eeba..cec677b 100755
--- a/demo/run_app.sh
+++ b/demo/run_demo.sh
@@ -34,11 +34,11 @@ if [ $attempt -eq $max_attempts ]; then
exit 1
fi
-# install dependencies and run the app
+# install dependencies and run the demo apps
docker compose exec -T runner /bin/bash -c "
cd /opt/hudi-rs/python && \
make setup develop && \
- cd /opt/hudi-rs/demo/app && \
- cargo run --manifest-path=rust/Cargo.toml && \
- python -m python.src.main
+ cd /opt/hudi-rs/demo/sql-datafusion && ./run.sh &&\
+ cd /opt/hudi-rs/demo/table-api-python && ./run.sh && \
+ cd /opt/hudi-rs/demo/table-api-rust && ./run.sh
"
diff --git a/demo/app/rust/Cargo.toml b/demo/sql-datafusion/Cargo.toml
similarity index 91%
copy from demo/app/rust/Cargo.toml
copy to demo/sql-datafusion/Cargo.toml
index 349a2d2..9cce925 100644
--- a/demo/app/rust/Cargo.toml
+++ b/demo/sql-datafusion/Cargo.toml
@@ -19,11 +19,11 @@
# keep this empty such that it won't be linked to the repo workspace
[package]
-name = "app"
+name = "demo-sql-datafusion"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = "^1"
datafusion = "^43"
-hudi = { path = "../../../crates/hudi", features = ["datafusion"] }
+hudi = { path = "../../crates/hudi", features = ["datafusion"] }
diff --git a/demo/app/.gitignore b/demo/sql-datafusion/run.sh
old mode 100644
new mode 100755
similarity index 96%
copy from demo/app/.gitignore
copy to demo/sql-datafusion/run.sh
index afcc356..241a60b
--- a/demo/app/.gitignore
+++ b/demo/sql-datafusion/run.sh
@@ -1,3 +1,5 @@
+#!/bin/bash
+#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -14,7 +16,6 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+#
-venv/
-Cargo.lock
-**/target/
+cargo run
diff --git a/demo/app/rust/src/main.rs b/demo/sql-datafusion/src/main.rs
similarity index 61%
copy from demo/app/rust/src/main.rs
copy to demo/sql-datafusion/src/main.rs
index 1bfbfd3..c9623ba 100644
--- a/demo/app/rust/src/main.rs
+++ b/demo/sql-datafusion/src/main.rs
@@ -29,37 +29,38 @@ async fn main() -> Result<()> {
let hudi =
HudiDataSource::new("s3://hudi-demo/cow/v6_complexkeygen_hivestyle").await?;
ctx.register_table("cow_v6_table", Arc::new(hudi))?;
let df: DataFrame = ctx.sql("SELECT * from cow_v6_table").await?;
- assert!(
+ assert_eq!(
df.schema()
.columns()
.iter()
.map(|c| c.name())
- .collect::<Vec<_>>()
- == vec![
- "_hoodie_commit_time",
- "_hoodie_commit_seqno",
- "_hoodie_record_key",
- "_hoodie_partition_path",
- "_hoodie_file_name",
- "id",
- "name",
- "isActive",
- "intField",
- "longField",
- "floatField",
- "doubleField",
- "decimalField",
- "dateField",
- "timestampField",
- "binaryField",
- "arrayField",
- "mapField",
- "structField",
- "byteField",
- "shortField",
- ]
+ .collect::<Vec<_>>(),
+ vec![
+ "_hoodie_commit_time",
+ "_hoodie_commit_seqno",
+ "_hoodie_record_key",
+ "_hoodie_partition_path",
+ "_hoodie_file_name",
+ "id",
+ "name",
+ "isActive",
+ "intField",
+ "longField",
+ "floatField",
+ "doubleField",
+ "decimalField",
+ "dateField",
+ "timestampField",
+ "binaryField",
+ "arrayField",
+ "mapField",
+ "structField",
+ "byteField",
+ "shortField",
+ ]
);
- assert!(df.count().await.unwrap() == 4);
- println!("Rust API: read snapshot successfully!");
+ assert_eq!(df.count().await?, 4);
+
+ println!("SQL (DataFusion): read snapshot successfully!");
Ok(())
}
diff --git a/demo/app/.gitignore b/demo/table-api-python/run.sh
old mode 100644
new mode 100755
similarity index 95%
copy from demo/app/.gitignore
copy to demo/table-api-python/run.sh
index afcc356..e80a057
--- a/demo/app/.gitignore
+++ b/demo/table-api-python/run.sh
@@ -1,3 +1,5 @@
+#!/bin/bash
+#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -14,7 +16,6 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+#
-venv/
-Cargo.lock
-**/target/
+python -m src.main
diff --git a/demo/app/python/src/__init__.py
b/demo/table-api-python/src/__init__.py
similarity index 100%
rename from demo/app/python/src/__init__.py
rename to demo/table-api-python/src/__init__.py
diff --git a/demo/app/python/src/main.py b/demo/table-api-python/src/main.py
similarity index 91%
rename from demo/app/python/src/main.py
rename to demo/table-api-python/src/main.py
index cc87068..826584f 100644
--- a/demo/app/python/src/main.py
+++ b/demo/table-api-python/src/main.py
@@ -15,17 +15,18 @@
# specific language governing permissions and limitations
# under the License.
-from hudi import HudiTableBuilder
import pyarrow as pa
+from hudi import HudiTableBuilder
+
for url in [
"s3://hudi-demo/cow/v6_complexkeygen_hivestyle",
"s3://hudi-demo/mor/v6_complexkeygen_hivestyle",
]:
hudi_table = HudiTableBuilder.from_base_uri(url).build()
- records = hudi_table.read_snapshot()
+ batches = hudi_table.read_snapshot()
- arrow_table = pa.Table.from_batches(records)
+ arrow_table = pa.Table.from_batches(batches)
assert arrow_table.schema.names == [
"_hoodie_commit_time",
"_hoodie_commit_seqno",
@@ -51,4 +52,4 @@ for url in [
]
assert arrow_table.num_rows == 4
-print("Python API: read snapshot successfully!")
+print("Table API (Python): read snapshot successfully!")
diff --git a/demo/app/rust/Cargo.toml b/demo/table-api-rust/Cargo.toml
similarity index 88%
rename from demo/app/rust/Cargo.toml
rename to demo/table-api-rust/Cargo.toml
index 349a2d2..06fd623 100644
--- a/demo/app/rust/Cargo.toml
+++ b/demo/table-api-rust/Cargo.toml
@@ -19,11 +19,12 @@
# keep this empty such that it won't be linked to the repo workspace
[package]
-name = "app"
+name = "demo-table-api-rust"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = "^1"
-datafusion = "^43"
-hudi = { path = "../../../crates/hudi", features = ["datafusion"] }
+arrow = { version = "= 53.3.0", features = ["pyarrow"] }
+
+hudi = { path = "../../crates/hudi" }
diff --git a/demo/app/.gitignore b/demo/table-api-rust/run.sh
old mode 100644
new mode 100755
similarity index 96%
rename from demo/app/.gitignore
rename to demo/table-api-rust/run.sh
index afcc356..241a60b
--- a/demo/app/.gitignore
+++ b/demo/table-api-rust/run.sh
@@ -1,3 +1,5 @@
+#!/bin/bash
+#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -14,7 +16,6 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+#
-venv/
-Cargo.lock
-**/target/
+cargo run
diff --git a/demo/app/rust/src/main.rs b/demo/table-api-rust/src/main.rs
similarity index 66%
rename from demo/app/rust/src/main.rs
rename to demo/table-api-rust/src/main.rs
index 1bfbfd3..e9209dc 100644
--- a/demo/app/rust/src/main.rs
+++ b/demo/table-api-rust/src/main.rs
@@ -17,25 +17,28 @@
* under the License.
*/
-use std::sync::Arc;
-
-use datafusion::error::Result;
-use datafusion::prelude::{DataFrame, SessionContext};
-use hudi::HudiDataSource;
+use arrow::compute::concat_batches;
+use hudi::error::Result;
+use hudi::table::builder::TableBuilder as HudiTableBuilder;
#[tokio::main]
async fn main() -> Result<()> {
- let ctx = SessionContext::new();
- let hudi =
HudiDataSource::new("s3://hudi-demo/cow/v6_complexkeygen_hivestyle").await?;
- ctx.register_table("cow_v6_table", Arc::new(hudi))?;
- let df: DataFrame = ctx.sql("SELECT * from cow_v6_table").await?;
- assert!(
- df.schema()
- .columns()
- .iter()
- .map(|c| c.name())
- .collect::<Vec<_>>()
- == vec![
+ for url in [
+ "s3://hudi-demo/cow/v6_complexkeygen_hivestyle",
+ "s3://hudi-demo/mor/v6_complexkeygen_hivestyle",
+ ] {
+ let hudi_table = HudiTableBuilder::from_base_uri(url).build().await?;
+ let batches = hudi_table.read_snapshot(&[]).await?;
+
+ let batch = concat_batches(&batches[0].schema(), &batches)?;
+ assert_eq!(
+ batch
+ .schema()
+ .fields()
+ .iter()
+ .map(|f| f.name())
+ .collect::<Vec<_>>(),
+ vec![
"_hoodie_commit_time",
"_hoodie_commit_seqno",
"_hoodie_record_key",
@@ -58,8 +61,10 @@ async fn main() -> Result<()> {
"byteField",
"shortField",
]
- );
- assert!(df.count().await.unwrap() == 4);
- println!("Rust API: read snapshot successfully!");
+ );
+ assert_eq!(batch.num_rows(), 4);
+ }
+
+ println!("Table API (Rust): read snapshot successfully!");
Ok(())
}
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index eb4b110..d6f6623 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -109,36 +109,36 @@ class HudiTable:
options (Optional[Dict[str, str]]): Additional configuration
options (optional).
"""
...
- def get_schema(self) -> "pyarrow.Schema":
+ def hudi_options(self) -> Dict[str, str]:
"""
- Returns the schema of the Hudi table.
+ Get hudi options for table.
Returns:
- pyarrow.Schema: The schema of the table.
+ Dict[str, str]: A dictionary of hudi options.
"""
...
- def get_partition_schema(self) -> "pyarrow.Schema":
+ def storage_options(self) -> Dict[str, str]:
"""
- Returns the partition schema of the Hudi table.
+ Get storage options set for table instance.
Returns:
- pyarrow.Schema: The schema used for partitioning the table.
+ Dict[str, str]: A dictionary of storage options.
"""
...
- def hudi_options(self) -> Dict[str, str]:
+ def get_schema(self) -> "pyarrow.Schema":
"""
- Get hudi options for table.
+ Returns the schema of the Hudi table.
Returns:
- Dict[str, str]: A dictionary of hudi options.
+ pyarrow.Schema: The schema of the table.
"""
...
- def storage_options(self) -> Dict[str, str]:
+ def get_partition_schema(self) -> "pyarrow.Schema":
"""
- Get storage options set for table instance.
+ Returns the partition schema of the Hudi table.
Returns:
- Dict[str, str]: A dictionary of storage options.
+ pyarrow.Schema: The schema used for partitioning the table.
"""
...
def get_file_slices_splits(
@@ -155,6 +155,13 @@ class HudiTable:
List[List[HudiFileSlice]]: A list of file slice groups, each group
being a list of HudiFileSlice objects.
"""
...
+ def get_file_slices_splits_as_of(
+ self, n: int, timestamp: str, filters: Optional[List[Tuple[str, str,
str]]]
+ ) -> List[List[HudiFileSlice]]:
+ """
+ Retrieves all file slices in the Hudi table as of a timestamp in 'n'
splits, optionally filtered by given filters.
+ """
+ ...
def get_file_slices(
self, filters: Optional[List[Tuple[str, str, str]]]
) -> List[HudiFileSlice]:
@@ -168,6 +175,13 @@ class HudiTable:
List[HudiFileSlice]: A list of file slices matching the filters.
"""
...
+ def get_file_slices_as_of(
+ self, timestamp: str, filters: Optional[List[Tuple[str, str, str]]]
+ ) -> List[HudiFileSlice]:
+ """
+ Retrieves all file slices in the Hudi table as of a timestamp,
optionally filtered by the provided filters.
+ """
+ ...
def create_file_group_reader(self) -> HudiFileGroupReader:
"""
Creates a HudiFileGroupReader for reading records from file groups in
the Hudi table.
@@ -189,9 +203,27 @@ class HudiTable:
List[pyarrow.RecordBatch]: A list of record batches from the
snapshot of the table.
"""
...
+ def read_snapshot_as_of(
+ self, timestamp: str, filters: Optional[List[Tuple[str, str, str]]]
+ ) -> List["pyarrow.RecordBatch"]:
+ """
+ Reads the snapshot of the Hudi table as of a timestamp, optionally
filtered by the provided filters.
+ """
+ ...
def read_incremental_records(
self, start_timestamp: str, end_timestamp: Optional[str]
- ) -> List["pyarrow.RecordBatch"]: ...
+ ) -> List["pyarrow.RecordBatch"]:
+ """
+ Reads incremental records from the Hudi table between the given
timestamps.
+
+ Parameters:
+ start_timestamp (str): The start timestamp (exclusive).
+ end_timestamp (Optional[str]): The end timestamp (inclusive).
+
+ Returns:
+ List[pyarrow.RecordBatch]: A list of record batches containing
incremental records.
+ """
+ ...
def build_hudi_table(
base_uri: str,
diff --git a/python/src/internal.rs b/python/src/internal.rs
index 1bfb953..b09ea12 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -17,12 +17,11 @@
* under the License.
*/
+use arrow::pyarrow::ToPyArrow;
use std::collections::HashMap;
use std::convert::From;
use std::path::PathBuf;
use std::sync::OnceLock;
-
-use arrow::pyarrow::ToPyArrow;
use tokio::runtime::Runtime;
use hudi::error::CoreError;
@@ -208,6 +207,30 @@ impl HudiTable {
})
}
+ #[pyo3(signature = (n, timestamp, filters=None))]
+ fn get_file_slices_splits_as_of(
+ &self,
+ n: usize,
+ timestamp: &str,
+ filters: Option<Vec<(String, String, String)>>,
+ py: Python,
+ ) -> PyResult<Vec<Vec<HudiFileSlice>>> {
+ let filters = filters.unwrap_or_default();
+
+ py.allow_threads(|| {
+ let file_slices = rt()
+ .block_on(
+ self.inner
+ .get_file_slices_splits_as_of(n, timestamp,
&filters.as_strs()),
+ )
+ .map_err(PythonError::from)?;
+ Ok(file_slices
+ .iter()
+ .map(|inner_vec|
inner_vec.iter().map(convert_file_slice).collect())
+ .collect())
+ })
+ }
+
#[pyo3(signature = (filters=None))]
fn get_file_slices(
&self,
@@ -224,6 +247,26 @@ impl HudiTable {
})
}
+ #[pyo3(signature = (timestamp, filters=None))]
+ fn get_file_slices_as_of(
+ &self,
+ timestamp: &str,
+ filters: Option<Vec<(String, String, String)>>,
+ py: Python,
+ ) -> PyResult<Vec<HudiFileSlice>> {
+ let filters = filters.unwrap_or_default();
+
+ py.allow_threads(|| {
+ let file_slices = rt()
+ .block_on(
+ self.inner
+ .get_file_slices_as_of(timestamp, &filters.as_strs()),
+ )
+ .map_err(PythonError::from)?;
+ Ok(file_slices.iter().map(convert_file_slice).collect())
+ })
+ }
+
fn create_file_group_reader(&self) -> PyResult<HudiFileGroupReader> {
let fg_reader = self.inner.create_file_group_reader();
Ok(HudiFileGroupReader { inner: fg_reader })
@@ -242,6 +285,23 @@ impl HudiTable {
.to_pyarrow(py)
}
+ #[pyo3(signature = (timestamp, filters=None))]
+ fn read_snapshot_as_of(
+ &self,
+ timestamp: &str,
+ filters: Option<Vec<(String, String, String)>>,
+ py: Python,
+ ) -> PyResult<PyObject> {
+ let filters = filters.unwrap_or_default();
+
+ rt().block_on(
+ self.inner
+ .read_snapshot_as_of(timestamp, &filters.as_strs()),
+ )
+ .map_err(PythonError::from)?
+ .to_pyarrow(py)
+ }
+
#[pyo3(signature = (start_timestamp, end_timestamp=None))]
fn read_incremental_records(
&self,
diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py
index c35be59..c2931aa 100644
--- a/python/tests/test_table_read.py
+++ b/python/tests/test_table_read.py
@@ -15,6 +15,8 @@
# specific language governing permissions and limitations
# under the License.
+from itertools import chain
+
import pyarrow as pa
from hudi import HudiTable
@@ -153,13 +155,24 @@ def test_read_table_for_partition(get_sample_table):
]
-def test_read_table_as_of_timestamp(get_sample_table):
+def test_table_apis_as_of_timestamp(get_sample_table):
table_path = get_sample_table
- table = HudiTable(
- table_path, options={"hoodie.read.as.of.timestamp":
"20240402123035233"}
+ table = HudiTable(table_path)
+ timestamp = "20240402123035233"
+
+ file_slices_gen = table.get_file_slices_splits_as_of(2, timestamp)
+ file_slices_base_paths = set(
+ f.base_file_relative_path() for f in
chain.from_iterable(file_slices_gen)
)
+ assert file_slices_base_paths == {
+
"san_francisco/780b8586-3ad0-48ef-a6a1-d2217845ce4a-0_0-8-0_20240402123035233.parquet",
+
"san_francisco/d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0_1-9-0_20240402123035233.parquet",
+
"san_francisco/5a226868-2934-4f84-a16f-55124630c68d-0_2-10-0_20240402123035233.parquet",
+
"sao_paulo/ee915c68-d7f8-44f6-9759-e691add290d8-0_3-11-0_20240402123035233.parquet",
+
"chennai/68d3c349-f621-4cd8-9e8b-c6dd8eb20d08-0_4-12-0_20240402123035233.parquet",
+ }
- batches = table.read_snapshot()
+ batches = table.read_snapshot_as_of(timestamp)
t = pa.Table.from_batches(batches).select([0, 5, 6, 9]).sort_by("ts")
assert t.to_pylist() == [
{