chiragjn opened a new issue, #17221: URL: https://github.com/apache/datafusion/issues/17221
### Describe the bug Continuing from the thread at https://discord.com/channels/885562378132000778/1402989687324414046 We are trying to add bloom filters to values of a Map field and while we do it see it available in the parquet file, we don't see it getting used while querying ### To Reproduce ```rust use arrow::array::{ArrayRef, MapArray, StringArray, StructArray}; use arrow::buffer::OffsetBuffer; use arrow::datatypes::{DataType, Field, Fields, Schema}; use arrow::record_batch::RecordBatch; use datafusion::prelude::*; use parquet::arrow::arrow_writer::ArrowWriter; use parquet::basic::Compression; use parquet::file::properties::{EnabledStatistics, WriterProperties}; use std::collections::HashMap; use std::fs::File; use std::sync::Arc; use tempfile::TempDir; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { println!("Creating parquet file with bloom filters..."); let temp_dir = TempDir::new()?; let parquet_path = temp_dir.path().join("bloom_filter_data.parquet"); // Create sample data with repetitive patterns to test bloom filter efficiency let mut ids = Vec::<String>::new(); let mut maps = Vec::new(); // Generate data with specific patterns for i in 0..10000 { ids.push(i.to_string()); maps.push(HashMap::from([( "key_1".to_string(), format!("value_{}", i), )])); } let ids_array = Arc::new(StringArray::from(ids)); let mut keys = Vec::new(); let mut values = Vec::new(); let mut lens = Vec::new(); for map in maps { keys.extend(map.keys().cloned().collect::<Vec<_>>()); values.extend(map.values().cloned().collect::<Vec<_>>()); lens.push(map.len()); } let keys_array = Arc::new(StringArray::from(keys)); let values_array = Arc::new(StringArray::from(values)); let entries = StructArray::from(vec![ ( Arc::new(Field::new("key", DataType::Utf8, false)), keys_array as ArrayRef, ), ( Arc::new(Field::new("value", DataType::Utf8, true)), values_array as ArrayRef, ), ]); let field = Arc::new(Field::new( "key_value", DataType::Struct(Fields::from(vec![ Field::new("key", DataType::Utf8, false), Field::new("value", DataType::Utf8, true), ])), false, )); let map_column = Arc::new(MapArray::try_new( field, OffsetBuffer::from_lengths(lens), entries, None, false, )?); let schema = Arc::new(Schema::new(vec![ Field::new("id", DataType::Utf8, false), Field::new_map( "SpanAttributesString", "key_value", Arc::new(Field::new("key", DataType::Utf8, false)), Arc::new(Field::new("value", DataType::Utf8, true)), false, // keys are not sorted true, // map itself is nullable ), ])); let record_batch = RecordBatch::try_new(schema.clone(), vec![ids_array, map_column])?; let file = File::create(&parquet_path)?; let id_col_path_value = parquet::schema::types::ColumnPath::from(vec!["id".to_string()]); let map_col_path_value = parquet::schema::types::ColumnPath::from(vec![ "SpanAttributesString".to_string(), "key_value".to_string(), "value".to_string(), ]); let writer_properties = WriterProperties::builder() .set_compression(Compression::LZ4_RAW) .set_statistics_enabled(EnabledStatistics::Chunk) .set_column_bloom_filter_enabled(id_col_path_value.clone(), true) .set_column_bloom_filter_ndv(id_col_path_value.clone(), record_batch.num_rows() as u64) .set_column_bloom_filter_fpp(id_col_path_value, 0.01) .set_column_bloom_filter_enabled(map_col_path_value.clone(), true) .set_column_bloom_filter_ndv(map_col_path_value.clone(), record_batch.num_rows() as u64) .set_column_bloom_filter_fpp(map_col_path_value, 0.01) .build(); let mut writer = ArrowWriter::try_new(file, schema, Some(writer_properties))?; writer.write(&record_batch)?; writer.close()?; println!("Parquet file created at: {parquet_path:?}"); println!("Bloom filters enabled for columns"); let ctx = SessionContext::new_with_config(SessionConfig::from_env()?); ctx.register_parquet( "test_data", parquet_path.to_str().unwrap(), ParquetReadOptions::default(), ) .await?; println!("\nSample data:"); let df = ctx .sql("SELECT \"id\", \"SpanAttributesString\" FROM test_data LIMIT 10") .await?; df.show().await?; println!("\n=== First Query: Test bloom filter for id ==="); let query1 = " EXPLAIN ANALYZE SELECT * FROM test_data WHERE \"id\" = '10' "; let result1 = ctx.sql(query1).await?; result1.show().await?; println!("\n=== Second Query: Test bloom filter for map ==="); let query2 = " EXPLAIN ANALYZE SELECT * FROM test_data WHERE array_contains(map_values(\"SpanAttributesString\"), 'value_7') "; let result2 = ctx.sql(query2).await?; result2.show().await?; println!("\n=== Third Query: Test bloom filter for map ==="); let query3 = " EXPLAIN ANALYZE SELECT * FROM test_data WHERE array_has_any(map_values(\"SpanAttributesString\"), ['value_7', 'value_5']) "; let result3 = ctx.sql(query3).await?; result3.show().await?; Ok(()) } ``` ``` Creating parquet file with bloom filters... Parquet file created at: "/var/folders/qw/rd2m1w2x4z7ffl2c25ys5gw80000gn/T/.tmp9pRpzS/bloom_filter_data.parquet" Bloom filters enabled for columns Sample data: +----+----------------------+ | id | SpanAttributesString | +----+----------------------+ | 0 | {key_1: value_0} | | 1 | {key_1: value_1} | | 2 | {key_1: value_2} | | 3 | {key_1: value_3} | | 4 | {key_1: value_4} | | 5 | {key_1: value_5} | | 6 | {key_1: value_6} | | 7 | {key_1: value_7} | | 8 | {key_1: value_8} | | 9 | {key_1: value_9} | +----+----------------------+ === First Query: Test bloom filter for id| plan_type | plan || Plan with Metrics | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=67.082µs] | | | FilterExec: id@0 = 10, metrics=[output_rows=1, elapsed_compute=878.756µs] | | | RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, metrics=[fetch_time=15.367917ms, repartition_time=1ns, send_time=57.757µs] | | | DataSourceExec: file_groups={1 group: [[var/folders/qw/rd2m1w2x4z7ffl2c25ys5gw80000gn/T/.tmp9pRpzS/bloom_filter_data.parquet]]}, projection=[id, SpanAttributesString], file_type=parquet, predicate=id@0 = 10, pruning_predicate=id_null_count@2 != row_count@3 AND id_min@0 <= 10 AND 10 <= id_max@1, required_guarantees=[id in (10)] | | | , metrics=[output_rows=10000, elapsed_compute=1ns, batches_splitted=0, bytes_scanned=133931, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=10000, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=1, row_groups_matched_statistics=1, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=702.501µs, metadata_load_time=1.357709ms, page_index_eval_time=179.96µs, row_pushdown_eval_time=2ns, statistics_eval_time=162.126µs, time_elapsed_opening=4.185041ms, time_elapsed_processing=14.827709ms, time_elapsed_scanning_total=11.203708ms, time_elapsed_scanning_until_data=10.098417ms] | | | |econd Query: Test bloom filter for map| plan_type | plan || Plan with Metrics | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=23.291µs] | | | FilterExec: array_has(map_values(SpanAttributesString@1), value_7), metrics=[output_rows=1, elapsed_compute=4.172256ms] | | | RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, metrics=[fetch_time=12.015292ms, repartition_time=1ns, send_time=31.59µs] | | | DataSourceExec: file_groups={1 group: [[var/folders/qw/rd2m1w2x4z7ffl2c25ys5gw80000gn/T/.tmp9pRpzS/bloom_filter_data.parquet]]}, projection=[id, SpanAttributesString], file_type=parquet, metrics=[output_rows=10000, elapsed_compute=1ns, batches_splitted=0, bytes_scanned=117475, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=0, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=0, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=2ns, metadata_load_time=479.959µs, page_index_eval_time=2ns, row_pushdown_eval_time=2ns, statistics_eval_time=2ns, time_elapsed_opening=548µs, time_elapsed_processing=11.696209ms, time_elapsed_scanning_total=11.457166ms, time_elapsed_scanning_until_data=10.287958ms] | | | | +-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ === Third Query: Test bloom filter for map| plan_type | plan || Plan with Metrics | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=2, elapsed_compute=34.083µs] | | | FilterExec: array_has_any(map_values(SpanAttributesString@1), [value_7, value_5]), metrics=[output_rows=2, elapsed_compute=23.428965ms] | | | RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, metrics=[fetch_time=11.922374ms, repartition_time=1ns, send_time=36.423µs] | | | DataSourceExec: file_groups={1 group: [[var/folders/qw/rd2m1w2x4z7ffl2c25ys5gw80000gn/T/.tmp9pRpzS/bloom_filter_data.parquet]]}, projection=[id, SpanAttributesString], file_type=parquet, metrics=[output_rows=10000, elapsed_compute=1ns, batches_splitted=0, bytes_scanned=117475, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=0, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=0, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=2ns, metadata_load_time=436.167µs, page_index_eval_time=2ns, row_pushdown_eval_time=2ns, statistics_eval_time=2ns, time_elapsed_opening=503.917µs, time_elapsed_processing=11.706666ms, time_elapsed_scanning_total=11.412208ms, time_elapsed_scanning_until_data=10.205833ms] | | | |``` ### Expected behavior _No response_ ### Additional context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org