[ 
https://issues.apache.org/jira/browse/SPARK-20103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-20103:
-------------------------------------
    Docs Text:   (was: object StructuredStreaming {
  def main(args: Array[String]): Unit = {
    val db_url = 
"jdbc:postgresql://dynamic-milestone-dev.crv1otzbekk9.us-east-1.rds.amazonaws.com:5432/DYNAMICPOSTGRES?user=dsdbadmin&password=password"
    val spark = SparkSession
      .builder
      .appName("StructuredKafkaReader")
      .master("local[*]")
      .getOrCreate()
    spark.conf.set("spark.sql.streaming.checkpointLocation", 
"/tmp/checkpoint_research/")
    import spark.implicits._
    val server = "10.205.82.113:9092"
    val topic = "checkpoint"
    val subscribeType="subscribe"
    val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", server)
      .option(subscribeType, topic)
      .load().selectExpr("CAST(value AS STRING)").as[String]
    lines.printSchema()
    import org.apache.spark.sql.ForeachWriter
    val writer = new ForeachWriter[String] {
       def open(partitionId: Long, version: Long):  Boolean = {
         println("After db props"); true
       }
       def process(value: String) = {
         val conn = DriverManager.getConnection(db_url)
         try{
           conn.createStatement().executeUpdate("INSERT INTO PUBLIC.checkpoint1 
VALUES ('"+value+"')")
         }
         finally {
           conn.close()
         }
      }
       def close(errorOrNull: Throwable) = {}
    }
    import scala.concurrent.duration._
    val query1 = lines.writeStream
                 .outputMode("append")
                 .queryName("checkpoint1")
                 .trigger(ProcessingTime(30.seconds))
                 .foreach(writer)
                 .start()
 val writer2 = new ForeachWriter[String] {
      def open(partitionId: Long, version: Long):  Boolean = {
        println("After db props"); true
      }
      def process(value: String) = {
        val conn = DriverManager.getConnection(db_url)
        try{
          conn.createStatement().executeUpdate("INSERT INTO PUBLIC.checkpoint2 
VALUES ('"+value+"')")
        }
        finally {
          conn.close()
        }
   }
      def close(errorOrNull: Throwable) = {}
    }
    import scala.concurrent.duration._
    val query2 = lines.writeStream
      .outputMode("append")
      .queryName("checkpoint2")
      .trigger(ProcessingTime(30.seconds))
      .foreach(writer2)
      .start()
    query2.awaitTermination()
    query1.awaitTermination()
}})

> Spark structured steaming from kafka - last message processed again after 
> resume from checkpoint
> ------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-20103
>                 URL: https://issues.apache.org/jira/browse/SPARK-20103
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.1.0
>         Environment: Linux, Spark 2.10 
>            Reporter: Rajesh Mutha
>              Labels: spark, streaming
>
> When the application starts after a failure or a graceful shutdown, it is 
> consistently processing the last message of the previous batch even though it 
> was already processed correctly without failure.
> We are making sure database writes are idempotent using postgres 9.6 feature. 
> Is this the default behavior of spark? I added a code snippet with 2 
> streaming queries. One of the query is idempotent; since query2 is not 
> idempotent, we are seeing duplicate entries in table. 
> {code}
> object StructuredStreaming {
>   def main(args: Array[String]): Unit = {
>     val db_url = 
> "jdbc:postgresql://dynamic-milestone-dev.crv1otzbekk9.us-east-1.rds.amazonaws.com:5432/DYNAMICPOSTGRES?user=dsdbadmin&password=password"
>     val spark = SparkSession
>       .builder
>       .appName("StructuredKafkaReader")
>       .master("local[*]")
>       .getOrCreate()
>     spark.conf.set("spark.sql.streaming.checkpointLocation", 
> "/tmp/checkpoint_research/")
>     import spark.implicits._
>     val server = "10.205.82.113:9092"
>     val topic = "checkpoint"
>     val subscribeType="subscribe"
>     val lines = spark
>       .readStream
>       .format("kafka")
>       .option("kafka.bootstrap.servers", server)
>       .option(subscribeType, topic)
>       .load().selectExpr("CAST(value AS STRING)").as[String]
>     lines.printSchema()
>     import org.apache.spark.sql.ForeachWriter
>     val writer = new ForeachWriter[String] {
>        def open(partitionId: Long, version: Long):  Boolean = {
>          println("After db props"); true
>        }
>        def process(value: String) = {
>          val conn = DriverManager.getConnection(db_url)
>          try{
>            conn.createStatement().executeUpdate("INSERT INTO 
> PUBLIC.checkpoint1 VALUES ('"+value+"')")
>          }
>          finally {
>            conn.close()
>          }
>       }
>        def close(errorOrNull: Throwable) = {}
>     }
>     import scala.concurrent.duration._
>     val query1 = lines.writeStream
>                  .outputMode("append")
>                  .queryName("checkpoint1")
>                  .trigger(ProcessingTime(30.seconds))
>                  .foreach(writer)
>                  .start()
>  val writer2 = new ForeachWriter[String] {
>       def open(partitionId: Long, version: Long):  Boolean = {
>         println("After db props"); true
>       }
>       def process(value: String) = {
>         val conn = DriverManager.getConnection(db_url)
>         try{
>           conn.createStatement().executeUpdate("INSERT INTO 
> PUBLIC.checkpoint2 VALUES ('"+value+"')")
>         }
>         finally {
>           conn.close()
>         }
>    }
>       def close(errorOrNull: Throwable) = {}
>     }
>     import scala.concurrent.duration._
>     val query2 = lines.writeStream
>       .outputMode("append")
>       .queryName("checkpoint2")
>       .trigger(ProcessingTime(30.seconds))
>       .foreach(writer2)
>       .start()
>     query2.awaitTermination()
>     query1.awaitTermination()
> }}
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to