As Frances alluded to, it's also really hard to reconcile the notion
of a globally ordered PCollection in the context of a streaming
pipeline. Sorting also imposes conditions on partitioning, which we
intentionally leave unspecified for maximum flexibility in the
runtime. One also gets into the question of whether particular
operations are order-creating, order-preserving, or order-destroying
and how much extra overhead is required to maintain these properties
for intermediate collections.

Your mention of sorting by time is interesting, as this is the
inherent sort dimension is streaming (and we use features like
windowing and triggering to do correct time-based grouping despite
real-time skew). Other than that, all the uses of sorting I've seen
have been limited to portions of data small enough to be produced by
(and consumed by) a single machine (so tops GBs, not TBs or PBs).

All that aside, I could see more tractable case being made for
ordering (partitioning, etc.) a particular materialization of a
PCollection, i.e. being sorted would not be a property of a
PCollection itself, but could be provided by a sink (e.g. one could
have a sink that promises to write its records in a particular order
within and across shards). It's not inconceivable that this could be
done in a way that is composible with (a large class of) existing
sinks, e.g. given a FileBasedSink and intra/inter-shard-sorting
specifications, one could produce a bounded sink that writes "sorted"
files. Lots of design work TBD...

- Robert




On Thu, May 26, 2016 at 11:32 AM, Jesse Anderson <[email protected]> wrote:
> @frances great analysis. I'm hoping this serves as the starting point for
> the discussion.
>
> It really comes down to: is this a nice to have or a show stopping
> requirement? As you mention, it comes down to the use case. I've taught at
> large financial companies where (global) sorting was a real and show
> stopping use case. Theirs was for a large end of day report that had to be
> globally sorted and consumed by many other groups. Sorry, I can't be more
> specific.
>
> Thanks,
>
> Jesse
>
> On Thu, May 26, 2016 at 10:19 AM Frances Perry <[email protected]>
> wrote:
>
>> Currently the Beam model doesn't provide the functionality to do sorting,
>> so this is a pretty deep feature request. Let's separate the discussion
>> into value sorting and global sorting.
>>
>> For value sorting, you need to be able to specify some property of the
>> value (often called a secondary key) and have the GroupByKey/shuffle
>> implementation sort values for a given key by the secondary key. This is a
>> pretty common use case, and I think exposing this in Beam would make a lot
>> of sense. The Hadoop and the Cloud Dataflow shuffle implementation supports
>> this, for example. So it may just be a matter of figuring out how best to
>> expose it to users. In FlumeJava we had you explicitly ParDo to pair values
>> with a string "sort key" so you'd GroupByKey on a PCollection<KV<Key,
>> KV<String, Value>> and get back the Values sorted lexicographically by
>> String. It's a bit gross for users to think about a way to order things
>> that sorts lexicographically. Looks like Crunch[1] uses a general sort key
>> -- but that likely won't interact cleanly with Beam's use of encoded keys
>> for comparisons. Would be nice to think about if there's a cleaner way.
>>
>> For global sorting, you need to be able to be able to generate and maintain
>> orderedness across the elements in a PCollection and have a way to know how
>> to partition the PCollection into balanced, sorted subchunks. This would
>> have a pretty large impact on the Beam model and potentially on many of the
>> runners. Looking at the Crunch sort [1], it requires users to provide the
>> partitioning function if they want it to scale beyond a single reduce. I'd
>> love to see if there's a way to do better. It also can have a pretty big
>> impact on the ability to efficiently parallelize execution (things like
>> dynamic work rebalancing [2] become trickier). Within Google [3], we've
>> found that this tends to be something that users ask for, but don't really
>> have a strong use case for. It's usually the case that Top suffices or that
>> they would rather redo the algorithm into something that can parallelize
>> more efficiently without relying on a global sort. Though of course, with
>> out this, we can't actually do the TeraSort benchmark in Beam. ;-)
>>
>> And of course there's the impact of the unified model on all this ;-) I
>> think these ideas would translated to windowed PCollections ok, but would
>> want to think carefully about it.
>>
>> [1] https://crunch.apache.org/user-guide.html#sorting
>> [2]
>>
>> https://cloud.google.com/blog/big-data/2016/05/no-shard-left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow
>>
>> [3]
>>
>> https://cloud.google.com/blog/big-data/2016/02/history-of-massive-scale-sorting-experiments-at-google
>>
>>
>> On Thu, May 26, 2016 at 8:56 AM, Jesse Anderson <[email protected]>
>> wrote:
>>
>> > This is somewhat the continuation of my thread "Writing Out
>> List<String>."
>> >
>> > Right now, the only way to do sorting is with the Top class. This works
>> > well, but has the constraint of fitting in memory.
>> >
>> > A common batch use case is to take a large file and sort it. For example,
>> > this would be sorting a large report (several GB) file by timestamp. As
>> of
>> > right now, this isn't built into Beam. I think it should be.
>> >
>> > I'll hold out Crunch's Sort
>> > <
>> https://crunch.apache.org/apidocs/0.11.0/org/apache/crunch/lib/Sort.html>
>> > class as an example of what this class could look like.
>> >
>> > Thanks,
>> >
>> > Jesse
>> >
>>

Reply via email to