mbutrovich opened a new pull request, #4722:
URL: https://github.com/apache/datafusion-comet/pull/4722

   ## Which issue does this PR close?
   
   Closes #.
   
   ## Rationale for this change
   
   While trying to understand performance regressions in TPC-DS with DF 54.0, I 
noticed that filters that Spark's optimizer pushes to a Comet native scan 
currently never reach DataFusion's `ParquetSource` under the default config. 
This was misleading because the filters show up in the EXPLAIN/Spark UI (note 
they match the associated CometFilter expressions):
   
   ```
   (1) CometNativeScan parquet spark_catalog.default.store_sales
   Output [5]: [ss_quantity#569, ss_wholesale_cost#570, ss_list_price#571, 
ss_coupon_amt#578, ss_sold_date_sk#582]
   Batched: true
   Location: InMemoryFileIndex [s3a://.../tpcds-parquet/.../store_sales]
   PushedFilters: [IsNotNull(ss_quantity), GreaterThanOrEqual(ss_quantity,0), 
LessThanOrEqual(ss_quantity,5), 
Or(Or(And(GreaterThanOrEqual(ss_list_price,8.00),LessThanOrEqual(ss_list_price,18.00)),And(GreaterThanOrEqual(ss_coupon_amt,459.00),LessThanOrEqual(ss_coupon_amt,1459.00))),And(GreaterThanOrEqual(ss_wholesale_cost,57.00),LessThanOrEqual(ss_wholesale_cost,77.00)))]
   ReadSchema: 
struct<ss_quantity:int,ss_wholesale_cost:decimal(7,2),ss_list_price:decimal(7,2),ss_coupon_amt:decimal(7,2)>
   
   (2) CometFilter
   Input [5]: [ss_quantity#569, ss_wholesale_cost#570, ss_list_price#571, 
ss_coupon_amt#578, ss_sold_date_sk#582]
   Condition : (((isnotnull(ss_quantity#569) AND (ss_quantity#569 >= 0)) AND 
(ss_quantity#569 <= 5)) AND ((((ss_list_price#571 >= 8.00) AND 
(ss_list_price#571 <= 18.00)) OR ((ss_coupon_amt#578 >= 459.00) AND 
(ss_coupon_amt#578 <= 1459.00))) OR ((ss_wholesale_cost#570 >= 57.00) AND 
(ss_wholesale_cost#570 <= 77.00))))
   
   ```
   
   But when you look in the Spark UI, you notice the CometFilter outputting far 
fewer rows, which should have been caught as scan time:
   
   <img width="769" height="882" alt="Screenshot 2026-06-24 at 2 54 23 PM" 
src="https://github.com/user-attachments/assets/34083cfd-48e7-4b82-86c3-9ee85dfde27d";
 />
   
   That's a ton of I/O that likely didn't need to happen.
   
   A note on terminology: Spark and most engines use "filter pushdown" to mean 
evaluating filters at the scan, typically against Parquet metadata (row group 
statistics, page index, bloom filters) so that whole row groups or pages can be 
skipped without decoding. DataFusion's `pushdown_filters` config means 
something narrower and stronger: in addition to metadata-based pruning, the 
parquet reader evaluates the predicate on each decoded row from the filter 
columns first, builds a selection mask, and then lazily materializes the 
remaining projected columns only for surviving rows. Format-level pruning runs 
whenever a predicate is attached to the source. The `pushdown_filters` flag 
only controls whether row-level evaluation and late materialization also run.
   
   The serde for `CometNativeScan` gates its serialization of 
`scan.supportedDataFilters` on `spark.comet.parquet.respectFilterPushdown` 
(added in [#1936](https://github.com/apache/datafusion-comet/pull/1936)), which 
defaults to `false`. With the default, the protobuf message crosses JNI with an 
empty `data_filters` list, so the native side constructs the `ParquetSource` 
with no predicate.
   
   Consequences of having no predicate on the source:
   
   - Row-group statistics filtering does not run.
   - Parquet page index filtering does not run.
   - Bloom filter pruning does not run.
   - Row-level `RowFilter` evaluation has nothing to evaluate.
   - `CometFilter` above the scan does the full row-level reduction on the 
decoded Arrow batches.
   
   The hardcoded `table_parquet_options.global.pushdown_filters = true` in 
`parquet_exec.rs` is therefore dead in the default config: enabling row-level 
eval has no effect when no predicate is attached.
   
   The Spark plan's `PushedFilters: [...]` and `DataFilters: [...]` come from 
the Scala `CometScanExec` operator's fields, which Spark's optimizer populates. 
They reflect what Spark planned, not what crossed JNI.
   
   ## What changes are included in this PR?
   
   - `CometNativeScan.scala`: unconditionally serialize 
`scan.supportedDataFilters` into the protobuf. The serde no longer consults any 
config. When Spark's optimizer did not push filters (e.g. because 
`spark.sql.parquet.filterPushdown=false`), `supportedDataFilters` is empty and 
the loop is a no-op.
   - `parquet_exec.rs`: remove the hardcoded 
`table_parquet_options.global.pushdown_filters = true` and `reorder_filters = 
true` lines. Both now default to `false` from DataFusion. Users who want 
row-level `RowFilter` evaluation set 
`spark.comet.datafusion.execution.parquet.pushdown_filters=true` (and 
optionally `...reorder_filters=true`), which the existing passthrough in 
`jni_api.rs` forwards to `ConfigOptions`. DataFusion's `try_pushdown_filters` 
ORs the table-level and session-level flags.
   - `CometConf.scala`: remove `COMET_RESPECT_PARQUET_FILTER_PUSHDOWN`.
   - `CometTestBase.scala`: remove the line that set the deleted config.
   - `dev/diffs/{3.4.3,3.5.8,4.0.2,4.1.2}.diff`: regenerated against their 
respective Spark tags. The lines that set the deleted config are removed from 
`SharedSparkSession.scala` patches.
   
   ## Result by config
   
   | Spark `filterPushdown` | DF passthrough `pushdown_filters` | Predicate on 
scan | Format-level pruning | Row-level RowFilter eval |
   | --- | --- | --- | --- | --- |
   | true (default) | unset / false (default) | yes | yes | no |
   | true | true | yes | yes | yes |
   | false | any | no | no | no |
   
   The first row is the change in default behavior introduced by this PR. 
Format-level pruning was previously unreachable in the default config and is 
now active. Row-level evaluation remains opt-in.
   
   ## How are these changes tested?
   
   Existing tests. I will run TPC-DS at scale.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to