zabetak commented on a change in pull request #1020: [CALCITE-2812] Add
algebraic operators to allow expressing recursive queries (Ruben Quesada Lopez)
URL: https://github.com/apache/calcite/pull/1020#discussion_r258413372
##########
File path:
linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##########
@@ -3329,6 +3329,168 @@ public void reset() {
public void close() {
}
}
+
+ /**
+ * Enumerator that performs a recursive union. Inspired by PostgreSQL's
"WITH RECURSIVE"
+ * implementation. See https://www.postgresql.org/docs/11/queries-with.html
+ *
+ * The general form of a recursive WITH query is always: a non-recursive
term, then UNION [ALL]
+ * (in our case only UNION ALL is supported for the moment), then a
recursive term;
+ * where only the recursive term can contain a reference to the query's own
output.
+ * Such an operation is executed as follows:
+ * - Evaluate the non-recursive term (seed). Include all rows in the
result of the recursive
+ * union, and also place them in a transient work table (aka delta table)
that gets added to
+ * the schema.
+ * - So long as the working table is not empty, repeat:
+ * - Evaluate the recursive term, using the current content of the
transient table for
+ * the recursive self-reference. Include all resulting rows in the
result of the recursive
+ * union, and also place them in the transient table, they will be the
input for the
+ * next iteration.
+ *
+ * Note: strictly speaking, this process is iteration not recursion, but
RECURSIVE is the
+ * terminology chosen by the SQL standards committee.
+ */
+ public static <TSource> Enumerable<TSource> recursiveUnion(
+ Enumerable<TSource> seed,
+ Enumerable<TSource> recursion,
+ Collection<TSource> deltaCollection,
+ int maxDepth) {
+ return new AbstractEnumerable<TSource>() {
+ @Override public Enumerator<TSource> enumerator() {
+ return new Enumerator<TSource>() {
+ private TSource current = null;
+ private boolean seedProcessed = false;
+ private int currentDepth = 0;
+ private final Enumerator<TSource> seedEnumerator = seed.enumerator();
+ private Enumerator<TSource> recursionEnumerator = null;
+ private final Collection<TSource> delta = deltaCollection;
+ private final Collection<TSource> nextDelta = new ArrayList<>();
+
+ @Override public TSource current() {
+ if (this.current == null) {
+ throw new NoSuchElementException();
+ }
+
+ return this.current;
+ }
+
+ @Override public boolean moveNext() {
+
+ // if we are not done with the seed (non-recursive term) moveNext
on it
+ if (!this.seedProcessed) {
+ if (this.seedEnumerator.moveNext()) {
+ this.current = this.seedEnumerator.current();
+ this.nextDelta.add(this.current);
+ return true;
+ } else {
+ this.seedProcessed = true;
+ this.swapDeltas();
+ }
+ }
+
+ // if we are done with the seed, moveNext on the recursive part
+ while (true) {
+ if (maxDepth != -1 && this.currentDepth == maxDepth) {
+ // maxDepth reached, we are done
+ this.current = null;
+ return false;
+ }
+
+ if (this.recursionEnumerator == null) {
+ this.recursionEnumerator = recursion.enumerator();
+ }
+
+ if (this.recursionEnumerator.moveNext()) {
+ this.current = this.recursionEnumerator.current();
+ this.nextDelta.add(this.current);
+ return true;
+ }
+
+ if (this.nextDelta.isEmpty()) {
+ // recursive part is finished (it did not return any new
item), we are done
+ this.current = null;
+ return false;
+ }
+
+ // we have finished the current depth level of the recursive
part, go to next level
+ this.swapDeltas();
+ this.recursionEnumerator.close();
+ this.recursionEnumerator = null;
+ this.currentDepth++;
+ }
+ }
+
+ private void swapDeltas() {
+ this.delta.clear();
+ this.delta.addAll(this.nextDelta);
+ this.nextDelta.clear();
+ }
+
+ @Override public void reset() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override public void close() {
+ this.seedEnumerator.close();
+ if (this.recursionEnumerator != null) {
+ this.recursionEnumerator.close();
+ this.recursionEnumerator = null;
+ }
+ }
+ };
+ }
+ };
+ }
+
+ /**
+ * Enumerator that implements the scan of a temporary working table (DELTA),
+ * used in recursive union.
+ * See {@link #recursiveUnion(Enumerable, Enumerable, Collection, int)}
+ */
+ public static <TSource> Enumerable<TSource>
deltaTableScan(Collection<TSource> delta) {
Review comment:
Do we really need a new method in EnumerableDefaults for this?
Can't we re-use
`org.apache.calcite.linq4j.Linq4j#asEnumerable(java.util.Collection<T>)` method
instead of implementing the enumerator from scratch?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services