Thanks! I've created BEAM-5392
<https://issues.apache.org/jira/browse/BEAM-5392> to track the issue.

On Fri, Sep 14, 2018 at 4:46 PM Robert Bradshaw <[email protected]> wrote:

> On Fri, Sep 14, 2018 at 4:22 PM David Morávek <[email protected]>
> wrote:
>
>> Hello Robert,
>>
>> thanks for the answer! Spark allows us to sort the single partition
>> (after repartition), by user provided comparator, so it is definitely
>> possible to do secondary sort by timestamp. The "more intelligent
>> ReduceFnRunner" you are talking about, is it part of Beam codebase already
>> (I guess it would lower the contribution investment, if we'd try to fix
>> this)?
>>
>
> No, but it is part of the dataflow worker codebase we're trying to donate
> (being discussed on the other thread on this very list today), so hopefully
> soon.
>
> This would definitely work for our use case (we used exact same approach
>> in our custom SparkRunner).
>>
>> Although, there is one think to consider. This approach would solve the
>> scaling issue, but it would be probably less effective for the smaller
>> scale. I think this could be solved by providing support for multiple
>> translators for a single operator and let user "hint" the translation layer
>> to decide which one to use. What do you think?
>>
>
> If at all possible, I generally prefer to avoid providing hints like this
> that the user needs to use to get decent performance as it simply doesn't
> scale (in many directions). Fortunately in this case, though you have to be
> a bit more careful about things, it is not less efficient.
>
> On Fri, Sep 14, 2018 at 4:10 PM Robert Bradshaw <[email protected]>
>> wrote:
>>
>>>
>>> If Spark supports producing grouped elements in timestamp order, a more
>>> intelligent ReduceFnRunner can be used. (We take advantage of that in
>>> Dataflow for example.)
>>>
>>> For non-merging windows, you could also put the window itself (or some
>>> subset thereof) into the key resulting in smaller groupings. I'm not sure I
>>> understand your output requirements enough to know if this would work.
>>>
>>> On Fri, Sep 14, 2018 at 3:28 PM David Morávek <[email protected]>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> currently, we are trying to move one of our large scale batch jobs
>>>> (~100TB inputs) from our Euphoria <http://github.com/seznam/euphoria>
>>>> based SparkRunner to Beam's Spark runner and we came across the following
>>>> issue.
>>>>
>>>> Because we rely on hadoop ecosystem, we need to group outputs by
>>>> TaskAttemptID, in order to use OutputFormats based on FileOutputFormat.
>>>>
>>>> We do this by using *GroupByKey*, but we came across the known
>>>> problem, that all values for any single key need to fit in-memory at once.
>>>>
>>>> I did a quick research and I think that following needs to be addressed:
>>>> a) We can not use Spark's *groupByKey*, because it requires all values
>>>> to fit in memory for a single key (it is implemented as "list combiner")
>>>> b) *ReduceFnRunner* iterates over values multiple times in order to
>>>> group also by window
>>>>
>>>
>>>> In Euphoria based runner, we solved this for *non-merging* windowing
>>>> by using Spark's *repartitionAndSortWithinPartitions*, where we sorted
>>>> output by key and window, so the output could be processed sequentially.
>>>>
>>>> Did anyone run into the same issue? Is there currently any workaround
>>>> for this? How should we approach this?
>>>>
>>>> Thanks,
>>>> David
>>>>
>>>>

Reply via email to