amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r445733718



##########
File path: 
linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##########
@@ -817,6 +817,112 @@ public void remove() {
         resultSelector);
   }
 
+  /**
+   * Group keys are sorted already. Key values are compared by using a
+   * specified comparator. Groups the elements of a sequence according to a
+   * specified key selector function and initializing one accumulator at a 
time.
+   * Go over elements sequentially, adding to accumulator each time an element
+   * with the same key is seen. When key changes, creates a result value from 
the
+   * accumulator and then re-initializes the accumulator.
+   */
+  public static <TSource, TKey, TAccumulate, TResult> Enumerable<TResult> 
sortedGroupBy(
+      Enumerable<TSource> enumerable,
+      Function1<TSource, TKey> keySelector,
+      Function0<TAccumulate> accumulatorInitializer,
+      Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+      final Function2<TKey, TAccumulate, TResult> resultSelector,
+      final Comparator<TKey> comparator) {
+    return new AbstractEnumerable<TResult>() {
+      public Enumerator<TResult> enumerator() {
+        return new SortedAggregateEnumerator(
+          enumerable, keySelector, accumulatorInitializer,
+          accumulatorAdder, resultSelector, comparator);
+      }
+    };
+  }
+
+  private static class SortedAggregateEnumerator<TSource, TKey, TAccumulate, 
TResult>
+      implements Enumerator<TResult> {
+    private final Enumerable<TSource> enumerable;
+    private final Function1<TSource, TKey> keySelector;
+    private final Function0<TAccumulate> accumulatorInitializer;
+    private final Function2<TAccumulate, TSource, TAccumulate> 
accumulatorAdder;
+    private final Function2<TKey, TAccumulate, TResult> resultSelector;
+    private final Comparator<TKey> comparator;
+    private boolean isInitialized;
+    private TAccumulate curAccumulator;
+    private Enumerator<TSource> enumerator;
+
+    SortedAggregateEnumerator(
+        Enumerable<TSource> enumerable,
+        Function1<TSource, TKey> keySelector,
+        Function0<TAccumulate> accumulatorInitializer,
+        Function2<TAccumulate, TSource, TAccumulate> accumulatorAdder,
+        final Function2<TKey, TAccumulate, TResult> resultSelector,
+        final Comparator<TKey> comparator) {
+      this.enumerable = enumerable;
+      this.keySelector = keySelector;
+      this.accumulatorInitializer = accumulatorInitializer;
+      this.accumulatorAdder = accumulatorAdder;
+      this.resultSelector = resultSelector;
+      this.comparator = comparator;
+      isInitialized = false;
+      curAccumulator = null;
+      enumerator = enumerable.enumerator();
+    }
+
+    @Override public TResult current() {
+      if (curAccumulator == null) {
+        curAccumulator = accumulatorInitializer.apply();
+      }
+      TResult result = null;
+      TSource o = enumerator.current();
+      TKey prevKey = keySelector.apply(o);
+      curAccumulator = accumulatorAdder.apply(curAccumulator, o);
+      while (enumerator.moveNext()) {
+        o = enumerator.current();
+        TKey curKey = keySelector.apply(o);

Review comment:
       added this requirements to both javadoc of `sortedGroupBy` and relevant 
lines in `EnumerableSortedAggregate`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to