Hi Holden, Thanks a lot for a bit more light on the topic. That however does not explain why memory sink requires Complete for a checkpoint location to work. The only reason I used Complete output mode was to meet the requirements of memory sink and that got me thinking why would the already-memory-hungry memory sink require yet another thing to get the query working.
On to exploring the bits... Pozdrawiam, Jacek Laskowski ---- https://medium.com/@jaceklaskowski/ Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Fri, Aug 18, 2017 at 6:35 PM, Holden Karau <hol...@pigscanfly.ca> wrote: > So performing complete output without an aggregation would require building > up a table of the entire input to write out at each micro batch. This would > get prohibitively expensive quickly. With an aggregation we just need to > keep track of the aggregates and update them every batch, so the memory > requirement is more reasonable. > > (Note: I don't do a lot of work in streaming so there may be additional > reasons, but these are the ones I remember from when I was working on > looking at integrating ML with SS). > > On Fri, Aug 18, 2017 at 5:25 AM Jacek Laskowski <ja...@japila.pl> wrote: >> >> Hi, >> >> Why is the requirement for a streaming aggregation in a streaming >> query? What would happen if Spark allowed Complete without a single >> aggregation? This is the latest master. >> >> scala> val q = ids. >> | writeStream. >> | format("memory"). >> | queryName("dups"). >> | outputMode(OutputMode.Complete). // <-- memory sink supports >> checkpointing for Complete output mode only >> | trigger(Trigger.ProcessingTime(30.seconds)). >> | option("checkpointLocation", "checkpoint-dir"). // <-- use >> checkpointing to save state between restarts >> | start >> org.apache.spark.sql.AnalysisException: Complete output mode not >> supported when there are no streaming aggregations on streaming >> DataFrames/Datasets;; >> Project [cast(time#10 as bigint) AS time#15L, id#6] >> +- Deduplicate [id#6], true >> +- Project [cast(time#5 as timestamp) AS time#10, id#6] >> +- Project [_1#2 AS time#5, _2#3 AS id#6] >> +- StreamingExecutionRelation MemoryStream[_1#2,_2#3], [_1#2, >> _2#3] >> >> at >> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297) >> at >> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:115) >> at >> org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232) >> at >> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278) >> at >> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:249) >> ... 57 elided >> >> Pozdrawiam, >> Jacek Laskowski >> ---- >> https://medium.com/@jaceklaskowski/ >> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark >> Follow me at https://twitter.com/jaceklaskowski >> >> --------------------------------------------------------------------- >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> > -- > Cell : 425-233-8271 > Twitter: https://twitter.com/holdenkarau --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org