danny0405 commented on a change in pull request #1761:
URL: https://github.com/apache/calcite/pull/1761#discussion_r411927333



##########
File path: 
core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
##########
@@ -796,4 +803,244 @@ static Expression windowSelector(
         outputPhysType.record(expressions),
         parameter);
   }
+
+  /**
+   * Create enumerable implementation that applies sessionization to elements 
from the input
+   * enumerator based on a specified key. Elements are windowed into sessions 
separated by
+   * periods with no input for at least the duration specified by gap 
parameter.
+   */
+  public static Enumerable<Object[]> sessionize(Enumerator<Object[]> 
inputEnumerator,
+      int indexOfWatermarkedColumn, int indexOfKeyColumn, long gap) {
+    return new AbstractEnumerable<Object[]>() {
+      @Override public Enumerator<Object[]> enumerator() {
+        return new SessionizationEnumerator(inputEnumerator,
+            indexOfWatermarkedColumn, indexOfKeyColumn, gap);
+      }
+    };
+  }
+
+  private static class SessionizationEnumerator implements 
Enumerator<Object[]> {
+    private final Enumerator<Object[]> inputEnumerator;
+    private final int indexOfWatermarkedColumn;
+    private final int indexOfKeyColumn;
+    private final long gap;
+    private LinkedList<Object[]> list;
+    private boolean initialized;
+
+    SessionizationEnumerator(Enumerator<Object[]> inputEnumerator,
+        int indexOfWatermarkedColumn, int indexOfKeyColumn, long gap) {
+      this.inputEnumerator = inputEnumerator;
+      this.indexOfWatermarkedColumn = indexOfWatermarkedColumn;
+      this.indexOfKeyColumn = indexOfKeyColumn;
+      this.gap = gap;
+      list = new LinkedList<>();
+      initialized = false;
+    }
+
+    @Override public Object[] current() {
+      if (!initialized) {
+        initialize();
+        initialized = true;
+      }
+      return list.pollFirst();
+    }
+
+    @Override public boolean moveNext() {
+      return initialized ? list.size() > 0 : inputEnumerator.moveNext();
+    }
+
+    @Override public void reset() {
+      list.clear();
+      inputEnumerator.reset();
+      initialized = false;
+    }
+
+    @Override public void close() {
+      list.clear();
+      inputEnumerator.close();
+      initialized = false;
+    }
+
+    private void initialize() {
+      List<Object[]> elements = new ArrayList<>();
+      // initialize() will be called when inputEnumerator.moveNext() is true,
+      // thus firstly should take the current element.
+      elements.add(inputEnumerator.current());
+      // sessionization needs to see all data.
+      while (inputEnumerator.moveNext()) {

Review comment:
       Then why there are param named `indexOfWatermarkedColumn ` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to