I had a similar thought, but wasn't sure if that violated a tenet of Beam. I'm thinking an ordered sink could wrap around another sink. I could see something like: collection.apply(OrderedSink.Timestamp.write(TextIO.Write.To(...)));
On Thu, May 26, 2016 at 12:26 PM Robert Bradshaw <[email protected]> wrote: > As Frances alluded to, it's also really hard to reconcile the notion > of a globally ordered PCollection in the context of a streaming > pipeline. Sorting also imposes conditions on partitioning, which we > intentionally leave unspecified for maximum flexibility in the > runtime. One also gets into the question of whether particular > operations are order-creating, order-preserving, or order-destroying > and how much extra overhead is required to maintain these properties > for intermediate collections. > > Your mention of sorting by time is interesting, as this is the > inherent sort dimension is streaming (and we use features like > windowing and triggering to do correct time-based grouping despite > real-time skew). Other than that, all the uses of sorting I've seen > have been limited to portions of data small enough to be produced by > (and consumed by) a single machine (so tops GBs, not TBs or PBs). > > All that aside, I could see more tractable case being made for > ordering (partitioning, etc.) a particular materialization of a > PCollection, i.e. being sorted would not be a property of a > PCollection itself, but could be provided by a sink (e.g. one could > have a sink that promises to write its records in a particular order > within and across shards). It's not inconceivable that this could be > done in a way that is composible with (a large class of) existing > sinks, e.g. given a FileBasedSink and intra/inter-shard-sorting > specifications, one could produce a bounded sink that writes "sorted" > files. Lots of design work TBD... > > - Robert > > > > > On Thu, May 26, 2016 at 11:32 AM, Jesse Anderson <[email protected]> > wrote: > > @frances great analysis. I'm hoping this serves as the starting point for > > the discussion. > > > > It really comes down to: is this a nice to have or a show stopping > > requirement? As you mention, it comes down to the use case. I've taught > at > > large financial companies where (global) sorting was a real and show > > stopping use case. Theirs was for a large end of day report that had to > be > > globally sorted and consumed by many other groups. Sorry, I can't be more > > specific. > > > > Thanks, > > > > Jesse > > > > On Thu, May 26, 2016 at 10:19 AM Frances Perry <[email protected]> > > wrote: > > > >> Currently the Beam model doesn't provide the functionality to do > sorting, > >> so this is a pretty deep feature request. Let's separate the discussion > >> into value sorting and global sorting. > >> > >> For value sorting, you need to be able to specify some property of the > >> value (often called a secondary key) and have the GroupByKey/shuffle > >> implementation sort values for a given key by the secondary key. This > is a > >> pretty common use case, and I think exposing this in Beam would make a > lot > >> of sense. The Hadoop and the Cloud Dataflow shuffle implementation > supports > >> this, for example. So it may just be a matter of figuring out how best > to > >> expose it to users. In FlumeJava we had you explicitly ParDo to pair > values > >> with a string "sort key" so you'd GroupByKey on a PCollection<KV<Key, > >> KV<String, Value>> and get back the Values sorted lexicographically by > >> String. It's a bit gross for users to think about a way to order things > >> that sorts lexicographically. Looks like Crunch[1] uses a general sort > key > >> -- but that likely won't interact cleanly with Beam's use of encoded > keys > >> for comparisons. Would be nice to think about if there's a cleaner way. > >> > >> For global sorting, you need to be able to be able to generate and > maintain > >> orderedness across the elements in a PCollection and have a way to know > how > >> to partition the PCollection into balanced, sorted subchunks. This would > >> have a pretty large impact on the Beam model and potentially on many of > the > >> runners. Looking at the Crunch sort [1], it requires users to provide > the > >> partitioning function if they want it to scale beyond a single reduce. > I'd > >> love to see if there's a way to do better. It also can have a pretty big > >> impact on the ability to efficiently parallelize execution (things like > >> dynamic work rebalancing [2] become trickier). Within Google [3], we've > >> found that this tends to be something that users ask for, but don't > really > >> have a strong use case for. It's usually the case that Top suffices or > that > >> they would rather redo the algorithm into something that can parallelize > >> more efficiently without relying on a global sort. Though of course, > with > >> out this, we can't actually do the TeraSort benchmark in Beam. ;-) > >> > >> And of course there's the impact of the unified model on all this ;-) I > >> think these ideas would translated to windowed PCollections ok, but > would > >> want to think carefully about it. > >> > >> [1] https://crunch.apache.org/user-guide.html#sorting > >> [2] > >> > >> > https://cloud.google.com/blog/big-data/2016/05/no-shard-left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow > >> > >> [3] > >> > >> > https://cloud.google.com/blog/big-data/2016/02/history-of-massive-scale-sorting-experiments-at-google > >> > >> > >> On Thu, May 26, 2016 at 8:56 AM, Jesse Anderson <[email protected]> > >> wrote: > >> > >> > This is somewhat the continuation of my thread "Writing Out > >> List<String>." > >> > > >> > Right now, the only way to do sorting is with the Top class. This > works > >> > well, but has the constraint of fitting in memory. > >> > > >> > A common batch use case is to take a large file and sort it. For > example, > >> > this would be sorting a large report (several GB) file by timestamp. > As > >> of > >> > right now, this isn't built into Beam. I think it should be. > >> > > >> > I'll hold out Crunch's Sort > >> > < > >> > https://crunch.apache.org/apidocs/0.11.0/org/apache/crunch/lib/Sort.html> > >> > class as an example of what this class could look like. > >> > > >> > Thanks, > >> > > >> > Jesse > >> > > >> >
