xuyangzhong commented on code in PR #23505:
URL: https://github.com/apache/flink/pull/23505#discussion_r1453280299
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala:
##########
@@ -234,11 +238,71 @@ object WindowUtil {
val step = getOperandAsLong(windowCall.operands(1))
val maxSize = getOperandAsLong(windowCall.operands(2))
new CumulativeWindowSpec(Duration.ofMillis(maxSize),
Duration.ofMillis(step), offset)
+ case FlinkSqlOperatorTable.SESSION =>
+ val gap = getOperandAsLong(windowCall.operands(1))
+ val partitionKeys =
+ exploreSessionWindowPartitionKeys(scanInput)
+ new SessionWindowSpec(Duration.ofMillis(gap), partitionKeys)
}
new TimeAttributeWindowingStrategy(windowSpec, timeAttributeType,
timeIndex)
}
+ /**
+ * If the session window tvf has partition keys, the whole tree is like:
+ *
+ * {{{
+ * TableFunctionScan
+ * |
+ * Project / Calc (optional)
+ * |
+ * Exchange
+ * }}}
+ */
+ private def exploreSessionWindowPartitionKeys(scanInput: RelNode):
Array[Int] = {
+ var input = unwrapHepRelVertexOrRelSubSet(scanInput)
+ // when transpose project or calc, the indices of the partition keys will
change
+ var indexMapper: TargetMapping = null
+ input = input match {
+ case project: Project =>
+ indexMapper = project.getMapping
+ unwrapHepRelVertexOrRelSubSet(input.getInput(0))
+ case calc: Calc =>
+ val calcProgram = calc.getProgram
+ val projects = calcProgram.getProjectList
+ val inputSize = calcProgram.getInputRowType.getFieldNames.size()
+ indexMapper =
+ Project.getMapping(inputSize, projects.map(p =>
calcProgram.expandLocalRef(p)).toList)
+ unwrapHepRelVertexOrRelSubSet(input.getInput(0))
+ case _ => input
+ }
+
+ input match {
+ case exchange: Exchange =>
+ val partitionKey = exchange.getDistribution.getKeys
+ val originalIndices =
JavaScalaConversionUtil.toScala(partitionKey).map(_.toInt).toArray
+ if (indexMapper != null) {
+ originalIndices.map(
+ v => {
+ indexMapper.getTarget(v)
+ })
+ } else {
+ originalIndices
+ }
+ case _ =>
+ Array.empty[Int]
+ }
+
+ }
+
+ private def unwrapHepRelVertexOrRelSubSet(node: RelNode): RelNode = {
Review Comment:
This logic has been removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]