[
https://issues.apache.org/jira/browse/FLINK-6011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15956126#comment-15956126
]
ASF GitHub Bot commented on FLINK-6011:
---------------------------------------
Github user haohui commented on a diff in the pull request:
https://github.com/apache/flink/pull/3665#discussion_r109810006
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
---
@@ -117,46 +119,86 @@ class LogicalWindowAggregateRule
}
private def identifyWindow(field: RexNode): Option[Window] = {
- // Detects window expressions by pattern matching
- // supported patterns: FLOOR(time AS xxx) and CEIL(time AS xxx),
- // with time being equal to proctime() or rowtime()
field match {
case call: RexCall =>
call.getOperator match {
- case _: SqlFloorFunction =>
- val operand = call.getOperands.get(1).asInstanceOf[RexLiteral]
- val unit: TimeUnitRange =
operand.getValue.asInstanceOf[TimeUnitRange]
- val w =
LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit)
- call.getType match {
- case TimeModeTypes.PROCTIME =>
- return Some(w)
- case TimeModeTypes.ROWTIME =>
- return Some(w.on("rowtime"))
- case _ =>
- }
- case _ =>
+ case _: SqlFloorFunction => FloorWindowTranslator(call).toWindow
+ case SqlStdOperatorTable.TUMBLE =>
TumbleWindowTranslator(call).toWindow
+ case SqlStdOperatorTable.HOP =>
SlidingWindowTranslator(call).toWindow
+ case SqlStdOperatorTable.SESSION =>
SessionWindowTranslator(call).toWindow
+ case _ => None
}
- case _ =>
+ case _ => None
}
- None
}
-
}
-object LogicalWindowAggregateRule {
+private abstract class WindowTranslator {
+ val call: RexCall
- private[flink] val LOGICAL_WINDOW_PREDICATE =
RelOptRule.operand(classOf[LogicalAggregate],
- RelOptRule.operand(classOf[LogicalProject], RelOptRule.none()))
+ protected def unwrapLiteral[T](node: RexNode): T =
+ node.asInstanceOf[RexLiteral].getValue.asInstanceOf[T]
- private[flink] val INSTANCE = new LogicalWindowAggregateRule
+ protected def getOperandAsLong(idx: Int): Long =
+ unwrapLiteral[BigDecimal](call.getOperands.get(idx)).longValue()
--- End diff --
I just tried out Calcite did not stop you from passing something like a
{{RexCall}}. So yes, it can be dynamic.
One question: whether Flink actually supports `GroupWindow` that has a
dynamic size? Maybe I'm wrong but it does not seem so.
If the answer is no maybe we should check that whether it is a `RexLiteral`?
> Support TUMBLE, HOP, SESSION window in streaming SQL
> ----------------------------------------------------
>
> Key: FLINK-6011
> URL: https://issues.apache.org/jira/browse/FLINK-6011
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Haohui Mai
> Assignee: Haohui Mai
>
> CALCITE-1603 and CALCITE-1615 introduces the support of the {{TUMBLE}} /
> {{HOP}} / {{SESSION}} windows in the parser.
> This jira tracks the efforts of adding the corresponding supports on the
> planners / optimizers in Flink.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)