Ah yes I'm not sure about the workings of the memory sink. On Fri, Aug 18, 2017 at 11:36 AM, Jacek Laskowski <ja...@japila.pl> wrote:
> Hi Holden, > > Thanks a lot for a bit more light on the topic. That however does not > explain why memory sink requires Complete for a checkpoint location to > work. The only reason I used Complete output mode was to meet the > requirements of memory sink and that got me thinking why would the > already-memory-hungry memory sink require yet another thing to get the > query working. > > On to exploring the bits... > > Pozdrawiam, > Jacek Laskowski > ---- > https://medium.com/@jaceklaskowski/ > Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark > Follow me at https://twitter.com/jaceklaskowski > > > On Fri, Aug 18, 2017 at 6:35 PM, Holden Karau <hol...@pigscanfly.ca> > wrote: > > So performing complete output without an aggregation would require > building > > up a table of the entire input to write out at each micro batch. This > would > > get prohibitively expensive quickly. With an aggregation we just need to > > keep track of the aggregates and update them every batch, so the memory > > requirement is more reasonable. > > > > (Note: I don't do a lot of work in streaming so there may be additional > > reasons, but these are the ones I remember from when I was working on > > looking at integrating ML with SS). > > > > On Fri, Aug 18, 2017 at 5:25 AM Jacek Laskowski <ja...@japila.pl> wrote: > >> > >> Hi, > >> > >> Why is the requirement for a streaming aggregation in a streaming > >> query? What would happen if Spark allowed Complete without a single > >> aggregation? This is the latest master. > >> > >> scala> val q = ids. > >> | writeStream. > >> | format("memory"). > >> | queryName("dups"). > >> | outputMode(OutputMode.Complete). // <-- memory sink supports > >> checkpointing for Complete output mode only > >> | trigger(Trigger.ProcessingTime(30.seconds)). > >> | option("checkpointLocation", "checkpoint-dir"). // <-- use > >> checkpointing to save state between restarts > >> | start > >> org.apache.spark.sql.AnalysisException: Complete output mode not > >> supported when there are no streaming aggregations on streaming > >> DataFrames/Datasets;; > >> Project [cast(time#10 as bigint) AS time#15L, id#6] > >> +- Deduplicate [id#6], true > >> +- Project [cast(time#5 as timestamp) AS time#10, id#6] > >> +- Project [_1#2 AS time#5, _2#3 AS id#6] > >> +- StreamingExecutionRelation MemoryStream[_1#2,_2#3], [_1#2, > >> _2#3] > >> > >> at > >> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$. > org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$ > throwError(UnsupportedOperationChecker.scala:297) > >> at > >> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$. > checkForStreaming(UnsupportedOperationChecker.scala:115) > >> at > >> org.apache.spark.sql.streaming.StreamingQueryManager.createQuery( > StreamingQueryManager.scala:232) > >> at > >> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery( > StreamingQueryManager.scala:278) > >> at > >> org.apache.spark.sql.streaming.DataStreamWriter. > start(DataStreamWriter.scala:249) > >> ... 57 elided > >> > >> Pozdrawiam, > >> Jacek Laskowski > >> ---- > >> https://medium.com/@jaceklaskowski/ > >> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark > >> Follow me at https://twitter.com/jaceklaskowski > >> > >> --------------------------------------------------------------------- > >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >> > > -- > > Cell : 425-233-8271 > > Twitter: https://twitter.com/holdenkarau > -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau