[
https://issues.apache.org/jira/browse/SPARK-20103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Michael Armbrust updated SPARK-20103:
-------------------------------------
Docs Text: (was: object StructuredStreaming {
def main(args: Array[String]): Unit = {
val db_url =
"jdbc:postgresql://dynamic-milestone-dev.crv1otzbekk9.us-east-1.rds.amazonaws.com:5432/DYNAMICPOSTGRES?user=dsdbadmin&password=password"
val spark = SparkSession
.builder
.appName("StructuredKafkaReader")
.master("local[*]")
.getOrCreate()
spark.conf.set("spark.sql.streaming.checkpointLocation",
"/tmp/checkpoint_research/")
import spark.implicits._
val server = "10.205.82.113:9092"
val topic = "checkpoint"
val subscribeType="subscribe"
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", server)
.option(subscribeType, topic)
.load().selectExpr("CAST(value AS STRING)").as[String]
lines.printSchema()
import org.apache.spark.sql.ForeachWriter
val writer = new ForeachWriter[String] {
def open(partitionId: Long, version: Long): Boolean = {
println("After db props"); true
}
def process(value: String) = {
val conn = DriverManager.getConnection(db_url)
try{
conn.createStatement().executeUpdate("INSERT INTO PUBLIC.checkpoint1
VALUES ('"+value+"')")
}
finally {
conn.close()
}
}
def close(errorOrNull: Throwable) = {}
}
import scala.concurrent.duration._
val query1 = lines.writeStream
.outputMode("append")
.queryName("checkpoint1")
.trigger(ProcessingTime(30.seconds))
.foreach(writer)
.start()
val writer2 = new ForeachWriter[String] {
def open(partitionId: Long, version: Long): Boolean = {
println("After db props"); true
}
def process(value: String) = {
val conn = DriverManager.getConnection(db_url)
try{
conn.createStatement().executeUpdate("INSERT INTO PUBLIC.checkpoint2
VALUES ('"+value+"')")
}
finally {
conn.close()
}
}
def close(errorOrNull: Throwable) = {}
}
import scala.concurrent.duration._
val query2 = lines.writeStream
.outputMode("append")
.queryName("checkpoint2")
.trigger(ProcessingTime(30.seconds))
.foreach(writer2)
.start()
query2.awaitTermination()
query1.awaitTermination()
}})
> Spark structured steaming from kafka - last message processed again after
> resume from checkpoint
> ------------------------------------------------------------------------------------------------
>
> Key: SPARK-20103
> URL: https://issues.apache.org/jira/browse/SPARK-20103
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.1.0
> Environment: Linux, Spark 2.10
> Reporter: Rajesh Mutha
> Labels: spark, streaming
>
> When the application starts after a failure or a graceful shutdown, it is
> consistently processing the last message of the previous batch even though it
> was already processed correctly without failure.
> We are making sure database writes are idempotent using postgres 9.6 feature.
> Is this the default behavior of spark? I added a code snippet with 2
> streaming queries. One of the query is idempotent; since query2 is not
> idempotent, we are seeing duplicate entries in table.
> {code}
> object StructuredStreaming {
> def main(args: Array[String]): Unit = {
> val db_url =
> "jdbc:postgresql://dynamic-milestone-dev.crv1otzbekk9.us-east-1.rds.amazonaws.com:5432/DYNAMICPOSTGRES?user=dsdbadmin&password=password"
> val spark = SparkSession
> .builder
> .appName("StructuredKafkaReader")
> .master("local[*]")
> .getOrCreate()
> spark.conf.set("spark.sql.streaming.checkpointLocation",
> "/tmp/checkpoint_research/")
> import spark.implicits._
> val server = "10.205.82.113:9092"
> val topic = "checkpoint"
> val subscribeType="subscribe"
> val lines = spark
> .readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", server)
> .option(subscribeType, topic)
> .load().selectExpr("CAST(value AS STRING)").as[String]
> lines.printSchema()
> import org.apache.spark.sql.ForeachWriter
> val writer = new ForeachWriter[String] {
> def open(partitionId: Long, version: Long): Boolean = {
> println("After db props"); true
> }
> def process(value: String) = {
> val conn = DriverManager.getConnection(db_url)
> try{
> conn.createStatement().executeUpdate("INSERT INTO
> PUBLIC.checkpoint1 VALUES ('"+value+"')")
> }
> finally {
> conn.close()
> }
> }
> def close(errorOrNull: Throwable) = {}
> }
> import scala.concurrent.duration._
> val query1 = lines.writeStream
> .outputMode("append")
> .queryName("checkpoint1")
> .trigger(ProcessingTime(30.seconds))
> .foreach(writer)
> .start()
> val writer2 = new ForeachWriter[String] {
> def open(partitionId: Long, version: Long): Boolean = {
> println("After db props"); true
> }
> def process(value: String) = {
> val conn = DriverManager.getConnection(db_url)
> try{
> conn.createStatement().executeUpdate("INSERT INTO
> PUBLIC.checkpoint2 VALUES ('"+value+"')")
> }
> finally {
> conn.close()
> }
> }
> def close(errorOrNull: Throwable) = {}
> }
> import scala.concurrent.duration._
> val query2 = lines.writeStream
> .outputMode("append")
> .queryName("checkpoint2")
> .trigger(ProcessingTime(30.seconds))
> .foreach(writer2)
> .start()
> query2.awaitTermination()
> query1.awaitTermination()
> }}
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]