Saisai Shao created SPARK-23844: ----------------------------------- Summary: Socket Stream recovering from checkpoint will throw exception Key: SPARK-23844 URL: https://issues.apache.org/jira/browse/SPARK-23844 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Saisai Shao
When we specified checkpoint location as well as using socket streaming, it will throw exception after rerun: {noformat} 18/04/02 14:11:28 ERROR MicroBatchExecution: Query test [id = c5ca82b2-550b-4c3d-9127-869f1aeae477, runId = 552d5bd4-a7e7-44e5-a85a-2f04f666ff6a] terminated with error java.lang.RuntimeException: Offsets committed out of order: 0 followed by -1 at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.execution.streaming.sources.TextSocketMicroBatchReader.commit(socket.scala:196) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(MicroBatchExecution.scala:373) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(MicroBatchExecution.scala:370) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcV$sp(MicroBatchExecution.scala:370) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:353) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:353) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:353) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:142) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:135) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:135) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:135) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:131) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189){noformat} Basically it means that {{TextSocketMicroBatchReader}} is honoring the offsets recovered from checkpoint, this is not correct for socket source, as it doesn't support recovering from checkpoint. Even though the offset is recovered, the real data is still unmatched from this offset. To reproduce this issue, {code:java} val socket = spark.readStream.format("socket").options(Map("host" -> "localhost", "port" -> "9999")).load spark.conf.set("spark.sql.streaming.checkpointLocation", "./checkpoint") socket.writeStream.format("parquet").option("path", "./result").queryName("test").start {code} This will be failed in the second run. Though this source is not supported from in production envs, I think still we should make sure it can be worked in test env without throwing runtime exception. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org