dramaticlly commented on issue #5224:
URL: https://github.com/apache/iceberg/issues/5224#issuecomment-1178418612
Thank you @aokolnychyi for jumping in quickly, here's detailed explain
output I found for spark 3.2
so for this particular delete query we are looking at
```sql
DELETE FROM privacy_dev.ussr_interaction_f_test_spark32 tgt
WHERE tgt.request_dateint >= '20220320' AND tgt.request_dateint <= '20220321'
AND upper(tgt.instance_id) IN (SELECT * FROM trigger_speech_ids)
```
- the table `privacy_dev.ussr_interaction_f_test_spark32` is alias to `tgt`
and partitioned by column `request_dateint`
- the secondary table `trigger_speech_ids ` in sub query is not partitioned
and it only have single column
I do observe some `dynamicpruning` filter is used but I dont see the one
specifically mentioned by you.
I also double checked `spark.sql.optimizer.dynamicPartitionPruning.enabled`
is not explicit set in our application, default enabled in OSS and our spark
distribution, so I assume it's turned on. I also tried to reproduce with
explicit set this spark conf to be enabled, but still generate TB level of
shuffle write, so might need second pair of eye here
I redacted some of irrelevant columns here below, hope it helps!
---
## == Parsed Logical Plan ==
```
'DeleteFromTable ((('tgt.request_dateint >= 20220320) AND
('tgt.request_dateint <= 20220321)) AND 'upper('tgt.instance_id) IN (list#267
[]))
: +- 'Project [*]
: +- 'UnresolvedRelation [trigger_speech_ids], [], false
+- 'SubqueryAlias tgt
+- 'UnresolvedRelation [privacy_dev, ussr_interaction_f_test_spark32],
[], false
```
## == Analyzed Logical Plan ==
```
DeleteFromTable (((request_dateint#670 >= cast(20220320 as int)) AND
(request_dateint#670 <= cast(20220321 as int))) AND upper(instance_id#682) IN
(list#267 []))
: +- Project [speech_id#258]
: +- SubqueryAlias trigger_speech_ids
: +- View (`trigger_speech_ids`, [speech_id#258])
: +- Deduplicate [speech_id#258]
: +- Project [speech_id#258]
: +- Filter ((date#741 >= 20220605) AND (date#741 <=
20220705))
: +- Project [upper(speech_id)#252 AS speech_id#258,
date#741]
: +- Project [upper(speech_id#739) AS
upper(speech_id)#252, date#741]
: +- RelationV2[id#738, speech_id#739, ]
spark_catalog.privacy_management.deletion_trigger_prod
:- RelationV2[request_dateint#670, instance_id#682, ]
spark_catalog.privacy_dev.ussr_interaction_f_test_spark32
+- ReplaceData RelationV2[request_dateint#670, instance_id#682, ]
spark_catalog.privacy_dev.ussr_interaction_f_test_spark32
+- Filter NOT ((((request_dateint#670 >= cast(20220320 as int)) AND
(request_dateint#670 <= cast(20220321 as int))) AND upper(instance_id#682) IN
(list#267 [])) <=> true)
: +- Project [speech_id#258]
: +- SubqueryAlias trigger_speech_ids
: +- View (`trigger_speech_ids`, [speech_id#258])
: +- Deduplicate [speech_id#258]
: +- Project [speech_id#258]
: +- Filter ((date#239 >= 20220605) AND (date#239 <=
20220705))
: +- Project [upper(speech_id)#252 AS
speech_id#258, date#239]
: +- Project [upper(speech_id#237) AS
upper(speech_id)#252, date#239]
: +- RelationV2[id#236, speech_id#237, ]
spark_catalog.privacy_management.deletion_trigger_prod
+- RelationV2[request_dateint#670, instance_id#682, _file#736,
_pos#737L] spark_catalog.privacy_dev.ussr_interaction_f_test_spark32
```
## == Optimized Logical Plan ==
```
ReplaceData RelationV2[request_dateint#670, instance_id#682, ]
spark_catalog.privacy_dev.ussr_interaction_f_test_spark32,
IcebergWrite(table=spark_catalog.privacy_dev.ussr_interaction_f_test_spark32,
format=PARQUET)
+- Project [request_dateint#670, instance_id#682, ]
+- Sort [request_dateint#670 ASC NULLS FIRST, _file#736 ASC NULLS FIRST,
_pos#737L ASC NULLS FIRST], false
+- RepartitionByExpression [_file#736], 960
+- Project [request_dateint#670, instance_id#682, _file#736,
_pos#737L]
+- Filter NOT ((((request_dateint#670 >= 20220320) AND
(request_dateint#670 <= 20220321)) AND exists#967) <=> true)
+- Join ExistenceJoin(exists#967), (upper(instance_id#682) =
speech_id#258)
:- Filter dynamicpruning#1034 [_file#736]
: : +- Project [_file#1032]
: : +- Join LeftSemi, (upper(instance_id#980) =
speech_id#258)
: : :- Project [instance_id#980, _file#1032]
: : : +- Filter ((isnotnull(request_dateint#968)
AND (request_dateint#968 >= 20220320)) AND (request_dateint#968 <= 20220321))
: : : +- RelationV2[request_dateint#968,
instance_id#980, ] spark_catalog.privacy_dev.ussr_interaction_f_test_spark32
: : +- InMemoryRelation [speech_id#258],
StorageLevel(disk, memory, deserialized, 1 replicas)
: : +- *(2)
HashAggregate(keys=[speech_id#258], functions=[], output=[speech_id#258])
: : +- Exchange
hashpartitioning(speech_id#258, 960), ENSURE_REQUIREMENTS, [id=#89]
: : +- *(1)
HashAggregate(keys=[speech_id#258], functions=[], output=[speech_id#258])
: : +- *(1) Project
[upper(speech_id#237) AS speech_id#258]
: : +- *(1) Filter
((isnotnull(date#239) AND (date#239 >= 20220605)) AND (date#239 <= 20220705))
: : +-
BatchScan[speech_id#237, date#239]
spark_catalog.privacy_management.deletion_trigger_prod [filters=date IS NOT
NULL, date >= '20220605', date <= '20220705'] RuntimeFilters: []
: +- RelationV2[request_dateint#670, instance_id#682,
_file#736, _pos#737L] spark_catalog.privacy_dev.ussr_interaction_f_test_spark32
+- InMemoryRelation [speech_id#258], StorageLevel(disk,
memory, deserialized, 1 replicas)
+- *(2) HashAggregate(keys=[speech_id#258],
functions=[], output=[speech_id#258])
+- Exchange hashpartitioning(speech_id#258, 960),
ENSURE_REQUIREMENTS, [id=#89]
+- *(1) HashAggregate(keys=[speech_id#258],
functions=[], output=[speech_id#258])
+- *(1) Project [upper(speech_id#237) AS
speech_id#258]
+- *(1) Filter ((isnotnull(date#239) AND
(date#239 >= 20220605)) AND (date#239 <= 20220705))
+- BatchScan[speech_id#237, date#239]
spark_catalog.privacy_management.deletion_trigger_prod [filters=date IS NOT
NULL, date >= '20220605', date <= '20220705'] RuntimeFilters: []
```
## == Physical Plan ==
```
ReplaceData
IcebergWrite(table=spark_catalog.privacy_dev.ussr_interaction_f_test_spark32,
format=PARQUET)
+- *(2) Project [request_dateint#670, instance_id#682, ]
+- *(2) Sort [request_dateint#670 ASC NULLS FIRST, _file#736 ASC NULLS
FIRST, _pos#737L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(_file#736, 960), REPARTITION_BY_NUM,
[id=#469]
+- *(1) Project [request_dateint#670, instance_id#682, _file#736,
_pos#737L]
+- *(1) Filter NOT ((((request_dateint#670 >= 20220320) AND
(request_dateint#670 <= 20220321)) AND exists#967) <=> true)
+- *(1) BroadcastHashJoin [upper(instance_id#682)],
[speech_id#258], ExistenceJoin(exists#967), BuildRight, false
:- *(1) Project [request_dateint#670, instance_id#682,
_file#736, _pos#737L]
: +- BatchScan[request_dateint#670, instance_id#682,
_file#736, _pos#737L] spark_catalog.privacy_dev.ussr_interaction_f_test_spark32
[filters=request_dateint >= 20220320, request_dateint <= 20220321]
RuntimeFilters: [dynamicpruningexpression(_file#736 IN subquery#1051)]
: +- Subquery subquery#1051, [id=#408]
: +- *(2) HashAggregate(keys=[_file#1032#1050],
functions=[], output=[_file#1032#1050])
: +- Exchange
hashpartitioning(_file#1032#1050, 960), ENSURE_REQUIREMENTS, [id=#404]
: +- *(1) HashAggregate(keys=[_file#1032
AS _file#1032#1050], functions=[], output=[_file#1032#1050])
: +- *(1) Project [_file#1032]
: +- *(1) BroadcastHashJoin
[upper(instance_id#980)], [speech_id#258], LeftSemi, BuildRight, false
: :- *(1) Project
[instance_id#980, _file#1032]
: : +- *(1) Filter
((isnotnull(request_dateint#968) AND (request_dateint#968 >= 20220320)) AND
(request_dateint#968 <= 20220321))
: : +-
BatchScan[request_dateint#968, instance_id#980, ]
spark_catalog.privacy_dev.ussr_interaction_f_test_spark32
[filters=request_dateint >= 20220320, request_dateint <= 20220321]
RuntimeFilters: []
: +- BroadcastExchange
HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#389]
: +- InMemoryTableScan
[speech_id#258]
: +- InMemoryRelation
[speech_id#258], StorageLevel(disk, memory, deserialized, 1 replicas)
: +- *(2)
HashAggregate(keys=[speech_id#258], functions=[], output=[speech_id#258])
: +- Exchange
hashpartitioning(speech_id#258, 960), ENSURE_REQUIREMENTS, [id=#89]
: +- *(1)
HashAggregate(keys=[speech_id#258], functions=[], output=[speech_id#258])
: +- *(1)
Project [upper(speech_id#237) AS speech_id#258]
: +-
*(1) Filter ((isnotnull(date#239) AND (date#239 >= 20220605)) AND (date#239 <=
20220705))
:
+- BatchScan[speech_id#237, date#239]
spark_catalog.privacy_management.deletion_trigger_prod [filters=date IS NOT
NULL, date >= '20220605', date <= '20220705'] RuntimeFilters: []
+- ReusedExchange [speech_id#258], BroadcastExchange
HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#389]
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]