[
https://issues.apache.org/jira/browse/SPARK-27456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dongjoon Hyun updated SPARK-27456:
----------------------------------
Affects Version/s: (was: 3.0.0)
3.1.0
> Support commitSync for offsets in DirectKafkaInputDStream
> ---------------------------------------------------------
>
> Key: SPARK-27456
> URL: https://issues.apache.org/jira/browse/SPARK-27456
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core
> Affects Versions: 3.1.0
> Reporter: Jackson Westeen
> Priority: Major
>
> Hello! I left a comment under SPARK-22486 but wasn't sure if it would get
> noticed as that one got closed; x-posting here.
> ----
> I'm trying to achieve "effectively once" semantics with Spark Streaming for
> batch writes to S3. Only way to do this is to partitionBy(startOffsets) in
> some way, such that re-writes on failure/retry are idempotent; they overwrite
> the past batch if failure occurred before commitAsync was successful.
>
> Here's my example:
> {code:java}
> stream.foreachRDD((rdd: ConsumerRecord[String, Array[Byte]]) => {
> val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
> // make dataset, with this batch's offsets included
> spark
> .createDataset(inputRdd)
> .map(record => from_json(new String(record.value))) // just for example
> .write
> .mode(SaveMode.Overwrite)
> .option("partitionOverwriteMode", "dynamic")
> .withColumn("dateKey", from_unixtime($"from_json.timestamp"), "yyyyMMDD"))
> .withColumn("startOffsets",
> lit(offsetRanges.sortBy(_.partition).map(_.fromOffset).mkString("_")) )
> .partitionBy("dateKey", "startOffsets")
> .parquet("s3://mybucket/kafka-parquet")
> stream.asInstanceOf[CanCommitOffsets].commitAsync...
> })
> {code}
> This almost works. The only issue is, I can still end up with
> duplicate/overlapping data if:
> # an initial write to S3 succeeds (batch A)
> # commitAsync takes a long time, eventually fails, *but the job carries on
> to successfully write another batch in the meantime (batch B)*
> # job fails for any reason, we start back at the last committed offsets,
> however now with more data in Kafka to process than before... (batch A' which
> includes A, B, ...)
> # we successfully overwrite the initial batch by startOffsets with (batch
> A') and progress as normal. No data is lost, however (batch B) is leftover in
> S3 and contains partially duplicate data.
> It would be very nice to have an atomic operation for write and
> commitOffsets, or be able to simulate one with commitSync in Spark Streaming
> :)
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]