[ 
https://issues.apache.org/jira/browse/FLINK-3659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-3659:
------------------------------------
    Description: 
We should add a new operation that has a main input that can be keyed (but 
doesn't have to be) and a second input that is always broadcast. This is 
similar to a {{CoFlatMap}} or {{CoMap}} but there either both inputs have to be 
keyed or non-keyed.

This builds on FLINK-4940 which aims at adding broadcast/global state. When 
processing an element from the broadcast input only access to broadcast state 
is allowed. When processing an element from the main input access both the 
regular keyed state and the broadcast state can be accessed.

I'm proposing this as an intermediate/low-level operation because it will 
probably take a while until we add support for side-inputs in the API. This new 
operation would allow expressing new patterns that cannot be expressed with the 
currently expressed operations.

This is the new proposed API (names are non-final): 

1) Add {{DataStream.connectWithBroadcast(DataStream)}} and 
{{KeyedStream.connectWithBroadcast(DataStream)}}
2) Add {{ConnectedWithBroadcastStream}}, akin to {{ConnectedStreams}}/
3) Add {{BroadcastFlatMap}} and {{TimelyBroadcastFlatMap}} as the user 
functions.

Sketch of the user function:
{code}
interface BroadcastFlatMapFunction {
  public void flatMap(IN in, Collector out);
  public void processBroadcastInput(BIN in);
}
{code}

The API names, function names are a bit verbose and we have to add two new 
different ones but I don't see a way around this with the current way the Flink 
API works.

  was:
We should add a new operation that has a main input that can be keyed (but 
doesn't have to be) and a second input that is always broadcast. This is 
similar to a {{CoFlatMap}} or {{CoMap}} but there either both inputs have to be 
keyed or non-keyed.

This builds on FLINK-4940 which aims at adding broadcast/global state. When 
processing an element from the broadcast input only access to broadcast state 
is allowed. When processing an element from the main input access both the 
regular keyed state and the broadcast state can be accessed.

I'm proposing this as an intermediate/low-level operation because it will 
probably take a while until we add support for side-inputs in the API. This new 
operation would allow expressing new patterns that cannot be expressed with the 
currently expressed operations.

This is the new proposed API (names are non-final): 

1) Add {{DataStream.connectWithBroadcast(DataStream)}} and 
{{KeyedStream.connectWithBroadcast(DataStream)}}
2) Add {{ConnectedWithBroadcastStream}}, akin to {{ConnectedStreams}}/
3) Add {{BroadcastFlatMap}} and {{TimelyBroadcastFlatMap}} as the user 
functions.

Sketch of the user function:

{{code}}
interface BroadcastFlatMapFunction {
  public void flatMap(IN in, Collector out);
  public void processBroadcastInput(BIN in);
}
{{code}}

The API names, function names are a bit verbose and we have to add two new 
different ones but I don't see a way around this with the current way the Flink 
API works.


> Add ConnectWithBroadcast Operation
> ----------------------------------
>
>                 Key: FLINK-3659
>                 URL: https://issues.apache.org/jira/browse/FLINK-3659
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 1.0.0
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>
> We should add a new operation that has a main input that can be keyed (but 
> doesn't have to be) and a second input that is always broadcast. This is 
> similar to a {{CoFlatMap}} or {{CoMap}} but there either both inputs have to 
> be keyed or non-keyed.
> This builds on FLINK-4940 which aims at adding broadcast/global state. When 
> processing an element from the broadcast input only access to broadcast state 
> is allowed. When processing an element from the main input access both the 
> regular keyed state and the broadcast state can be accessed.
> I'm proposing this as an intermediate/low-level operation because it will 
> probably take a while until we add support for side-inputs in the API. This 
> new operation would allow expressing new patterns that cannot be expressed 
> with the currently expressed operations.
> This is the new proposed API (names are non-final): 
> 1) Add {{DataStream.connectWithBroadcast(DataStream)}} and 
> {{KeyedStream.connectWithBroadcast(DataStream)}}
> 2) Add {{ConnectedWithBroadcastStream}}, akin to {{ConnectedStreams}}/
> 3) Add {{BroadcastFlatMap}} and {{TimelyBroadcastFlatMap}} as the user 
> functions.
> Sketch of the user function:
> {code}
> interface BroadcastFlatMapFunction {
>   public void flatMap(IN in, Collector out);
>   public void processBroadcastInput(BIN in);
> }
> {code}
> The API names, function names are a bit verbose and we have to add two new 
> different ones but I don't see a way around this with the current way the 
> Flink API works.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to