chiragjn opened a new issue, #17221:
URL: https://github.com/apache/datafusion/issues/17221

   ### Describe the bug
   
   Continuing from the thread at 
https://discord.com/channels/885562378132000778/1402989687324414046
   We are trying to add bloom filters to values of a Map field and while we do 
it see it available in the parquet file, we don't see it getting used while 
querying
   
   ### To Reproduce
   
   ```rust
   use arrow::array::{ArrayRef, MapArray, StringArray, StructArray};
   use arrow::buffer::OffsetBuffer;
   use arrow::datatypes::{DataType, Field, Fields, Schema};
   use arrow::record_batch::RecordBatch;
   use datafusion::prelude::*;
   use parquet::arrow::arrow_writer::ArrowWriter;
   use parquet::basic::Compression;
   use parquet::file::properties::{EnabledStatistics, WriterProperties};
   use std::collections::HashMap;
   use std::fs::File;
   use std::sync::Arc;
   use tempfile::TempDir;
   
   #[tokio::main]
   async fn main() -> Result<(), Box<dyn std::error::Error>> {
       println!("Creating parquet file with bloom filters...");
   
       let temp_dir = TempDir::new()?;
       let parquet_path = temp_dir.path().join("bloom_filter_data.parquet");
   
       // Create sample data with repetitive patterns to test bloom filter 
efficiency
       let mut ids = Vec::<String>::new();
       let mut maps = Vec::new();
   
       // Generate data with specific patterns
       for i in 0..10000 {
           ids.push(i.to_string());
           maps.push(HashMap::from([(
               "key_1".to_string(),
               format!("value_{}", i),
           )]));
       }
   
       let ids_array = Arc::new(StringArray::from(ids));
       let mut keys = Vec::new();
       let mut values = Vec::new();
       let mut lens = Vec::new();
   
       for map in maps {
           keys.extend(map.keys().cloned().collect::<Vec<_>>());
           values.extend(map.values().cloned().collect::<Vec<_>>());
           lens.push(map.len());
       }
   
       let keys_array = Arc::new(StringArray::from(keys));
       let values_array = Arc::new(StringArray::from(values));
       let entries = StructArray::from(vec![
           (
               Arc::new(Field::new("key", DataType::Utf8, false)),
               keys_array as ArrayRef,
           ),
           (
               Arc::new(Field::new("value", DataType::Utf8, true)),
               values_array as ArrayRef,
           ),
       ]);
   
       let field = Arc::new(Field::new(
           "key_value",
           DataType::Struct(Fields::from(vec![
               Field::new("key", DataType::Utf8, false),
               Field::new("value", DataType::Utf8, true),
           ])),
           false,
       ));
   
       let map_column = Arc::new(MapArray::try_new(
           field,
           OffsetBuffer::from_lengths(lens),
           entries,
           None,
           false,
       )?);
   
       let schema = Arc::new(Schema::new(vec![
           Field::new("id", DataType::Utf8, false),
           Field::new_map(
               "SpanAttributesString",
               "key_value",
               Arc::new(Field::new("key", DataType::Utf8, false)),
               Arc::new(Field::new("value", DataType::Utf8, true)),
               false, // keys are not sorted
               true,  // map itself is nullable
           ),
       ]));
   
       let record_batch = RecordBatch::try_new(schema.clone(), vec![ids_array, 
map_column])?;
   
       let file = File::create(&parquet_path)?;
   
       let id_col_path_value = 
parquet::schema::types::ColumnPath::from(vec!["id".to_string()]);
       let map_col_path_value = parquet::schema::types::ColumnPath::from(vec![
           "SpanAttributesString".to_string(),
           "key_value".to_string(),
           "value".to_string(),
       ]);
   
       let writer_properties = WriterProperties::builder()
           .set_compression(Compression::LZ4_RAW)
           .set_statistics_enabled(EnabledStatistics::Chunk)
           .set_column_bloom_filter_enabled(id_col_path_value.clone(), true)
           .set_column_bloom_filter_ndv(id_col_path_value.clone(), 
record_batch.num_rows() as u64)
           .set_column_bloom_filter_fpp(id_col_path_value, 0.01)
           .set_column_bloom_filter_enabled(map_col_path_value.clone(), true)
           .set_column_bloom_filter_ndv(map_col_path_value.clone(), 
record_batch.num_rows() as u64)
           .set_column_bloom_filter_fpp(map_col_path_value, 0.01)
           .build();
   
       let mut writer = ArrowWriter::try_new(file, schema, 
Some(writer_properties))?;
       writer.write(&record_batch)?;
       writer.close()?;
   
       println!("Parquet file created at: {parquet_path:?}");
       println!("Bloom filters enabled for columns");
   
       let ctx = SessionContext::new_with_config(SessionConfig::from_env()?);
       ctx.register_parquet(
           "test_data",
           parquet_path.to_str().unwrap(),
           ParquetReadOptions::default(),
       )
       .await?;
   
       println!("\nSample data:");
       let df = ctx
           .sql("SELECT \"id\", \"SpanAttributesString\" FROM test_data LIMIT 
10")
           .await?;
       df.show().await?;
   
       println!("\n=== First Query: Test bloom filter for id ===");
       let query1 = "
           EXPLAIN ANALYZE
           SELECT *
           FROM test_data
           WHERE \"id\" = '10'
       ";
       let result1 = ctx.sql(query1).await?;
       result1.show().await?;
   
       println!("\n=== Second Query: Test bloom filter for map ===");
       let query2 = "
           EXPLAIN ANALYZE
           SELECT *
           FROM test_data
           WHERE
           array_contains(map_values(\"SpanAttributesString\"), 'value_7')
       ";
       let result2 = ctx.sql(query2).await?;
       result2.show().await?;
   
       println!("\n=== Third Query: Test bloom filter for map ===");
       let query3 = "
           EXPLAIN ANALYZE
           SELECT *
           FROM test_data
           WHERE
           array_has_any(map_values(\"SpanAttributesString\"), ['value_7', 
'value_5'])
       ";
   
       let result3 = ctx.sql(query3).await?;
       result3.show().await?;
   
       Ok(())
   }
   
   ```
   
   
   ```
   Creating parquet file with bloom filters...
   Parquet file created at: 
"/var/folders/qw/rd2m1w2x4z7ffl2c25ys5gw80000gn/T/.tmp9pRpzS/bloom_filter_data.parquet"
   Bloom filters enabled for columns
   
   Sample data:
   +----+----------------------+
   | id | SpanAttributesString |
   +----+----------------------+
   | 0  | {key_1: value_0}     |
   | 1  | {key_1: value_1}     |
   | 2  | {key_1: value_2}     |
   | 3  | {key_1: value_3}     |
   | 4  | {key_1: value_4}     |
   | 5  | {key_1: value_5}     |
   | 6  | {key_1: value_6}     |
   | 7  | {key_1: value_7}     |
   | 8  | {key_1: value_8}     |
   | 9  | {key_1: value_9}     |
   +----+----------------------+
   
   === First Query: Test bloom filter for id ===
   

   | plan_type         | plan                                                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
         |
   
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | Plan with Metrics | CoalesceBatchesExec: target_batch_size=8192, 
metrics=[output_rows=1, elapsed_compute=67.082µs]                               
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                   |
   |                   |   FilterExec: id@0 = 10, metrics=[output_rows=1, 
elapsed_compute=878.756µs]                                                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
               |
   |                   |     RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1, metrics=[fetch_time=15.367917ms, repartition_time=1ns, 
send_time=57.757µs]                                                             
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
               |
   |                   |       DataSourceExec: file_groups={1 group: 
[[var/folders/qw/rd2m1w2x4z7ffl2c25ys5gw80000gn/T/.tmp9pRpzS/bloom_filter_data.parquet]]},
 projection=[id, SpanAttributesString], file_type=parquet, predicate=id@0 = 10, 
pruning_predicate=id_null_count@2 != row_count@3 AND id_min@0 <= 10 AND 10 <= 
id_max@1, required_guarantees=[id in (10)]                                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
            |
   |                   | , metrics=[output_rows=10000, elapsed_compute=1ns, 
batches_splitted=0, bytes_scanned=133931, file_open_errors=0, 
file_scan_errors=0, files_ranges_pruned_statistics=0, 
num_predicate_creation_errors=0, page_index_rows_matched=10000, 
page_index_rows_pruned=0, predicate_evaluation_errors=0, 
pushdown_rows_matched=0, pushdown_rows_pruned=0, 
row_groups_matched_bloom_filter=1, row_groups_matched_statistics=1, 
row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, 
bloom_filter_eval_time=702.501µs, metadata_load_time=1.357709ms, 
page_index_eval_time=179.96µs, row_pushdown_eval_time=2ns, 
statistics_eval_time=162.126µs, time_elapsed_opening=4.185041ms, 
time_elapsed_processing=14.827709ms, time_elapsed_scanning_total=11.203708ms, 
time_elapsed_scanning_until_data=10.098417ms] |
   |                   |                                                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
         |
   

   
   === Second Query: Test bloom filter for map ===
   

   | plan_type         | plan                                                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
             |
   

   | Plan with Metrics | CoalesceBatchesExec: target_batch_size=8192, 
metrics=[output_rows=1, elapsed_compute=23.291µs]                               
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                       |
   |                   |   FilterExec: 
array_has(map_values(SpanAttributesString@1), value_7), metrics=[output_rows=1, 
elapsed_compute=4.172256ms]                                                     
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                      |
   |                   |     RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1, metrics=[fetch_time=12.015292ms, repartition_time=1ns, 
send_time=31.59µs]                                                              
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                   |
   |                   |       DataSourceExec: file_groups={1 group: 
[[var/folders/qw/rd2m1w2x4z7ffl2c25ys5gw80000gn/T/.tmp9pRpzS/bloom_filter_data.parquet]]},
 projection=[id, SpanAttributesString], file_type=parquet, 
metrics=[output_rows=10000, elapsed_compute=1ns, batches_splitted=0, 
bytes_scanned=117475, file_open_errors=0, file_scan_errors=0, 
files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, 
page_index_rows_matched=0, page_index_rows_pruned=0, 
predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, 
row_groups_matched_bloom_filter=0, row_groups_matched_statistics=0, 
row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, 
bloom_filter_eval_time=2ns, metadata_load_time=479.959µs, 
page_index_eval_time=2ns, row_pushdown_eval_time=2ns, statistics_eval_time=2ns, 
time_elapsed_opening=548µs, time_elapsed_processing=11.696209ms, 
time_elapsed_scanning_total=11.457166ms, 
time_elapsed_scanning_until_data=10.287958ms] |
   |                   |                                                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
             |
   

   
   === Third Query: Test bloom filter for map ===
   
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type         | plan                                                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                 |
   

   | Plan with Metrics | CoalesceBatchesExec: target_batch_size=8192, 
metrics=[output_rows=2, elapsed_compute=34.083µs]                               
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                           |
   |                   |   FilterExec: 
array_has_any(map_values(SpanAttributesString@1), [value_7, value_5]), 
metrics=[output_rows=2, elapsed_compute=23.428965ms]                            
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                   |
   |                   |     RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1, metrics=[fetch_time=11.922374ms, repartition_time=1ns, 
send_time=36.423µs]                                                             
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                       |
   |                   |       DataSourceExec: file_groups={1 group: 
[[var/folders/qw/rd2m1w2x4z7ffl2c25ys5gw80000gn/T/.tmp9pRpzS/bloom_filter_data.parquet]]},
 projection=[id, SpanAttributesString], file_type=parquet, 
metrics=[output_rows=10000, elapsed_compute=1ns, batches_splitted=0, 
bytes_scanned=117475, file_open_errors=0, file_scan_errors=0, 
files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, 
page_index_rows_matched=0, page_index_rows_pruned=0, 
predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, 
row_groups_matched_bloom_filter=0, row_groups_matched_statistics=0, 
row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, 
bloom_filter_eval_time=2ns, metadata_load_time=436.167µs, 
page_index_eval_time=2ns, row_pushdown_eval_time=2ns, statistics_eval_time=2ns, 
time_elapsed_opening=503.917µs, time_elapsed_processing=11.706666ms, 
time_elapsed_scanning_total=11.412208ms, 
time_elapsed_scanning_until_data=10.205833ms] |
   |                   |                                                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                 |
   

   ```
   
   ### Expected behavior
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to