[
https://issues.apache.org/jira/browse/SPARK-31376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17077803#comment-17077803
]
Adam Binford commented on SPARK-31376:
--------------------------------------
You can't global sort because you can't sort an infinite list. However,
regardless of batch vs streaming, a partition is a fairly well defined unit of
work I would think. And I'm only speaking in the straightforward file based
cases, I don't know if there are other reasons this wouldn't make sense, and
I'm also ignoring continuous mode right now.
Say you had 5 input files you process as a batch. Your input size results in 8
partitions, resulting in 8 output files.
Then let's say you processed those in a streaming fashion. And let's say the
first micro batch is the first 3 files, that ends up being 5 partitions, and
then your second micro batch is the last 2 files, resulting in 4 partitions. I
don't feel like thinking about what math would make that end up being the case,
but I'm sure that'd be a possibility. So you would end up with 9 total
partitions/output files in that case. You already have a "different result"
than you would have achieved via batch processing, and that doesn't even
include any kind of sorting within partitions. Add in sorting, and you just get
the same different number of partitions, but potentially better optimized for
your use case.
So it's not that one "makes sense", it's that global sort is not well defined
while partition based sorting is. Though in theory you could make "global sort"
mean sort within micro-batch and that would be well defined, but I'm not sure
what benefits that would provide.
One question I have, and this isn't the use case I care about but something
that seems useful, can you order by a column in a parquet file without using a
foreachBatch? This seems like a very useful well defined optimization for a
streaming output.
Also, I don't quite understand how repartitioning is valid on a streaming
query, I was surprised when that worked. That seems more questionable than
local sorting.
Again, I don't know if there's non-file based reason this would cause issues,
but a file based approach seems straightforward and well-defined.
> Non-global sort support for structured streaming
> ------------------------------------------------
>
> Key: SPARK-31376
> URL: https://issues.apache.org/jira/browse/SPARK-31376
> Project: Spark
> Issue Type: Improvement
> Components: Structured Streaming
> Affects Versions: 3.1.0
> Reporter: Adam Binford
> Priority: Minor
>
> Currently, all sorting is disallowed with structured streaming queries. Not
> allowing global sorting makes sense, but could non-global sorting (i.e.
> sortWithinPartitions) be allowed? I'm running into this with an external
> source I'm using, but not sure if this would be useful to file sources as
> well. I have to foreachBatch so that I can do a sortWithinPartitions.
> Two main questions:
> * Does a local sort cause issues with any exactly-once guarantees streaming
> queries provides? I can't say I know or understand how these semantics work.
> Or are there other issues I can't think of this would cause?
> * Is the change as simple as changing the unsupported operations check to
> only look for global sorts instead of all sorts?
> I have built a version that simply changes the unsupported check to only
> disallow global sorts and it seems to be working. Anything I'm missing or is
> it this simple?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]