rubenada commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r443486637



##########
File path: 
linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##########
@@ -817,6 +817,112 @@ public void remove() {
         resultSelector);
   }
 
+  /**
+   * Group keys are sorted already. Key values are compared by using a
+   * specified comparator. Groups the elements of a sequence according to a
+   * specified key selector function and initializing one accumulator at a 
time.
+   * Go over elements sequentially, adding to accumulator each time an element
+   * with the same key is seen. When key changes, creates a result value from 
the
+   * accumulator and then re-initializes the accumulator.
+   */
+  public static <TSource, TKey, TAccumulate, TResult> Enumerable<TResult> 
sortedGroupBy(
+      Enumerable<TSource> enumerable,
+      Function1<TSource, TKey> keySelector,
+      Function0<TAccumulate> accumulatorInitializer,
+      Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+      final Function2<TKey, TAccumulate, TResult> resultSelector,
+      final Comparator<TKey> comparator) {
+    return new AbstractEnumerable<TResult>() {
+      public Enumerator<TResult> enumerator() {
+        return new SortedAggregateEnumerator(
+          enumerable, keySelector, accumulatorInitializer,
+          accumulatorAdder, resultSelector, comparator);
+      }
+    };
+  }
+
+  private static class SortedAggregateEnumerator<TSource, TKey, TAccumulate, 
TResult>
+      implements Enumerator<TResult> {
+    private final Enumerable<TSource> enumerable;
+    private final Function1<TSource, TKey> keySelector;
+    private final Function0<TAccumulate> accumulatorInitializer;
+    private final Function2<TAccumulate, TSource, TAccumulate> 
accumulatorAdder;
+    private final Function2<TKey, TAccumulate, TResult> resultSelector;
+    private final Comparator<TKey> comparator;
+    private boolean isInitialized;
+    private TAccumulate curAccumulator;
+    private Enumerator<TSource> enumerator;
+
+    SortedAggregateEnumerator(
+        Enumerable<TSource> enumerable,
+        Function1<TSource, TKey> keySelector,
+        Function0<TAccumulate> accumulatorInitializer,
+        Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+        final Function2<TKey, TAccumulate, TResult> resultSelector,
+        final Comparator<TKey> comparator) {
+      this.enumerable = enumerable;
+      this.keySelector = keySelector;
+      this.accumulatorInitializer = accumulatorInitializer;
+      this.accumulatorAdder = accumulatorAdder;
+      this.resultSelector = resultSelector;
+      this.comparator = comparator;
+      isInitialized = false;
+      curAccumulator = null;
+      enumerator = enumerable.enumerator();
+    }
+
+    @Override public TResult current() {
+      if (curAccumulator == null) {
+        curAccumulator = accumulatorInitializer.apply();
+      }
+      TResult result = null;
+      TSource o = enumerator.current();
+      TKey prevKey = keySelector.apply(o);
+      curAccumulator = accumulatorAdder.apply(curAccumulator, o);
+      while (enumerator.moveNext()) {
+        o = enumerator.current();
+        TKey curKey = keySelector.apply(o);
+        if (comparator.compare(prevKey, curKey) != 0) {
+          // current key is different from previous key, get accumulated 
results and re-create
+          // accumulator for current key.
+          result = resultSelector.apply(prevKey, curAccumulator);
+          curAccumulator = accumulatorInitializer.apply();
+          break;
+        } else {
+          curAccumulator = accumulatorAdder.apply(curAccumulator, o);
+        }
+        prevKey = curKey;
+      }
+
+      if (result == null) {
+        // current key is the last key.
+        result = resultSelector.apply(prevKey, curAccumulator);
+        // no need to keep accumulator for the last key.
+        curAccumulator = null;
+      }
+
+      return result;
+    }
+
+    @Override public boolean moveNext() {
+      if (!isInitialized) {
+        isInitialized = true;
+        return enumerator.moveNext();
+      } else {
+        return curAccumulator != null;
+      }
+    }
+
+    @Override public void reset() {
+      enumerator.reset();

Review comment:
       Maybe we should put `curAccumulator = null;` as part of the `reset`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to