liupengcheng created FLINK-18830:
------------------------------------
Summary: JoinCoGroupFunction and FlatJoinCoGroupFunction work
incorrectly for outer join when one side of coGroup is empty
Key: FLINK-18830
URL: https://issues.apache.org/jira/browse/FLINK-18830
Project: Flink
Issue Type: Bug
Components: API / DataStream
Affects Versions: 1.11.1
Reporter: liupengcheng
Currently, The `JoinCoGroupFunction` and `FlatJoinCoGroupFunction` in
JoinedStreams does't respect the join type, it's been implemented as doing join
within a two-level loop. However, this is incorrect for outer join when one
side of the coGroup is empty.
```
public void coGroup(Iterable<T1> first, Iterable<T2> second,
Collector<T> out) throws Exception {
for (T1 val1: first) {
for (T2 val2: second) {
wrappedFunction.join(val1, val2, out);
}
}
}
```
The above code is the current implementation, suppose the first input is
non-empty, and the second input is an empty iterator, then the join
function(`wrappedFunction`) will never be called. This will cause no data to be
emitted for a left outer join.
So I propose to consider join type here, and handle this case, e.g., for left
outer join, we can call join function with right side set to null here if the
right side is empty, so that the join function can handle real joins.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)