amaliujia commented on a change in pull request #1761:
URL: https://github.com/apache/calcite/pull/1761#discussion_r411089272
##########
File path:
core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
##########
@@ -796,4 +803,244 @@ static Expression windowSelector(
outputPhysType.record(expressions),
parameter);
}
+
+ /**
+ * Create enumerable implementation that applies sessionization to elements
from the input
+ * enumerator based on a specified key. Elements are windowed into sessions
separated by
+ * periods with no input for at least the duration specified by gap
parameter.
+ */
+ public static Enumerable<Object[]> sessionize(Enumerator<Object[]>
inputEnumerator,
+ int indexOfWatermarkedColumn, int indexOfKeyColumn, long gap) {
+ return new AbstractEnumerable<Object[]>() {
+ @Override public Enumerator<Object[]> enumerator() {
+ return new SessionizationEnumerator(inputEnumerator,
+ indexOfWatermarkedColumn, indexOfKeyColumn, gap);
+ }
+ };
+ }
+
+ private static class SessionizationEnumerator implements
Enumerator<Object[]> {
+ private final Enumerator<Object[]> inputEnumerator;
+ private final int indexOfWatermarkedColumn;
+ private final int indexOfKeyColumn;
+ private final long gap;
+ private LinkedList<Object[]> list;
+ private boolean initialized;
+
+ SessionizationEnumerator(Enumerator<Object[]> inputEnumerator,
+ int indexOfWatermarkedColumn, int indexOfKeyColumn, long gap) {
+ this.inputEnumerator = inputEnumerator;
+ this.indexOfWatermarkedColumn = indexOfWatermarkedColumn;
+ this.indexOfKeyColumn = indexOfKeyColumn;
+ this.gap = gap;
+ list = new LinkedList<>();
+ initialized = false;
+ }
+
+ @Override public Object[] current() {
+ if (!initialized) {
+ initialize();
+ initialized = true;
+ }
+ return list.pollFirst();
+ }
+
+ @Override public boolean moveNext() {
+ return initialized ? list.size() > 0 : inputEnumerator.moveNext();
+ }
+
+ @Override public void reset() {
+ list.clear();
+ inputEnumerator.reset();
+ initialized = false;
+ }
+
+ @Override public void close() {
+ list.clear();
+ inputEnumerator.close();
+ initialized = false;
+ }
+
+ private void initialize() {
+ List<Object[]> elements = new ArrayList<>();
+ // initialize() will be called when inputEnumerator.moveNext() is true,
+ // thus firstly should take the current element.
+ elements.add(inputEnumerator.current());
+ // sessionization needs to see all data.
+ while (inputEnumerator.moveNext()) {
Review comment:
It is true because this is batch only implementation.
For streaming cases, it needs to rely on watermark to know when it's ready
to emit data. For batch cases, technically watermark always advances to the
positive infinite at the beginning to mark all data has arrived (because it is
bounded, or pre-known input data set). So this implementation is correct for
batch case, because "watermark = positive infinity" means all data need to be
processed.
I do have a plan to introduce watermark into calcite, and along with it,
there should also be a test unbounded/streaming input source be built. By doing
so we should be able to migrate current batch only implementation to
batch-streaming unified implementation, which depends on watermark. However it
will take some time to reach that point (with the support of community).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]