alexone95 opened a new issue, #8261:
URL: https://github.com/apache/hudi/issues/8261
Hello, we are facing the fact that commit are getting slower and slower as
time goes by (from a delta commit of 160 s during the day 1 to a delta commit
of 300 s during day 4). Our deploy condition are the following:
- We read INSERT, UPDATE and DELETE operation from a Kafka topic and we
replicate them in a target hudi table stored on Hive via a pyspark job running
24/7
**Expected behavior**
We would like to know if there is a way to reduce, or at least to keep
constant, the writing latency on the hudi table and understand if there is
something we can improve in the deploy condition or in other configuration
described below.
**Environment Description**
* Hudi version : 0.12.1-amzn-0
* Spark version : 3.3.0
* Hive version : 3.1.3
* Hadoop version : 3.3.3 amz
* Storage (HDFS/S3/GCS..) : S3
* Running on Docker? (yes/no) : no (EMR 6.9.0)
**Additional context**
HOODIE TABLE PROPERTIES:
'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
'hoodie.datasource.write.keygenerator.class':
'org.apache.hudi.keygen.ComplexKeyGenerator',
'hoodie.datasource.write.hive_style_partitioning':'true',
'hoodie.index.type':'GLOBAL_BLOOM',
'hoodie.simple.index.update.partition.path':'true',
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.partition_extractor_class':
'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.datasource.hive_sync.use_jdbc': 'false',
'hoodie.datasource.hive_sync.mode': 'hms',
'hoodie.copyonwrite.record.size.estimate':285,
'hoodie.parquet.small.file.limit': 104857600,
'hoodie.parquet.max.file.size': 120000000,
'hoodie.cleaner.commits.retained': 1
KAFKA READ CONFIG:
.readStream \
.format("kafka") \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.enabled.protocols", "TLSv1.2, TLSv1.1, TLSv1") \
.option("kafka.ssl.protocol", "TLS") \
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "true") \
.option("maxOffsetsPerTrigger", 2000) \
.option("kafka.group.id",CG_NAME) \
.load()
PYSPARK WRITE
df_source.writeStream.foreachBatch(foreach_batch_write_function)
FOR EACH BATCH FUNCTION:
#management of delete messages
batchDF_deletes.write.format('hudi') \
.option('hoodie.datasource.write.operation', 'delete')
\
.options(**hudiOptions_table) \
.mode('append') \
.save(S3_OUTPUT_PATH)
#management of update and insert messages
batchDF_upserts.write.format('org.apache.hudi') \
.option('hoodie.datasource.write.operation', 'upsert')
\
.options(**hudiOptions_table) \
.mode('append') \
.save(S3_OUTPUT_PATH)
SPARK SUBMIT
spark-submit --master yarn --deploy-mode cluster --num-executors 1
--executor-memory 1G --executor-cores 2 --conf
spark.dynamicAllocation.enabled=false --packages
org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 --conf
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf
spark.sql.hive.convertMetastoreParquet=false --jars
/usr/lib/hudi/hudi-spark-bundle.jar <path_to_script>
Thanks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]