wuchong commented on code in PR #19727:
URL: https://github.com/apache/flink/pull/19727#discussion_r919976481
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/DeclarativeExpressionResolver.java:
##########
@@ -58,6 +59,10 @@ protected ResolvedExpression defaultMethod(Expression
expression) {
if (expression instanceof UnresolvedReferenceExpression) {
UnresolvedReferenceExpression expr =
(UnresolvedReferenceExpression) expression;
String name = expr.getName();
+ if (function instanceof SizeBasedWindowFunction
Review Comment:
TODO
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SizeBasedWindowFunction.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.aggfunctions;
+
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+
+/**
+ * Some functions like CUME_DIST/PERCENT_RANK/NTILE need the size of current
window for calculation.
+ * Such function need to implement the interface to provide accessing to the
window size.
+ *
+ * <p>NOTE: Now, it can only be used by {@link DeclarativeAggregateFunction}.
+ */
+public interface SizeBasedWindowFunction {
+
+ /** The field for the window size. */
+ UnresolvedReferenceExpression windowSize();
Review Comment:
A window function should have only a context-provided window size
expression. So we don't need to let users define the expression. The interface
can pre-define one. In addition, the expression can be a resolved one, to make
it easy to use when code generation (don't need `toWindowSizeExpr`). For
example:
```java
default ResolvedExpression windowSizeAttribute() {
return localRef("window_size", DataTypes.INT());
}
```
Then devs dont' need to implement this method and can directly be used just
like method `operand(i)`.
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala:
##########
@@ -280,6 +281,11 @@ object AggCodeGenHelper {
val variableName = s"agg${aggIndex}_$name"
newLocalReference(variableName, aggBufferTypes(aggIndex)(localIndex))
}
+
+ override def toWindowSizeExpr(name: String): ResolvedExpression = {
+ val variableName = s"agg${aggIndex}_$name"
Review Comment:
We may not need this when the windowSize expression is resolved.
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/AggsFunctionWithWindowSize.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.generated;
+
+/**
+ * The interface to provide the ability to set window size for the aggregate
functions that requires
+ * the size of current window to do calculation.
+ *
+ * <p>It is code generated to set the size of current window
+ */
+public interface AggsFunctionWithWindowSize {
+ void setWindowSize(int windowSize);
Review Comment:
I think it's fine to just put this method in `AggsHandleFunction` or
`AggsHandleFunctionBase`. When generating handle for the non-window-sized
function, the method can just be a no-op, like `retract`, `merge`, `reset`
methods.
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggCodeGen.scala:
##########
@@ -46,5 +48,6 @@ trait AggCodeGen {
needRetract: Boolean = false,
needMerge: Boolean = false,
needReset: Boolean = false,
- needEmitValue: Boolean = false): Unit
+ needEmitValue: Boolean = false,
+ needWindowSize: Boolean = false): Unit
Review Comment:
We don't need to add `needWindowSize` check here (and you didn't use it at
all), because the `checkNeededMethods` just happens on imperative aggregate
functions, and the window size method is generated only for declarative agg
functions.
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/AggsFunctionWithWindowSize.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.generated;
+
+/**
+ * The interface to provide the ability to set window size for the aggregate
functions that requires
+ * the size of current window to do calculation.
+ *
+ * <p>It is code generated to set the size of current window
+ */
+public interface AggsFunctionWithWindowSize {
+ void setWindowSize(int windowSize);
Review Comment:
I think it's fine to just put this method in `AggsHandleFunction` or
`AggsHandleFunctionBase`. When generating handle for the non-window-sized
function, the method can just be a no-op, like `retract`, `merge`, `reset`
methods.
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala:
##########
@@ -2759,6 +2758,53 @@ class OverAggregateITCase extends BatchTestBase {
"select dep,name,rank() over (partition by dep order by salary desc) as
rnk from emp",
Seq(row("1", "A", 2), row("1", "B", 1), row("2", "C", 1)))
}
+
+ @Test
+ def testCumeDist(): Unit = {
Review Comment:
Please add additional tests to cover values of order-key containing
duplicates.
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/DistinctAggCodeGen.scala:
##########
@@ -955,4 +956,9 @@ class DistinctAggCodeGen(
""".stripMargin
}
}
+
+ override def setWindowSize(generator: ExprCodeGenerator): String = {
+ throw new TableException(
+ "Distinct shouldn't set window size, this is a bug, please file a
issue.")
Review Comment:
Is it possible that `COUNT(DISTINCT) OVER ...` may hit this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]