Hi Holden,

Thanks a lot for a bit more light on the topic. That however does not
explain why memory sink requires Complete for a checkpoint location to
work. The only reason I used Complete output mode was to meet the
requirements of memory sink and that got me thinking why would the
already-memory-hungry memory sink require yet another thing to get the
query working.

On to exploring the bits...

Pozdrawiam,
Jacek Laskowski
----
https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Fri, Aug 18, 2017 at 6:35 PM, Holden Karau <hol...@pigscanfly.ca> wrote:
> So performing complete output without an aggregation would require building
> up a table of the entire input to write out at each micro batch. This would
> get prohibitively expensive quickly. With an aggregation we just need to
> keep track of the aggregates and update them every batch, so the memory
> requirement is more reasonable.
>
> (Note: I don't do a lot of work in streaming so there may be additional
> reasons, but these are the ones I remember from when I was working on
> looking at integrating ML with SS).
>
> On Fri, Aug 18, 2017 at 5:25 AM Jacek Laskowski <ja...@japila.pl> wrote:
>>
>> Hi,
>>
>> Why is the requirement for a streaming aggregation in a streaming
>> query? What would happen if Spark allowed Complete without a single
>> aggregation? This is the latest master.
>>
>> scala> val q = ids.
>>      |   writeStream.
>>      |   format("memory").
>>      |   queryName("dups").
>>      |   outputMode(OutputMode.Complete).  // <-- memory sink supports
>> checkpointing for Complete output mode only
>>      |   trigger(Trigger.ProcessingTime(30.seconds)).
>>      |   option("checkpointLocation", "checkpoint-dir"). // <-- use
>> checkpointing to save state between restarts
>>      |   start
>> org.apache.spark.sql.AnalysisException: Complete output mode not
>> supported when there are no streaming aggregations on streaming
>> DataFrames/Datasets;;
>> Project [cast(time#10 as bigint) AS time#15L, id#6]
>> +- Deduplicate [id#6], true
>>    +- Project [cast(time#5 as timestamp) AS time#10, id#6]
>>       +- Project [_1#2 AS time#5, _2#3 AS id#6]
>>          +- StreamingExecutionRelation MemoryStream[_1#2,_2#3], [_1#2,
>> _2#3]
>>
>>   at
>> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
>>   at
>> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:115)
>>   at
>> org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
>>   at
>> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
>>   at
>> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:249)
>>   ... 57 elided
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> ----
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to