Oh wow. I never thought this would be up for debate. I use complete mode
VERY frequently for all my dashboarding use cases. Here are some of my

> 1. It destroys the purpose of watermark and forces Spark to maintain all
of state rows, growing incrementally. It only works when all keys are
bounded to the limited set.

yes, this is a conscious architectural decision users need to make. There
are many cases where state is finite and small.

> 2. It has to provide all state rows as outputs per batch, hence the size
of outputs is also growing incrementally.

Not if you add a filter that filters the results of the aggregation output
;) For example, I have aggregations that filter out only the last week of
data, and I use it in Complete mode without any issues. I don't remember if
we drop filtered values from the state as well, but I haven't faced a
memory issue yet (streams have been running since the creation of
Structured Streaming).

> 3. It has to truncate the target before putting rows which might not be
trivial for external storage if it should be executed per batch.

This is trivial and super cheap for certain data sources like JDBC, Delta,
Iceberg. Again, it becomes a question of using the right tools and the
right architecture to solve your problem. IMHO it's not a problem of the
execution engine or the mode.

> 4. It enables some operations like sort on streaming query or couple of
more things. But it will not work cleanly (state won't keep up) under
reasonably high input rate, and we have to consider how the operation will
work for streaming output mode hence non-trivial amount of consideration
has to be added to maintain the mode.

I mean, I would want all pipelines that I build to work magically without
me having to put any thought into it, but then I feel most people in this
email list would be out of jobs. These are typical considerations that you
need to put into how you architect data pipelines. If someone doesn't put
thought into the scalability of their system then ¯\_(ツ)_/¯

Let me know what you think!


On Wed, May 20, 2020 at 10:29 PM Jungtaek Lim <kabhwan.opensou...@gmail.com>

> Thanks for the voice, Shixiong!
> Thanks for sharing the use case of complete mode in practice. I agree
> that's a valid use case where complete mode would help, but I'm unsure
> enabling complete mode is the only way to deal with the use case.
> 1. Given it assumes pretty much small cardinality of the output, using
> "update" mode with leveraging external storages which can handle fast read
> and update would also work smoothly. e.g. Redis. The cons of approach is
> that it requires external storage to install and maintain (+ assuming
> there's data source implementation for the external storage).
> 2. I think "queryable state" (interactive queries) is the widely adopted
> technology for addressing such use case. It doesn't need to be accessed
> within the same driver, and it even doesn't need to assume the set of keys
> are bounded and small enough to fit in driver memory. Probably it requires
> major effort to implement and may need more effort to wrap with the table.
> Also worth noting that there's data loss issue on complete mode if end
> users try to union the result of streaming aggregation and non-streaming
> aggregation. While we may be able to restrict the query to not applying
> union after streaming aggregation, we have already lots of rules to
> restrict the problematic cases for arbitrary plans. It seems to be time to
> revisit theoretically (SPARK-31724 is for).
> We may need to provide the alternative for the possible use cases even we
> decide to drop complete mode. But before then it's more important to build
> a consensus that complete mode is only used for few use case (we need to
> collect these use cases of course) and the cost of maintenance exceeds the
> benefit. For sure I'm open for disagreement.
> Thanks,
> Jungtaek Lim (HeartSaVioR)
> On Thu, May 21, 2020 at 9:45 AM Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>> Hey Jungtaek,
>> I totally agree with you about the issues of the complete mode you raised
>> here. However, not all streaming queries have unbounded states and
>> will grow quickly to a crazy state.
>> Actually, I found the complete mode is pretty useful when the states are
>> bounded and small. For example, a user can build a realtime dashboard based
>> on daily aggregation results (only 365 or 366 keys in one year, so less
>> than 40k keys in 100 years) using memory sink in the following steps:
>> - Write a streaming query to poll data from Kafka, calculate the
>> aggregation results, and save to the memory sink in the complete mode.
>> - In the same Spark application, start a thrift server with
>> "spark.sql.hive.thriftServer.singleSession=true" to expose the temp table
>> created by the memory sink through JDBC/ODBC.
>> - Connect a BI tool using JDBC/ODBC to query the temp table created by
>> the memory sink.
>> - Use the BI tool to build a realtime dashboard by polling the results in
>> a specified speed.
>> Best Regards,
>> Ryan
>> On Mon, May 18, 2020 at 8:44 PM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>> Hi devs,
>>> while dealing with SPARK-31706 [1] we figured out the streaming output
>>> mode is only effective for stateful aggregation and not guaranteed on sink,
>>> which could expose data loss issue. SPARK-31724 [2] is filed to track the
>>> efforts on improving the streaming output mode.
>>> Before we revisit the streaming output mode, I'd like to initiate the
>>> discussion around "complete" streaming output mode first, because I have no
>>> idea how it works for production use case. For me, it's only useful for
>>> niche cases and no other streaming framework has such concept.
>>> 1. It destroys the purpose of watermark and forces Spark to maintain all
>>> of state rows, growing incrementally. It only works when all keys are
>>> bounded to the limited set.
>>> 2. It has to provide all state rows as outputs per batch, hence the size
>>> of outputs is also growing incrementally.
>>> 3. It has to truncate the target before putting rows which might not be
>>> trivial for external storage if it should be executed per batch.
>>> 4. It enables some operations like sort on streaming query or couple of
>>> more things. But it will not work cleanly (state won't keep up) under
>>> reasonably high input rate, and we have to consider how the operation will
>>> work for streaming output mode hence non-trivial amount of consideration
>>> has to be added to maintain the mode.
>>> It would be a headache to retain the complete mode if we consider
>>> improving modes, as someone might concern about compatibility. It would be
>>> nice if we can make a consensus on the viewpoint of complete mode and drop
>>> supporting it if we agree with.
>>> Would like to hear everyone's opinions. It would be great if someone
>>> brings the valid cases where complete mode is being used in production.
>>> Thanks,
>>> Jungtaek Lim (HeartSaVioR)
>>> 1. https://issues.apache.org/jira/browse/SPARK-31706
>>> 2. https://issues.apache.org/jira/browse/SPARK-31724

