[
https://issues.apache.org/jira/browse/SPARK-20103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Michael Armbrust updated SPARK-20103:
-------------------------------------
Fix Version/s: 2.2.0
> Spark structured steaming from kafka - last message processed again after
> resume from checkpoint
> ------------------------------------------------------------------------------------------------
>
> Key: SPARK-20103
> URL: https://issues.apache.org/jira/browse/SPARK-20103
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.1.0
> Environment: Linux, Spark 2.10
> Reporter: Rajesh Mutha
> Labels: spark, streaming
> Fix For: 2.2.0
>
>
> When the application starts after a failure or a graceful shutdown, it is
> consistently processing the last message of the previous batch even though it
> was already processed correctly without failure.
> We are making sure database writes are idempotent using postgres 9.6 feature.
> Is this the default behavior of spark? I added a code snippet with 2
> streaming queries. One of the query is idempotent; since query2 is not
> idempotent, we are seeing duplicate entries in table.
> {code}
> object StructuredStreaming {
> def main(args: Array[String]): Unit = {
> val db_url =
> "jdbc:postgresql://dynamic-milestone-dev.crv1otzbekk9.us-east-1.rds.amazonaws.com:5432/DYNAMICPOSTGRES?user=dsdbadmin&password=password"
> val spark = SparkSession
> .builder
> .appName("StructuredKafkaReader")
> .master("local[*]")
> .getOrCreate()
> spark.conf.set("spark.sql.streaming.checkpointLocation",
> "/tmp/checkpoint_research/")
> import spark.implicits._
> val server = "10.205.82.113:9092"
> val topic = "checkpoint"
> val subscribeType="subscribe"
> val lines = spark
> .readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", server)
> .option(subscribeType, topic)
> .load().selectExpr("CAST(value AS STRING)").as[String]
> lines.printSchema()
> import org.apache.spark.sql.ForeachWriter
> val writer = new ForeachWriter[String] {
> def open(partitionId: Long, version: Long): Boolean = {
> println("After db props"); true
> }
> def process(value: String) = {
> val conn = DriverManager.getConnection(db_url)
> try{
> conn.createStatement().executeUpdate("INSERT INTO
> PUBLIC.checkpoint1 VALUES ('"+value+"')")
> }
> finally {
> conn.close()
> }
> }
> def close(errorOrNull: Throwable) = {}
> }
> import scala.concurrent.duration._
> val query1 = lines.writeStream
> .outputMode("append")
> .queryName("checkpoint1")
> .trigger(ProcessingTime(30.seconds))
> .foreach(writer)
> .start()
> val writer2 = new ForeachWriter[String] {
> def open(partitionId: Long, version: Long): Boolean = {
> println("After db props"); true
> }
> def process(value: String) = {
> val conn = DriverManager.getConnection(db_url)
> try{
> conn.createStatement().executeUpdate("INSERT INTO
> PUBLIC.checkpoint2 VALUES ('"+value+"')")
> }
> finally {
> conn.close()
> }
> }
> def close(errorOrNull: Throwable) = {}
> }
> import scala.concurrent.duration._
> val query2 = lines.writeStream
> .outputMode("append")
> .queryName("checkpoint2")
> .trigger(ProcessingTime(30.seconds))
> .foreach(writer2)
> .start()
> query2.awaitTermination()
> query1.awaitTermination()
> }}
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]