Hi Reuven,

I'm not really complaining about DirectRunner. In fact it seems to me as if
what previously was considered as part of the "expensive extra checks" done
by the DirectRunner is now done within the beam-runners-core-java library.
Considering that all objects involved are immutable (in our case at least)
and simple assignment is sufficient, the serialization-deserialization
really seems as unwanted and hugely expensive correctness check. If there
was a problem with identity copy, wasn't DirectRunner supposed to reveal
it?

Regards,
Vojta

On Mon, Jul 9, 2018 at 4:46 PM, Reuven Lax <re...@google.com> wrote:

> Hi Vojita,
>
> One problem is that the DirectRunner is designed for testing, not for
> performance. The DirectRunner currently does many purposely-inefficient
> things, the point of which is to better expose potential bugs in tests. For
> example, the DirectRunner will randomly shuffle the order of PCollections
> to ensure that your code does not rely on ordering.  All of this adds cost,
> because the current runner is designed for testing. There have been
> requests in the past for an "optimized" local runner, however we don't
> currently have such a thing.
>
> In this case, using coders to clone values is more correct. In a
> distributed environment using encode/decode is the only way to copy values,
> and the DirectRunner is trying to ensure that your code is correct in a
> distributed environment.
>
> Reuven
>
> On Mon, Jul 9, 2018 at 7:22 AM Vojtech Janota <vojta.jan...@gmail.com>
> wrote:
>
>> Hi,
>>
>> We are using Apache Beam in our project for some time now. Since our
>> datasets are of modest size, we have so far used DirectRunner as the
>> computation easily fits onto a single machine. Recently we upgraded Beam
>> from 2.2 to 2.4 and found out that performance of our pipelines drastically
>> deteriorated. Pipelines that took ~3 minutes with 2.2 do not finish within
>> hours now. We tried to isolate the change that causes the slowdown and came
>> to the commits into the "InMemoryStateInternals" class:
>>
>> * https://github.com/apache/beam/commit/32a427c
>> * https://github.com/apache/beam/commit/8151d82
>>
>> In a nutshell where previously the copy() method simply assigned:
>>
>>   that.value = this.value
>>
>> There is now coder encode/decode combo hidden behind:
>>
>>   that.value = uncheckedClone(coder, this.value)
>>
>> Can somebody explain the purpose of this change? Is it meant as an
>> additional "enforcement" point, similar to DirectRunner's
>> enforceImmutability and enforceEncodability? Or is it something that is
>> genuinely needed to provide correct behaviour of the pipeline?
>>
>> Any hints or thoughts are appreciated.
>>
>> Regards,
>> Vojta
>>
>>
>>
>>
>>
>>

Reply via email to