Hello Robert, thanks for the answer! Spark allows us to sort the single partition (after repartition), by user provided comparator, so it is definitely possible to do secondary sort by timestamp. The "more intelligent ReduceFnRunner" you are talking about, is it part of Beam codebase already (I guess it would lower the contribution investment, if we'd try to fix this)?
This would definitely work for our use case (we used exact same approach in our custom SparkRunner). Although, there is one think to consider. This approach would solve the scaling issue, but it would be probably less effective for the smaller scale. I think this could be solved by providing support for multiple translators for a single operator and let user "hint" the translation layer to decide which one to use. What do you think? Thanks, D. On Fri, Sep 14, 2018 at 4:10 PM Robert Bradshaw <[email protected]> wrote: > > If Spark supports producing grouped elements in timestamp order, a more > intelligent ReduceFnRunner can be used. (We take advantage of that in > Dataflow for example.) > > For non-merging windows, you could also put the window itself (or some > subset thereof) into the key resulting in smaller groupings. I'm not sure I > understand your output requirements enough to know if this would work. > > On Fri, Sep 14, 2018 at 3:28 PM David Morávek <[email protected]> > wrote: > >> Hello, >> >> currently, we are trying to move one of our large scale batch jobs >> (~100TB inputs) from our Euphoria <http://github.com/seznam/euphoria> >> based SparkRunner to Beam's Spark runner and we came across the following >> issue. >> >> Because we rely on hadoop ecosystem, we need to group outputs by >> TaskAttemptID, in order to use OutputFormats based on FileOutputFormat. >> >> We do this by using *GroupByKey*, but we came across the known problem, >> that all values for any single key need to fit in-memory at once. >> >> I did a quick research and I think that following needs to be addressed: >> a) We can not use Spark's *groupByKey*, because it requires all values >> to fit in memory for a single key (it is implemented as "list combiner") >> b) *ReduceFnRunner* iterates over values multiple times in order to >> group also by window >> > >> In Euphoria based runner, we solved this for *non-merging* windowing by >> using Spark's *repartitionAndSortWithinPartitions*, where we sorted >> output by key and window, so the output could be processed sequentially. >> >> Did anyone run into the same issue? Is there currently any workaround for >> this? How should we approach this? >> >> Thanks, >> David >> >>
