wuchong commented on a change in pull request #8462:
[FLINK-12496][table-planner-blink] Support translation from
StreamExecGroupWindowAggregate to StreamTransformation.
URL: https://github.com/apache/flink/pull/8462#discussion_r285169940
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala
##########
@@ -122,7 +135,236 @@ class StreamExecGroupWindowAggregate(
override protected def translateToPlanInternal(
tableEnv: StreamTableEnvironment): StreamTransformation[BaseRow] = {
- throw new TableException("Implements this")
+ val config = tableEnv.getConfig
+
+ val inputTransform = getInputNodes.get(0).translateToPlan(tableEnv)
+ .asInstanceOf[StreamTransformation[BaseRow]]
+
+ val inputRowTypeInfo =
inputTransform.getOutputType.asInstanceOf[BaseRowTypeInfo]
+ val outRowType =
FlinkTypeFactory.toInternalRowType(outputRowType).toTypeInfo
+
+ val inputIsAccRetract = StreamExecRetractionRules.isAccRetract(input)
+
+ if (inputIsAccRetract) {
+ throw new TableException(
+ "Group Window Agg: Retraction on windowed GroupBy aggregation is not
supported yet. \n" +
+ "please re-check sql grammar. \n" +
+ "Note: Windowed GroupBy aggregation should not follow a" +
+ "non-windowed GroupBy aggregation.")
+ }
+
+ val isCountWindow = window match {
+ case TumblingGroupWindow(_, _, size) if isRowIntervalType(size.getType)
=> true
+ case SlidingGroupWindow(_, _, size, _) if
isRowIntervalType(size.getType) => true
+ case _ => false
+ }
+
+ if (isCountWindow && grouping.length > 0 &&
config.getMinIdleStateRetentionTime < 0) {
+ LOG.warn(
+ "No state retention interval configured for a query which accumulates
state. " +
+ "Please provide a query configuration with valid retention interval
to prevent " +
+ "excessive state size. You may specify a retention time of 0 to not
clean up the state.")
+ }
+
+ // validation
+ emitStrategy.checkValidation()
+
+ val aggString = RelExplainUtil.streamWindowAggregationToString(
+ inputRowType,
+ grouping,
+ outputRowType,
+ aggCalls,
+ namedProperties)
+
+ val timeIdx = if
(isRowtimeIndicatorType(window.timeAttribute.getResultType)) {
+ if (inputTimestampIndex < 0) {
+ throw new TableException(
+ "Group Window Agg: Time attribute could not be found. \n" +
+ "Time attribute could not be found. This is a bug.\n" +
+ "please contact customer support for this"
Review comment:
Please polish the exception message. For example:
"Group window aggregate must defined on a time attribute, but the time
attribute can't be found. This should never happen. Please file an issue."
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services