[ https://issues.apache.org/jira/browse/SPARK-53343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18015267#comment-18015267 ]
Parth Chandra commented on SPARK-53343: --------------------------------------- {quote}AdaptiveSparkPlan isFinalPlan=false +- == Current Plan == CometProject [id#101L, data#102, date#103, ts#104], [id#101L, data#102, date#103, ts#104] +- CometBroadcastHashJoin [id#101L, data#102], [id#62L, data#64], Inner, BuildRight :- CometBatchScan testhadoop.default.table[id#101L, data#102, date#103, ts#104] testhadoop.default.table (branch=null) [filters=id IS NOT NULL, data IS NOT NULL, groupedBy=] RuntimeFilters: [dynamicpruningexpression(id#101L IN dynamicpruning#118),dynamicpruningexpression(data#102 IN dynamicpruning#119)] : :- SubqueryAdaptiveBroadcast dynamicpruning#118, 0, true, Project [id#62L, data#64], [id#62L, data#64] : : +- AdaptiveSparkPlan isFinalPlan=false : : +- CometProject [id#62L, data#64], [id#62L, data#64] : : +- CometFilter [id#62L, date#63, data#64], (((isnotnull(date#63) AND (date#63 = 1970-01-02)) AND isnotnull(id#62L)) AND isnotnull(data#64)) : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.dim[id#62L,date#63,data#64] Batched: true, DataFilters: [isnotnull(date#63), (date#63 = 1970-01-02), isnotnull(id#62L), isnotnull(data#64)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/var/folders/bz/gg_fqnmj4c17j2c7mdn8ps1m0000gn/T/hive170392419582..., PartitionFilters: [], PushedFilters: [IsNotNull(date), EqualTo(date,1970-01-02), IsNotNull(id), IsNotNull(data)], ReadSchema: struct<id:bigint,date:date,data:string> : +- SubqueryAdaptiveBroadcast dynamicpruning#119, 1, true, Project [id#62L, data#64], [id#62L, data#64] : +- AdaptiveSparkPlan isFinalPlan=false : +- CometProject [id#62L, data#64], [id#62L, data#64] : +- CometFilter [id#62L, date#63, data#64], (((isnotnull(date#63) AND (date#63 = 1970-01-02)) AND isnotnull(id#62L)) AND isnotnull(data#64)) : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.dim[id#62L,date#63,data#64] Batched: true, DataFilters: [isnotnull(date#63), (date#63 = 1970-01-02), isnotnull(id#62L), isnotnull(data#64)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/var/folders/bz/gg_fqnmj4c17j2c7mdn8ps1m0000gn/T/hive170392419582..., PartitionFilters: [], PushedFilters: [IsNotNull(date), EqualTo(date,1970-01-02), IsNotNull(id), IsNotNull(data)], ReadSchema: struct<id:bigint,date:date,data:string> +- BroadcastQueryStage 0 +- CometBroadcastExchange [id#62L, data#64] +- CometProject [id#62L, data#64], [id#62L, data#64] +- CometFilter [id#62L, date#63, data#64], (((isnotnull(date#63) AND (date#63 = 1970-01-02)) AND isnotnull(id#62L)) AND isnotnull(data#64)) +- CometScan [native_iceberg_compat] parquet spark_catalog.default.dim[id#62L,date#63,data#64] Batched: true, DataFilters: [isnotnull(date#63), (date#63 = 1970-01-02), isnotnull(id#62L), isnotnull(data#64)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/var/folders/bz/gg_fqnmj4c17j2c7mdn8ps1m0000gn/T/hive170392419582..., PartitionFilters: [], PushedFilters: [IsNotNull(date), EqualTo(date,1970-01-02), IsNotNull(id), IsNotNull(data)], ReadSchema: struct<id:bigint,date:date,data:string> +- == Initial Plan == CometProject [id#101L, data#102, date#103, ts#104], [id#101L, data#102, date#103, ts#104] +- CometBroadcastHashJoin [id#101L, data#102], [id#62L, data#64], Inner, BuildRight :- CometBatchScan testhadoop.default.table[id#101L, data#102, date#103, ts#104] testhadoop.default.table (branch=null) [filters=id IS NOT NULL, data IS NOT NULL, groupedBy=] RuntimeFilters: [dynamicpruningexpression(id#101L IN dynamicpruning#118),dynamicpruningexpression(data#102 IN dynamicpruning#119)] : :- SubqueryAdaptiveBroadcast dynamicpruning#118, 0, true, Project [id#62L, data#64], [id#62L, data#64] : : +- AdaptiveSparkPlan isFinalPlan=false : : +- CometProject [id#62L, data#64], [id#62L, data#64] : : +- CometFilter [id#62L, date#63, data#64], (((isnotnull(date#63) AND (date#63 = 1970-01-02)) AND isnotnull(id#62L)) AND isnotnull(data#64)) : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.dim[id#62L,date#63,data#64] Batched: true, DataFilters: [isnotnull(date#63), (date#63 = 1970-01-02), isnotnull(id#62L), isnotnull(data#64)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/var/folders/bz/gg_fqnmj4c17j2c7mdn8ps1m0000gn/T/hive170392419582..., PartitionFilters: [], PushedFilters: [IsNotNull(date), EqualTo(date,1970-01-02), IsNotNull(id), IsNotNull(data)], ReadSchema: struct<id:bigint,date:date,data:string> : +- SubqueryAdaptiveBroadcast dynamicpruning#119, 1, true, Project [id#62L, data#64], [id#62L, data#64] : +- AdaptiveSparkPlan isFinalPlan=false : +- CometProject [id#62L, data#64], [id#62L, data#64] : +- CometFilter [id#62L, date#63, data#64], (((isnotnull(date#63) AND (date#63 = 1970-01-02)) AND isnotnull(id#62L)) AND isnotnull(data#64)) : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.dim[id#62L,date#63,data#64] Batched: true, DataFilters: [isnotnull(date#63), (date#63 = 1970-01-02), isnotnull(id#62L), isnotnull(data#64)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/var/folders/bz/gg_fqnmj4c17j2c7mdn8ps1m0000gn/T/hive170392419582..., PartitionFilters: [], PushedFilters: [IsNotNull(date), EqualTo(date,1970-01-02), IsNotNull(id), IsNotNull(data)], ReadSchema: struct<id:bigint,date:date,data:string> +- CometBroadcastExchange [id#62L, data#64] +- CometProject [id#62L, data#64], [id#62L, data#64] +- CometFilter [id#62L, date#63, data#64], (((isnotnull(date#63) AND (date#63 = 1970-01-02)) AND isnotnull(id#62L)) AND isnotnull(data#64)) +- CometScan [native_iceberg_compat] parquet spark_catalog.default.dim[id#62L,date#63,data#64] Batched: true, DataFilters: [isnotnull(date#63), (date#63 = 1970-01-02), isnotnull(id#62L), isnotnull(data#64)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/var/folders/bz/gg_fqnmj4c17j2c7mdn8ps1m0000gn/T/hive170392419582..., PartitionFilters: [], PushedFilters: [IsNotNull(date), EqualTo(date,1970-01-02), IsNotNull(id), IsNotNull(data)], ReadSchema: struct<id:bigint,date:date,data:string> {quote} > Dynamic pruning filters get removed with custom BroadcastHashJoin > implementations > --------------------------------------------------------------------------------- > > Key: SPARK-53343 > URL: https://issues.apache.org/jira/browse/SPARK-53343 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.5.6 > Reporter: Parth Chandra > Priority: Major > > The Iceberg unit test > [TestRuntimeFiltering.testMultipleRuntimeFilters|https://github.com/apache/iceberg/blob/07c088fce9c54369864dcb6da16006e78206048b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestRuntimeFiltering.java#L203] > creates a runtime filter that will skip reading some of the files in the > dataset. The test _deletes_ those files so that the test fails if the runtime > filtering still tries to read those files. > When running with Comet, this test fails because the dynamic filter > expression gets erased. > This happens in > [PlanAdaptiveDynamicPruningFilters|https://github.com/apache/spark/blob/5660dbadf90ed08faef6dc883fd98f55b098e96a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala#L54] > where we explicitly check for a BroadcastHashJoin in canReuseExchange. > Because we enable Comet, the BroadcastHashJoin has been replaced by a > CometBroadcastHashJoin which fails the check and the pruning expression is > set to Literal.TrueLiteral > Is there a way around this apart from enhancing Spark to allow a > BroadcastHashJoinLike trait? What would a custom implementation need to have > to allow this dynamic pruning to happen correctly? -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org