amaliujia commented on a change in pull request #1761:
URL: https://github.com/apache/calcite/pull/1761#discussion_r411861776
##########
File path:
core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
##########
@@ -796,4 +803,244 @@ static Expression windowSelector(
outputPhysType.record(expressions),
parameter);
}
+
+ /**
+ * Create enumerable implementation that applies sessionization to elements
from the input
+ * enumerator based on a specified key. Elements are windowed into sessions
separated by
+ * periods with no input for at least the duration specified by gap
parameter.
+ */
+ public static Enumerable<Object[]> sessionize(Enumerator<Object[]>
inputEnumerator,
+ int indexOfWatermarkedColumn, int indexOfKeyColumn, long gap) {
+ return new AbstractEnumerable<Object[]>() {
+ @Override public Enumerator<Object[]> enumerator() {
+ return new SessionizationEnumerator(inputEnumerator,
+ indexOfWatermarkedColumn, indexOfKeyColumn, gap);
+ }
+ };
+ }
+
+ private static class SessionizationEnumerator implements
Enumerator<Object[]> {
+ private final Enumerator<Object[]> inputEnumerator;
+ private final int indexOfWatermarkedColumn;
+ private final int indexOfKeyColumn;
+ private final long gap;
+ private LinkedList<Object[]> list;
+ private boolean initialized;
+
+ SessionizationEnumerator(Enumerator<Object[]> inputEnumerator,
+ int indexOfWatermarkedColumn, int indexOfKeyColumn, long gap) {
+ this.inputEnumerator = inputEnumerator;
+ this.indexOfWatermarkedColumn = indexOfWatermarkedColumn;
+ this.indexOfKeyColumn = indexOfKeyColumn;
+ this.gap = gap;
+ list = new LinkedList<>();
+ initialized = false;
+ }
+
+ @Override public Object[] current() {
+ if (!initialized) {
+ initialize();
+ initialized = true;
+ }
+ return list.pollFirst();
+ }
+
+ @Override public boolean moveNext() {
+ return initialized ? list.size() > 0 : inputEnumerator.moveNext();
+ }
+
+ @Override public void reset() {
+ list.clear();
+ inputEnumerator.reset();
+ initialized = false;
+ }
+
+ @Override public void close() {
+ list.clear();
+ inputEnumerator.close();
+ initialized = false;
+ }
+
+ private void initialize() {
+ List<Object[]> elements = new ArrayList<>();
+ // initialize() will be called when inputEnumerator.moveNext() is true,
+ // thus firstly should take the current element.
+ elements.add(inputEnumerator.current());
+ // sessionization needs to see all data.
+ while (inputEnumerator.moveNext()) {
Review comment:
Sorry I might have missed something. What's the param that is named
watermark? You mean the watermarked column, e.g. timestamp column specified by
descriptor?
To clarify, that column is not named watermark. It is just a timestamp
column which we say "watermark is built upon". For SQL, watermark is not
visible to users. Users cannot tell a "watermark" from the query, and watermark
is not a part of algebra.
Watermark is supposed to be implemented by SQL engine, which usually is tied
with sources. For example, in Apache Beam, we have two interfaces for sources:
bounded source [1] and unbounded source [2]. For bounded source, there is no
watermark since it is the batch case, but windowing still applies: just
applying windowing based on a timestamp column without considering late data.
Calcite's Enumerable interface is like bounded source in Beam.
[1 ]:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java
[2 ]:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]