zabetak commented on a change in pull request #1020: [CALCITE-2812] Add
algebraic operators to allow expressing recursive queries (Ruben Quesada Lopez)
URL: https://github.com/apache/calcite/pull/1020#discussion_r255037109
##########
File path:
linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##########
@@ -3329,6 +3329,167 @@ public void reset() {
public void close() {
}
}
+
+ /**
+ * Enumerator that performs a recursive union. Inspired by PostgreSQL's
"WITH RECURSIVE"
+ * implementation. See https://www.postgresql.org/docs/11/queries-with.html
+ *
+ * The general form of a recursive WITH query is always: a non-recursive
term, then UNION [ALL]
+ * (in our case only UNION ALL is supported for the moment), then a
recursive term;
+ * where only the recursive term can contain a reference to the query's own
output.
+ * Such an operation is executed as follows:
+ * - Evaluate the non-recursive term (seed). Include all rows in the
result of the recursive
+ * union, and also place them in a transient work table (aka delta table)
that gets added to
+ * the schema.
+ * - So long as the working table is not empty, repeat:
+ * - Evaluate the recursive term, using the current content of the
transient table for
+ * the recursive self-reference. Include all resulting rows in the
result of the recursive
+ * union, and also place them in the transient table, they will be the
input for the
+ * next iteration.
+ *
+ * Note: strictly speaking, this process is iteration not recursion, but
RECURSIVE is the
+ * terminology chosen by the SQL standards committee.
+ */
+ public static <TSource> Enumerable<TSource> recursiveUnion(
+ Enumerable<TSource> seed,
+ Enumerable<TSource> recursion,
+ Collection<TSource> deltaCollection) {
+ return new AbstractEnumerable<TSource>() {
+ @Override public Enumerator<TSource> enumerator() {
+ return new Enumerator<TSource>() {
+ private TSource current = null;
+ private boolean seedProcessed = false;
+ private final Enumerator<TSource> seedEnumerator = seed.enumerator();
+ private final Enumerator<TSource> recursionEnumerator =
recursion.enumerator();
+ private final Collection<TSource> delta = deltaCollection;
+
+ // flag to avoid adding the current item multiple times to delta
collection
+ // in case of consecutive current() calls
+ private boolean currentAddedToDelta = false;
+
+ @Override public TSource current() {
+ if (this.current == null) {
+ throw new NoSuchElementException();
+ }
+
+ if (!this.currentAddedToDelta) {
+ // add 'current' to the delta work table (so that it will be
fetched
+ // by the delta table scan later)
+ this.delta.add(this.current);
+ this.currentAddedToDelta = true;
+ }
+
+ return this.current;
+ }
+
+ @Override public boolean moveNext() {
+ // if we are not done with the seed (non-recursive term) moveNext
on it
+ if (!this.seedProcessed) {
+ if (this.seedEnumerator.moveNext()) {
+ this.currentAddedToDelta = false;
+ this.current = this.seedEnumerator.current();
+ return true;
+ } else {
+ this.seedProcessed = true;
+ }
+ }
+
+ // if we are done with the seed, moveNext on the recursive part
+ if (this.recursionEnumerator.moveNext()) {
Review comment:
From this line it seems that the recursive part can contain only certain
kind of operators. It seems that all operators above the delta scan table must
allow pipelined execution (i.e., read rows one by one). If for some reason
there is an operator that reads all rows before producing any result then
probably the result of the recursive union may not be correct.
If what I described above is true then we could try the alternative below.
When `this.recursionEnumerator.moveNext()` returns false then we obtain a
new enumerator and keep repeating this procedure till there are not really any
new rows (e.g., the new enumerator returns directly false?).
If `maxDepth` was part of the recursive union then we could stop after
obtaining a new enumerator a certain number of times.
What do you think @rubenada ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services