[
https://issues.apache.org/jira/browse/CALCITE-5559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17697311#comment-17697311
]
Ruben Q L commented on CALCITE-5559:
------------------------------------
I have tried both approaches:
A) Deal with duplicates inside TableSpool. This would require either enlarging
the existing Logical & Enumerable TableSpool operators to support a flag à la
Union (e.g. all=true/false, default true) to discard duplicates; or creating a
new operator (SetTableSpool?) to support this duplicate removal. Either way,
when creating a RepeatUnion with all=false, the "optimized" TableSpool should
be used to discard duplicates earlier. This would work, but seems a rather
ad-hoc solution, it does not seem like a "natural responsibility" of a
TableSpool to deal with duplicates.
B) Use aggregations (distinct) before each TableSpool, i.e. in
{{RelBuilder#repeatUnion}} do something like:
{code:java}
public RelBuilder repeatUnion(String tableName, boolean all, int
iterationLimit) {
...
RelNode iterative = tableSpool(Spool.Type.LAZY, Spool.Type.LAZY,
finder.relOptTable, all).build();
RelNode seed = tableSpool(Spool.Type.LAZY, Spool.Type.LAZY,
finder.relOptTable, all).build();
RelNode repeatUnion = struct.repeatUnionFactory.createRepeatUnion(seed,
iterative, all, iterationLimit, finder.relOptTable);
==>
public RelBuilder repeatUnion(String tableName, boolean all, int
iterationLimit) {
...
if (!all)
this.distinct();
RelNode iterative = tableSpool(Spool.Type.LAZY, Spool.Type.LAZY,
finder.relOptTable, all).build();
if (!all)
this.distinct();
RelNode seed = tableSpool(Spool.Type.LAZY, Spool.Type.LAZY,
finder.relOptTable, all).build();
RelNode repeatUnion = struct.repeatUnionFactory.createRepeatUnion(seed,
iterative, all, iterationLimit, finder.relOptTable);
{code}
This seems a more logical approach, where we reuse existing operators, we use
the "proper operator" (aggregate) to deal with duplicates and we get the
benefits of it too (e.g. if the subplan before the {{distinct}} contains a
projection of the PK field of a table, it can be removed by the existing
Calcite logic since it is unnecessary).
However, the big issue with this approach B is that currently Aggregate is not
"compatible" with RepeatUnion's iterative mechanism (where each iteration
requires the "re-evaluation" of the input enumerator), since this enumerator
gets eagerly computed on creation, so it will always have the same content,
unaware of the fact that the underlying source (transient scan) has changed.
Basically we have the same issue as we used to have with Sort operator's
implementation in {{EnumerableDefaults#orderBy}} which was changed via
(CALCITE-3820), see more info about this problem in that ticket's description.
So, if we decided to go with approach B, a pre-requisite would be changing all
distinct / gruopBy implementations in EnumerableDefaults from eagerly
evaluation to lazily evaluated (and re-evaluated if required), e.g.:
{code}
public static <TSource, TKey> Enumerable<Grouping<TKey, TSource>> groupBy(
final Enumerable<TSource> enumerable,
final Function1<TSource, TKey> keySelector) {
return enumerable.toLookup(keySelector);
}
==>
public static <TSource, TKey> Enumerable<Grouping<TKey, TSource>> groupBy(
final Enumerable<TSource> enumerable,
final Function1<TSource, TKey> keySelector) {
return new AbstractEnumerable<Grouping<TKey, TSource>>() {
@Override public Enumerator<Grouping<TKey, TSource>> enumerator() {
return enumerable.toLookup(keySelector).enumerator();
}
};
}
{code}
The advantages of this change (as described in CALCITE-3820) would be avoiding
computation in some cases where it is not required; and make the enumerator
"re-evaluable" and hence compatible with RepeatUnion. Also, quoting Julian Hyde
"When you create an Enumerable, the work (in particular calling an input's
enumerator() method) should not happen until Enumerable.enumerator() is called".
The main disadvantage is that the current code computes the enumerator
(eagerly) just once and then re-uses it, so in the (rare?) scenario of
accessing the Aggregate enumerator multiple times, the new code would need to
re-compute it on each time. However, since this change was approved for Sort
(CALCITE-3820), I guess the same could be applied to Aggregate.
So, if we decide to go with B, I guess we would need a separate ticket (that
would block the current one) to move EnumerableDefaults' distinct and groupBy
methods from eager to lazy enumerator computation (and maybe also other
operator's methods in the same situation such as union, except, intersect,
others?).
> Improve RepeatUnion by discarding duplicates at TableSpool level
> ----------------------------------------------------------------
>
> Key: CALCITE-5559
> URL: https://issues.apache.org/jira/browse/CALCITE-5559
> Project: Calcite
> Issue Type: Improvement
> Components: core
> Reporter: Ruben Q L
> Assignee: Ruben Q L
> Priority: Major
>
> Currently, RepeatUnion operator with all=false keeps track of the elements
> that it has returned in order to discard duplicates. However, the TableSpool
> operators that are right below it do not have such control. In certain
> scenarios, duplicates are returned by the TableSpool current iteration,
> discarded by the RepeatUnion, but have been already "fed back" by the
> TableSpool into the next iteration, causing unnecessary processing.
> We can optimize this scenario by keeping track of the duplicates
> inside/before the TableSpool too (note: we still need to keep track of
> duplicates at RepeatUnion level, because that is the only place where we can
> detect a potential "global duplicate" of an element: returned by the LHS and
> then also by the RHS, or by two different iterations of the RHS).
> A PoC testing this improvement on a downstream project showed that certain
> queries can go from ~40s down to ~1s.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)