Hi,

This is what I could find in Spark's source code about the
`recoverFromCheckpointLocation` flag (that led me to explore the
complete output mode for dropDuplicates operator).

`recoverFromCheckpointLocation` flag is enabled by default and varies
per sink (memory, console and others).

* `memory` sink has the flag enabled for Complete output mode only

* `foreach` sink has the flag always enabled

* `console` sink has the flag always disabled

* all other sinks have the flag always enabled

As agreed with Michael
(https://issues.apache.org/jira/browse/SPARK-21667) is to make console
sink accepting the flag as enabled which would make memory sink the
only one left with the flag enabled for Complete output.

And I thought I've been close to understand Structured Streaming :)

Pozdrawiam,
Jacek Laskowski
----
https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Fri, Aug 18, 2017 at 9:21 PM, Holden Karau <hol...@pigscanfly.ca> wrote:
> My assumption is it would be similar though, in memory sink of all of your
> records would quickly overwhelm your cluster, but in aggregation it could be
> reasonable. But there might be additional reasons on top of that.
>
> On Fri, Aug 18, 2017 at 11:44 AM Holden Karau <hol...@pigscanfly.ca> wrote:
>>
>> Ah yes I'm not sure about the workings of the memory sink.
>>
>> On Fri, Aug 18, 2017 at 11:36 AM, Jacek Laskowski <ja...@japila.pl> wrote:
>>>
>>> Hi Holden,
>>>
>>> Thanks a lot for a bit more light on the topic. That however does not
>>> explain why memory sink requires Complete for a checkpoint location to
>>> work. The only reason I used Complete output mode was to meet the
>>> requirements of memory sink and that got me thinking why would the
>>> already-memory-hungry memory sink require yet another thing to get the
>>> query working.
>>>
>>> On to exploring the bits...
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> ----
>>> https://medium.com/@jaceklaskowski/
>>> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
>>> Follow me at https://twitter.com/jaceklaskowski
>>>
>>>
>>> On Fri, Aug 18, 2017 at 6:35 PM, Holden Karau <hol...@pigscanfly.ca>
>>> wrote:
>>> > So performing complete output without an aggregation would require
>>> > building
>>> > up a table of the entire input to write out at each micro batch. This
>>> > would
>>> > get prohibitively expensive quickly. With an aggregation we just need
>>> > to
>>> > keep track of the aggregates and update them every batch, so the memory
>>> > requirement is more reasonable.
>>> >
>>> > (Note: I don't do a lot of work in streaming so there may be additional
>>> > reasons, but these are the ones I remember from when I was working on
>>> > looking at integrating ML with SS).
>>> >
>>> > On Fri, Aug 18, 2017 at 5:25 AM Jacek Laskowski <ja...@japila.pl>
>>> > wrote:
>>> >>
>>> >> Hi,
>>> >>
>>> >> Why is the requirement for a streaming aggregation in a streaming
>>> >> query? What would happen if Spark allowed Complete without a single
>>> >> aggregation? This is the latest master.
>>> >>
>>> >> scala> val q = ids.
>>> >>      |   writeStream.
>>> >>      |   format("memory").
>>> >>      |   queryName("dups").
>>> >>      |   outputMode(OutputMode.Complete).  // <-- memory sink supports
>>> >> checkpointing for Complete output mode only
>>> >>      |   trigger(Trigger.ProcessingTime(30.seconds)).
>>> >>      |   option("checkpointLocation", "checkpoint-dir"). // <-- use
>>> >> checkpointing to save state between restarts
>>> >>      |   start
>>> >> org.apache.spark.sql.AnalysisException: Complete output mode not
>>> >> supported when there are no streaming aggregations on streaming
>>> >> DataFrames/Datasets;;
>>> >> Project [cast(time#10 as bigint) AS time#15L, id#6]
>>> >> +- Deduplicate [id#6], true
>>> >>    +- Project [cast(time#5 as timestamp) AS time#10, id#6]
>>> >>       +- Project [_1#2 AS time#5, _2#3 AS id#6]
>>> >>          +- StreamingExecutionRelation MemoryStream[_1#2,_2#3], [_1#2,
>>> >> _2#3]
>>> >>
>>> >>   at
>>> >>
>>> >> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
>>> >>   at
>>> >>
>>> >> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:115)
>>> >>   at
>>> >>
>>> >> org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
>>> >>   at
>>> >>
>>> >> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
>>> >>   at
>>> >>
>>> >> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:249)
>>> >>   ... 57 elided
>>> >>
>>> >> Pozdrawiam,
>>> >> Jacek Laskowski
>>> >> ----
>>> >> https://medium.com/@jaceklaskowski/
>>> >> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
>>> >> Follow me at https://twitter.com/jaceklaskowski
>>> >>
>>> >> ---------------------------------------------------------------------
>>> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> >>
>>> > --
>>> > Cell : 425-233-8271
>>> > Twitter: https://twitter.com/holdenkarau
>>
>>
>>
>>
>> --
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to