wuchong commented on code in PR #19727:
URL: https://github.com/apache/flink/pull/19727#discussion_r919976481


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/DeclarativeExpressionResolver.java:
##########
@@ -58,6 +59,10 @@ protected ResolvedExpression defaultMethod(Expression 
expression) {
         if (expression instanceof UnresolvedReferenceExpression) {
             UnresolvedReferenceExpression expr = 
(UnresolvedReferenceExpression) expression;
             String name = expr.getName();
+            if (function instanceof SizeBasedWindowFunction

Review Comment:
   TODO



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SizeBasedWindowFunction.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.aggfunctions;
+
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+
+/**
+ * Some functions like CUME_DIST/PERCENT_RANK/NTILE need the size of current 
window for calculation.
+ * Such function need to implement the interface to provide accessing to the 
window size.
+ *
+ * <p>NOTE: Now, it can only be used by {@link DeclarativeAggregateFunction}.
+ */
+public interface SizeBasedWindowFunction {
+
+    /** The field for the window size. */
+    UnresolvedReferenceExpression windowSize();

Review Comment:
   A window function should have only a context-provided window size 
expression. So we don't need to let users define the expression. The interface 
can pre-define one. In addition, the expression can be a resolved one, to make 
it easy to use when code generation (don't need `toWindowSizeExpr`). For 
example:
   
   ```java
   default ResolvedExpression windowSizeAttribute() {
           return localRef("window_size", DataTypes.INT());
   }
   ```
   
   Then devs dont' need to implement this method and can directly be used just 
like method `operand(i)`.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala:
##########
@@ -280,6 +281,11 @@ object AggCodeGenHelper {
       val variableName = s"agg${aggIndex}_$name"
       newLocalReference(variableName, aggBufferTypes(aggIndex)(localIndex))
     }
+
+    override def toWindowSizeExpr(name: String): ResolvedExpression = {
+      val variableName = s"agg${aggIndex}_$name"

Review Comment:
   We may not need this when the windowSize expression is resolved. 



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/AggsFunctionWithWindowSize.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.generated;
+
+/**
+ * The interface to provide the ability to set window size for the aggregate 
functions that requires
+ * the size of current window to do calculation.
+ *
+ * <p>It is code generated to set the size of current window
+ */
+public interface AggsFunctionWithWindowSize {
+    void setWindowSize(int windowSize);

Review Comment:
   I think it's fine to just put this method in `AggsHandleFunction` or 
`AggsHandleFunctionBase`. When generating handle for the non-window-sized 
function, the method can just be a no-op, like `retract`, `merge`, `reset` 
methods. 



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggCodeGen.scala:
##########
@@ -46,5 +48,6 @@ trait AggCodeGen {
       needRetract: Boolean = false,
       needMerge: Boolean = false,
       needReset: Boolean = false,
-      needEmitValue: Boolean = false): Unit
+      needEmitValue: Boolean = false,
+      needWindowSize: Boolean = false): Unit

Review Comment:
   We don't need to add `needWindowSize` check here (and you didn't use it at 
all), because the `checkNeededMethods` just happens on imperative aggregate 
functions, and the window size method is generated only for declarative agg 
functions. 



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/AggsFunctionWithWindowSize.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.generated;
+
+/**
+ * The interface to provide the ability to set window size for the aggregate 
functions that requires
+ * the size of current window to do calculation.
+ *
+ * <p>It is code generated to set the size of current window
+ */
+public interface AggsFunctionWithWindowSize {
+    void setWindowSize(int windowSize);

Review Comment:
   I think it's fine to just put this method in `AggsHandleFunction` or 
`AggsHandleFunctionBase`. When generating handle for the non-window-sized 
function, the method can just be a no-op, like `retract`, `merge`, `reset` 
methods. 



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala:
##########
@@ -2759,6 +2758,53 @@ class OverAggregateITCase extends BatchTestBase {
       "select dep,name,rank() over (partition by dep order by salary desc) as 
rnk from emp",
       Seq(row("1", "A", 2), row("1", "B", 1), row("2", "C", 1)))
   }
+
+  @Test
+  def testCumeDist(): Unit = {

Review Comment:
   Please add additional tests to cover values of order-key containing 
duplicates. 



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/DistinctAggCodeGen.scala:
##########
@@ -955,4 +956,9 @@ class DistinctAggCodeGen(
        """.stripMargin
     }
   }
+
+  override def setWindowSize(generator: ExprCodeGenerator): String = {
+    throw new TableException(
+      "Distinct shouldn't set window size, this is a bug, please file a 
issue.")

Review Comment:
   Is it possible that `COUNT(DISTINCT) OVER ...` may hit this?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to