Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3364#discussion_r104919295
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
---
@@ -312,6 +320,108 @@ class DataSetWindowAggregate(
}
}
+ private def createEventTimeSlidingWindowDataSet(
+ inputDS: DataSet[Row],
+ isTimeWindow: Boolean,
+ size: Long,
+ slide: Long,
+ isParserCaseSensitive: Boolean)
+ : DataSet[Row] = {
+
+ // create MapFunction for initializing the aggregations
+ // it aligns the rowtime for pre-tumbling in case of a time-window for
partial aggregates
+ val mapFunction = createDataSetWindowPrepareMapFunction(
+ window,
+ namedAggregates,
+ grouping,
+ inputType,
+ isParserCaseSensitive)
+
+ val mappedDataSet = inputDS
+ .map(mapFunction)
+ .name(prepareOperatorName)
+
+ val mapReturnType = mappedDataSet.getType
+
+ val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+ val groupingKeys = grouping.indices.toArray
+
+ // do partial aggregation if possible
+ val isPartial = doAllSupportPartialMerge(
+ namedAggregates.map(_.getKey),
+ inputType,
+ grouping.length)
+
+ // only pre-tumble if it is worth it
+ val littleTumblingSize = determineLargestTumblingSize(size, slide) <= 1
--- End diff --
`isLittleTumblingSize`
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---