cafelo-pfdrive opened a new issue #4873:
URL: https://github.com/apache/hudi/issues/4873


   
   **Describe the problem you faced**
   
   **Long time  time executing Upserts in HUDI. it takes 4 or 5 times longer 
doing Updates than Inserts. 90% data needs to be updated**
     
   Code below takes around 45 minutes to write new data (300 million records) 
in AWS S3 Bucket HUDI format with 21 GPU using AWS Glue, but it takes more than 
3 hours ingesting the same data set previously inserted to update and remove 
duplicates as previously data could be resent multiple times to correct the 
quality of the data and consumers only need the latest version of the record 
Key  
   
   **Additional context**
   
   in Apache spark UI, Stage  Building workload profilemapToPair at 
SparkHoodieBloomIndex.java:266_ which takes the longest in the execution plan  
shuffles the following  
   
   Shuffle Read Size / Records:      42.6 GiB /      540 000 000
   Shuffle Write Size / Records: 1237.8 GiB / 23 759 659 000
   Spill (Memory): 7.7 TiB
   Spill (Disk): 1241.6 GiB
   
   **Expected behavior**
   
    We have a window of 1 hour to execute the ETL process which include both 
inserts and updates. It means if only inserting takes 45 minutes, the updates 
should not take longer than 1 hour processing 300 million records 
   
    
   **To Reproduce** 
   
     Steps to reproduce the behavior: 
   
   **Environment Description**
   
   *AWS Glue basic properties 
   glue version : Glue 2.0 - Supports spark 2.4, Scala 2, Python 
   Worker Type G.1x 
   Number of workers : 31 (1 driver and 30 executors) ==>  160 cores 
   * Hudi version 
   connector:ver_2.0_hudi_0.9.0_glue_1.0_or_2.0
   * Storage (S3) 
   * Running on Docker? (yes/no) : no . running in AWS Glue
   
   **Code**
   
   import sys
   from awsglue.transforms import *
   from awsglue.utils import getResolvedOptions
   from pyspark.context import SparkContext
   from awsglue.context import GlueContext
   from awsglue.job import Job
   from pyspark.sql.functions import year, month, date_format, to_date, col
   from awsglue.dynamicframe import DynamicFrame
   from pyspark.sql.session import SparkSession
   
   args = getResolvedOptions(sys.argv, ['JOB_NAME'])
   spark = 
SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer')
 \
                               
.config('spark.sql.hive.convertMetastoreParquet','false').getOrCreate()
   sc = spark.sparkContext
   glueContext = GlueContext(sc)
   job = Job(glueContext)
   job.init(args['JOB_NAME'], args)
   targetPath = 's3://poc-lake-silver/interval_mor/data/'
   
   commonConfig = {
           'className' : 'org.apache.hudi', 
           "path": "s3://poc-lake-silver/interval_mor/data/",
           'hoodie.bulkinsert.shuffle.parallelism': 320, 
           'hoodie.upsert.shuffle.parallelism': 320,
           'hoodie.datasource.write.operation': 'upsert'
       }
   
   partitionDataConfig = {
       'hoodie.datasource.hive_sync.partition_extractor_class': 
'org.apache.hudi.hive.MultiPartKeysValueExtractor'}
                     
   dataSourceWriteConfig = {
       'hoodie.datasource.write.table.type': 'MERGE_ON_READ',    
       'hoodie.datasource.write.keygenerator.class': 
'org.apache.hudi.keygen.ComplexKeyGenerator',
       'hoodie.datasource.write.precombine.field': 'ingestionutc', 
       'hoodie.datasource.write.partitionpath.field': 'intervaldate,plantuid',
       'hoodie.datasource.write.recordkey.field': 'intervalutc,asset,attribute' 
       }
   dataSourceHiveConfig = {
       'hoodie.datasource.hive_sync.use_jdbc':'false', 
       'hoodie.datasource.hive_sync.enable': 'true',
       'hoodie.datasource.hive_sync.database': 'db_swat_lake_silver', 
       'hoodie.datasource.hive_sync.table': 'interval_mor',
      'hoodie.datasource.hive_sync.partition_fields': 'intervaldate,plantuid' 
       }
       
   dataTableConfig = {
       'hoodie.table.type': 'MERGE_ON_READ',
       'hoodie.index.type': 'BLOOM',
       'hoodie.table.keygenerator.class': 
'org.apache.hudi.keygen.ComplexKeyGenerator',
       'hoodie.database.name': 'db_swat_lake_silver', 
       'hoodie.table.name': 'interval_mor', 
       'hoodie.table.precombine.field': 'ingestionutc',
       'hoodie.table.partition.fields': 'intervaldate,plantuid', 
       'hoodie.table.recordkey.fields': 'intervalutc,asset,attribute'
        }
        
   finalConf = {**commonConfig, **partitionDataConfig, **dataSourceWriteConfig, 
 **dataSourceHiveConfig, **dataTableConfig}
   S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
       format_options={},
       connection_type="s3",
       format="parquet",
       connection_options={
           "paths": [
               
's3://poc-lake-bronze/dc/interval/data/ingest_date=20220221/ingest_hour=04/',
           ],
           "recurse":True
       },
       transformation_ctx="S3bucket_node1",
   )
   
   ApplyMapping_node2 = ApplyMapping.apply(
       frame=S3bucket_node1,
       mappings=[
           ("plant", "int", "plant", "int"),
           ("plantuid", "string", "plantuid", "string"),  
           ("asset", "int", "asset", "int"),
           ("attribute", "string", "attribute", "string"),
           ("value", "double", "value", "double"),
           ("intervalutc", "timestamp", "intervalutc", "timestamp"),
           ("intervaldate", "string", "intervaldate", "string"),
           ("ingestiontsutc", "timestamp", "ingestionutc", "timestamp"),
           ],
           transformation_ctx="ApplyMapping_node2",
           )
   
   S3bucket_node3 = ApplyMapping_node2.toDF()
   
S3bucket_node3.write.format('org.apache.hudi').options(**finalConf).mode('Append').save(targetPath)
   
   job.commit()
   
   
   **Stacktrace**
   
   ```Spark Environment```
   
![image](https://user-images.githubusercontent.com/93156892/155244925-fa102c22-c7e6-4a55-9ee7-d0228630364b.png)
   
   ```Executors Summary```
   
![image](https://user-images.githubusercontent.com/93156892/155245061-04b248da-d639-466e-bc2e-c7c59abeb036.png)
   
   ```Stages```
   
![image](https://user-images.githubusercontent.com/93156892/155245202-fa9d33b9-d446-4e21-8081-3936cbf262ff.png)
   
   ```countByKey at SparkHoodieBloomIndex.java:114```
   
![image](https://user-images.githubusercontent.com/93156892/155245275-ff193797-e88b-49e6-b497-5251b7171eec.png)
   
   ```Building workload profilemapToPair at SparkHoodieBloomIndex.java:266```
   
   
![image](https://user-images.githubusercontent.com/93156892/155245391-7a22d3df-2ec7-466a-b76a-2107038ed157.png)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to