epsilon-akshay opened a new issue, #11820:
URL: https://github.com/apache/hudi/issues/11820

   I have a hudi with glue streaming setup (glue 4) reading from kinesis. 
Config are as follows:
   1. ingestion rate - 5000rps
   2. number of DPU - g2.x 20 workers
   3. kinesis shards - 40 
   4. target - s3 
   
   
   Each record size is 2.5KB and there are around 20000 partitions being 
published. when keeping a window size of 5 min or 15 min in both cases the lag 
is more than half an hour even though we have tried:
   1. increasing workers
   2. removing compression
   3. Tried MOR instead of COW (little better but still >20min) - but for our 
usecase we need to spike out max latency hence doing COW
   4. playing around with kinesis fetch configs
   5. Playing around with shuffle parallelism (increased. from 80 to 160 to 320)
   6. played around with file sizing. (didnt impact much)
   
   Only thing that made the lag >10min < 15 min to reduce the partition size, 
am i missing something here is there some other configuration to tweak ?
   
   
   A clear and concise description of what you expected to happen.
   
   **Environment Description**
   
   * Hudi version : 0.12.1
   
   * Spark version : spark 3.3.0
   
   * Storage (HDFS/S3/GCS..) :s3
   
   * Running on Docker? (yes/no) : no
   
   SCRIPT
   
   from pyspark.sql.functions import col, from_json, from_unixtime, expr, 
regexp_extract, udf
   from pyspark.sql.session import SparkSession
   from awsglue.context import GlueContext
   from awsglue.transforms import *
   from pyspark.sql.types import StructType, StructField, StringType, 
IntegerType, ArrayType, DoubleType, LongType
   import uuid
   from awsglue import DynamicFrame
   
   
   G2.x (4 vcpu and 16gb ram) - 20 workers
   
   # Initialize Spark session
   spark = SparkSession.builder \
       .appName("GlueKinesisToHudi") \
       .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer") \
       .config("spark.sql.hive.convertMetastoreParquet", "false") \
       .getOrCreate()
   
   sparkContext = spark.sparkContext
   glueContext = GlueContext(sparkContext)
   
   
   schema = # schema
   
   
   parsed_df_k = glueContext.create_data_frame.from_options(
       connection_type="kinesis",
       connection_options={
           "typeOfData": "kinesis",
           "streamARN": "# kinesis arn",
           "classification": "json",
           "startingPosition": "latest",
           "inferSchema": "true",
           "emitConsumerLagMetrics": "true",
           "maxFetchTimeInMs": 60000
       },
       transformation_ctx="dataframe_KinesisStream_node21_c",
   )
   
   
   def process_batch(spark_df,batch_id):
         if spark_df.count() > 0:
           dynamicframe = DynamicFrame.fromDF(spark_df, glueContext, 
"from_data_frame")
           parsed_df = dynamicframe.toDF()
           
           #column transformations her
           
           # Define Hudi options
           hudi_options = {
               "hoodie.table.name": "compression_15_run1",
               "hoodie.datasource.write.table.type": "COPY_ON_WRITE",
               "hoodie.datasource.write.operation": "upsert",
               "hoodie.datasource.write.recordkey.field": "id",
               "hoodie.datasource.write.partitionpath.field": 
"container_id,year,month,day",
               "hoodie.datasource.write.precombine.field": "timestamp",
               "hoodie.datasource.write.hive_style.partitioning": "true",
               "hoodie.upsert.shuffle.parallelism": 160,
               "hoodie.insert.shuffle.parallelism": 160,
               "hoodie.bulkinsert.shuffle.parallelism": 160,
               "hoodie.datasource.hive_sync.mode": "hms",
               'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
               'hoodie.datasource.hive_sync.database': 'glue_ctx_db',
               'hoodie.datasource.hive_sync.table': 'compression_15_run1',
               'hoodie.datasource.hive_sync.use_jdbc': 'false',
               'hoodie.datasource.hive_sync.partition_extractor_class': 
'org.apache.hudi.hive.MultiPartKeysValueExtractor',
               'hoodie.datasource.write.hive_style_partitioning': 'true',
               'hoodie.datasource.hive_sync.enable': 'true',
               'hoodie.index.type': 'SIMPLE',
               "hoodie.parquet.compression.codec": "gzip",
               "hoodie.cleaner.policy": "KEEP_LATEST_FILE_VERSIONS",
               "hoodie.cleaner.fileversions.retained": "1",
               "hoodie.parquet.compression.ratio": "0.95",
               'hoodie.copyonwrite.record.size.estimate': '2560',
               'hoodie.cleaner.commits.retained': 1,
               'hoodie.combine.before.insert': True,
               
           }
           
           warehouse_path = "s3://hudi/run_1/output"
           parsed_df.write.format("hudi") \
                   .options(**hudi_options) \
                   .mode("append") \
                   .save(warehouse_path)
       
   glueContext.forEachBatch(
       frame=parsed_df_k,
       batch_function=process_batch,
       options={
           "windowSize": "15 minutes",
           "checkpointLocation": "s3://hudi/run_1/checkpoint",
       },
   )
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to