[
https://issues.apache.org/jira/browse/FLINK-18830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17172797#comment-17172797
]
liupengcheng edited comment on FLINK-18830 at 8/9/20, 1:01 PM:
---------------------------------------------------------------
First, I think we can create a new functionalInterface, e.g.,
DeclarativeJoinFunction, which provide more functionalities like checking join
predicate result(true/false), returning joined result element according to
different join predicate result.
It may looks like this:
{code:java}
public interface DeclarativeJoinFuntion<IN1, IN2, OUT> {
// check predicate result
boolean joinPredicate(IN1 left, IN2 right);
// return the full joined element when join predicate returns true
OUT full(IN1 left, IN2 right);
// return the joined element whose right side is padded with nulls. e.g.
for left outer join when no match found in the right.
OUT padLeft(IN1 left);
// return the joined element whose left side is padded with nulls.
OUT padRight(IN2 right);
}
{code}
Then, based on the interfaces we can also create RichDeclarativeJoinFunction,
DeclarativeJoinCoGroupFunction, so that we are able to support outer join
JoinedStreams.
e.g. for lefter outer join. if is second iterator is empty or can not find any
match, we can do as the following:
{code:java}
for (T1 val1: first) {
// reset rightMatched to false
for (T2 val2: second) {
// call real join, e.g. RichDeclarativeJoinFunction
// if any match found then set rightMatched to true
}
if not rightMatched
out.collect(declarativeJoinFunction.padLeft(val1));
}
{code}
was (Author: liupengcheng):
First, I think we can create a new functionalInterface, e.g.,
DeclarativeJoinFunction, which provide more functionalities like checking join
predicate result(true/false), returning joined result element according to
different join predicate result.
It may looks like this:
{code:java}
public interface DeclarativeJoinFuntion<IN1, IN2, OUT> {
// check predicate result
boolean joinPredicate(IN1 left, IN2 right);
// return the full joined element when join predicate returns true
OUT full(IN1 left, IN2 right);
// return the joined element whose right side is padded with nulls. e.g.
for left outer join when no match found in the right.
OUT padLeft(IN1 left);
// return the joined element whose left side is padded with nulls.
OUT padRight(IN2 right);
}
{code}
Then, based on the interfaces we can also create RichDeclarativeJoinFunction,
and so that we are able to support outer join in `JoinCoGroupFunction` and
`FlatJoinCoGroupFunction`.
e.g. for lefter outer join. if is second iterator is empty or can not find any
match, we can do as the following:
{code:java}
for (T1 val1: first) {
// reset rightMatched to false
for (T2 val2: second) {
// call real join, e.g. RichDeclarativeJoinFunction
// if any match found then set rightMatched to true
}
if not rightMatched
out.collect(declarativeJoinFunction.padLeft(val1));
}
{code}
> JoinCoGroupFunction and FlatJoinCoGroupFunction work incorrectly for outer
> join when one side of coGroup is empty
> -----------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-18830
> URL: https://issues.apache.org/jira/browse/FLINK-18830
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.11.1
> Reporter: liupengcheng
> Priority: Major
>
> Currently, The `JoinCoGroupFunction` and `FlatJoinCoGroupFunction` in
> JoinedStreams does't respect the join type, it's been implemented as doing
> join within a two-level loop. However, this is incorrect for outer join when
> one side of the coGroup is empty.
> ```
> public void coGroup(Iterable<T1> first, Iterable<T2> second,
> Collector<T> out) throws Exception {
> for (T1 val1: first) {
> for (T2 val2: second) {
> wrappedFunction.join(val1, val2, out);
> }
> }
> }
> ```
> The above code is the current implementation, suppose the first input is
> non-empty, and the second input is an empty iterator, then the join
> function(`wrappedFunction`) will never be called. This will cause no data to
> be emitted for a left outer join.
> So I propose to consider join type here, and handle this case, e.g., for left
> outer join, we can emit record with right side set to null here if the right
> side is empty or can not find any match in the right side.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)