This is an automated email from the ASF dual-hosted git repository.
jiayuliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 3c45926b25 Make parquet-show-bloom-filter work with integer typed
columns (#7529)
3c45926b25 is described below
commit 3c45926b25290c754564ee72003dad4224e40fe7
Author: Adam Reeve <[email protected]>
AuthorDate: Tue May 20 15:12:05 2025 +1200
Make parquet-show-bloom-filter work with integer typed columns (#7529)
---
parquet/pytest/test_parquet_integration.py | 84 ++++++++++++++++++++++------
parquet/src/bin/parquet-show-bloom-filter.rs | 50 +++++++++++++----
2 files changed, 105 insertions(+), 29 deletions(-)
diff --git a/parquet/pytest/test_parquet_integration.py
b/parquet/pytest/test_parquet_integration.py
index e0846d4e77..e7e6c64b04 100755
--- a/parquet/pytest/test_parquet_integration.py
+++ b/parquet/pytest/test_parquet_integration.py
@@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
import pyspark.sql
+from pyspark.sql.types import StructType, StructField, StringType,
IntegerType, LongType
import pandas as pd
from tempfile import NamedTemporaryFile, TemporaryDirectory
import subprocess
@@ -27,23 +28,30 @@ def create_data_and_spark_df(n):
spark.conf.set("parquet.bloom.filter.enabled", True)
spark.conf.set("parquet.bloom.filter.expected.ndv", 10)
spark.conf.set("parquet.bloom.filter.max.bytes", 32)
- data = [(f"id-{i % 10}", f"name-{i%10}") for i in range(n)]
- df = spark.createDataFrame(data, ["id", "name"]).repartition(1)
+ data = [(f"id-{i % 10}", f"name-{i%10}", i * 2, i * 2 + 1) for i in
range(n)]
+ schema = StructType(
+ [
+ StructField("id", StringType(), True),
+ StructField("name", StringType(), True),
+ StructField("int32", IntegerType(), True),
+ StructField("int64", LongType(), True),
+ ]
+ )
+ df = spark.createDataFrame(data, schema).repartition(1)
return data, df
def create_data_and_pandas_df(n):
- data = [(f"id-{i % 10}", f"name-{i%10}") for i in range(n)]
- df = pd.DataFrame(data, columns=["id", "name"])
+ data = [(f"id-{i % 10}", f"name-{i%10}", i * 2, i * 2 + 1) for i in
range(n)]
+ df = pd.DataFrame(data, columns=["id", "name", "int32", "int64"])
return data, df
-def get_expected_output(data):
+def get_expected_output(expected_results):
expected = ["Row group #0", "=" * 80]
- for v in data:
- expected.append(f"Value {v[0]} is present in bloom filter")
- for v in data:
- expected.append(f"Value {v[1]} is absent in bloom filter")
+ for value, result in expected_results:
+ result_str = "present" if result else "absent"
+ expected.append(f"Value {value} is {result_str} in bloom filter")
expected = "\n".join(expected) + "\n"
return expected.encode("utf-8")
@@ -53,6 +61,7 @@ def get_from_csv_cli_output(schema_file, output_file,
csv_file):
"parquet-fromcsv",
"--schema",
schema_file,
+ "--has-header",
"--enable-bloom-filter",
"true",
"--input-file",
@@ -63,7 +72,7 @@ def get_from_csv_cli_output(schema_file, output_file,
csv_file):
return subprocess.check_output(args)
-def get_show_filter_cli_output(output_dir, data, col_name="id"):
+def get_show_filter_cli_output(output_dir, col_name, test_values):
# take the first (and only) parquet file
(parquet_file,) = sorted(pathlib.Path(output_dir).glob("*.parquet"))
args = [
@@ -71,16 +80,16 @@ def get_show_filter_cli_output(output_dir, data,
col_name="id"):
parquet_file,
col_name,
]
- for v in data:
- args.extend([v[0]])
- for v in data:
- args.extend([v[1]])
+ for v in test_values:
+ args.append(str(v))
return subprocess.check_output(args)
SCHEMA = b"""message schema {
required binary id (UTF8);
required binary name (UTF8);
+ required int32 int32;
+ required int64 int64;
}"""
@@ -90,8 +99,7 @@ class TestParquetIntegration:
data, df = create_data_and_spark_df(n)
with TemporaryDirectory() as output_dir:
df.write.parquet(output_dir, mode="overwrite")
- cli_output = get_show_filter_cli_output(output_dir, data)
- assert cli_output == get_expected_output(data)
+ self._test_column_filters(output_dir, data)
def test_bloom_filter_round_trip(self, n):
data, df = create_data_and_pandas_df(n)
@@ -106,5 +114,45 @@ class TestParquetIntegration:
schema_file.name, parquet_file, csv_file.name
)
assert cli_output == b""
- cli_output = get_show_filter_cli_output(output_dir, data)
- assert cli_output == get_expected_output(data)
+ self._test_column_filters(output_dir, data)
+
+ def _test_column_filters(self, output_dir, data):
+ self._test_string_col_filter(output_dir, data)
+ self._test_int32_col_filter(output_dir, data)
+ self._test_int64_col_filter(output_dir, data)
+
+ def _test_string_col_filter(self, output_dir, data):
+ test_values = []
+ expected_results = []
+ for v in data:
+ test_values.append(v[0])
+ expected_results.append((v[0], True))
+ for v in data:
+ test_values.append(v[1])
+ expected_results.append((v[1], False))
+ cli_output = get_show_filter_cli_output(output_dir, "id", test_values)
+ assert cli_output == get_expected_output(expected_results)
+
+ def _test_int32_col_filter(self, output_dir, data):
+ test_values = []
+ expected_results = []
+ for v in data:
+ test_values.append(v[2])
+ expected_results.append((v[2], True))
+ for v in data:
+ test_values.append(v[3])
+ expected_results.append((v[3], False))
+ cli_output = get_show_filter_cli_output(output_dir, "int32",
test_values)
+ assert cli_output == get_expected_output(expected_results)
+
+ def _test_int64_col_filter(self, output_dir, data):
+ test_values = []
+ expected_results = []
+ for v in data:
+ test_values.append(v[3])
+ expected_results.append((v[3], True))
+ for v in data:
+ test_values.append(v[2])
+ expected_results.append((v[2], False))
+ cli_output = get_show_filter_cli_output(output_dir, "int64",
test_values)
+ assert cli_output == get_expected_output(expected_results)
diff --git a/parquet/src/bin/parquet-show-bloom-filter.rs
b/parquet/src/bin/parquet-show-bloom-filter.rs
index b1b332590a..41e3ac9b52 100644
--- a/parquet/src/bin/parquet-show-bloom-filter.rs
+++ b/parquet/src/bin/parquet-show-bloom-filter.rs
@@ -34,6 +34,9 @@
//! ```
use clap::Parser;
+use parquet::basic::Type;
+use parquet::bloom_filter::Sbbf;
+use parquet::file::metadata::ColumnChunkMetaData;
use parquet::file::{
properties::ReaderProperties,
reader::{FileReader, SerializedFileReader},
@@ -46,11 +49,11 @@ use std::{fs::File, path::Path};
struct Args {
#[clap(help("Path to the parquet file"))]
file_name: String,
- #[clap(help("Check the bloom filter indexes for the given column"))]
+ #[clap(help("Check the bloom filter indexes for the given column. Only
string typed columns or columns with an Int32 or Int64 physical type are
supported"))]
column: String,
#[clap(
help(
- "Check if the given values match bloom filter, the values will be
evaluated as strings"
+ "Check if the given values match bloom filter, the values will be
parsed to the physical type of the column"
),
required = true
)]
@@ -78,7 +81,7 @@ fn main() {
for (ri, row_group) in metadata.row_groups().iter().enumerate() {
println!("Row group #{ri}");
println!("{}", "=".repeat(80));
- if let Some((column_index, _)) = row_group
+ if let Some((column_index, column)) = row_group
.columns()
.iter()
.enumerate()
@@ -89,15 +92,18 @@ fn main() {
.expect("Unable to read row group");
if let Some(sbbf) =
row_group_reader.get_column_bloom_filter(column_index) {
args.values.iter().for_each(|value| {
- println!(
- "Value {} is {} in bloom filter",
- value,
- if sbbf.check(&value.as_str()) {
- "present"
- } else {
- "absent"
+ match check_filter(sbbf, value, column) {
+ Ok(present) => {
+ println!(
+ "Value {} is {} in bloom filter",
+ value,
+ if present { "present" } else { "absent" }
+ )
}
- )
+ Err(err) => {
+ println!("{err}");
+ }
+ };
});
} else {
println!("No bloom filter found for column {}", args.column);
@@ -116,3 +122,25 @@ fn main() {
}
}
}
+
+fn check_filter(sbbf: &Sbbf, value: &String, column: &ColumnChunkMetaData) ->
Result<bool, String> {
+ match column.column_type() {
+ Type::INT32 => {
+ let value: i32 = value
+ .parse()
+ .map_err(|e| format!("Unable to parse value '{}' to i32: {}",
value, e))?;
+ Ok(sbbf.check(&value))
+ }
+ Type::INT64 => {
+ let value: i64 = value
+ .parse()
+ .map_err(|e| format!("Unable to parse value '{}' to i64: {}",
value, e))?;
+ Ok(sbbf.check(&value))
+ }
+ Type::BYTE_ARRAY => Ok(sbbf.check(&value.as_str())),
+ _ => Err(format!(
+ "Unsupported column type for checking bloom filter: {}",
+ column.column_type()
+ )),
+ }
+}