[ 
https://issues.apache.org/jira/browse/FLINK-20491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17246383#comment-17246383
 ] 

Dawid Wysakowicz edited comment on FLINK-20491 at 12/9/20, 9:22 AM:
--------------------------------------------------------------------

Hey [~aljoscha]
Thanks for starting the work on this. Overall I think the approach with 
consuming non-keyed inputs before the keyed ones is good. I have a couple of 
questions/concerns to the current PoC, that thought we should solve first 
before diving onto the review:
* Do we need to sort the "broadcast" side? We have no keys, so the sorting is 
not strictly necessary for correctness. On the other hand we won't have the 
nice characteristic that events arrive in the timestamp order.
* For multi input operators, if we do sort the non-keyed inputs, we might 
consume them intermittently, to sort them according the timestamp. Is that 
desired?
* We are introducing a limitation that no user key can be serialized as an 
empty array, otherwise a keyed stream with such a key might be consumed before 
the non-keyed input.
* I think the trick with an empty byte array might have problems if the key 
uses a serializer with a fixed length representation. Have you tried it e.g. 
with Integer?  This is probably just a problem of the implementation.
* Have you thought of completely separating the non-keyed inputs, not sorting 
them and consuming them first? We could force that in the InputSelector and 
CommonContext. We could ensure a non-keyed stream is consumed entirely before 
going to the next one. We would not introduce a special meaning for an empty 
byte array. Unfortunately we would not have it sorted based on timestamp, it 
would require slightly more changes in the DataInput.
* As for the changes in BatchExecutionKeyedStateBackend theoretically you can 
use this functions in the keyed side as well. Simply doing nothing/returning 
empty results is wrong in that context.


was (Author: dawidwys):
Hey [~aljoscha]
Thanks for starting the work on this. Overall I think the approach with 
consuming non-keyed inputs before the keyed ones is good. I have a couple of 
questions/concerns to the current PoC, that thought we should solve first 
before diving onto the review:
* Do we need to sort the "broadcast" side? We have no keys, so the sorting is 
not strictly necessary for correctness. On the other hand we won't have the 
nice characteristic that events arrive in timestamp order.
* For multi input operators, if we do sort the non-keyed inputs, we might 
consume them intermittently, to sort them according the timestamp. Is that 
desired?
* We are introducing a limitation that no user key can be serialized as an 
empty array, otherwise a keyed stream with such a key might be consumed before 
the non-keyed input.
* I think the trick with an empty byte array might have problems if the key 
uses a serializer with a fixed length representation. Have you tried it e.g. 
with Integer?  This is probably just a problem of the implementation.
* Have you thought of completely separating the non-keyed inputs, not sorting 
them and consuming them first? We could force that in the InputSelector and 
CommonContext. We could ensure a non-keyed stream is consumed entirely before 
going to the next one. We would not introduce a special meaning for an empty 
byte array. Unfortunately we would not have it sorted based on timestamp, it 
would require slightly more changes in the DataInput.
* As for the changes in BatchExecutionKeyedStateBackend theoretically you can 
use this functions in the keyed side as well. Simply doing nothing/returning 
empty results is wrong in that context.

> Support Broadcast State in BATCH execution mode
> -----------------------------------------------
>
>                 Key: FLINK-20491
>                 URL: https://issues.apache.org/jira/browse/FLINK-20491
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>            Priority: Major
>              Labels: pull-request-available
>
> Right now, we don't support {{DataStream.connect(BroadcastStream)}} in 
> {{BATCH}} execution mode. I believe we can add support for this with not too 
> much work.
> The key insight is that we can process the broadcast side before the 
> non-broadcast side. Initially, we were shying away from this because of 
> concerns about {{ctx.applyToKeyedState()}} which allows the broadcast side of 
> the user function to access/iterate over state from the keyed side. We 
> thought that we couldn't support this. However, since we know that we process 
> the broadcast side first we know that the keyed side will always be empty 
> when doing so. We can thus just make this "keyed iteration" call a no-op, 
> instead of throwing an exception as we do now.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to