Hi, This is what I could find in Spark's source code about the `recoverFromCheckpointLocation` flag (that led me to explore the complete output mode for dropDuplicates operator).
`recoverFromCheckpointLocation` flag is enabled by default and varies per sink (memory, console and others). * `memory` sink has the flag enabled for Complete output mode only * `foreach` sink has the flag always enabled * `console` sink has the flag always disabled * all other sinks have the flag always enabled As agreed with Michael (https://issues.apache.org/jira/browse/SPARK-21667) is to make console sink accepting the flag as enabled which would make memory sink the only one left with the flag enabled for Complete output. And I thought I've been close to understand Structured Streaming :) Pozdrawiam, Jacek Laskowski ---- https://medium.com/@jaceklaskowski/ Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Fri, Aug 18, 2017 at 9:21 PM, Holden Karau <hol...@pigscanfly.ca> wrote: > My assumption is it would be similar though, in memory sink of all of your > records would quickly overwhelm your cluster, but in aggregation it could be > reasonable. But there might be additional reasons on top of that. > > On Fri, Aug 18, 2017 at 11:44 AM Holden Karau <hol...@pigscanfly.ca> wrote: >> >> Ah yes I'm not sure about the workings of the memory sink. >> >> On Fri, Aug 18, 2017 at 11:36 AM, Jacek Laskowski <ja...@japila.pl> wrote: >>> >>> Hi Holden, >>> >>> Thanks a lot for a bit more light on the topic. That however does not >>> explain why memory sink requires Complete for a checkpoint location to >>> work. The only reason I used Complete output mode was to meet the >>> requirements of memory sink and that got me thinking why would the >>> already-memory-hungry memory sink require yet another thing to get the >>> query working. >>> >>> On to exploring the bits... >>> >>> Pozdrawiam, >>> Jacek Laskowski >>> ---- >>> https://medium.com/@jaceklaskowski/ >>> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark >>> Follow me at https://twitter.com/jaceklaskowski >>> >>> >>> On Fri, Aug 18, 2017 at 6:35 PM, Holden Karau <hol...@pigscanfly.ca> >>> wrote: >>> > So performing complete output without an aggregation would require >>> > building >>> > up a table of the entire input to write out at each micro batch. This >>> > would >>> > get prohibitively expensive quickly. With an aggregation we just need >>> > to >>> > keep track of the aggregates and update them every batch, so the memory >>> > requirement is more reasonable. >>> > >>> > (Note: I don't do a lot of work in streaming so there may be additional >>> > reasons, but these are the ones I remember from when I was working on >>> > looking at integrating ML with SS). >>> > >>> > On Fri, Aug 18, 2017 at 5:25 AM Jacek Laskowski <ja...@japila.pl> >>> > wrote: >>> >> >>> >> Hi, >>> >> >>> >> Why is the requirement for a streaming aggregation in a streaming >>> >> query? What would happen if Spark allowed Complete without a single >>> >> aggregation? This is the latest master. >>> >> >>> >> scala> val q = ids. >>> >> | writeStream. >>> >> | format("memory"). >>> >> | queryName("dups"). >>> >> | outputMode(OutputMode.Complete). // <-- memory sink supports >>> >> checkpointing for Complete output mode only >>> >> | trigger(Trigger.ProcessingTime(30.seconds)). >>> >> | option("checkpointLocation", "checkpoint-dir"). // <-- use >>> >> checkpointing to save state between restarts >>> >> | start >>> >> org.apache.spark.sql.AnalysisException: Complete output mode not >>> >> supported when there are no streaming aggregations on streaming >>> >> DataFrames/Datasets;; >>> >> Project [cast(time#10 as bigint) AS time#15L, id#6] >>> >> +- Deduplicate [id#6], true >>> >> +- Project [cast(time#5 as timestamp) AS time#10, id#6] >>> >> +- Project [_1#2 AS time#5, _2#3 AS id#6] >>> >> +- StreamingExecutionRelation MemoryStream[_1#2,_2#3], [_1#2, >>> >> _2#3] >>> >> >>> >> at >>> >> >>> >> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297) >>> >> at >>> >> >>> >> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:115) >>> >> at >>> >> >>> >> org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232) >>> >> at >>> >> >>> >> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278) >>> >> at >>> >> >>> >> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:249) >>> >> ... 57 elided >>> >> >>> >> Pozdrawiam, >>> >> Jacek Laskowski >>> >> ---- >>> >> https://medium.com/@jaceklaskowski/ >>> >> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark >>> >> Follow me at https://twitter.com/jaceklaskowski >>> >> >>> >> --------------------------------------------------------------------- >>> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> >> >>> > -- >>> > Cell : 425-233-8271 >>> > Twitter: https://twitter.com/holdenkarau >> >> >> >> >> -- >> Cell : 425-233-8271 >> Twitter: https://twitter.com/holdenkarau > > -- > Cell : 425-233-8271 > Twitter: https://twitter.com/holdenkarau --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org