arunb2w opened a new issue, #5909:
URL: https://github.com/apache/iceberg/issues/5909
### Apache Iceberg version
0.14.0
### Query engine
EMR
### Please describe the bug 🐞
spark version : 3.2.1
iceberg version: 0.14.0
Running the below code in EMR with Glue catalog
**Target:**
I have an iceberg table(EPAYMENT) which contains 457M records in it and it
is about 89.4G in size.
It is a partitioned table based on the column _CONTEXT_ID_ and it has 18K
partitions.
This table resides in S3 and has catalog as Glue catalog.
**Input:**
My incoming batch to update to this iceberg table contains 335K records
which needs to access 5K partitions to update these 335K records.
The incoming batch is in the form of spark dataframe so am creating a view
out of it and using it in the merge to upsert the records.
**Merge query am using to upsert the incoming batch.**
```
deflateDf.createOrReplaceTempView("deflate_table")
merge into glue_dev.epaymentnightly.epayment as target
using (
select * from deflate_table as deflate
)
on (cast(target._context_id_ as int) = cast(deflate._context_id_ as
int) and cast(target.id as int) = cast(deflate.id as int) )
when matched
then update set
target.AMOUNT = cast(if(array_contains(deflate.changed_cols,
'AMOUNT'), deflate.AMOUNT, target.AMOUNT) as
decimal(12,2)),target.AUTHORIZATIONCODE =
cast(if(array_contains(deflate.changed_cols, 'AUTHORIZATIONCODE'),
deflate.AUTHORIZATIONCODE, target.AUTHORIZATIONCODE) as
string),target.AUTOMATEDYN = cast(if(array_contains(deflate.changed_cols,
'AUTOMATEDYN'), deflate.AUTOMATEDYN, target.AUTOMATEDYN) as
string),target.CONTRACTID = cast(if(array_contains(deflate.changed_cols,
'CONTRACTID'), deflate.CONTRACTID, target.CONTRACTID) as
decimal(12,0)),target.CREATED = cast(if(array_contains(deflate.changed_cols,
'CREATED'), deflate.CREATED, target.CREATED) as timestamp),target.CREATEDBY =
cast(if(array_contains(deflate.changed_cols, 'CREATEDBY'), deflate.CREATEDBY,
target.CREATEDBY) as string),target.DELETED =
cast(if(array_contains(deflate.changed_cols, 'DELETED'), deflate.DELETED,
target.DELETED) as timestamp),target.DELETEDBY =
cast(if(array_contains(deflate.changed_cols, 'DELE
TEDBY'), deflate.DELETEDBY, target.DELETEDBY) as
string),target.DEVICESOFTWAREVERSION =
cast(if(array_contains(deflate.changed_cols, 'DEVICESOFTWAREVERSION'),
deflate.DEVICESOFTWAREVERSION, target.DEVICESOFTWAREVERSION) as
string),target.EPAYMENTCARDID = cast(if(array_contains(deflate.changed_cols,
'EPAYMENTCARDID'), deflate.EPAYMENTCARDID, target.EPAYMENTCARDID) as
decimal(12,0)),target.GATEWAYRESPONSECODE =
cast(if(array_contains(deflate.changed_cols, 'GATEWAYRESPONSECODE'),
deflate.GATEWAYRESPONSECODE, target.GATEWAYRESPONSECODE) as
string),target.GATEWAYRESPONSEMESSAGE =
cast(if(array_contains(deflate.changed_cols, 'GATEWAYRESPONSEMESSAGE'),
deflate.GATEWAYRESPONSEMESSAGE, target.GATEWAYRESPONSEMESSAGE) as
string),target.HOSTRESPONSECODE = cast(if(array_contains(deflate.changed_cols,
'HOSTRESPONSECODE'), deflate.HOSTRESPONSECODE, target.HOSTRESPONSECODE) as
string),target.HOSTRESPONSEMESSAGE =
cast(if(array_contains(deflate.changed_cols, 'HOSTRESPONSEMESSAGE'),
deflate.HOSTRESPO
NSEMESSAGE, target.HOSTRESPONSEMESSAGE) as string),target.ID =
cast(if(array_contains(deflate.changed_cols, 'ID'), deflate.ID, target.ID) as
decimal(12,0)),target.LASTMODIFIED =
cast(if(array_contains(deflate.changed_cols, 'LASTMODIFIED'),
deflate.LASTMODIFIED, target.LASTMODIFIED) as timestamp),target.LASTMODIFIEDBY
= cast(if(array_contains(deflate.changed_cols, 'LASTMODIFIEDBY'),
deflate.LASTMODIFIEDBY, target.LASTMODIFIEDBY) as string),target.LOCKDATE =
cast(if(array_contains(deflate.changed_cols, 'LOCKDATE'), deflate.LOCKDATE,
target.LOCKDATE) as timestamp),target.LOCKID =
cast(if(array_contains(deflate.changed_cols, 'LOCKID'), deflate.LOCKID,
target.LOCKID) as string),target.MANUALLYRESOLVEDYN =
cast(if(array_contains(deflate.changed_cols, 'MANUALLYRESOLVEDYN'),
deflate.MANUALLYRESOLVEDYN, target.MANUALLYRESOLVEDYN) as
string),target.ORIGINALDEVICEID = cast(if(array_contains(deflate.changed_cols,
'ORIGINALDEVICEID'), deflate.ORIGINALDEVICEID, target.ORIGINALDEVICEID) as
string)
,target.ORIGINALEPAYMENTID = cast(if(array_contains(deflate.changed_cols,
'ORIGINALEPAYMENTID'), deflate.ORIGINALEPAYMENTID, target.ORIGINALEPAYMENTID)
as decimal(12,0)),target.QUICKPAYCODE =
cast(if(array_contains(deflate.changed_cols, 'QUICKPAYCODE'),
deflate.QUICKPAYCODE, target.QUICKPAYCODE) as string),target.RAWREQUEST =
cast(if(array_contains(deflate.changed_cols, 'RAWREQUEST'), deflate.RAWREQUEST,
target.RAWREQUEST) as string),target.RAWRESPONSE =
cast(if(array_contains(deflate.changed_cols, 'RAWRESPONSE'),
deflate.RAWRESPONSE, target.RAWRESPONSE) as string),target.RECEIPTSENTDATE =
cast(if(array_contains(deflate.changed_cols, 'RECEIPTSENTDATE'),
deflate.RECEIPTSENTDATE, target.RECEIPTSENTDATE) as
timestamp),target.REQUESTDATE = cast(if(array_contains(deflate.changed_cols,
'REQUESTDATE'), deflate.REQUESTDATE, target.REQUESTDATE) as
timestamp),target.REQUESTTYPE = cast(if(array_contains(deflate.changed_cols,
'REQUESTTYPE'), deflate.REQUESTTYPE, target.REQUESTTYPE) as string),t
arget.RESPONSEDATE = cast(if(array_contains(deflate.changed_cols,
'RESPONSEDATE'), deflate.RESPONSEDATE, target.RESPONSEDATE) as
timestamp),target.ROUTE = cast(if(array_contains(deflate.changed_cols,
'ROUTE'), deflate.ROUTE, target.ROUTE) as string),target.STATUS =
cast(if(array_contains(deflate.changed_cols, 'STATUS'), deflate.STATUS,
target.STATUS) as string),target.TERMINALID =
cast(if(array_contains(deflate.changed_cols, 'TERMINALID'), deflate.TERMINALID,
target.TERMINALID) as string),target.TYPE =
cast(if(array_contains(deflate.changed_cols, 'TYPE'), deflate.TYPE,
target.TYPE) as string),target._ETL_RUN_ID_ =
cast(if(array_contains(deflate.changed_cols, '_ETL_RUN_ID_'),
deflate._ETL_RUN_ID_, target._ETL_RUN_ID_) as
decimal(38,0)),target._ETL_MODIFIED_ =
cast(if(array_contains(deflate.changed_cols, '_ETL_MODIFIED_'),
deflate._ETL_MODIFIED_, target._ETL_MODIFIED_) as timestamp),target._EXTRACTED_
= cast(if(array_contains(deflate.changed_cols, '_EXTRACTED_'),
deflate._EXTRACTED_,
target._EXTRACTED_) as timestamp),target._SOURCE_EXTRACTED_ =
cast(if(array_contains(deflate.changed_cols, '_SOURCE_EXTRACTED_'),
deflate._SOURCE_EXTRACTED_, target._SOURCE_EXTRACTED_) as
timestamp),target._LAST_MODIFIED_SEQ_ =
cast(if(array_contains(deflate.changed_cols, '_LAST_MODIFIED_SEQ_'),
deflate._LAST_MODIFIED_SEQ_, target._LAST_MODIFIED_SEQ_) as
decimal(38,0)),target._SCHEMA_CLASS_ =
cast(if(array_contains(deflate.changed_cols, '_SCHEMA_CLASS_'),
deflate._SCHEMA_CLASS_, target._SCHEMA_CLASS_) as string),target._CONTEXT_ID_ =
cast(if(array_contains(deflate.changed_cols, '_CONTEXT_ID_'),
deflate._CONTEXT_ID_, target._CONTEXT_ID_) as
decimal(12,0)),target._IS_DELETED_ =
cast(if(array_contains(deflate.changed_cols, '_IS_DELETED_'),
deflate._IS_DELETED_, target._IS_DELETED_) as boolean)
when not matched
then insert
(AMOUNT,AUTHORIZATIONCODE,AUTOMATEDYN,CONTRACTID,CREATED,CREATEDBY,DELETED,DELETEDBY,DEVICESOFTWAREVERSION,EPAYMENTCARDID,GATEWAYRESPONSECODE,GATEWAYRESPONSEMESSAGE,HOSTRESPONSECODE,HOSTRESPONSEMESSAGE,ID,LASTMODIFIED,LASTMODIFIEDBY,LOCKDATE,LOCKID,MANUALLYRESOLVEDYN,ORIGINALDEVICEID,ORIGINALEPAYMENTID,QUICKPAYCODE,RAWREQUEST,RAWRESPONSE,RECEIPTSENTDATE,REQUESTDATE,REQUESTTYPE,RESPONSEDATE,ROUTE,STATUS,TERMINALID,TYPE,_ETL_RUN_ID_,_ETL_MODIFIED_,_EXTRACTED_,_SOURCE_EXTRACTED_,_LAST_MODIFIED_SEQ_,_SCHEMA_CLASS_,_CONTEXT_ID_,_IS_DELETED_)
values (cast(deflate.AMOUNT as decimal(12,2)),cast(deflate.AUTHORIZATIONCODE
as string),cast(deflate.AUTOMATEDYN as string),cast(deflate.CONTRACTID as
decimal(12,0)),cast(deflate.CREATED as timestamp),cast(deflate.CREATEDBY as
string),cast(deflate.DELETED as timestamp),cast(deflate.DELETEDBY as
string),cast(deflate.DEVICESOFTWAREVERSION as
string),cast(deflate.EPAYMENTCARDID as
decimal(12,0)),cast(deflate.GATEWAYRESPONSECODE as string),cast
(deflate.GATEWAYRESPONSEMESSAGE as string),cast(deflate.HOSTRESPONSECODE as
string),cast(deflate.HOSTRESPONSEMESSAGE as string),cast(deflate.ID as
decimal(12,0)),cast(deflate.LASTMODIFIED as
timestamp),cast(deflate.LASTMODIFIEDBY as string),cast(deflate.LOCKDATE as
timestamp),cast(deflate.LOCKID as string),cast(deflate.MANUALLYRESOLVEDYN as
string),cast(deflate.ORIGINALDEVICEID as
string),cast(deflate.ORIGINALEPAYMENTID as
decimal(12,0)),cast(deflate.QUICKPAYCODE as string),cast(deflate.RAWREQUEST as
string),cast(deflate.RAWRESPONSE as string),cast(deflate.RECEIPTSENTDATE as
timestamp),cast(deflate.REQUESTDATE as timestamp),cast(deflate.REQUESTTYPE as
string),cast(deflate.RESPONSEDATE as timestamp),cast(deflate.ROUTE as
string),cast(deflate.STATUS as string),cast(deflate.TERMINALID as
string),cast(deflate.TYPE as string),cast(deflate._ETL_RUN_ID_ as
decimal(38,0)),cast(deflate._ETL_MODIFIED_ as
timestamp),cast(deflate._EXTRACTED_ as
timestamp),cast(deflate._SOURCE_EXTRACTED_ as time
stamp),cast(deflate._LAST_MODIFIED_SEQ_ as
decimal(38,0)),cast(deflate._SCHEMA_CLASS_ as string),cast(deflate._CONTEXT_ID_
as decimal(12,0)),cast(deflate._IS_DELETED_ as boolean))
```
**Spark command used to run:**
`spark-submit --deploy-mode cluster--packages
org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.14.0,software.amazon.awssdk:bundle:2.17.257,software.amazon.awssdk:url-connection-client:2.17.257
--conf spark.yarn.submit.waitAppCompletion=true --conf
"spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=\"/opt/spark\"" --conf spark.dynamicAllocation.enabled=true
--conf spark.executor.maxMemory=32g --conf
spark.dynamicAllocation.executorIdleTimeout=300 --conf
spark.shuffle.service.enabled=true --driver-memory 8g --num-executors 1
--executor-memory 8g --executor-cores 5 iceberg_main.py`
The problem here is, when i view the job in spark UI, i could see that
shuffle write size and the number of records to upsert is very high compared to
the actual number which should be 335K records based on the incoming batch. So,
it looks like the partition pruning is not happening as expected. Because of
this huge shuffle write my EMR cluster is running out of memory and could not
able to complete the job. Please see the attached image
<img width="1489" alt="Screenshot 2022-10-03 at 4 27 05 PM"
src="https://user-images.githubusercontent.com/38204827/193576500-d3643484-e37e-42d0-9482-565c707db0b3.png">
Please provide some insights on what went wrong here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]