HuangXingBo commented on a change in pull request #13462:
URL: https://github.com/apache/flink/pull/13462#discussion_r494179309



##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/PythonAggregateTest.scala
##########
@@ -20,9 +20,8 @@ package org.apache.flink.table.planner.plan.batch.table
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
-import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.PandasAggregateFunction
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.{PandasAggregateFunction,
 TestPythonAggregateFunction}
 import org.apache.flink.table.planner.utils.TableTestBase
-

Review comment:
       Unnecessary change
   

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedAggFunctions.java
##########
@@ -118,6 +118,27 @@ public void accumulate(WeightedAvgAccum accumulator, int 
iValue, int iWeight) {
                }
        }
 
+       /**
+        * Test for Python Python Aggregate Function.

Review comment:
       ```suggestion
         * Test for Python Aggregate Function.
   ```

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/PythonAggregateTest.scala
##########
@@ -20,9 +20,8 @@ package org.apache.flink.table.planner.plan.batch.table
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
-import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.PandasAggregateFunction
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.{PandasAggregateFunction,
 TestPythonAggregateFunction}

Review comment:
       remove TestPythonAggregateFunction which is not used? 

##########
File path: flink-python/pyflink/fn_execution/coders.py
##########
@@ -80,6 +82,64 @@ def __hash__(self):
         return hash(self._flatten_row_coder)
 
 
+class AggregateFunctionInputRowCoder(BaseCoder):
+    """
+    Coder for Aggregate Function Input Row.
+    """
+
+    def __init__(self, flatten_row_coder):
+        self._flatten_row_coder = flatten_row_coder
+
+    def get_impl(self):
+        return 
coder_impl.AggregateFunctionRowCoderImpl(self._flatten_row_coder.get_impl())
+
+    @staticmethod
+    def from_schema_proto(schema_proto):
+        return 
AggregateFunctionInputRowCoder(FlattenRowCoder.from_schema_proto(schema_proto))
+
+    def __repr__(self):
+        return 'AggregateFunctionInputRowCoder[%s]' % 
repr(self._flatten_row_coder)
+
+    def __eq__(self, other):
+        return (self.__class__ == other.__class__
+                and self._flatten_row_coder == other._flatten_row_coder)
+
+    def __ne__(self, other):
+        return not self == other
+
+    def __hash__(self):
+        return hash(self._flatten_row_coder)
+
+
+class AggregateFunctionOutputRowCoder(BaseCoder):

Review comment:
       I think we only introduce a coder named  `AggregateFunctionRowCoder` 
since the implementations of `AggregateFunctionInputRowCoder` and 
`AggregateFunctionOutputRowCoder` are the same

##########
File path: flink-python/pyflink/fn_execution/beam/beam_coders.py
##########
@@ -97,6 +97,80 @@ def __hash__(self):
         return hash(self._table_function_row_coder)
 
 
+class BeamAggregateFunctionInputRowCoder(FastCoder):
+    """
+    Coder for Table Function Row.
+    """
+
+    def __init__(self, aggregate_function_row_coder):
+        self._aggregate_function_row_coder = aggregate_function_row_coder
+
+    def _create_impl(self):
+        return self._aggregate_function_row_coder.get_impl()
+
+    def get_impl(self):
+        return BeamCoderImpl(self._create_impl())
+
+    def to_type_hint(self):
+        return typehints.List
+
+    @Coder.register_urn(coders.FLINK_AGGREGATE_FUNCTION_INPUT_SCHEMA_CODER_URN,
+                        flink_fn_execution_pb2.Schema)
+    def _pickle_from_runner_api_parameter(schema_proto, unused_components, 
unused_context):
+        return BeamAggregateFunctionInputRowCoder(
+            
coders.AggregateFunctionInputRowCoder.from_schema_proto(schema_proto))
+
+    def __repr__(self):
+        return 'BeamAggregateFunctionInputRowCoder[%s]' % 
repr(self._aggregate_function_row_coder)
+
+    def __eq__(self, other):
+        return (self.__class__ == other.__class__
+                and self._aggregate_function_row_coder == 
other._table_function_row_coder)
+
+    def __ne__(self, other):
+        return not self == other
+
+    def __hash__(self):
+        return hash(self._aggregate_function_row_coder)
+
+
+class BeamAggregateFunctionOutputRowCoder(FastCoder):

Review comment:
       Same to `AggregateFunctionOutputRowCoder`, we don't need 
`BeamAggregateFunctionOutputRowCoder`

##########
File path: flink-python/pyflink/fn_execution/beam/beam_coders.py
##########
@@ -97,6 +97,80 @@ def __hash__(self):
         return hash(self._table_function_row_coder)
 
 
+class BeamAggregateFunctionInputRowCoder(FastCoder):
+    """
+    Coder for Table Function Row.
+    """
+
+    def __init__(self, aggregate_function_row_coder):
+        self._aggregate_function_row_coder = aggregate_function_row_coder
+
+    def _create_impl(self):
+        return self._aggregate_function_row_coder.get_impl()
+
+    def get_impl(self):
+        return BeamCoderImpl(self._create_impl())
+
+    def to_type_hint(self):
+        return typehints.List
+
+    @Coder.register_urn(coders.FLINK_AGGREGATE_FUNCTION_INPUT_SCHEMA_CODER_URN,
+                        flink_fn_execution_pb2.Schema)
+    def _pickle_from_runner_api_parameter(schema_proto, unused_components, 
unused_context):
+        return BeamAggregateFunctionInputRowCoder(
+            
coders.AggregateFunctionInputRowCoder.from_schema_proto(schema_proto))
+
+    def __repr__(self):
+        return 'BeamAggregateFunctionInputRowCoder[%s]' % 
repr(self._aggregate_function_row_coder)
+
+    def __eq__(self, other):
+        return (self.__class__ == other.__class__
+                and self._aggregate_function_row_coder == 
other._table_function_row_coder)

Review comment:
       ```suggestion
                   and self._aggregate_function_row_coder == 
other._aggregate_function_row_coder)
   ```

##########
File path: flink-python/pyflink/fn_execution/coders.py
##########
@@ -112,6 +172,38 @@ def __hash__(self):
         return hash(self._field_coders)
 
 
+class FlattenRowCoderWithRowKind(BaseCoder):

Review comment:
       I found no place to use this coder, so I think it can be deleted
   

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecPythonGroupAggregateRule.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.functions.python.PythonFunctionKind;
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonGroupAggregate;
+import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
+import org.apache.flink.table.planner.plan.utils.PythonUtil;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+
+import java.util.List;
+
+import scala.collection.JavaConverters;
+
+/**
+ * Rule to convert a [[FlinkLogicalAggregate]] into a 
[[StreamExecPythonGroupAggregate]].

Review comment:
       ```suggestion
    * Rule to convert a {@link FlinkLogicalAggregate} into a {@link 
StreamExecPythonGroupAggregate}.
   ```

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonGroupAggregate.scala
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.stream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, RelWriter}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.flink.api.dag.Transformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.table.data.RowData
+import org.apache.flink.table.functions.python.PythonFunctionInfo
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.delegation.StreamPlanner
+import org.apache.flink.table.planner.plan.nodes.common.CommonPythonAggregate
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, 
StreamExecNode}
+import org.apache.flink.table.planner.plan.utils._
+import org.apache.flink.table.types.logical.RowType
+
+import scala.collection.JavaConversions._
+import java.util
+
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
+
+/**
+  * Stream physical RelNode for Python unbounded group aggregate.
+  *
+  * @see [[StreamExecGroupAggregateBase]] for more info.
+  */
+class StreamExecPythonGroupAggregate(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputRel: RelNode,
+    outputRowType: RelDataType,
+    val grouping: Array[Int],
+    val aggCalls: Seq[AggregateCall])
+  extends StreamExecGroupAggregateBase(cluster, traitSet, inputRel)
+    with StreamExecNode[RowData]
+    with CommonPythonAggregate {

Review comment:
       ```suggestion
     with CommonPythonAggregate {
   ```

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonGroupAggregate.scala
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.stream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, RelWriter}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.flink.api.dag.Transformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.table.data.RowData
+import org.apache.flink.table.functions.python.PythonFunctionInfo
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.delegation.StreamPlanner
+import org.apache.flink.table.planner.plan.nodes.common.CommonPythonAggregate
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, 
StreamExecNode}
+import org.apache.flink.table.planner.plan.utils._
+import org.apache.flink.table.types.logical.RowType
+
+import scala.collection.JavaConversions._
+import java.util
+
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
+
+/**
+  * Stream physical RelNode for Python unbounded group aggregate.
+  *
+  * @see [[StreamExecGroupAggregateBase]] for more info.
+  */
+class StreamExecPythonGroupAggregate(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputRel: RelNode,
+    outputRowType: RelDataType,
+    val grouping: Array[Int],
+    val aggCalls: Seq[AggregateCall])
+  extends StreamExecGroupAggregateBase(cluster, traitSet, inputRel)
+    with StreamExecNode[RowData]

Review comment:
       ```suggestion
     with StreamExecNode[RowData]
   ```

##########
File path: flink-python/pyflink/fn_execution/coders.py
##########
@@ -195,6 +287,22 @@ def get_impl(self):
     def __repr__(self):
         return 'RowCoder[%s]' % ', '.join(str(c) for c in self._field_coders)
 
+    @staticmethod
+    def from_schema_proto(schema_proto):

Review comment:
       I don't find where this method is called, so I think the `RowCoder` 
doesn't need to extend `BaseCoder`
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to