[
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15977676#comment-15977676
]
ASF GitHub Bot commented on FLINK-6228:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3743#discussion_r112499806
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
---
@@ -93,28 +96,43 @@ class DataStreamOverAggregate(
val orderKeys = overWindow.orderKeys.getFieldCollations
- if (orderKeys.size() != 1) {
- throw new TableException(
- "Unsupported use of OVER windows. The window can only be ordered
by a single time column.")
- }
- val orderKey = orderKeys.get(0)
+ val timeType = if (!orderKeys.isEmpty) {
+ if (orderKeys.size() != 1) {
+ throw new TableException(
+ "Unsupported use of OVER windows. The window can only be ordered
by a single time " +
+ "column.")
+ }
+ val orderKey = orderKeys.get(0)
- if (!orderKey.direction.equals(ASCENDING)) {
- throw new TableException(
- "Unsupported use of OVER windows. The window can only be ordered
in ASCENDING mode.")
+ if (!orderKey.direction.equals(ASCENDING)) {
+ throw new TableException(
+ "Unsupported use of OVER windows. The window can only be ordered
in ASCENDING mode.")
+ }
+ inputType
+ .getFieldList
+ .get(orderKey.getFieldIndex)
+ .getValue.asInstanceOf[TimeModeType]
+ } else {
+ val it = logicWindow.constants.listIterator()
+ if (it.hasNext) {
+ val item = it.next().getValue
+ if (item.isInstanceOf[NlsString]) {
+ val value = item.asInstanceOf[NlsString].getValue
+ if (value.equalsIgnoreCase("rowtime")) {
+ new RowTimeType
+ } else {
+ new ProcTimeType
+ }
+ }
+ }
}
val inputDS =
input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
val generator = new CodeGenerator(
- tableEnv.getConfig,
- false,
- inputDS.getType)
-
- val timeType = inputType
- .getFieldList
- .get(orderKey.getFieldIndex)
- .getValue
+ tableEnv.getConfig,
--- End diff --
indent
> Integrating the OVER windows in the Table API
> ---------------------------------------------
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: sunjincheng
> Assignee: sunjincheng
>
> Syntax:
> {code}
> table
> .overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy
> order_by_expression]
> (preceding
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
> [following
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
> )
> .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single
> threaded execution).
> * The ORDER BY Before the
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for
> batch).
> * FOLLOWING is not supported.
> User interface design document [See |
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)