[
https://issues.apache.org/jira/browse/FLINK-18830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17172797#comment-17172797
]
liupengcheng edited comment on FLINK-18830 at 8/7/20, 9:41 AM:
---------------------------------------------------------------
First, I think we can create a new functionalInterface, e.g., ComplexJoin,
which provide more functionalities like checking join predicate
result(true/false), returning joined result element according to different join
predicate result.
It may looks like this:
{code:java}
public interface ComplexJoin<IN1, IN2, OUT> {
// check predicate result
boolean joinPredicate(IN1 left, IN2 right);
// return the full joined element when join predicate returns true
abstract OUT full(IN1 left, IN2 right);
// return the joined element whose right side is padded with nulls. e.g.
for left outer join when no match found in the right.
abstract OUT padLeft(IN1 left);
// return the joined element whose left side is padded with nulls.
abstract OUT padRight(IN2 right);
}
{code}
Then, based on the interfaces we can create
ComplexFlatJoinFunction/RichComplexFlatJoinFunction, and so that we are able to
support outer join in `JoinCoGroupFunction` and `FlatJoinCoGroupFunction`.
e.g. for lefter outer join. if is second iterator is empty or can not find any
match, we can do as the following:
{code:java}
for (T1 val1: first) {
reset emit state
for (T2 val2: second) {
// call real join, e.g. RichComplexFlatJoinFunction
}
if not emited
out.collect(complexJoinFunction.padLeft(val1));
}
{code}
was (Author: liupengcheng):
First, I think we can create a new functionalInterface, e.g., ComplexJoin,
which provide more functionalities like checking join predicate
result(true/false), returning joined result element according to different join
predicate result.
It may looks like this:
{code:java}
public interface ComplexJoin<IN1, IN2, OUT> {
// check predicate result
boolean joinPredicate(IN1 left, IN2 right);
// return the full joined element when join predicate returns true
abstract OUT full(IN1 left, IN2 right);
// return the joined element whose right side is padded with nulls. e.g.
for left outer join when no match found in the right.
abstract OUT padLeft(IN1 left);
// return the joined element whose left side is padded with nulls.
abstract OUT padRight(IN2 right);
}
{code}
Then, based on the interfaces we can create
ComplexJoinFunction/ComplexFlatJoinFunction/RichComplexJoinFunction/RichComplexFlatJoinFunction,
and so that we are able to support outer join in `JoinCoGroupFunction` and
`FlatJoinCoGroupFunction`.
e.g. for lefter outer join. if is second iterator is empty or can not find any
match, we can do as the following:
{code:java}
for (T1 val1: first) {
reset emit state
for (T2 val2: second) {
// call real join, e.g. RichComplexJoinFunction
}
if not emited
out.collect(complexJoinFunction.padLeft(val1));
}
{code}
> JoinCoGroupFunction and FlatJoinCoGroupFunction work incorrectly for outer
> join when one side of coGroup is empty
> -----------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-18830
> URL: https://issues.apache.org/jira/browse/FLINK-18830
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.11.1
> Reporter: liupengcheng
> Priority: Major
>
> Currently, The `JoinCoGroupFunction` and `FlatJoinCoGroupFunction` in
> JoinedStreams does't respect the join type, it's been implemented as doing
> join within a two-level loop. However, this is incorrect for outer join when
> one side of the coGroup is empty.
> ```
> public void coGroup(Iterable<T1> first, Iterable<T2> second,
> Collector<T> out) throws Exception {
> for (T1 val1: first) {
> for (T2 val2: second) {
> wrappedFunction.join(val1, val2, out);
> }
> }
> }
> ```
> The above code is the current implementation, suppose the first input is
> non-empty, and the second input is an empty iterator, then the join
> function(`wrappedFunction`) will never be called. This will cause no data to
> be emitted for a left outer join.
> So I propose to consider join type here, and handle this case, e.g., for left
> outer join, we can emit record with right side set to null here if the right
> side is empty or can not find any match in the right side.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)