Ah yes I'm not sure about the workings of the memory sink.

On Fri, Aug 18, 2017 at 11:36 AM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi Holden,
>
> Thanks a lot for a bit more light on the topic. That however does not
> explain why memory sink requires Complete for a checkpoint location to
> work. The only reason I used Complete output mode was to meet the
> requirements of memory sink and that got me thinking why would the
> already-memory-hungry memory sink require yet another thing to get the
> query working.
>
> On to exploring the bits...
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Fri, Aug 18, 2017 at 6:35 PM, Holden Karau <hol...@pigscanfly.ca>
> wrote:
> > So performing complete output without an aggregation would require
> building
> > up a table of the entire input to write out at each micro batch. This
> would
> > get prohibitively expensive quickly. With an aggregation we just need to
> > keep track of the aggregates and update them every batch, so the memory
> > requirement is more reasonable.
> >
> > (Note: I don't do a lot of work in streaming so there may be additional
> > reasons, but these are the ones I remember from when I was working on
> > looking at integrating ML with SS).
> >
> > On Fri, Aug 18, 2017 at 5:25 AM Jacek Laskowski <ja...@japila.pl> wrote:
> >>
> >> Hi,
> >>
> >> Why is the requirement for a streaming aggregation in a streaming
> >> query? What would happen if Spark allowed Complete without a single
> >> aggregation? This is the latest master.
> >>
> >> scala> val q = ids.
> >>      |   writeStream.
> >>      |   format("memory").
> >>      |   queryName("dups").
> >>      |   outputMode(OutputMode.Complete).  // <-- memory sink supports
> >> checkpointing for Complete output mode only
> >>      |   trigger(Trigger.ProcessingTime(30.seconds)).
> >>      |   option("checkpointLocation", "checkpoint-dir"). // <-- use
> >> checkpointing to save state between restarts
> >>      |   start
> >> org.apache.spark.sql.AnalysisException: Complete output mode not
> >> supported when there are no streaming aggregations on streaming
> >> DataFrames/Datasets;;
> >> Project [cast(time#10 as bigint) AS time#15L, id#6]
> >> +- Deduplicate [id#6], true
> >>    +- Project [cast(time#5 as timestamp) AS time#10, id#6]
> >>       +- Project [_1#2 AS time#5, _2#3 AS id#6]
> >>          +- StreamingExecutionRelation MemoryStream[_1#2,_2#3], [_1#2,
> >> _2#3]
> >>
> >>   at
> >> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.
> org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$
> throwError(UnsupportedOperationChecker.scala:297)
> >>   at
> >> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.
> checkForStreaming(UnsupportedOperationChecker.scala:115)
> >>   at
> >> org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(
> StreamingQueryManager.scala:232)
> >>   at
> >> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(
> StreamingQueryManager.scala:278)
> >>   at
> >> org.apache.spark.sql.streaming.DataStreamWriter.
> start(DataStreamWriter.scala:249)
> >>   ... 57 elided
> >>
> >> Pozdrawiam,
> >> Jacek Laskowski
> >> ----
> >> https://medium.com/@jaceklaskowski/
> >> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
> >> Follow me at https://twitter.com/jaceklaskowski
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>
> > --
> > Cell : 425-233-8271
> > Twitter: https://twitter.com/holdenkarau
>



-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau

Reply via email to