[ https://issues.apache.org/jira/browse/SPARK-20103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Michael Armbrust updated SPARK-20103: ------------------------------------- Docs Text: (was: object StructuredStreaming { def main(args: Array[String]): Unit = { val db_url = "jdbc:postgresql://dynamic-milestone-dev.crv1otzbekk9.us-east-1.rds.amazonaws.com:5432/DYNAMICPOSTGRES?user=dsdbadmin&password=password" val spark = SparkSession .builder .appName("StructuredKafkaReader") .master("local[*]") .getOrCreate() spark.conf.set("spark.sql.streaming.checkpointLocation", "/tmp/checkpoint_research/") import spark.implicits._ val server = "10.205.82.113:9092" val topic = "checkpoint" val subscribeType="subscribe" val lines = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", server) .option(subscribeType, topic) .load().selectExpr("CAST(value AS STRING)").as[String] lines.printSchema() import org.apache.spark.sql.ForeachWriter val writer = new ForeachWriter[String] { def open(partitionId: Long, version: Long): Boolean = { println("After db props"); true } def process(value: String) = { val conn = DriverManager.getConnection(db_url) try{ conn.createStatement().executeUpdate("INSERT INTO PUBLIC.checkpoint1 VALUES ('"+value+"')") } finally { conn.close() } } def close(errorOrNull: Throwable) = {} } import scala.concurrent.duration._ val query1 = lines.writeStream .outputMode("append") .queryName("checkpoint1") .trigger(ProcessingTime(30.seconds)) .foreach(writer) .start() val writer2 = new ForeachWriter[String] { def open(partitionId: Long, version: Long): Boolean = { println("After db props"); true } def process(value: String) = { val conn = DriverManager.getConnection(db_url) try{ conn.createStatement().executeUpdate("INSERT INTO PUBLIC.checkpoint2 VALUES ('"+value+"')") } finally { conn.close() } } def close(errorOrNull: Throwable) = {} } import scala.concurrent.duration._ val query2 = lines.writeStream .outputMode("append") .queryName("checkpoint2") .trigger(ProcessingTime(30.seconds)) .foreach(writer2) .start() query2.awaitTermination() query1.awaitTermination() }}) > Spark structured steaming from kafka - last message processed again after > resume from checkpoint > ------------------------------------------------------------------------------------------------ > > Key: SPARK-20103 > URL: https://issues.apache.org/jira/browse/SPARK-20103 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.1.0 > Environment: Linux, Spark 2.10 > Reporter: Rajesh Mutha > Labels: spark, streaming > > When the application starts after a failure or a graceful shutdown, it is > consistently processing the last message of the previous batch even though it > was already processed correctly without failure. > We are making sure database writes are idempotent using postgres 9.6 feature. > Is this the default behavior of spark? I added a code snippet with 2 > streaming queries. One of the query is idempotent; since query2 is not > idempotent, we are seeing duplicate entries in table. > {code} > object StructuredStreaming { > def main(args: Array[String]): Unit = { > val db_url = > "jdbc:postgresql://dynamic-milestone-dev.crv1otzbekk9.us-east-1.rds.amazonaws.com:5432/DYNAMICPOSTGRES?user=dsdbadmin&password=password" > val spark = SparkSession > .builder > .appName("StructuredKafkaReader") > .master("local[*]") > .getOrCreate() > spark.conf.set("spark.sql.streaming.checkpointLocation", > "/tmp/checkpoint_research/") > import spark.implicits._ > val server = "10.205.82.113:9092" > val topic = "checkpoint" > val subscribeType="subscribe" > val lines = spark > .readStream > .format("kafka") > .option("kafka.bootstrap.servers", server) > .option(subscribeType, topic) > .load().selectExpr("CAST(value AS STRING)").as[String] > lines.printSchema() > import org.apache.spark.sql.ForeachWriter > val writer = new ForeachWriter[String] { > def open(partitionId: Long, version: Long): Boolean = { > println("After db props"); true > } > def process(value: String) = { > val conn = DriverManager.getConnection(db_url) > try{ > conn.createStatement().executeUpdate("INSERT INTO > PUBLIC.checkpoint1 VALUES ('"+value+"')") > } > finally { > conn.close() > } > } > def close(errorOrNull: Throwable) = {} > } > import scala.concurrent.duration._ > val query1 = lines.writeStream > .outputMode("append") > .queryName("checkpoint1") > .trigger(ProcessingTime(30.seconds)) > .foreach(writer) > .start() > val writer2 = new ForeachWriter[String] { > def open(partitionId: Long, version: Long): Boolean = { > println("After db props"); true > } > def process(value: String) = { > val conn = DriverManager.getConnection(db_url) > try{ > conn.createStatement().executeUpdate("INSERT INTO > PUBLIC.checkpoint2 VALUES ('"+value+"')") > } > finally { > conn.close() > } > } > def close(errorOrNull: Throwable) = {} > } > import scala.concurrent.duration._ > val query2 = lines.writeStream > .outputMode("append") > .queryName("checkpoint2") > .trigger(ProcessingTime(30.seconds)) > .foreach(writer2) > .start() > query2.awaitTermination() > query1.awaitTermination() > }} > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org