cafelo-pfdrive opened a new issue #4873:
URL: https://github.com/apache/hudi/issues/4873
**Describe the problem you faced**
**Long time time executing Upserts in HUDI. it takes 4 or 5 times longer
doing Updates than Inserts. 90% data needs to be updated**
Code below takes around 45 minutes to write new data (300 million records)
in AWS S3 Bucket HUDI format with 21 GPU using AWS Glue, but it takes more than
3 hours ingesting the same data set previously inserted to update and remove
duplicates as previously data could be resent multiple times to correct the
quality of the data and consumers only need the latest version of the record
Key
**Additional context**
in Apache spark UI, Stage Building workload profilemapToPair at
SparkHoodieBloomIndex.java:266_ which takes the longest in the execution plan
shuffles the following
Shuffle Read Size / Records: 42.6 GiB / 540 000 000
Shuffle Write Size / Records: 1237.8 GiB / 23 759 659 000
Spill (Memory): 7.7 TiB
Spill (Disk): 1241.6 GiB
**Expected behavior**
We have a window of 1 hour to execute the ETL process which include both
inserts and updates. It means if only inserting takes 45 minutes, the updates
should not take longer than 1 hour processing 300 million records
**To Reproduce**
Steps to reproduce the behavior:
**Environment Description**
*AWS Glue basic properties
glue version : Glue 2.0 - Supports spark 2.4, Scala 2, Python
Worker Type G.1x
Number of workers : 31 (1 driver and 30 executors) ==> 160 cores
* Hudi version
connector:ver_2.0_hudi_0.9.0_glue_1.0_or_2.0
* Storage (S3)
* Running on Docker? (yes/no) : no . running in AWS Glue
**Code**
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import year, month, date_format, to_date, col
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.session import SparkSession
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
spark =
SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer')
\
.config('spark.sql.hive.convertMetastoreParquet','false').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
targetPath = 's3://poc-lake-silver/interval_mor/data/'
commonConfig = {
'className' : 'org.apache.hudi',
"path": "s3://poc-lake-silver/interval_mor/data/",
'hoodie.bulkinsert.shuffle.parallelism': 320,
'hoodie.upsert.shuffle.parallelism': 320,
'hoodie.datasource.write.operation': 'upsert'
}
partitionDataConfig = {
'hoodie.datasource.hive_sync.partition_extractor_class':
'org.apache.hudi.hive.MultiPartKeysValueExtractor'}
dataSourceWriteConfig = {
'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
'hoodie.datasource.write.keygenerator.class':
'org.apache.hudi.keygen.ComplexKeyGenerator',
'hoodie.datasource.write.precombine.field': 'ingestionutc',
'hoodie.datasource.write.partitionpath.field': 'intervaldate,plantuid',
'hoodie.datasource.write.recordkey.field': 'intervalutc,asset,attribute'
}
dataSourceHiveConfig = {
'hoodie.datasource.hive_sync.use_jdbc':'false',
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.database': 'db_swat_lake_silver',
'hoodie.datasource.hive_sync.table': 'interval_mor',
'hoodie.datasource.hive_sync.partition_fields': 'intervaldate,plantuid'
}
dataTableConfig = {
'hoodie.table.type': 'MERGE_ON_READ',
'hoodie.index.type': 'BLOOM',
'hoodie.table.keygenerator.class':
'org.apache.hudi.keygen.ComplexKeyGenerator',
'hoodie.database.name': 'db_swat_lake_silver',
'hoodie.table.name': 'interval_mor',
'hoodie.table.precombine.field': 'ingestionutc',
'hoodie.table.partition.fields': 'intervaldate,plantuid',
'hoodie.table.recordkey.fields': 'intervalutc,asset,attribute'
}
finalConf = {**commonConfig, **partitionDataConfig, **dataSourceWriteConfig,
**dataSourceHiveConfig, **dataTableConfig}
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
format_options={},
connection_type="s3",
format="parquet",
connection_options={
"paths": [
's3://poc-lake-bronze/dc/interval/data/ingest_date=20220221/ingest_hour=04/',
],
"recurse":True
},
transformation_ctx="S3bucket_node1",
)
ApplyMapping_node2 = ApplyMapping.apply(
frame=S3bucket_node1,
mappings=[
("plant", "int", "plant", "int"),
("plantuid", "string", "plantuid", "string"),
("asset", "int", "asset", "int"),
("attribute", "string", "attribute", "string"),
("value", "double", "value", "double"),
("intervalutc", "timestamp", "intervalutc", "timestamp"),
("intervaldate", "string", "intervaldate", "string"),
("ingestiontsutc", "timestamp", "ingestionutc", "timestamp"),
],
transformation_ctx="ApplyMapping_node2",
)
S3bucket_node3 = ApplyMapping_node2.toDF()
S3bucket_node3.write.format('org.apache.hudi').options(**finalConf).mode('Append').save(targetPath)
job.commit()
**Stacktrace**
```Spark Environment```

```Executors Summary```

```Stages```

```countByKey at SparkHoodieBloomIndex.java:114```

```Building workload profilemapToPair at SparkHoodieBloomIndex.java:266```

--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]