[
https://issues.apache.org/jira/browse/SPARK-31724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17108513#comment-17108513
]
Xingcan Cui commented on SPARK-31724:
-------------------------------------
Copy some comments from [https://github.com/apache/spark/pull/28523]
----
xccui commented 23 hours ago
Hi all, I am not quite familiar with the version history of the streaming sink,
but would like to share pieces of my thought here. Please correct me if I
misunderstood.
I think SupportsStreamingUpdate should NOT be a sink specific feature. We
concentrate on sink now because the current SS implementation doesn't allow
chaining operators (other than the sink) which produce updating results.
The only reason we provide update mode should be to produce the "correct"
result table, i.e., to make the result table identical with the one produced by
applying the same query on the materialized input rows.
The semantics of update mode we provided lacks deleting support, which makes
it unreliable sometimes. I suppose we all agree on offering a better design in
the future. But for now, it's better to keep it unchanged with previous
versions (and also with the lowest risk and efforts).
I'll try to make some improvements to the related issues.
----
HeartSaVioR commented 17 hours ago
Thanks for the great input, @xccui.
Basically I agree with your input - that's the same as my understanding as I
commented before (#28523 (comment)).
To summarize my previous comment, I also don't know how the streaming output
mode was designed, but from my understanding it's effective only on result
table for stateful aggregation operators. It's not even applied for all
stateful operators, e.g. the mode doesn't affect stream-stream join. It doesn't
guarantee the final output is respecting the semantic, and then there's no
meaning of applying the same on the sink side.
Another concern comes into my mind is complete mode. The complete mode is also
effective on the result table. It may sound making sense to support complete
mode in sink as truncate and insert, but it leads to data loss for the case the
result table is being union to other stream which is not creating "result
table". (I haven't had such query but it's technically possible.) The complete
mode will not care about the other stream and in every batch the previous
output from the other stream will be lost. I think complete mode is weird one
for streaming and better to discontinue supporting; I wouldn't expect any
production query to use this mode, but please let me know if there is.
Anyway I think the streaming update mode technically doesn't couple with the
availability of sink. It should be left as it is, though we'll probably have to
fix guide doc as the guide doc says it's for result table "as well as" for the
sink. Description of the streaming output mode in sink should be corrected as
well - they're not dependent on streaming output mode, and as of now only
append is possible.
ps. We may need to revisit the operators and streaming output modes to see any
flaw, similarly I went through via discussion thread and #24890. One thing
would be flatMapGroupsWithState with append mode.
----
xccui commented 12 hours ago
@HeartSaVioR Yes. It seems the output mode option was mainly designed for
stateful aggregations, which means it actually works in a restricted way.
Ideally, to support complete mode, all the operators must be capable of
outputting the "complete" result seen so far for each epoch. Personally, I'm in
favor of removing this mode in a future version. But for now, I propose to add
more restrictions while doing the plan check (e.g., disallowing the union
situation you mentioned) and also a note to the documentation.
IMO, the mode of the result table should only be decided by the operators in
the plan and it could either be "append" or "update" (including the current
"complete" mode). Basically, the designated sink should match the mode of the
result table. Usually, supporting "update" needs more effort and that means
only part of the sinks could be chosen for a plan containing an aggregation or
some kind of joins.
----
HeartSaVioR commented 5 hours ago
I'm curious how SS committers think about these comments upon - if they agree
about the comments then the issue is rather about the design issue of streaming
output mode, and I think the right way to fix is decoupling streaming output
mode with sink.
Would it break backward compatibility? If you look back branch-2.4, you'll find
it very surprised that most built-in sink implementations "ignore" the output
mode. The output mode provided by StreamWriteSupport.createStreamWriter is not
used at all, with only one exception.
The only exception for built-in sink is memory sink, and it doesn't deal with
the difference between append mode and update mode. It's only used to truncate
the memory for complete mode. Given that the sink exists most likely for
testing, it only helps to make tests be easier to implement, nothing else. Also
I'm strongly in favor of dropping complete mode, as I already provided data
loss issue, and I don't think it's production-wise.
I don't even think custom sinks have been respecting the output mode, as the
API lacks of information on update mode, and complete mode is not
production-wise.
The major problem is the time. I totally understand such change may feel a bit
huge to go with the release which has already done in RC1 though...
I hope we address it in the major release (that's a good rationalization), but
if we really want to minimize the changes for now, what about adding
SupportsStreamingTruncate as internal trait as well, so that we avoid coupling
SupportTruncate with complete mode and let streaming write go different path
with the batch one?
(Even better if we simply don't support complete mode or restrict to private
right now, but...)
As both SupportsStreamingUpdate and SupportsStreamingTruncate would be internal
one, we will have time to revisit the streaming path and change without making
effect of public API.
That would make custom sinks only be able to append as we won't expose these
abilities, but that's what I expect so far. Even with the new DSv2 implementing
truncate in streaming sink looks to be very limited, as truncation should take
place when committing, which means write tasks cannot write to the destination
directly, not scalable.
Does my proposal make sense?
cc. to @tdas @zsxwing @jose-torres @brkyvz @jerryshao @gaborgsomogyi to hear
their voices as well. Please cc. to more ppl if you have anyone to get some
help taking a look at.
> Improve the output mode for structured streaming
> ------------------------------------------------
>
> Key: SPARK-31724
> URL: https://issues.apache.org/jira/browse/SPARK-31724
> Project: Spark
> Issue Type: Umbrella
> Components: Structured Streaming
> Affects Versions: 3.0.0
> Reporter: Xingcan Cui
> Priority: Major
>
> The current design of output mode in structured streaming is restricted and
> needs some improvements. This umbrella issue is used to track all the updates
> we are going to make.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]