So performing complete output without an aggregation would require building
up a table of the entire input to write out at each micro batch. This would
get prohibitively expensive quickly. With an aggregation we just need to
keep track of the aggregates and update them every batch, so the memory
requirement is more reasonable.

(Note: I don't do a lot of work in streaming so there may be additional
reasons, but these are the ones I remember from when I was working on
looking at integrating ML with SS).

On Fri, Aug 18, 2017 at 5:25 AM Jacek Laskowski <ja...@japila.pl> wrote:

> Hi,
>
> Why is the requirement for a streaming aggregation in a streaming
> query? What would happen if Spark allowed Complete without a single
> aggregation? This is the latest master.
>
> scala> val q = ids.
>      |   writeStream.
>      |   format("memory").
>      |   queryName("dups").
>      |   outputMode(OutputMode.Complete).  // <-- memory sink supports
> checkpointing for Complete output mode only
>      |   trigger(Trigger.ProcessingTime(30.seconds)).
>      |   option("checkpointLocation", "checkpoint-dir"). // <-- use
> checkpointing to save state between restarts
>      |   start
> org.apache.spark.sql.AnalysisException: Complete output mode not
> supported when there are no streaming aggregations on streaming
> DataFrames/Datasets;;
> Project [cast(time#10 as bigint) AS time#15L, id#6]
> +- Deduplicate [id#6], true
>    +- Project [cast(time#5 as timestamp) AS time#10, id#6]
>       +- Project [_1#2 AS time#5, _2#3 AS id#6]
>          +- StreamingExecutionRelation MemoryStream[_1#2,_2#3], [_1#2,
> _2#3]
>
>   at
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
>   at
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:115)
>   at
> org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
>   at
> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
>   at
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:249)
>   ... 57 elided
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau

Reply via email to