alexone95 opened a new issue, #8261:
URL: https://github.com/apache/hudi/issues/8261

   Hello, we are facing the fact that commit are getting slower and slower as 
time goes by (from a delta commit of 160 s during the day 1 to a delta commit 
of 300 s during day 4). Our deploy condition are the following:
   - We read INSERT, UPDATE and DELETE operation from a Kafka topic and we 
replicate them in a target hudi table stored on Hive via a pyspark job running 
24/7
   
   **Expected behavior**
   
   We would like to know if there is a way to reduce, or at least to keep 
constant, the writing latency on the hudi table and understand if there is 
something we can improve in the deploy condition or in other configuration 
described below.
   
   **Environment Description**
   
   * Hudi version : 0.12.1-amzn-0
   * Spark version : 3.3.0
   * Hive version : 3.1.3
   * Hadoop version : 3.3.3 amz
   * Storage (HDFS/S3/GCS..) : S3
   * Running on Docker? (yes/no) : no (EMR 6.9.0)
   
   **Additional context**
   
   HOODIE TABLE PROPERTIES:
       'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
       'hoodie.datasource.write.keygenerator.class': 
'org.apache.hudi.keygen.ComplexKeyGenerator',
       'hoodie.datasource.write.hive_style_partitioning':'true',
       'hoodie.index.type':'GLOBAL_BLOOM',
       'hoodie.simple.index.update.partition.path':'true',
       'hoodie.datasource.hive_sync.enable': 'true',
       'hoodie.datasource.hive_sync.partition_extractor_class': 
'org.apache.hudi.hive.MultiPartKeysValueExtractor',
       'hoodie.datasource.hive_sync.use_jdbc': 'false',
       'hoodie.datasource.hive_sync.mode': 'hms',
       'hoodie.copyonwrite.record.size.estimate':285,
       'hoodie.parquet.small.file.limit': 104857600,
       'hoodie.parquet.max.file.size': 120000000,
       'hoodie.cleaner.commits.retained': 1
   
   KAFKA READ CONFIG:
       .readStream \
       .format("kafka") \
       .option("kafka.security.protocol", "SSL") \
       .option("kafka.ssl.enabled.protocols", "TLSv1.2, TLSv1.1, TLSv1") \
       .option("kafka.ssl.protocol", "TLS") \
       .option("startingOffsets", "latest") \
       .option("failOnDataLoss", "true") \
       .option("maxOffsetsPerTrigger", 2000) \
       .option("kafka.group.id",CG_NAME) \
       .load()
   
   PYSPARK WRITE
       df_source.writeStream.foreachBatch(foreach_batch_write_function)
   
             FOR EACH BATCH FUNCTION:
            #management of delete messages
             batchDF_deletes.write.format('hudi') \
                         .option('hoodie.datasource.write.operation', 'delete') 
\
                         .options(**hudiOptions_table) \
                         .mode('append') \
                         .save(S3_OUTPUT_PATH)
   
            #management of update and insert messages
             batchDF_upserts.write.format('org.apache.hudi') \
                         .option('hoodie.datasource.write.operation', 'upsert') 
\
                         .options(**hudiOptions_table) \
                         .mode('append') \
                         .save(S3_OUTPUT_PATH)
   
   SPARK SUBMIT
   spark-submit --master yarn --deploy-mode cluster --num-executors 1 
--executor-memory 1G --executor-cores 2 --conf 
spark.dynamicAllocation.enabled=false --packages 
org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 --conf 
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf 
spark.sql.hive.convertMetastoreParquet=false --jars 
/usr/lib/hudi/hudi-spark-bundle.jar <path_to_script>
   
   Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to