[
https://issues.apache.org/jira/browse/SPARK-49127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17879657#comment-17879657
]
Wei Liu commented on SPARK-49127:
---------------------------------
A checkpoint define the streaming query progress. If you use the same
checkpoint as the last time (.writeStream.option("checkpointLocation", loc)),
you'll be able to restart it.
note that if you don't set this option, spark will create a random location for
you and try its best to delete it afterwards. So it's nearly impossible to
restart from the last offset in this case
> How to restart failed spark stream job from the failure point
> -------------------------------------------------------------
>
> Key: SPARK-49127
> URL: https://issues.apache.org/jira/browse/SPARK-49127
> Project: Spark
> Issue Type: Request
> Components: PySpark
> Affects Versions: 3.5.0
> Environment: GCP Dataproc cluster
> Reporter: Rajdeepak
> Priority: Blocker
>
> I am setting up a ETL process using pyspark. My input is a kafka stream and i
> am writing output to multiple sink (one into kafka and another into cloud
> storage). I am writing checkpoints on the cloud storage. The issue i am
> facing is that, whenever my application is getting failed due to some reason
> and when i am restarting my application then, my pyspark application is again
> reprocessing some (not all) of the input stream data causing data redundancy.
> Is there any way i can avoid this. I am using spark 3.5.0 and python 3.11.
> Below are some of my application code:
> Spark Session :
> spark = SparkSession \
> .builder \
> .appName("ETL") \
> .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2") \
> .config('spark.hadoop.fs.s3a.aws.credentials.provider',
> 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')\
> .config('spark.driver.extraJavaOptions', '-Duser.timezone=GMT') \
> .config('spark.executor.extraJavaOptions', '-Duser.timezone=GMT') \
> .config('spark.sql.session.timeZone', 'UTC') \
> .config('spark.hadoop.fs.s3a.buffer.dir', '/tmp,/mnt/tmp') \
> .config('spark.hadoop.fs.s3a.fast.upload.buffer', 'bytebuffer') \
> .config('spark.hadoop.fs.s3a.fast.upload.active.blocks', 1) \
> .config('spark.streaming.backpressure.enabled', True) \
> .config("spark.redis.host",conf["nosql-host"]) \
> .config("spark.redis.port",conf["nosql-port"]) \
> .config("spark.redis.db",conf["nosql-db"]) \
> .config("spark.redis.auth", __REDIS_CREDENTIAL__) \
> .getOrCreate()
>
> Kafka Read Stream :
> streamDF = (spark \
> .readStream \
> .format("kafka") \
> .option("kafka.bootstrap.servers", kafka_bootstrap_server_consumer) \
> .option("subscribe", kafka_topic_name) \
> .option("mode", "PERMISSIVE") \
> .option("startingOffsets", "earliest").option("failOnDataLoss", "false") \
> .load().withColumn('fixedValue', fn.expr("substring(value, 6,
> length(value)-5)")).select('fixedValue'))
>
> Write Stream to multiple sinks :
> write_stream = extractionDF \
> .writeStream \
> .trigger(processingTime='2 seconds') \
> .outputMode("append") \
> .foreachBatch(lambda df,epochId: write_to_multiple_sinks(df,
> epochId,processed_cloud_storage_path,kafka_bootstrap_server_producer)) \
> .option("truncate", "false").option("checkpointLocation", cloud_storage_path)\
> .start()
> write_to_multiple_sinks Function :
> def write_to_multiple_sinks(dataframe: DataFrame, epochId,cloud_storage_path,
> kafka_bootstrap_server):
> dataframe = dataframe.cache()
> druidDF = dataframe.select(druidSchema())
> druidDF.selectExpr(producerTopic,"to_json(struct(*)) AS value").write\
> .format("kafka")\
> .option("kafka.bootstrap.servers", kafka_bootstrap_server).save()
> processedDF = dataframe.select(processedSchema())
> processedDF.write.format("csv").mode("append").option("sep","^").option("compression","gzip").option("path",
> cloud_storage_path).save()
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]