rangareddy commented on issue #12108:
URL: https://github.com/apache/hudi/issues/12108#issuecomment-2426262408

   **Code:**
   
   ```sh
   
HUDI_SPARK_JAR="./packaging/hudi-spark-bundle/target/hudi-spark3.5-bundle_2.12-1.0.0-SNAPSHOT.jar"
   
   $SPARK_HOME/bin/pyspark \
   --deploy-mode client \
   --jars $HUDI_SPARK_JAR \
   --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
   --conf 
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
   --conf 
'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
 \
   --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' \
   --driver-memory 20g \
   --executor-memory 20g \
   --conf spark.driver.memoryOverhead=4g \
   --conf spark.executor.memoryOverhead=4g \
   --conf "spark.sql.hive.convertMetastoreParquet=false"
   ```
   
   ```
   from pyspark.sql.functions import *
   from pyspark.sql.types import *
   import dbldatagen as dg
   import json
   import uuid
   import random
   
   hudi_db = 'default'
   hudi_table = 'example-table'
   hudi_table=f'file:///tmp/hudi/{hudi_table}'
   hudi_checkpoint_path = f'{hudi_table}/checkpoint'
   hudi_table_path = f'{hudi_table}/tablepath' 
   
   schema = ArrayType(
       StructType([
           StructField("domain", StringType(), False),
           StructField("risk", StringType(), False),
           StructField("timestamp", TimestampType(), False),
       ])
   )
   
   multi_writer_id = 'datagen-writer1'
   
   @udf(returnType=StringType())
   def generate_domain(): 
       rand_domain = f"{random.randrange(10000000,20000000)}.com"
       return rand_domain
   
   @udf(returnType=LongType())
   def generate_timestamp():
       return random.randrange(1000000000,2000000000)
   
   @udf(returnType=StringType())
   def generate_risk():
       return json.dumps({"blaa":str(uuid.uuid4())})
   
   
   ds = (
        dg.DataGenerator(spark, name="test-data-set", partitions=1)
        .withColumn("offset", "long", minValue=1, maxValue=9999999, random=True)
        .withColumn("timestamp_", "timestamp", random=True)
        .build(withStreaming=True, options={'rowsPerSecond': 10000, 
'rampUpTimeSeconds':60})
        .withColumn("domain", generate_domain())
        .withColumn("timestamp", generate_timestamp())
        .withColumn("risk", generate_risk())
        .withColumnRenamed("timestamp_","kafka_timestamp")
       ) 
   
   df = (ds.select(col("offset"), col("kafka_timestamp"), col("domain"), 
col("timestamp"), col("risk")).na.drop())
   
   hudi_precombine_field = 'timestamp'
   hudi_recordkey_field = 'domain'
   
   hudi_options = {
       'hoodie.archive.async': True,
       'hoodie.clean.async.enabled': True,
       'hoodie.clean.automatic': True,
       'hoodie.cleaner.commits.retained': 10,
       'hoodie.keep.min.commits': 21,
       'hoodie.keep.max.commits': 30,
       'hoodie.clean.policy': 'KEEP_LATEST_COMMITS',
       'hoodie.clean.fileversions.retained': '2',
       'hoodie.clean.failed.writes.policy': 'LAZY',
       'hoodie.clustering.inline': True,
       'hoodie.clustering.inline.max.commits': 2,
       'hoodie.clustering.execution.strategy.class': 
'org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy',
       'hoodie.clustering.plan.strategy.class': 
'org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy',
       'hoodie.clustering.plan.strategy.sort.columns': hudi_recordkey_field,
       'hoodie.clustering.preserve.commit.metadata': True,
       'hoodie.clustering.rollback.pending.replacecommit.on.conflict': True,
       'hoodie.clustering.updates.strategy': 
'org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy',
       'hoodie.compact.inline.max.delta.commits': 2,
       'hoodie.datasource.read.incr.fallback.fulltablescan.enable': True,
       'hoodie.datasource.read.use.new.parquet.file.format': True,
       'hoodie.datasource.write.hive_style_partitioning': 'true',
       'hoodie.datasource.write.operation': 'upsert',
       'hoodie.datasource.write.precombine.field': hudi_precombine_field,
       'hoodie.datasource.write.reconcile.schema':'true',
       'hoodie.datasource.write.record.merger.impls': 
'org.apache.hudi.HoodieSparkRecordMerger',
       'hoodie.datasource.write.recordkey.field': hudi_recordkey_field,
       'hoodie.datasource.write.row.writer.enable': True,
       'hoodie.datasource.write.streaming.checkpoint.identifier': 
multi_writer_id,
       'hoodie.datasource.write.streaming.ignore.failed.batch': 'true',
       'hoodie.datasource.write.table.name': hudi_table,
       'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
       'hoodie.enable.data.skipping': True,
       'hoodie.index.type': 'RECORD_INDEX',
       'hoodie.logfile.data.block.format':'parquet',
       'hoodie.merge.use.record.positions': True,
       'hoodie.metadata.auto.initialize': True,
       'hoodie.metadata.enable': True,
       'hoodie.metadata.clean.async': True,
       'hoodie.metadata.index.async': False, # DO NOT SET TRUE!!! Record and 
column indexes will not be created!
       'hoodie.metadata.index.column.stats.columns': hudi_recordkey_field,
       'hoodie.metadata.index.column.stats.column.list': hudi_recordkey_field,
       'hoodie.metadata.index.column.stats.enable': True,
       'hoodie.metadata.record.index.enable': True,
       'hoodie.parquet.avro.write-old-list-structure':'false',
       'hoodie.parquet.compression.codec': 'snappy',
       'hoodie.record.index.use.caching':True,
       'hoodie.schema.on.read.enable': True,
       'hoodie.table.name': hudi_table,
       'hoodie.table.services.enabled': True,
       'checkpointLocation': hudi_checkpoint_path,
       'parquet.avro.write-old-list-structure': 'false',
       'path': hudi_table_path,
   }
   
   print(f"hudi_options={hudi_options}")
   
   df.writeStream \
      .format("org.apache.hudi") \
      .options(**hudi_options) \
      .outputMode("append") \
      .start() 
   
   spark.streams.awaitAnyTermination()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to