twalthr commented on a change in pull request #18154:
URL: https://github.com/apache/flink/pull/18154#discussion_r772931616



##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java
##########
@@ -114,6 +116,16 @@
                     
"org.apache.flink.table.runtime.operators.python.aggregate."
                             + "PythonStreamGroupWindowAggregateOperator";
 
+    private static final String

Review comment:
       nit: `STREAM_PYTHON_CREATE_TUMBLING_GROUP_WINDOW_METHOD` a bit shorter?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java
##########
@@ -525,46 +537,145 @@ public StreamExecPythonGroupWindowAggregate(
         Class clazz =
                 CommonPythonUtil.loadClass(
                         
GENERAL_STREAM_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME);
+
+        boolean isRowTime = 
AggregateUtil.isRowtimeAttribute(window.timeAttribute());
+
         try {
-            Constructor<OneInputStreamOperator<RowData, RowData>> ctor =
-                    clazz.getConstructor(
-                            Configuration.class,
-                            RowType.class,
-                            RowType.class,
-                            PythonAggregateFunctionInfo[].class,
-                            DataViewUtils.DataViewSpec[][].class,
-                            int[].class,
-                            int.class,
-                            boolean.class,
-                            boolean.class,
-                            int.class,
-                            WindowAssigner.class,
-                            LogicalWindow.class,
-                            long.class,
-                            PlannerNamedWindowProperty[].class,
-                            ZoneId.class);
-            return ctor.newInstance(
-                    config,
-                    inputType,
-                    outputType,
-                    aggregateFunctions,
-                    dataViewSpecs,
-                    grouping,
-                    indexOfCountStar,
-                    generateUpdateBefore,
-                    countStarInserted,
-                    inputTimeFieldIndex,
-                    windowAssigner,
-                    window,
-                    allowance,
-                    namedWindowProperties,
-                    shiftTimeZone);
-        } catch (NoSuchMethodException
-                | IllegalAccessException
-                | InstantiationException
-                | InvocationTargetException e) {
+            if (window instanceof TumblingGroupWindow) {
+                ValueLiteralExpression size = ((TumblingGroupWindow) 
window).size();
+
+                Method create =
+                        clazz.getMethod(
+                                
GENERAL_STREAM_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_CREATE_TUMBLING_WINDOW_METHOD_NAME,
+                                Configuration.class,
+                                RowType.class,
+                                RowType.class,
+                                PythonAggregateFunctionInfo[].class,
+                                DataViewUtils.DataViewSpec[][].class,
+                                int[].class,
+                                int.class,
+                                boolean.class,
+                                boolean.class,
+                                int.class,
+                                WindowAssigner.class,
+                                boolean.class,
+                                boolean.class,
+                                long.class,
+                                long.class,
+                                NamedWindowProperty[].class,
+                                ZoneId.class);
+                return (OneInputStreamOperator<RowData, RowData>)
+                        create.invoke(
+                                null,
+                                config,
+                                inputType,
+                                outputType,
+                                aggregateFunctions,
+                                dataViewSpecs,
+                                grouping,
+                                indexOfCountStar,
+                                generateUpdateBefore,
+                                countStarInserted,
+                                inputTimeFieldIndex,
+                                windowAssigner,
+                                isRowTime,
+                                AggregateUtil.hasTimeIntervalType(size),
+                                AggregateUtil.toDuration(size).toMillis(),

Review comment:
       This might fail for counting windows. The logic before was returning 0. 
However, this might not be exposed in the Python API?

##########
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/groupwindow/WindowStart.java
##########
@@ -16,22 +16,28 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.expressions;
+package org.apache.flink.table.runtime.groupwindow;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.TimestampType;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
 
-/** Window start property. */
+/**
+ * Window start property.
+ *
+ * @deprecated The POJOs in this package are used to represent the deprecated 
Group Window feature

Review comment:
       nit:
   
   ```
   The POJOs in this package are used to represent the deprecated Group Window 
feature. Currently, they also used to configure Python operators.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to