This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new d90abed fix: simplify partition filter format by taking tuple of
strings (#170)
d90abed is described below
commit d90abed1a8d7da54bbde81aad473d4a923b5c597
Author: kazdy <[email protected]>
AuthorDate: Sat Oct 19 21:54:51 2024 +0200
fix: simplify partition filter format by taking tuple of strings (#170)
---
crates/core/src/lib.rs | 1 +
crates/core/src/table/fs_view.rs | 2 +-
crates/core/src/table/mod.rs | 33 ++++++----
crates/core/src/table/partition.rs | 126 +++++++++----------------------------
crates/core/src/util/mod.rs | 59 +++++++++++++++++
python/hudi/_internal.pyi | 16 ++---
python/src/internal.rs | 25 ++++----
python/tests/test_table_read.py | 2 +-
8 files changed, 134 insertions(+), 130 deletions(-)
diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs
index 3e15dee..3190dec 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/lib.rs
@@ -47,3 +47,4 @@ pub mod config;
pub mod file_group;
pub mod storage;
pub mod table;
+pub mod util;
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index b67534a..f2309e3 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -297,7 +297,7 @@ mod tests {
.unwrap();
let partition_schema =
hudi_table.get_partition_schema().await.unwrap();
let partition_pruner = PartitionPruner::new(
- &["byteField < 20", "shortField = 300"],
+ &[("byteField", "<", "20"), ("shortField", "=", "300")],
&partition_schema,
hudi_table.hudi_configs.as_ref(),
)
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 486e320..0d25db7 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -190,7 +190,7 @@ impl Table {
pub async fn split_file_slices(
&self,
n: usize,
- filters: &[&str],
+ filters: &[(&str, &str, &str)],
) -> Result<Vec<Vec<FileSlice>>> {
let n = std::cmp::max(1, n);
let file_slices = self.get_file_slices(filters).await?;
@@ -205,7 +205,7 @@ impl Table {
/// Get all the [FileSlice]s in the table.
///
/// If the [AsOfTimestamp] configuration is set, the file slices at the
specified timestamp will be returned.
- pub async fn get_file_slices(&self, filters: &[&str]) ->
Result<Vec<FileSlice>> {
+ pub async fn get_file_slices(&self, filters: &[(&str, &str, &str)]) ->
Result<Vec<FileSlice>> {
if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) {
self.get_file_slices_as_of(timestamp.to::<String>().as_str(),
filters)
.await
@@ -220,7 +220,7 @@ impl Table {
async fn get_file_slices_as_of(
&self,
timestamp: &str,
- filters: &[&str],
+ filters: &[(&str, &str, &str)],
) -> Result<Vec<FileSlice>> {
let excludes = self.timeline.get_replaced_file_groups().await?;
let partition_schema = self.get_partition_schema().await?;
@@ -234,7 +234,7 @@ impl Table {
/// Get all the latest records in the table.
///
/// If the [AsOfTimestamp] configuration is set, the records at the
specified timestamp will be returned.
- pub async fn read_snapshot(&self, filters: &[&str]) ->
Result<Vec<RecordBatch>> {
+ pub async fn read_snapshot(&self, filters: &[(&str, &str, &str)]) ->
Result<Vec<RecordBatch>> {
if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) {
self.read_snapshot_as_of(timestamp.to::<String>().as_str(),
filters)
.await
@@ -249,7 +249,7 @@ impl Table {
async fn read_snapshot_as_of(
&self,
timestamp: &str,
- filters: &[&str],
+ filters: &[(&str, &str, &str)],
) -> Result<Vec<RecordBatch>> {
let file_slices = self
.get_file_slices_as_of(timestamp, filters)
@@ -267,7 +267,10 @@ impl Table {
}
#[cfg(test)]
- async fn get_file_paths_with_filters(&self, filters: &[&str]) ->
Result<Vec<String>> {
+ async fn get_file_paths_with_filters(
+ &self,
+ filters: &[(&str, &str, &str)],
+ ) -> Result<Vec<String>> {
let mut file_paths = Vec::new();
for f in self.get_file_slices(filters).await? {
file_paths.push(f.base_file_path().to_string());
@@ -691,7 +694,7 @@ mod tests {
.collect::<HashSet<_>>();
assert_eq!(actual, expected);
- let partition_filters = &["byteField >= 10", "byteField < 30"];
+ let partition_filters = &[("byteField", ">=", "10"), ("byteField",
"<", "30")];
let actual = hudi_table
.get_file_paths_with_filters(partition_filters)
.await
@@ -707,7 +710,7 @@ mod tests {
.collect::<HashSet<_>>();
assert_eq!(actual, expected);
- let partition_filters = &["byteField > 30"];
+ let partition_filters = &[("byteField", ">", "30")];
let actual = hudi_table
.get_file_paths_with_filters(partition_filters)
.await
@@ -741,7 +744,11 @@ mod tests {
.collect::<HashSet<_>>();
assert_eq!(actual, expected);
- let partition_filters = &["byteField >= 10", "byteField < 20",
"shortField != 100"];
+ let partition_filters = &[
+ ("byteField", ">=", "10"),
+ ("byteField", "<", "20"),
+ ("shortField", "!=", "100"),
+ ];
let actual = hudi_table
.get_file_paths_with_filters(partition_filters)
.await
@@ -756,7 +763,7 @@ mod tests {
.collect::<HashSet<_>>();
assert_eq!(actual, expected);
- let partition_filters = &["byteField > 20", "shortField = 300"];
+ let partition_filters = &[("byteField", ">", "20"), ("shortField",
"=", "300")];
let actual = hudi_table
.get_file_paths_with_filters(partition_filters)
.await
@@ -771,7 +778,11 @@ mod tests {
async fn hudi_table_read_snapshot_for_complex_keygen_hive_style() {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
- let partition_filters = &["byteField >= 10", "byteField < 20",
"shortField != 100"];
+ let partition_filters = &[
+ ("byteField", ">=", "10"),
+ ("byteField", "<", "20"),
+ ("shortField", "!=", "100"),
+ ];
let records =
hudi_table.read_snapshot(partition_filters).await.unwrap();
assert_eq!(records.len(), 1);
assert_eq!(records[0].num_rows(), 2);
diff --git a/crates/core/src/table/partition.rs
b/crates/core/src/table/partition.rs
index 16b9a99..17927eb 100644
--- a/crates/core/src/table/partition.rs
+++ b/crates/core/src/table/partition.rs
@@ -24,7 +24,6 @@ use arrow_array::{ArrayRef, Scalar, StringArray};
use arrow_cast::{cast_with_options, CastOptions};
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
use arrow_schema::{DataType, Field, Schema};
-use once_cell::sync::Lazy;
use std::cmp::PartialEq;
use std::collections::HashMap;
use std::str::FromStr;
@@ -41,7 +40,7 @@ pub struct PartitionPruner {
impl PartitionPruner {
pub fn new(
- and_filters: &[&str],
+ and_filters: &[(&str, &str, &str)],
partition_schema: &Schema,
hudi_configs: &HudiConfigs,
) -> Result<Self> {
@@ -175,19 +174,6 @@ impl Operator {
(">", Operator::Gt),
(">=", Operator::Gte),
];
-
- /// Returns the supported operator tokens. Note that the tokens are sorted
by length in descending order to facilitate parsing.
- fn supported_tokens() -> &'static [&'static str] {
- static TOKENS: Lazy<Vec<&'static str>> = Lazy::new(|| {
- let mut tokens: Vec<&'static str> = Operator::TOKEN_OP_PAIRS
- .iter()
- .map(|&(token, _)| token)
- .collect();
- tokens.sort_by_key(|t| std::cmp::Reverse(t.len()));
- tokens
- });
- &TOKENS
- }
}
impl FromStr for Operator {
@@ -209,11 +195,11 @@ pub struct PartitionFilter {
value: Scalar<ArrayRef>,
}
-impl TryFrom<(&str, &Schema)> for PartitionFilter {
+impl TryFrom<((&str, &str, &str), &Schema)> for PartitionFilter {
type Error = anyhow::Error;
- fn try_from((s, partition_schema): (&str, &Schema)) -> Result<Self> {
- let (field_name, operator_str, value_str) = Self::parse_to_parts(s)?;
+ fn try_from((filter, partition_schema): ((&str, &str, &str), &Schema)) ->
Result<Self> {
+ let (field_name, operator_str, value_str) = filter;
let field: &Field = partition_schema
.field_with_name(field_name)
@@ -236,36 +222,14 @@ impl TryFrom<(&str, &Schema)> for PartitionFilter {
}
impl PartitionFilter {
- fn parse_to_parts(s: &str) -> Result<(&str, &str, &str)> {
- let s = s.trim();
-
- let (index, op) = Operator::supported_tokens()
- .iter()
- .filter_map(|&op| s.find(op).map(|index| (index, op)))
- .min_by_key(|(index, _)| *index)
- .ok_or_else(|| anyhow!("No valid operator found in the filter
string"))?;
-
- let (field, rest) = s.split_at(index);
- let (_, value) = rest.split_at(op.len());
-
- let field = field.trim();
- let value = value.trim();
-
- if field.is_empty() || value.is_empty() {
- return Err(anyhow!(
- "Invalid filter format: missing field name or value"
- ));
- }
-
- Ok((field, op, value))
- }
-
fn cast_value(value: &[&str; 1], data_type: &DataType) ->
Result<Scalar<ArrayRef>> {
let cast_options = CastOptions {
safe: false,
format_options: Default::default(),
};
+
let value = StringArray::from(Vec::from(value));
+
Ok(Scalar::new(cast_with_options(
&value,
data_type,
@@ -281,7 +245,7 @@ mod tests {
IsHiveStylePartitioning, IsPartitionPathUrlencoded,
};
use arrow::datatypes::{DataType, Field, Schema};
- use arrow_array::Datum;
+ use arrow_array::{Array, Datum};
use hudi_tests::assert_not;
use std::str::FromStr;
@@ -296,16 +260,16 @@ mod tests {
#[test]
fn test_partition_filter_try_from_valid() {
let schema = create_test_schema();
- let filter_str = "date = 2023-01-01";
- let filter = PartitionFilter::try_from((filter_str, &schema));
+ let filter_tuple = ("date", "=", "2023-01-01");
+ let filter = PartitionFilter::try_from((filter_tuple, &schema));
assert!(filter.is_ok());
let filter = filter.unwrap();
assert_eq!(filter.field.name(), "date");
assert_eq!(filter.operator, Operator::Eq);
assert_eq!(filter.value.get().0.len(), 1);
- let filter_str = "category!=foo";
- let filter = PartitionFilter::try_from((filter_str, &schema));
+ let filter_tuple = ("category", "!=", "foo");
+ let filter = PartitionFilter::try_from((filter_tuple, &schema));
assert!(filter.is_ok());
let filter = filter.unwrap();
assert_eq!(filter.field.name(), "category");
@@ -320,8 +284,8 @@ mod tests {
#[test]
fn test_partition_filter_try_from_invalid_field() {
let schema = create_test_schema();
- let filter_str = "invalid_field = 2023-01-01";
- let filter = PartitionFilter::try_from((filter_str, &schema));
+ let filter_tuple = ("invalid_field", "=", "2023-01-01");
+ let filter = PartitionFilter::try_from((filter_tuple, &schema));
assert!(filter.is_err());
assert!(filter
.unwrap_err()
@@ -332,60 +296,30 @@ mod tests {
#[test]
fn test_partition_filter_try_from_invalid_operator() {
let schema = create_test_schema();
- let filter_str = "date ?? 2023-01-01";
- let filter = PartitionFilter::try_from((filter_str, &schema));
+ let filter_tuple = ("date", "??", "2023-01-01");
+ let filter = PartitionFilter::try_from((filter_tuple, &schema));
assert!(filter.is_err());
assert!(filter
.unwrap_err()
.to_string()
- .contains("No valid operator found"));
+ .contains("Unsupported operator: ??"));
}
#[test]
fn test_partition_filter_try_from_invalid_value() {
let schema = create_test_schema();
- let filter_str = "count = not_a_number";
- let filter = PartitionFilter::try_from((filter_str, &schema));
+ let filter_tuple = ("count", "=", "not_a_number");
+ let filter = PartitionFilter::try_from((filter_tuple, &schema));
assert!(filter.is_err());
assert!(filter.unwrap_err().to_string().contains("Unable to cast"));
}
- #[test]
- fn test_parse_to_parts_valid() {
- let result = PartitionFilter::parse_to_parts("date = 2023-01-01");
- assert!(result.is_ok());
- let (field, operator, value) = result.unwrap();
- assert_eq!(field, "date");
- assert_eq!(operator, "=");
- assert_eq!(value, "2023-01-01");
- }
-
- #[test]
- fn test_parse_to_parts_no_operator() {
- let result = PartitionFilter::parse_to_parts("date 2023-01-01");
- assert!(result.is_err());
- assert!(result
- .unwrap_err()
- .to_string()
- .contains("No valid operator found"));
- }
-
- #[test]
- fn test_parse_to_parts_multiple_operators() {
- let result = PartitionFilter::parse_to_parts("count >= 10 <= 20");
- assert!(result.is_ok());
- let (field, operator, value) = result.unwrap();
- assert_eq!(field, "count");
- assert_eq!(operator, ">=");
- assert_eq!(value, "10 <= 20");
- }
-
#[test]
fn test_partition_filter_try_from_all_operators() {
let schema = create_test_schema();
- for &op in Operator::supported_tokens() {
- let filter_str = format!("count {} 10", op);
- let filter = PartitionFilter::try_from((filter_str.as_str(),
&schema));
+ for (op, _) in Operator::TOKEN_OP_PAIRS {
+ let filter_tuple = ("count", op, "10");
+ let filter = PartitionFilter::try_from((filter_tuple, &schema));
assert!(filter.is_ok(), "Failed for operator: {}", op);
let filter = filter.unwrap();
assert_eq!(filter.field.name(), "count");
@@ -404,14 +338,6 @@ mod tests {
assert!(Operator::from_str("??").is_err());
}
- #[test]
- fn test_operator_supported_tokens() {
- assert_eq!(
- Operator::supported_tokens(),
- &["!=", "<=", ">=", "=", "<", ">"]
- );
- }
-
fn create_hudi_configs(is_hive_style: bool, is_url_encoded: bool) ->
HudiConfigs {
HudiConfigs::new([
(IsHiveStylePartitioning, is_hive_style.to_string()),
@@ -422,7 +348,7 @@ mod tests {
fn test_partition_pruner_new() {
let schema = create_test_schema();
let configs = create_hudi_configs(true, false);
- let filters = vec!["date > 2023-01-01", "category = A"];
+ let filters = vec![("date", ">", "2023-01-01"), ("category", "=",
"A")];
let pruner = PartitionPruner::new(&filters, &schema, &configs);
assert!(pruner.is_ok());
@@ -450,7 +376,7 @@ mod tests {
assert!(pruner_empty.is_empty());
let pruner_non_empty =
- PartitionPruner::new(&["date > 2023-01-01"], &schema,
&configs).unwrap();
+ PartitionPruner::new(&[("date", ">", "2023-01-01")], &schema,
&configs).unwrap();
assert_not!(pruner_non_empty.is_empty());
}
@@ -458,7 +384,11 @@ mod tests {
fn test_partition_pruner_should_include() {
let schema = create_test_schema();
let configs = create_hudi_configs(true, false);
- let filters = vec!["date > 2023-01-01", "category = A", "count <=
100"];
+ let filters = vec![
+ ("date", ">", "2023-01-01"),
+ ("category", "=", "A"),
+ ("count", "<=", "100"),
+ ];
let pruner = PartitionPruner::new(&filters, &schema,
&configs).unwrap();
diff --git a/crates/core/src/util/mod.rs b/crates/core/src/util/mod.rs
new file mode 100644
index 0000000..7db2764
--- /dev/null
+++ b/crates/core/src/util/mod.rs
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+pub fn convert_vec_to_slice(vec: &[(String, String, String)]) -> Vec<(&str,
&str, &str)> {
+ vec.iter()
+ .map(|(a, b, c)| (a.as_str(), b.as_str(), c.as_str()))
+ .collect()
+}
+
+#[macro_export]
+macro_rules! vec_to_slice {
+ ($vec:expr) => {
+ &convert_vec_to_slice(&$vec)[..]
+ };
+}
+pub use vec_to_slice;
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_convert_vec_of_string_to_vec_of_str_slice() {
+ let vec_of_strings = vec![
+ (
+ String::from("date"),
+ String::from("="),
+ String::from("2022-01-02"),
+ ),
+ (
+ String::from("foo"),
+ String::from("bar"),
+ String::from("baz"),
+ ),
+ ];
+
+ let expected_slice = vec![("date", "=", "2022-01-02"), ("foo", "bar",
"baz")];
+
+ let result = vec_to_slice!(&vec_of_strings);
+
+ assert_eq!(result, expected_slice);
+ }
+}
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index ccb8d1c..081a229 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
from dataclasses import dataclass
-from typing import Dict, List, Optional
+from typing import Dict, List, Optional, Tuple
import pyarrow # type: ignore
@@ -142,25 +142,27 @@ class HudiTable:
"""
...
def split_file_slices(
- self, n: int, filters: Optional[List[str]]
+ self, n: int, filters: Optional[List[Tuple[str, str, str]]]
) -> List[List[HudiFileSlice]]:
"""
Splits the file slices into 'n' parts, optionally filtered by given
filters.
Parameters:
n (int): The number of parts to split the file slices into.
- filters (Optional[List[str]]): Optional filters for selecting file
slices.
+ filters (Optional[List[Tuple[str, str, str]]]): Optional filters
for selecting file slices.
Returns:
List[List[HudiFileSlice]]: A list of file slice groups, each group
being a list of HudiFileSlice objects.
"""
...
- def get_file_slices(self, filters: Optional[List[str]]) ->
List[HudiFileSlice]:
+ def get_file_slices(
+ self, filters: Optional[List[Tuple[str, str, str]]]
+ ) -> List[HudiFileSlice]:
"""
Retrieves all file slices in the Hudi table, optionally filtered by
the provided filters.
Parameters:
- filters (Optional[List[str]]): Optional filters for selecting file
slices.
+ filters (Optional[List[Tuple[str, str, str]]]): Optional filters
for selecting file slices.
Returns:
List[HudiFileSlice]: A list of file slices matching the filters.
@@ -175,13 +177,13 @@ class HudiTable:
"""
...
def read_snapshot(
- self, filters: Optional[List[str]]
+ self, filters: Optional[List[Tuple[str, str, str]]]
) -> List["pyarrow.RecordBatch"]:
"""
Reads the latest snapshot of the Hudi table, optionally filtered by
the provided filters.
Parameters:
- filters (Optional[List[str]]): Optional filters for selecting file
slices.
+ filters (Optional[List[Tuple[str, str, str]]]): Optional filters
for selecting file slices.
Returns:
List[pyarrow.RecordBatch]: A list of record batches from the
snapshot of the table.
diff --git a/python/src/internal.rs b/python/src/internal.rs
index b26a511..113c6c9 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::OnceLock;
@@ -29,12 +30,8 @@ use hudi::file_group::reader::FileGroupReader;
use hudi::file_group::FileSlice;
use hudi::table::builder::TableBuilder;
use hudi::table::Table;
-
-macro_rules! vec_string_to_slice {
- ($vec:expr) => {
- &$vec.iter().map(AsRef::as_ref).collect::<Vec<_>>()
- };
-}
+use hudi::util::convert_vec_to_slice;
+use hudi::util::vec_to_slice;
#[cfg(not(tarpaulin))]
#[derive(Clone, Debug)]
@@ -163,13 +160,13 @@ impl HudiTable {
fn split_file_slices(
&self,
n: usize,
- filters: Option<Vec<String>>,
+ filters: Option<Vec<(String, String, String)>>,
py: Python,
) -> PyResult<Vec<Vec<HudiFileSlice>>> {
py.allow_threads(|| {
let file_slices = rt().block_on(
self.inner
- .split_file_slices(n,
vec_string_to_slice!(filters.unwrap_or_default())),
+ .split_file_slices(n,
vec_to_slice!(filters.unwrap_or_default())),
)?;
Ok(file_slices
.iter()
@@ -181,13 +178,13 @@ impl HudiTable {
#[pyo3(signature = (filters=None))]
fn get_file_slices(
&self,
- filters: Option<Vec<String>>,
+ filters: Option<Vec<(String, String, String)>>,
py: Python,
) -> PyResult<Vec<HudiFileSlice>> {
py.allow_threads(|| {
let file_slices = rt().block_on(
self.inner
-
.get_file_slices(vec_string_to_slice!(filters.unwrap_or_default())),
+
.get_file_slices(vec_to_slice!(filters.unwrap_or_default())),
)?;
Ok(file_slices.iter().map(convert_file_slice).collect())
})
@@ -199,10 +196,14 @@ impl HudiTable {
}
#[pyo3(signature = (filters=None))]
- fn read_snapshot(&self, filters: Option<Vec<String>>, py: Python) ->
PyResult<PyObject> {
+ fn read_snapshot(
+ &self,
+ filters: Option<Vec<(String, String, String)>>,
+ py: Python,
+ ) -> PyResult<PyObject> {
rt().block_on(
self.inner
-
.read_snapshot(vec_string_to_slice!(filters.unwrap_or_default())),
+ .read_snapshot(vec_to_slice!(filters.unwrap_or_default())),
)?
.to_pyarrow(py)
}
diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py
index cbcfaab..fbed1fd 100644
--- a/python/tests/test_table_read.py
+++ b/python/tests/test_table_read.py
@@ -129,7 +129,7 @@ def test_read_table_for_partition(get_sample_table):
table_path = get_sample_table
table = HudiTable(table_path)
- batches = table.read_snapshot(["city = san_francisco"])
+ batches = table.read_snapshot([("city", "=", "san_francisco")])
t = pa.Table.from_batches(batches).select([0, 5, 6, 9]).sort_by("ts")
assert t.to_pylist() == [
{