I can’t think of a good solution.
Consider what is involved to (properly) add a predicate to the Correlate
operator. You would need to modify all the rules that involve Correlate, and
add new rules to push predicates into, and through, that filter, and add tests
for all of this. Rewrite rules will use the new form of Correlate automatically
for “ordinary” queries, and if you haven’t tested it extensively, those queries
will break.
I don’t think adding a filter to Correlate is a bad idea. It’s just a lot of
work.
Also, Correlate isn’t an easy operator to implement efficiently. The obvious
implementation requires the right-hand side of the tree to be restarted —
expensive if the query is distributed, and impossible if it is streaming. This
is why we take such great pains to de-correlate queries.
I wonder whether it would be better to implement APPLY[1] (and its variants
CROSS APPLY and LEFT APPLY) as a first-class relational operator. It captures
the fact that the right-hand side is a leaf node (a call to a function). And
would also apply UNNEST when the function returns several rows. In other words,
it is like selectMany(), described as “LINQ’s most powerful operator”[2]
because it can implement Project, Filter, and more besides. As a single-input
operator, it is easier to reason about.
Consider the query to find all employees whose age is greater than their
department number, using a function to generate the employees, and printing
each department at least once even if it has no employees old enough.
SELECT d.deptno, d.name, e.name
FROM dept AS d JOIN LATERAL TABLE(Employees(d.deptno)) AS e ON e.age >
d.deptno
becomes
Apply(let e = Employees($0) in if empty(e) then {($0, $1, null)} else map(e,
e -> ($0, $1, e.name)) end end)
Scan(dept)
Julian
[1] https://issues.apache.org/jira/browse/CALCITE-1472
<https://issues.apache.org/jira/browse/CALCITE-1472>
[2] https://dzone.com/articles/selectmany-probably-the-most-p
<https://dzone.com/articles/selectmany-probably-the-most-p>
> On Oct 10, 2017, at 7:56 AM, Fabian Hueske <[email protected]> wrote:
>
> Thanks Julian,
> Xingcan created CALCITE-2004 to track the issue.
>
> I think applying a local predicate that is defined in the ON clause on the
> inner input before the join solves one part of the issue.
> Another problem are join predicates in the ON clause that reference the
> inner and outer table. These would need to be applied by the correlate
> operator to ensure that outer rows are preserved.
>
> We might need to add a predicate to the Correlate operator or do you have
> another idea?
>
> Thanks, Fabian
>
>
> 2017-10-07 20:53 GMT+02:00 Julian Hyde <[email protected]>:
>
>> You are correct. Applying the Filter after the Correlate doesn’t give the
>> right behavior. Can you log a bug?
>>
>> I’d rather not add filters to the Correlate operator unless absolutely
>> necessary. In this case, is it sufficient to apply the Filter to the right
>> input of the Correlate?
>>
>> Julian
>>
>>
>>> On Oct 4, 2017, at 2:20 PM, Fabian Hueske <[email protected]> wrote:
>>>
>>> Thanks for the quick reply, Julian.
>>>
>>> It's great that the first issue is already fixed in 1.14!
>>>
>>> Regarding the second issue.
>>> The problem with the logical plan that Xingcan posted is not the handling
>>> of outer rows for which the right side is empty.
>>> Our concern is that the outer join predicate is pushed into a
>> LogicalFilter
>>> and not kept together with the left join. Shouldn't an outer join
>> predicate
>>> be evaluated by the join itself and not in a subsequent filter?
>>> Otherwise an outer row might be completely filtered out (instead of being
>>> padded with null) if the join predicate filter filters out all join
>> results
>>> produced by a correlate join for the outer row.
>>>
>>> Of course it is possible to add a rule that converts LogicalCorrelate ->
>>> LogicalFilter(joinPredicate) into FlinkCorrelate(joinPredicate).
>>> However, I think it would be good to directly support join predicates in
>>> LogicalCorrelate because this should be an issue for all systems that
>>> support outer joins with table functions.
>>>
>>> Does that make sense?
>>>
>>> Thanks, Fabian
>>>
>>> 2017-10-04 22:24 GMT+02:00 Julian Hyde <[email protected]>:
>>>
>>>> Many thanks for raising these issues. See comments line.
>>>>
>>>>> 1. Using NULL literal causes NPE.
>>>>> It seems that the constant NULL in Calcite is represented as a
>> RexLiteral
>>>>> with a (null: Comparable) value. In RexUtil.gatherConstraint(), the
>>>>> equals() method is invoked by the value returned by (NULL: RexLiteral),
>>>>> which is null, and that causes the NPE.
>>>>
>>>> The latest RexUtil.gatherConstraint does not call Object.equals(Object);
>>>> it calls Objects.equals(Object, Object). See https://issues.apache.org/
>>>> jira/browse/CALCITE-1860 <https://issues.apache.org/
>>>> jira/browse/CALCITE-1860>, which is fixed in 1.14 (which will be
>> released
>>>> in the next day or two).
>>>>
>>>>>
>>>>> 2. The TableFunction left outer join works incorrectly.
>>>>> For instance, given a simple table {WordCount(word:String,
>>>> frequency:Int)},
>>>>> a table function {split: word:String => (letter:String,
>> length:String)},
>>>>> and a query "SELECT word, letter, length FROM WordCount LEFT JOIN
>> LATERAL
>>>>> TABLE(split(word)) AS T (letter, length) ON frequency = length OR
>> length
>>>> <
>>>>> 5", the query will be translated to the logical plan below.
>>>>>
>>>>> LogicalProject(word=[$0], name=[$2], length=[$3])
>>>>> LogicalFilter(condition=[OR(=($1, CAST($3):BIGINT), <($3, 5))])
>>>>> LogicalCorrelate(correlation=[$cor0], joinType=[left],
>>>>> requiredColumns=[{0}])
>>>>> LogicalTableScan(table=[[WordCount]])
>>>>> LogicalTableFunctionScan(invocation=[split($cor0.word)],
>>>>> rowType=[RecordType(VARCHAR(65536) _1, INTEGER _2)],
>> elementType=[class
>>>>> [Ljava.lang.Object;])
>>>>>
>>>>> This logical plan may lead to an improper physical plan, which first
>>>>> correlates each row with its table function results (just like
>>>> performing a
>>>>> cartesian product) and then filters the rows. IMO, it only works for
>>>> inner
>>>>> join, but not for left outer join.
>>>>
>>>> Note that LogicalCorrelate has joinType=left. This should cause a NULL
>>>> record to be emitted on the right if no records are generated, which is
>> the
>>>> right behavior.
>>>>
>>>> Now, maybe that logical plan is being incorrectly converted to a
>> physical
>>>> plan. If so please log a JIRA case.
>>>>
>>>> Julian
>>>>
>>>>
>>
>>