So I spent some more time on this today, and noticed something interesting
when trying to reproduce it: it only seems to happen if the ParDo using the
side input is fused with another stage that uses state.  I'm not quite sure
why this is, but I'm also fairly certain the fix is to simply clear the
state (and thus cache the fact that it's cleared) unconditionally [1].
I'll open a PR for more discussion.

Also, wrt the above cache null vs missing, I was wrong there and was
conflating a couple problems together.  The cache can differentiate a state
cell that is empty vs one that is missing from the cache.

[1]
https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java#L238

On Thu, Jul 22, 2021 at 2:41 PM Kenneth Knowles <[email protected]> wrote:

> On Thu, Jul 22, 2021 at 4:47 AM Steve Niemitz <[email protected]> wrote:
>
>> I don't think I'd call it a bug?  The cache doesn't differentiate between
>> a state cell that existed but was cleared, and one that is missing from the
>> cache (maybe it should?).
>>
>
> Filing this in my collection of problems caused by "nullable" vs
> "Optionable". Conflating "not in cache" and "empty value" is definitely a
> bug. Luckily in this case only a performance bug.
>
> Kenn
>
>
>>   The side input fetcher clears the blocked state when it becomes
>> unblocked:
>>
>>
>> https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java#L241
>>
>> Given that interaction, once the side input becomes ready, every request
>> for the blocked map will result in a cache miss, and a state lookup
>> request.  Changing the cache to cache tombstones for cleared cells is
>> another possibility as well.
>>
>> On Thu, Jul 22, 2021 at 2:33 AM Reuven Lax <[email protected]> wrote:
>>
>>> So you're saying there's a bug in the caching logic that prevents the
>>> side-input cache from working?
>>>
>>> On Wed, Jul 21, 2021 at 7:07 PM Steve Niemitz <[email protected]>
>>> wrote:
>>>
>>>> I had opened a jira years ago [1] about this, but would like to
>>>> actually fix it for real now, given that our users have started using
>>>> streaming more and more.
>>>>
>>>> There's more detail in the jira, but basically side inputs in streaming
>>>> pipelines on dataflow lead to pretty bad performance because they result in
>>>> a state lookup request for each element.
>>>>
>>>
>>>
>>>> I think the best solution would be to stop storing null for the
>>>> blockedMap, and instead store and empty map.  This way there will
>>>> (generally) be a cache hit when looking it up (the cache can't cache a
>>>> null).  The only issue here though is that something then needs to clean up
>>>> the map.  It seems like the StreamingSideInputDoFnRunner could probably set
>>>> its own cleanup timer here to do that.
>>>>
>>>> I'm curious if there are other thoughts on the better ways to do this?
>>>>
>>>> Also, as a side note, I was looking at the SideInputHandler
>>>> implementation in runners-core, and I can't seem to see where the state
>>>> that it maintains is ever cleaned up?
>>>>
>>>> [1] https://issues.apache.org/jira/browse/BEAM-7745
>>>>
>>>

Reply via email to