amaliujia commented on a change in pull request #1761:
URL: https://github.com/apache/calcite/pull/1761#discussion_r411102505
##########
File path:
core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
##########
@@ -3143,11 +3147,62 @@ private Expression normalize(SqlTypeName typeName,
Expression e) {
return Expressions.call(
BuiltInMethod.TUMBLING.method,
inputEnumerable,
- EnumUtils.windowSelector(
+ EnumUtils.tumblingWindowSelector(
inputPhysType,
outputPhysType,
translatedOperands.get(0),
translatedOperands.get(1)));
}
}
+
+ /** Implements hopping. */
+ private static class HopImplementor implements TableFunctionCallImplementor {
+ @Override public Expression implement(RexToLixTranslator translator,
+ Expression inputEnumerable, RexCall call, PhysType inputPhysType,
PhysType outputPhysType) {
+ Expression intervalExpression =
translator.translate(call.getOperands().get(2));
+ Expression intervalExpression2 =
translator.translate(call.getOperands().get(3));
+ RexCall descriptor = (RexCall) call.getOperands().get(1);
Review comment:
Done.
##########
File path:
core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
##########
@@ -796,4 +803,244 @@ static Expression windowSelector(
outputPhysType.record(expressions),
parameter);
}
+
+ /**
+ * Create enumerable implementation that applies sessionization to elements
from the input
+ * enumerator based on a specified key. Elements are windowed into sessions
separated by
Review comment:
Done.
##########
File path:
core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
##########
@@ -796,4 +803,244 @@ static Expression windowSelector(
outputPhysType.record(expressions),
parameter);
}
+
+ /**
+ * Create enumerable implementation that applies sessionization to elements
from the input
+ * enumerator based on a specified key. Elements are windowed into sessions
separated by
+ * periods with no input for at least the duration specified by gap
parameter.
+ */
+ public static Enumerable<Object[]> sessionize(Enumerator<Object[]>
inputEnumerator,
+ int indexOfWatermarkedColumn, int indexOfKeyColumn, long gap) {
+ return new AbstractEnumerable<Object[]>() {
+ @Override public Enumerator<Object[]> enumerator() {
+ return new SessionizationEnumerator(inputEnumerator,
+ indexOfWatermarkedColumn, indexOfKeyColumn, gap);
+ }
+ };
+ }
+
+ private static class SessionizationEnumerator implements
Enumerator<Object[]> {
+ private final Enumerator<Object[]> inputEnumerator;
+ private final int indexOfWatermarkedColumn;
+ private final int indexOfKeyColumn;
+ private final long gap;
+ private LinkedList<Object[]> list;
+ private boolean initialized;
+
+ SessionizationEnumerator(Enumerator<Object[]> inputEnumerator,
+ int indexOfWatermarkedColumn, int indexOfKeyColumn, long gap) {
+ this.inputEnumerator = inputEnumerator;
Review comment:
Done.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]