[
https://issues.apache.org/jira/browse/FLINK-5498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940804#comment-15940804
]
Fabian Hueske commented on FLINK-5498:
--------------------------------------
Hi [~lincoln.86xy], I thought about this problem and I think I found a
memory-safe way to address it, i.e., without a {{CoGroupFunction}}. The idea is
to filter out invalid {{null}} join results in a {{GroupReduceFunction}}. The
overhead for this another sort, but the operator becomes memory-safe.
I think we should prefer a less-efficient memory-safe implementation if
possible.
I made a prototype implementation for a LEFT OUTER JOIN (see below) but haven't
thought about whether it would work for FULL OUTER JOINs as well.
What do you think?
Best, Fabian
{code}
public class OuterJoin {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
Row[] dataOuter = new Row[]{Row.of(1, 100), Row.of(1, 100), Row.of(2, 200),
Row.of(3, 300), Row.of(4, 400), Row.of(5, 500), Row.of(6, 600), Row.of(6, 600)};
Row[] dataInner = new Row[]{Row.of(1, 10), Row.of(1, 110), Row.of(2, 220),
Row.of(3, 30), Row.of(4, 40), Row.of(4, 41)};
RowTypeInfo rowType = new RowTypeInfo(
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO
);
DataSet<Row> outer = env.fromCollection(Arrays.asList(dataOuter), rowType);
DataSet<Row> inner = env.fromCollection(Arrays.asList(dataInner), rowType);
DataSet<Row> joined = outer
.leftOuterJoin(inner)
.where(0).equalTo(0) // define join keys
.with(new JoinFunc()) // join function adds flag whether join with null
or not
.groupBy(1, 2) // group by all fields of the outer table (partitioning is
reused)
.reduceGroup(new NullFilter()); // filter out all null joins if there was
any matched join
joined.print();
}
@FunctionAnnotation.ForwardedFieldsFirst({"f0->f1; f1->f2"})
@FunctionAnnotation.ForwardedFieldsSecond({"f0->f3"})
public static class JoinFunc implements JoinFunction<Row, Row, Row>,
ResultTypeQueryable<Row> {
@Override
public Row join(Row outer, Row inner) throws Exception {
if (inner == null) {
return Row.of(true, outer.getField(0), outer.getField(1), null, null);
} else {
if (((int)outer.getField(1)) > ((int)inner.getField(1))) {
// remains
return Row.of(false, outer.getField(0), outer.getField(1),
inner.getField(0), inner.getField(1));
} else {
// filtered out
return Row.of(true, outer.getField(0), outer.getField(1), null, null);
}
}
}
@Override
public TypeInformation<Row> getProducedType() {
return new RowTypeInfo(
BasicTypeInfo.BOOLEAN_TYPE_INFO, // flag to indicate null
BasicTypeInfo.INT_TYPE_INFO, // first field of outer table
BasicTypeInfo.INT_TYPE_INFO, // second field of outer table
BasicTypeInfo.INT_TYPE_INFO, // first field of inner table
BasicTypeInfo.INT_TYPE_INFO // second field of inner table
);
}
}
@FunctionAnnotation.ForwardedFields({"f1->f0; f2->f1"})
public static class NullFilter implements GroupReduceFunction<Row, Row>,
ResultTypeQueryable<Row> {
@Override
public void reduce(Iterable<Row> rows, Collector<Row> out) throws Exception
{
boolean needsNull = true;
int nullCnt = 0;
Row r = null;
Iterator<Row> rowsIt = rows.iterator();
while (rowsIt.hasNext()) {
r = rowsIt.next();
boolean isNull = (Boolean) r.getField(0);
if (!isNull) {
// non nulls are directly forwarded
out.collect(Row.of(r.getField(1), r.getField(2), r.getField(3),
r.getField(4)));
needsNull = false;
} else {
// nulls are not forwarded but counted. Let's see if there were some
join matches
nullCnt++;
}
}
if (needsNull) {
// no join matches found. Forward null joins
for (int i = 0; i < nullCnt; i++) {
out.collect(Row.of(r.getField(1), r.getField(2), null, null));
}
}
}
@Override
public TypeInformation<Row> getProducedType() {
return new RowTypeInfo(
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO
);
}
}
}
{code}
> Add support for left/right outer joins with non-equality predicates (and 1+
> equality predicates)
> ------------------------------------------------------------------------------------------------
>
> Key: FLINK-5498
> URL: https://issues.apache.org/jira/browse/FLINK-5498
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Affects Versions: 1.3.0
> Reporter: lincoln.lee
> Assignee: lincoln.lee
> Priority: Minor
>
> I found the expected result of a unit test case incorrect compare to that in
> a RDMBS,
> see
> flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
> {code:title=JoinITCase.scala}
> def testRightJoinWithNotOnlyEquiJoin(): Unit = {
> ...
> val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b,
> 'c)
> val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e,
> 'f, 'g, 'h)
> val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
>
> val expected = "Hello world,BCD\n"
> val results = joinT.toDataSet[Row].collect()
> TestBaseUtils.compareResultAsText(results.asJava, expected)
> }
> {code}
> Then I took some time to learn about the ‘outer join’ in relational
> databases, the right result of above case should be(tested in SQL Server and
> MySQL, the results are same):
> {code}
> > select c, g from tuple3 right outer join tuple5 on a=f and b<h;
> c g
> -------------------------------- --------------------------------
> NULL Hallo
> NULL Hallo Welt
> NULL Hallo Welt wie
> NULL Hallo Welt wie gehts?
> NULL ABC
> Hello world BCD
> NULL CDE
> NULL DEF
> NULL EFG
> NULL FGH
> NULL GHI
> NULL HIJ
> NULL IJK
> NULL JKL
> NULL KLM
> {code}
> the join condition {{rightOuterJoin('a === 'd && 'b < 'h)}} is not equivalent
> to {{rightOuterJoin('a === 'd).where('b < 'h)}}.
> The problem is rooted in the code-generated {{JoinFunction}} (see
> {{DataSetJoin.translateToPlan()}}, line 188). If the join condition does not
> match, we must emit the outer row padded with nulls instead of returning from
> the function without emitting anything.
> The code-generated {{JoinFunction}} does also include equality predicates.
> These should be removed before generating the code, e.g., in
> {{DataSetJoinRule}} when generating the {{DataSetJoin}} with help of
> {{JoinInfo.getRemaining()}}.
> More details: https://goo.gl/ngekca
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)