rangareddy commented on issue #12108:
URL: https://github.com/apache/hudi/issues/12108#issuecomment-2426262408
**Code:**
```sh
HUDI_SPARK_JAR="./packaging/hudi-spark-bundle/target/hudi-spark3.5-bundle_2.12-1.0.0-SNAPSHOT.jar"
$SPARK_HOME/bin/pyspark \
--deploy-mode client \
--jars $HUDI_SPARK_JAR \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf
'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
\
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' \
--driver-memory 20g \
--executor-memory 20g \
--conf spark.driver.memoryOverhead=4g \
--conf spark.executor.memoryOverhead=4g \
--conf "spark.sql.hive.convertMetastoreParquet=false"
```
```
from pyspark.sql.functions import *
from pyspark.sql.types import *
import dbldatagen as dg
import json
import uuid
import random
hudi_db = 'default'
hudi_table = 'example-table'
hudi_table=f'file:///tmp/hudi/{hudi_table}'
hudi_checkpoint_path = f'{hudi_table}/checkpoint'
hudi_table_path = f'{hudi_table}/tablepath'
schema = ArrayType(
StructType([
StructField("domain", StringType(), False),
StructField("risk", StringType(), False),
StructField("timestamp", TimestampType(), False),
])
)
multi_writer_id = 'datagen-writer1'
@udf(returnType=StringType())
def generate_domain():
rand_domain = f"{random.randrange(10000000,20000000)}.com"
return rand_domain
@udf(returnType=LongType())
def generate_timestamp():
return random.randrange(1000000000,2000000000)
@udf(returnType=StringType())
def generate_risk():
return json.dumps({"blaa":str(uuid.uuid4())})
ds = (
dg.DataGenerator(spark, name="test-data-set", partitions=1)
.withColumn("offset", "long", minValue=1, maxValue=9999999, random=True)
.withColumn("timestamp_", "timestamp", random=True)
.build(withStreaming=True, options={'rowsPerSecond': 10000,
'rampUpTimeSeconds':60})
.withColumn("domain", generate_domain())
.withColumn("timestamp", generate_timestamp())
.withColumn("risk", generate_risk())
.withColumnRenamed("timestamp_","kafka_timestamp")
)
df = (ds.select(col("offset"), col("kafka_timestamp"), col("domain"),
col("timestamp"), col("risk")).na.drop())
hudi_precombine_field = 'timestamp'
hudi_recordkey_field = 'domain'
hudi_options = {
'hoodie.archive.async': True,
'hoodie.clean.async.enabled': True,
'hoodie.clean.automatic': True,
'hoodie.cleaner.commits.retained': 10,
'hoodie.keep.min.commits': 21,
'hoodie.keep.max.commits': 30,
'hoodie.clean.policy': 'KEEP_LATEST_COMMITS',
'hoodie.clean.fileversions.retained': '2',
'hoodie.clean.failed.writes.policy': 'LAZY',
'hoodie.clustering.inline': True,
'hoodie.clustering.inline.max.commits': 2,
'hoodie.clustering.execution.strategy.class':
'org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy',
'hoodie.clustering.plan.strategy.class':
'org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy',
'hoodie.clustering.plan.strategy.sort.columns': hudi_recordkey_field,
'hoodie.clustering.preserve.commit.metadata': True,
'hoodie.clustering.rollback.pending.replacecommit.on.conflict': True,
'hoodie.clustering.updates.strategy':
'org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy',
'hoodie.compact.inline.max.delta.commits': 2,
'hoodie.datasource.read.incr.fallback.fulltablescan.enable': True,
'hoodie.datasource.read.use.new.parquet.file.format': True,
'hoodie.datasource.write.hive_style_partitioning': 'true',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': hudi_precombine_field,
'hoodie.datasource.write.reconcile.schema':'true',
'hoodie.datasource.write.record.merger.impls':
'org.apache.hudi.HoodieSparkRecordMerger',
'hoodie.datasource.write.recordkey.field': hudi_recordkey_field,
'hoodie.datasource.write.row.writer.enable': True,
'hoodie.datasource.write.streaming.checkpoint.identifier':
multi_writer_id,
'hoodie.datasource.write.streaming.ignore.failed.batch': 'true',
'hoodie.datasource.write.table.name': hudi_table,
'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
'hoodie.enable.data.skipping': True,
'hoodie.index.type': 'RECORD_INDEX',
'hoodie.logfile.data.block.format':'parquet',
'hoodie.merge.use.record.positions': True,
'hoodie.metadata.auto.initialize': True,
'hoodie.metadata.enable': True,
'hoodie.metadata.clean.async': True,
'hoodie.metadata.index.async': False, # DO NOT SET TRUE!!! Record and
column indexes will not be created!
'hoodie.metadata.index.column.stats.columns': hudi_recordkey_field,
'hoodie.metadata.index.column.stats.column.list': hudi_recordkey_field,
'hoodie.metadata.index.column.stats.enable': True,
'hoodie.metadata.record.index.enable': True,
'hoodie.parquet.avro.write-old-list-structure':'false',
'hoodie.parquet.compression.codec': 'snappy',
'hoodie.record.index.use.caching':True,
'hoodie.schema.on.read.enable': True,
'hoodie.table.name': hudi_table,
'hoodie.table.services.enabled': True,
'checkpointLocation': hudi_checkpoint_path,
'parquet.avro.write-old-list-structure': 'false',
'path': hudi_table_path,
}
print(f"hudi_options={hudi_options}")
df.writeStream \
.format("org.apache.hudi") \
.options(**hudi_options) \
.outputMode("append") \
.start()
spark.streams.awaitAnyTermination()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]