This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 3c45926b25 Make parquet-show-bloom-filter work with integer typed 
columns (#7529)
3c45926b25 is described below

commit 3c45926b25290c754564ee72003dad4224e40fe7
Author: Adam Reeve <[email protected]>
AuthorDate: Tue May 20 15:12:05 2025 +1200

    Make parquet-show-bloom-filter work with integer typed columns (#7529)
---
 parquet/pytest/test_parquet_integration.py   | 84 ++++++++++++++++++++++------
 parquet/src/bin/parquet-show-bloom-filter.rs | 50 +++++++++++++----
 2 files changed, 105 insertions(+), 29 deletions(-)

diff --git a/parquet/pytest/test_parquet_integration.py 
b/parquet/pytest/test_parquet_integration.py
index e0846d4e77..e7e6c64b04 100755
--- a/parquet/pytest/test_parquet_integration.py
+++ b/parquet/pytest/test_parquet_integration.py
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 import pyspark.sql
+from pyspark.sql.types import StructType, StructField, StringType, 
IntegerType, LongType
 import pandas as pd
 from tempfile import NamedTemporaryFile, TemporaryDirectory
 import subprocess
@@ -27,23 +28,30 @@ def create_data_and_spark_df(n):
     spark.conf.set("parquet.bloom.filter.enabled", True)
     spark.conf.set("parquet.bloom.filter.expected.ndv", 10)
     spark.conf.set("parquet.bloom.filter.max.bytes", 32)
-    data = [(f"id-{i % 10}", f"name-{i%10}") for i in range(n)]
-    df = spark.createDataFrame(data, ["id", "name"]).repartition(1)
+    data = [(f"id-{i % 10}", f"name-{i%10}", i * 2, i * 2 + 1) for i in 
range(n)]
+    schema = StructType(
+        [
+            StructField("id", StringType(), True),
+            StructField("name", StringType(), True),
+            StructField("int32", IntegerType(), True),
+            StructField("int64", LongType(), True),
+        ]
+    )
+    df = spark.createDataFrame(data, schema).repartition(1)
     return data, df
 
 
 def create_data_and_pandas_df(n):
-    data = [(f"id-{i % 10}", f"name-{i%10}") for i in range(n)]
-    df = pd.DataFrame(data, columns=["id", "name"])
+    data = [(f"id-{i % 10}", f"name-{i%10}", i * 2, i * 2 + 1) for i in 
range(n)]
+    df = pd.DataFrame(data, columns=["id", "name", "int32", "int64"])
     return data, df
 
 
-def get_expected_output(data):
+def get_expected_output(expected_results):
     expected = ["Row group #0", "=" * 80]
-    for v in data:
-        expected.append(f"Value {v[0]} is present in bloom filter")
-    for v in data:
-        expected.append(f"Value {v[1]} is absent in bloom filter")
+    for value, result in expected_results:
+        result_str = "present" if result else "absent"
+        expected.append(f"Value {value} is {result_str} in bloom filter")
     expected = "\n".join(expected) + "\n"
     return expected.encode("utf-8")
 
@@ -53,6 +61,7 @@ def get_from_csv_cli_output(schema_file, output_file, 
csv_file):
         "parquet-fromcsv",
         "--schema",
         schema_file,
+        "--has-header",
         "--enable-bloom-filter",
         "true",
         "--input-file",
@@ -63,7 +72,7 @@ def get_from_csv_cli_output(schema_file, output_file, 
csv_file):
     return subprocess.check_output(args)
 
 
-def get_show_filter_cli_output(output_dir, data, col_name="id"):
+def get_show_filter_cli_output(output_dir, col_name, test_values):
     # take the first (and only) parquet file
     (parquet_file,) = sorted(pathlib.Path(output_dir).glob("*.parquet"))
     args = [
@@ -71,16 +80,16 @@ def get_show_filter_cli_output(output_dir, data, 
col_name="id"):
         parquet_file,
         col_name,
     ]
-    for v in data:
-        args.extend([v[0]])
-    for v in data:
-        args.extend([v[1]])
+    for v in test_values:
+        args.append(str(v))
     return subprocess.check_output(args)
 
 
 SCHEMA = b"""message schema {
     required binary id (UTF8);
     required binary name (UTF8);
+    required int32 int32;
+    required int64 int64;
 }"""
 
 
@@ -90,8 +99,7 @@ class TestParquetIntegration:
         data, df = create_data_and_spark_df(n)
         with TemporaryDirectory() as output_dir:
             df.write.parquet(output_dir, mode="overwrite")
-            cli_output = get_show_filter_cli_output(output_dir, data)
-            assert cli_output == get_expected_output(data)
+            self._test_column_filters(output_dir, data)
 
     def test_bloom_filter_round_trip(self, n):
         data, df = create_data_and_pandas_df(n)
@@ -106,5 +114,45 @@ class TestParquetIntegration:
                 schema_file.name, parquet_file, csv_file.name
             )
             assert cli_output == b""
-            cli_output = get_show_filter_cli_output(output_dir, data)
-            assert cli_output == get_expected_output(data)
+            self._test_column_filters(output_dir, data)
+
+    def _test_column_filters(self, output_dir, data):
+        self._test_string_col_filter(output_dir, data)
+        self._test_int32_col_filter(output_dir, data)
+        self._test_int64_col_filter(output_dir, data)
+
+    def _test_string_col_filter(self, output_dir, data):
+        test_values = []
+        expected_results = []
+        for v in data:
+            test_values.append(v[0])
+            expected_results.append((v[0], True))
+        for v in data:
+            test_values.append(v[1])
+            expected_results.append((v[1], False))
+        cli_output = get_show_filter_cli_output(output_dir, "id", test_values)
+        assert cli_output == get_expected_output(expected_results)
+
+    def _test_int32_col_filter(self, output_dir, data):
+        test_values = []
+        expected_results = []
+        for v in data:
+            test_values.append(v[2])
+            expected_results.append((v[2], True))
+        for v in data:
+            test_values.append(v[3])
+            expected_results.append((v[3], False))
+        cli_output = get_show_filter_cli_output(output_dir, "int32", 
test_values)
+        assert cli_output == get_expected_output(expected_results)
+
+    def _test_int64_col_filter(self, output_dir, data):
+        test_values = []
+        expected_results = []
+        for v in data:
+            test_values.append(v[3])
+            expected_results.append((v[3], True))
+        for v in data:
+            test_values.append(v[2])
+            expected_results.append((v[2], False))
+        cli_output = get_show_filter_cli_output(output_dir, "int64", 
test_values)
+        assert cli_output == get_expected_output(expected_results)
diff --git a/parquet/src/bin/parquet-show-bloom-filter.rs 
b/parquet/src/bin/parquet-show-bloom-filter.rs
index b1b332590a..41e3ac9b52 100644
--- a/parquet/src/bin/parquet-show-bloom-filter.rs
+++ b/parquet/src/bin/parquet-show-bloom-filter.rs
@@ -34,6 +34,9 @@
 //! ```
 
 use clap::Parser;
+use parquet::basic::Type;
+use parquet::bloom_filter::Sbbf;
+use parquet::file::metadata::ColumnChunkMetaData;
 use parquet::file::{
     properties::ReaderProperties,
     reader::{FileReader, SerializedFileReader},
@@ -46,11 +49,11 @@ use std::{fs::File, path::Path};
 struct Args {
     #[clap(help("Path to the parquet file"))]
     file_name: String,
-    #[clap(help("Check the bloom filter indexes for the given column"))]
+    #[clap(help("Check the bloom filter indexes for the given column. Only 
string typed columns or columns with an Int32 or Int64 physical type are 
supported"))]
     column: String,
     #[clap(
         help(
-            "Check if the given values match bloom filter, the values will be 
evaluated as strings"
+            "Check if the given values match bloom filter, the values will be 
parsed to the physical type of the column"
         ),
         required = true
     )]
@@ -78,7 +81,7 @@ fn main() {
     for (ri, row_group) in metadata.row_groups().iter().enumerate() {
         println!("Row group #{ri}");
         println!("{}", "=".repeat(80));
-        if let Some((column_index, _)) = row_group
+        if let Some((column_index, column)) = row_group
             .columns()
             .iter()
             .enumerate()
@@ -89,15 +92,18 @@ fn main() {
                 .expect("Unable to read row group");
             if let Some(sbbf) = 
row_group_reader.get_column_bloom_filter(column_index) {
                 args.values.iter().for_each(|value| {
-                    println!(
-                        "Value {} is {} in bloom filter",
-                        value,
-                        if sbbf.check(&value.as_str()) {
-                            "present"
-                        } else {
-                            "absent"
+                    match check_filter(sbbf, value, column) {
+                        Ok(present) => {
+                            println!(
+                                "Value {} is {} in bloom filter",
+                                value,
+                                if present { "present" } else { "absent" }
+                            )
                         }
-                    )
+                        Err(err) => {
+                            println!("{err}");
+                        }
+                    };
                 });
             } else {
                 println!("No bloom filter found for column {}", args.column);
@@ -116,3 +122,25 @@ fn main() {
         }
     }
 }
+
+fn check_filter(sbbf: &Sbbf, value: &String, column: &ColumnChunkMetaData) -> 
Result<bool, String> {
+    match column.column_type() {
+        Type::INT32 => {
+            let value: i32 = value
+                .parse()
+                .map_err(|e| format!("Unable to parse value '{}' to i32: {}", 
value, e))?;
+            Ok(sbbf.check(&value))
+        }
+        Type::INT64 => {
+            let value: i64 = value
+                .parse()
+                .map_err(|e| format!("Unable to parse value '{}' to i64: {}", 
value, e))?;
+            Ok(sbbf.check(&value))
+        }
+        Type::BYTE_ARRAY => Ok(sbbf.check(&value.as_str())),
+        _ => Err(format!(
+            "Unsupported column type for checking bloom filter: {}",
+            column.column_type()
+        )),
+    }
+}

Reply via email to