arunb2w opened a new issue, #5909:
URL: https://github.com/apache/iceberg/issues/5909

   ### Apache Iceberg version
   
   0.14.0
   
   ### Query engine
   
   EMR
   
   ### Please describe the bug 🐞
   
   spark version : 3.2.1
   iceberg version: 0.14.0
   Running the below code in EMR with Glue catalog
   **Target:**
   I have an iceberg table(EPAYMENT) which contains 457M records in it and it 
is about 89.4G in size.
   It is a partitioned table based on the column _CONTEXT_ID_ and it has 18K 
partitions.
   This table resides in S3 and has catalog as Glue catalog.
   **Input:**
   My incoming batch to update to this iceberg table contains 335K records 
which needs to access 5K partitions to update these 335K records.
   The incoming batch is in the form of spark dataframe so am creating a view 
out of it and using it in the merge to upsert the records.
   **Merge query am using to upsert the incoming batch.**
   
   ```
   deflateDf.createOrReplaceTempView("deflate_table")
   
   merge into glue_dev.epaymentnightly.epayment as target
           using (
               select * from deflate_table as deflate
           )
           on (cast(target._context_id_ as int) = cast(deflate._context_id_ as 
int) and cast(target.id as int) = cast(deflate.id as int) )
           when matched 
           then update set 
           target.AMOUNT = cast(if(array_contains(deflate.changed_cols, 
'AMOUNT'), deflate.AMOUNT, target.AMOUNT) as 
decimal(12,2)),target.AUTHORIZATIONCODE = 
cast(if(array_contains(deflate.changed_cols, 'AUTHORIZATIONCODE'), 
deflate.AUTHORIZATIONCODE, target.AUTHORIZATIONCODE) as 
string),target.AUTOMATEDYN = cast(if(array_contains(deflate.changed_cols, 
'AUTOMATEDYN'), deflate.AUTOMATEDYN, target.AUTOMATEDYN) as 
string),target.CONTRACTID = cast(if(array_contains(deflate.changed_cols, 
'CONTRACTID'), deflate.CONTRACTID, target.CONTRACTID) as 
decimal(12,0)),target.CREATED = cast(if(array_contains(deflate.changed_cols, 
'CREATED'), deflate.CREATED, target.CREATED) as timestamp),target.CREATEDBY = 
cast(if(array_contains(deflate.changed_cols, 'CREATEDBY'), deflate.CREATEDBY, 
target.CREATEDBY) as string),target.DELETED = 
cast(if(array_contains(deflate.changed_cols, 'DELETED'), deflate.DELETED, 
target.DELETED) as timestamp),target.DELETEDBY = 
cast(if(array_contains(deflate.changed_cols, 'DELE
 TEDBY'), deflate.DELETEDBY, target.DELETEDBY) as 
string),target.DEVICESOFTWAREVERSION = 
cast(if(array_contains(deflate.changed_cols, 'DEVICESOFTWAREVERSION'), 
deflate.DEVICESOFTWAREVERSION, target.DEVICESOFTWAREVERSION) as 
string),target.EPAYMENTCARDID = cast(if(array_contains(deflate.changed_cols, 
'EPAYMENTCARDID'), deflate.EPAYMENTCARDID, target.EPAYMENTCARDID) as 
decimal(12,0)),target.GATEWAYRESPONSECODE = 
cast(if(array_contains(deflate.changed_cols, 'GATEWAYRESPONSECODE'), 
deflate.GATEWAYRESPONSECODE, target.GATEWAYRESPONSECODE) as 
string),target.GATEWAYRESPONSEMESSAGE = 
cast(if(array_contains(deflate.changed_cols, 'GATEWAYRESPONSEMESSAGE'), 
deflate.GATEWAYRESPONSEMESSAGE, target.GATEWAYRESPONSEMESSAGE) as 
string),target.HOSTRESPONSECODE = cast(if(array_contains(deflate.changed_cols, 
'HOSTRESPONSECODE'), deflate.HOSTRESPONSECODE, target.HOSTRESPONSECODE) as 
string),target.HOSTRESPONSEMESSAGE = 
cast(if(array_contains(deflate.changed_cols, 'HOSTRESPONSEMESSAGE'), 
deflate.HOSTRESPO
 NSEMESSAGE, target.HOSTRESPONSEMESSAGE) as string),target.ID = 
cast(if(array_contains(deflate.changed_cols, 'ID'), deflate.ID, target.ID) as 
decimal(12,0)),target.LASTMODIFIED = 
cast(if(array_contains(deflate.changed_cols, 'LASTMODIFIED'), 
deflate.LASTMODIFIED, target.LASTMODIFIED) as timestamp),target.LASTMODIFIEDBY 
= cast(if(array_contains(deflate.changed_cols, 'LASTMODIFIEDBY'), 
deflate.LASTMODIFIEDBY, target.LASTMODIFIEDBY) as string),target.LOCKDATE = 
cast(if(array_contains(deflate.changed_cols, 'LOCKDATE'), deflate.LOCKDATE, 
target.LOCKDATE) as timestamp),target.LOCKID = 
cast(if(array_contains(deflate.changed_cols, 'LOCKID'), deflate.LOCKID, 
target.LOCKID) as string),target.MANUALLYRESOLVEDYN = 
cast(if(array_contains(deflate.changed_cols, 'MANUALLYRESOLVEDYN'), 
deflate.MANUALLYRESOLVEDYN, target.MANUALLYRESOLVEDYN) as 
string),target.ORIGINALDEVICEID = cast(if(array_contains(deflate.changed_cols, 
'ORIGINALDEVICEID'), deflate.ORIGINALDEVICEID, target.ORIGINALDEVICEID) as 
string)
 ,target.ORIGINALEPAYMENTID = cast(if(array_contains(deflate.changed_cols, 
'ORIGINALEPAYMENTID'), deflate.ORIGINALEPAYMENTID, target.ORIGINALEPAYMENTID) 
as decimal(12,0)),target.QUICKPAYCODE = 
cast(if(array_contains(deflate.changed_cols, 'QUICKPAYCODE'), 
deflate.QUICKPAYCODE, target.QUICKPAYCODE) as string),target.RAWREQUEST = 
cast(if(array_contains(deflate.changed_cols, 'RAWREQUEST'), deflate.RAWREQUEST, 
target.RAWREQUEST) as string),target.RAWRESPONSE = 
cast(if(array_contains(deflate.changed_cols, 'RAWRESPONSE'), 
deflate.RAWRESPONSE, target.RAWRESPONSE) as string),target.RECEIPTSENTDATE = 
cast(if(array_contains(deflate.changed_cols, 'RECEIPTSENTDATE'), 
deflate.RECEIPTSENTDATE, target.RECEIPTSENTDATE) as 
timestamp),target.REQUESTDATE = cast(if(array_contains(deflate.changed_cols, 
'REQUESTDATE'), deflate.REQUESTDATE, target.REQUESTDATE) as 
timestamp),target.REQUESTTYPE = cast(if(array_contains(deflate.changed_cols, 
'REQUESTTYPE'), deflate.REQUESTTYPE, target.REQUESTTYPE) as string),t
 arget.RESPONSEDATE = cast(if(array_contains(deflate.changed_cols, 
'RESPONSEDATE'), deflate.RESPONSEDATE, target.RESPONSEDATE) as 
timestamp),target.ROUTE = cast(if(array_contains(deflate.changed_cols, 
'ROUTE'), deflate.ROUTE, target.ROUTE) as string),target.STATUS = 
cast(if(array_contains(deflate.changed_cols, 'STATUS'), deflate.STATUS, 
target.STATUS) as string),target.TERMINALID = 
cast(if(array_contains(deflate.changed_cols, 'TERMINALID'), deflate.TERMINALID, 
target.TERMINALID) as string),target.TYPE = 
cast(if(array_contains(deflate.changed_cols, 'TYPE'), deflate.TYPE, 
target.TYPE) as string),target._ETL_RUN_ID_ = 
cast(if(array_contains(deflate.changed_cols, '_ETL_RUN_ID_'), 
deflate._ETL_RUN_ID_, target._ETL_RUN_ID_) as 
decimal(38,0)),target._ETL_MODIFIED_ = 
cast(if(array_contains(deflate.changed_cols, '_ETL_MODIFIED_'), 
deflate._ETL_MODIFIED_, target._ETL_MODIFIED_) as timestamp),target._EXTRACTED_ 
= cast(if(array_contains(deflate.changed_cols, '_EXTRACTED_'), 
deflate._EXTRACTED_, 
 target._EXTRACTED_) as timestamp),target._SOURCE_EXTRACTED_ = 
cast(if(array_contains(deflate.changed_cols, '_SOURCE_EXTRACTED_'), 
deflate._SOURCE_EXTRACTED_, target._SOURCE_EXTRACTED_) as 
timestamp),target._LAST_MODIFIED_SEQ_ = 
cast(if(array_contains(deflate.changed_cols, '_LAST_MODIFIED_SEQ_'), 
deflate._LAST_MODIFIED_SEQ_, target._LAST_MODIFIED_SEQ_) as 
decimal(38,0)),target._SCHEMA_CLASS_ = 
cast(if(array_contains(deflate.changed_cols, '_SCHEMA_CLASS_'), 
deflate._SCHEMA_CLASS_, target._SCHEMA_CLASS_) as string),target._CONTEXT_ID_ = 
cast(if(array_contains(deflate.changed_cols, '_CONTEXT_ID_'), 
deflate._CONTEXT_ID_, target._CONTEXT_ID_) as 
decimal(12,0)),target._IS_DELETED_ = 
cast(if(array_contains(deflate.changed_cols, '_IS_DELETED_'), 
deflate._IS_DELETED_, target._IS_DELETED_) as boolean)
           when not matched
           then insert 
           
(AMOUNT,AUTHORIZATIONCODE,AUTOMATEDYN,CONTRACTID,CREATED,CREATEDBY,DELETED,DELETEDBY,DEVICESOFTWAREVERSION,EPAYMENTCARDID,GATEWAYRESPONSECODE,GATEWAYRESPONSEMESSAGE,HOSTRESPONSECODE,HOSTRESPONSEMESSAGE,ID,LASTMODIFIED,LASTMODIFIEDBY,LOCKDATE,LOCKID,MANUALLYRESOLVEDYN,ORIGINALDEVICEID,ORIGINALEPAYMENTID,QUICKPAYCODE,RAWREQUEST,RAWRESPONSE,RECEIPTSENTDATE,REQUESTDATE,REQUESTTYPE,RESPONSEDATE,ROUTE,STATUS,TERMINALID,TYPE,_ETL_RUN_ID_,_ETL_MODIFIED_,_EXTRACTED_,_SOURCE_EXTRACTED_,_LAST_MODIFIED_SEQ_,_SCHEMA_CLASS_,_CONTEXT_ID_,_IS_DELETED_)
 values (cast(deflate.AMOUNT as decimal(12,2)),cast(deflate.AUTHORIZATIONCODE 
as string),cast(deflate.AUTOMATEDYN as string),cast(deflate.CONTRACTID as 
decimal(12,0)),cast(deflate.CREATED as timestamp),cast(deflate.CREATEDBY as 
string),cast(deflate.DELETED as timestamp),cast(deflate.DELETEDBY as 
string),cast(deflate.DEVICESOFTWAREVERSION as 
string),cast(deflate.EPAYMENTCARDID as 
decimal(12,0)),cast(deflate.GATEWAYRESPONSECODE as string),cast
 (deflate.GATEWAYRESPONSEMESSAGE as string),cast(deflate.HOSTRESPONSECODE as 
string),cast(deflate.HOSTRESPONSEMESSAGE as string),cast(deflate.ID as 
decimal(12,0)),cast(deflate.LASTMODIFIED as 
timestamp),cast(deflate.LASTMODIFIEDBY as string),cast(deflate.LOCKDATE as 
timestamp),cast(deflate.LOCKID as string),cast(deflate.MANUALLYRESOLVEDYN as 
string),cast(deflate.ORIGINALDEVICEID as 
string),cast(deflate.ORIGINALEPAYMENTID as 
decimal(12,0)),cast(deflate.QUICKPAYCODE as string),cast(deflate.RAWREQUEST as 
string),cast(deflate.RAWRESPONSE as string),cast(deflate.RECEIPTSENTDATE as 
timestamp),cast(deflate.REQUESTDATE as timestamp),cast(deflate.REQUESTTYPE as 
string),cast(deflate.RESPONSEDATE as timestamp),cast(deflate.ROUTE as 
string),cast(deflate.STATUS as string),cast(deflate.TERMINALID as 
string),cast(deflate.TYPE as string),cast(deflate._ETL_RUN_ID_ as 
decimal(38,0)),cast(deflate._ETL_MODIFIED_ as 
timestamp),cast(deflate._EXTRACTED_ as 
timestamp),cast(deflate._SOURCE_EXTRACTED_ as time
 stamp),cast(deflate._LAST_MODIFIED_SEQ_ as 
decimal(38,0)),cast(deflate._SCHEMA_CLASS_ as string),cast(deflate._CONTEXT_ID_ 
as decimal(12,0)),cast(deflate._IS_DELETED_ as boolean))
   ```
   
   **Spark command used to run:**
   `spark-submit --deploy-mode cluster--packages 
org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.14.0,software.amazon.awssdk:bundle:2.17.257,software.amazon.awssdk:url-connection-client:2.17.257
 --conf spark.yarn.submit.waitAppCompletion=true --conf 
"spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=\"/opt/spark\"" --conf spark.dynamicAllocation.enabled=true 
--conf spark.executor.maxMemory=32g --conf 
spark.dynamicAllocation.executorIdleTimeout=300 --conf 
spark.shuffle.service.enabled=true --driver-memory 8g --num-executors 1 
--executor-memory 8g --executor-cores 5 iceberg_main.py`
   
   The problem here is, when i view the job in spark UI, i could see that 
shuffle write size and the number of records to upsert is very high compared to 
the actual number which should be 335K records based on the incoming batch. So, 
it looks like the partition pruning is not happening as expected. Because of 
this huge shuffle write my EMR cluster is running out of memory and could not 
able to complete the job. Please see the attached image
   <img width="1489" alt="Screenshot 2022-10-03 at 4 27 05 PM" 
src="https://user-images.githubusercontent.com/38204827/193576500-d3643484-e37e-42d0-9482-565c707db0b3.png";>
   
   Please provide some insights on what went wrong here


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to