[ 
https://issues.apache.org/jira/browse/SPARK-53343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18015267#comment-18015267
 ] 

Parth Chandra commented on SPARK-53343:
---------------------------------------

{quote}AdaptiveSparkPlan isFinalPlan=false
+- == Current Plan ==
   CometProject [id#101L, data#102, date#103, ts#104], [id#101L, data#102, 
date#103, ts#104]
   +- CometBroadcastHashJoin [id#101L, data#102], [id#62L, data#64], Inner, 
BuildRight
      :- CometBatchScan testhadoop.default.table[id#101L, data#102, date#103, 
ts#104] testhadoop.default.table (branch=null) [filters=id IS NOT NULL, data IS 
NOT NULL, groupedBy=] RuntimeFilters: [dynamicpruningexpression(id#101L IN 
dynamicpruning#118),dynamicpruningexpression(data#102 IN dynamicpruning#119)]
      :     :- SubqueryAdaptiveBroadcast dynamicpruning#118, 0, true, Project 
[id#62L, data#64], [id#62L, data#64]
      :     :  +- AdaptiveSparkPlan isFinalPlan=false
      :     :     +- CometProject [id#62L, data#64], [id#62L, data#64]
      :     :        +- CometFilter [id#62L, date#63, data#64], 
(((isnotnull(date#63) AND (date#63 = 1970-01-02)) AND isnotnull(id#62L)) AND 
isnotnull(data#64))
      :     :           +- CometScan [native_iceberg_compat] parquet 
spark_catalog.default.dim[id#62L,date#63,data#64] Batched: true, DataFilters: 
[isnotnull(date#63), (date#63 = 1970-01-02), isnotnull(id#62L), 
isnotnull(data#64)], Format: CometParquet, Location: InMemoryFileIndex(1 
paths)[file:/var/folders/bz/gg_fqnmj4c17j2c7mdn8ps1m0000gn/T/hive170392419582...,
 PartitionFilters: [], PushedFilters: [IsNotNull(date), 
EqualTo(date,1970-01-02), IsNotNull(id), IsNotNull(data)], ReadSchema: 
struct<id:bigint,date:date,data:string>
      :     +- SubqueryAdaptiveBroadcast dynamicpruning#119, 1, true, Project 
[id#62L, data#64], [id#62L, data#64]
      :        +- AdaptiveSparkPlan isFinalPlan=false
      :           +- CometProject [id#62L, data#64], [id#62L, data#64]
      :              +- CometFilter [id#62L, date#63, data#64], 
(((isnotnull(date#63) AND (date#63 = 1970-01-02)) AND isnotnull(id#62L)) AND 
isnotnull(data#64))
      :                 +- CometScan [native_iceberg_compat] parquet 
spark_catalog.default.dim[id#62L,date#63,data#64] Batched: true, DataFilters: 
[isnotnull(date#63), (date#63 = 1970-01-02), isnotnull(id#62L), 
isnotnull(data#64)], Format: CometParquet, Location: InMemoryFileIndex(1 
paths)[file:/var/folders/bz/gg_fqnmj4c17j2c7mdn8ps1m0000gn/T/hive170392419582...,
 PartitionFilters: [], PushedFilters: [IsNotNull(date), 
EqualTo(date,1970-01-02), IsNotNull(id), IsNotNull(data)], ReadSchema: 
struct<id:bigint,date:date,data:string>
      +- BroadcastQueryStage 0
         +- CometBroadcastExchange [id#62L, data#64]
            +- CometProject [id#62L, data#64], [id#62L, data#64]
               +- CometFilter [id#62L, date#63, data#64], (((isnotnull(date#63) 
AND (date#63 = 1970-01-02)) AND isnotnull(id#62L)) AND isnotnull(data#64))
                  +- CometScan [native_iceberg_compat] parquet 
spark_catalog.default.dim[id#62L,date#63,data#64] Batched: true, DataFilters: 
[isnotnull(date#63), (date#63 = 1970-01-02), isnotnull(id#62L), 
isnotnull(data#64)], Format: CometParquet, Location: InMemoryFileIndex(1 
paths)[file:/var/folders/bz/gg_fqnmj4c17j2c7mdn8ps1m0000gn/T/hive170392419582...,
 PartitionFilters: [], PushedFilters: [IsNotNull(date), 
EqualTo(date,1970-01-02), IsNotNull(id), IsNotNull(data)], ReadSchema: 
struct<id:bigint,date:date,data:string>
+- == Initial Plan ==
   CometProject [id#101L, data#102, date#103, ts#104], [id#101L, data#102, 
date#103, ts#104]
   +- CometBroadcastHashJoin [id#101L, data#102], [id#62L, data#64], Inner, 
BuildRight
      :- CometBatchScan testhadoop.default.table[id#101L, data#102, date#103, 
ts#104] testhadoop.default.table (branch=null) [filters=id IS NOT NULL, data IS 
NOT NULL, groupedBy=] RuntimeFilters: [dynamicpruningexpression(id#101L IN 
dynamicpruning#118),dynamicpruningexpression(data#102 IN dynamicpruning#119)]
      :     :- SubqueryAdaptiveBroadcast dynamicpruning#118, 0, true, Project 
[id#62L, data#64], [id#62L, data#64]
      :     :  +- AdaptiveSparkPlan isFinalPlan=false
      :     :     +- CometProject [id#62L, data#64], [id#62L, data#64]
      :     :        +- CometFilter [id#62L, date#63, data#64], 
(((isnotnull(date#63) AND (date#63 = 1970-01-02)) AND isnotnull(id#62L)) AND 
isnotnull(data#64))
      :     :           +- CometScan [native_iceberg_compat] parquet 
spark_catalog.default.dim[id#62L,date#63,data#64] Batched: true, DataFilters: 
[isnotnull(date#63), (date#63 = 1970-01-02), isnotnull(id#62L), 
isnotnull(data#64)], Format: CometParquet, Location: InMemoryFileIndex(1 
paths)[file:/var/folders/bz/gg_fqnmj4c17j2c7mdn8ps1m0000gn/T/hive170392419582...,
 PartitionFilters: [], PushedFilters: [IsNotNull(date), 
EqualTo(date,1970-01-02), IsNotNull(id), IsNotNull(data)], ReadSchema: 
struct<id:bigint,date:date,data:string>
      :     +- SubqueryAdaptiveBroadcast dynamicpruning#119, 1, true, Project 
[id#62L, data#64], [id#62L, data#64]
      :        +- AdaptiveSparkPlan isFinalPlan=false
      :           +- CometProject [id#62L, data#64], [id#62L, data#64]
      :              +- CometFilter [id#62L, date#63, data#64], 
(((isnotnull(date#63) AND (date#63 = 1970-01-02)) AND isnotnull(id#62L)) AND 
isnotnull(data#64))
      :                 +- CometScan [native_iceberg_compat] parquet 
spark_catalog.default.dim[id#62L,date#63,data#64] Batched: true, DataFilters: 
[isnotnull(date#63), (date#63 = 1970-01-02), isnotnull(id#62L), 
isnotnull(data#64)], Format: CometParquet, Location: InMemoryFileIndex(1 
paths)[file:/var/folders/bz/gg_fqnmj4c17j2c7mdn8ps1m0000gn/T/hive170392419582...,
 PartitionFilters: [], PushedFilters: [IsNotNull(date), 
EqualTo(date,1970-01-02), IsNotNull(id), IsNotNull(data)], ReadSchema: 
struct<id:bigint,date:date,data:string>
      +- CometBroadcastExchange [id#62L, data#64]
         +- CometProject [id#62L, data#64], [id#62L, data#64]
            +- CometFilter [id#62L, date#63, data#64], (((isnotnull(date#63) 
AND (date#63 = 1970-01-02)) AND isnotnull(id#62L)) AND isnotnull(data#64))
               +- CometScan [native_iceberg_compat] parquet 
spark_catalog.default.dim[id#62L,date#63,data#64] Batched: true, DataFilters: 
[isnotnull(date#63), (date#63 = 1970-01-02), isnotnull(id#62L), 
isnotnull(data#64)], Format: CometParquet, Location: InMemoryFileIndex(1 
paths)[file:/var/folders/bz/gg_fqnmj4c17j2c7mdn8ps1m0000gn/T/hive170392419582...,
 PartitionFilters: [], PushedFilters: [IsNotNull(date), 
EqualTo(date,1970-01-02), IsNotNull(id), IsNotNull(data)], ReadSchema: 
struct<id:bigint,date:date,data:string>
{quote}

> Dynamic pruning filters get removed with custom BroadcastHashJoin 
> implementations
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-53343
>                 URL: https://issues.apache.org/jira/browse/SPARK-53343
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.5.6
>            Reporter: Parth Chandra
>            Priority: Major
>
> The Iceberg unit test 
> [TestRuntimeFiltering.testMultipleRuntimeFilters|https://github.com/apache/iceberg/blob/07c088fce9c54369864dcb6da16006e78206048b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestRuntimeFiltering.java#L203]
>  creates a runtime filter that will skip reading some of the files in the 
> dataset. The test _deletes_ those files so that the test fails if the runtime 
> filtering still tries to read those files.
> When running with Comet, this test fails because the dynamic filter 
> expression gets erased.
> This happens in 
> [PlanAdaptiveDynamicPruningFilters|https://github.com/apache/spark/blob/5660dbadf90ed08faef6dc883fd98f55b098e96a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala#L54]
>  where we explicitly check for a BroadcastHashJoin in canReuseExchange. 
> Because we enable Comet, the BroadcastHashJoin has been replaced by a 
> CometBroadcastHashJoin which fails the check and the pruning expression is 
> set to Literal.TrueLiteral
> Is there a way around this apart from enhancing Spark to allow a 
> BroadcastHashJoinLike trait? What would a custom implementation need to have 
> to allow this dynamic pruning to happen correctly?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to