mbutrovich opened a new pull request, #4812: URL: https://github.com/apache/datafusion-comet/pull/4812
## Which issue does this PR close? Closes #4774. ## Rationale for this change Comet silently produces wrong results when the same Iceberg table is scanned more than once with different pushed-down filters (for example a `FULL OUTER JOIN` or `UNION ALL` of two differently-filtered reads). Spark's non-AQE `ReuseExchangeAndSubquery` keys exchange reuse off `Exchange.canonicalized`, which delegates to the scan's canonical form. `CometIcebergNativeScanExec.equals`/`hashCode`/`doCanonicalize` identified a scan by `metadataLocation`, `output`, `serializedPlanOpt`, and `runtimeFilters` only. None of these carry the pushed static filters, which live in the `@transient originalPlan` that canonicalization nulls out. Two scans of the same table+snapshot with different filters therefore canonicalize identically, and reuse collapses them into one, so one branch reads the other branch's data. Vanilla Spark avoids this because `BatchScanExec.equals` compares `scan.toBatch`, and Iceberg's `SparkBatch.equals` compares `table.name()` plus `SparkScan.hashCode()`, which folds in pushed filters, snapshot, branch, and read schema. Spark also keeps the `scan` reference alive through `doCanonicalize`, so the fingerprint survives. Comet's V1 Parquet scan (`CometNativeScanExec`) is not affected because its filters are top-level `partitionFilters`/`dataFilters` fields that are already in equality and preserved by canonicalization. ## What changes are included in this PR? - Add a non-transient `scanHashCode: Int` field to `CometIcebergNativeScanExec`, captured from `scanExec.scan.hashCode()` in the `apply` factory while the transient scan is still available. - Include `scanHashCode` in `equals` and `hashCode`, and carry it through `doCanonicalize` (it is the only distinguishing field left once `originalPlan` is nulled) and `convertBlock`. ## How are these changes tested? New test in `CometIcebergNativeSuite`: "exchange reuse must not collapse scans with different pushed filters (#4774)". It runs a `UNION ALL` of two aggregations over the same partitioned Iceberg table with different partition filters under non-AQE with `spark.sql.exchange.reuse=true`. It fails on `main` (branch B reuses branch A's exchange, yielding A twice and dropping B) and passes with this change. Beyond `checkSparkAnswerAndOperator`, it asserts both `CometIcebergNativeScanExec` nodes survive and that no `ReusedExchangeExec` collapsed the branches. The existing AQE DPP broadcast-reuse tests continue to pass, confirming legitimate reuse (identical scans still share an exchange) is preserved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
