dianfu commented on a change in pull request #13507:
URL: https://github.com/apache/flink/pull/13507#discussion_r497198985
##########
File path: flink-python/pyflink/common/types.py
##########
@@ -37,7 +37,7 @@ def _create_row(fields, values, row_kind: RowKind = None):
return row
-class Row(tuple):
+class Row(object):
Review comment:
Why changed this?
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonFunctionInfo.java
##########
@@ -28,7 +28,7 @@
* the actual Python function, the input arguments, etc.
*/
@Internal
-public final class PythonFunctionInfo implements Serializable {
Review comment:
Why changed this?
##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
##########
@@ -304,9 +306,6 @@ def finish(self):
def reset(self):
super().reset()
- if self.keyed_state_backend:
Review comment:
why removing this?
##########
File path: flink-python/pyflink/proto/flink-fn-execution.proto
##########
@@ -94,6 +94,21 @@ message UserDefinedDataStreamFunctions {
// A list of the user-defined aggregate functions to be executed in a group
aggregate operation.
message UserDefinedAggregateFunctions {
+ message DataViewSpec {
+ enum DataViewType {
+ LIST = 0;
+ MAP = 1;
+ }
+ string name = 1;
+ int32 field_index = 2;
+ DataViewType type = 3;
+ Schema.FieldType element_type = 4;
+ Schema.FieldType key_type = 5;
+ }
+ message DataViewSpecs {
Review comment:
Each DataViewSpec belongs to a specific UserDefinedAggregateFunction.
What about adding UserDefinedAggregateFunction which extends
UserDefinedFunction and move DataViewSpec inside UserDefinedAggregateFunction?
##########
File path: flink-python/pyflink/fn_execution/aggregate.py
##########
@@ -167,18 +247,31 @@ class SimpleAggsHandleFunction(AggsHandleFunction):
def __init__(self,
udfs: List[AggregateFunction],
args_offsets_list: List[List[int]],
- index_of_count_star: int):
+ index_of_count_star: int,
+ udf_data_view_specs: List[List[DataViewSpec]]):
self._udfs = udfs
self._args_offsets_list = args_offsets_list
self._accumulators = None # type: Row
self._get_value_indexes = [i for i in range(len(udfs))]
if index_of_count_star >= 0:
# The record count is used internally, should be ignored by the
get_value method.
self._get_value_indexes.remove(index_of_count_star)
+ self._udf_data_view_specs = udf_data_view_specs
+ self._udf_data_views = [] # type: List[Dict[DataView]]
Review comment:
```suggestion
self._udf_data_views = []
```
##########
File path: flink-python/pyflink/proto/flink-fn-execution.proto
##########
@@ -108,6 +123,8 @@ message UserDefinedAggregateFunctions {
int32 state_cache_size = 7;
// Cleanup the expired state if true.
bool state_cleaning_enabled = 8;
+ // The data view specifications
+ repeated DataViewSpecs udf_data_view_specs = 9;
Review comment:
```suggestion
repeated DataViewSpecs data_view_specs = 9;
```
##########
File path: flink-python/pyflink/common/types.py
##########
@@ -82,21 +82,18 @@ class Row(tuple):
Row(name='Alice', age=11)
"""
- def __new__(cls, *args, **kwargs):
Review comment:
I guess the example in the doc of the class doesn't work any more with
this change. Could you explain why changed this and investigate if it's
possible avoiding this change?
##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
##########
@@ -323,6 +322,7 @@ def __init__(self, name, spec, counter_factory, sampler,
consumers, keyed_state_
self.index_of_count_star = spec.serialized_fn.index_of_count_star
self.state_cache_size = spec.serialized_fn.state_cache_size
self.state_cleaning_enabled = spec.serialized_fn.state_cleaning_enabled
+ self.udf_data_view_specs =
extract_data_view_specs(spec.serialized_fn.udf_data_view_specs)
Review comment:
```suggestion
self.data_view_specs =
extract_data_view_specs(spec.serialized_fn.udf_data_view_specs)
```
##########
File path: flink-python/pyflink/table/data_view.py
##########
@@ -0,0 +1,100 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from abc import ABC, abstractmethod
+from typing import TypeVar, Generic, Iterable, List, Any, Iterator
+
+T = TypeVar('T')
+
+__all__ = ['DataView', 'ListView']
Review comment:
What about also adding them in pyflink.table.__init__.py?
##########
File path: flink-python/pyflink/proto/flink-fn-execution.proto
##########
@@ -94,6 +94,21 @@ message UserDefinedDataStreamFunctions {
// A list of the user-defined aggregate functions to be executed in a group
aggregate operation.
message UserDefinedAggregateFunctions {
+ message DataViewSpec {
+ enum DataViewType {
+ LIST = 0;
+ MAP = 1;
+ }
+ string name = 1;
+ int32 field_index = 2;
+ DataViewType type = 3;
+ Schema.FieldType element_type = 4;
Review comment:
Please split the fields for ListView and MapView, e.g.
```
message ListView {
Schema.FieldType element_type = 1;
}
message MapView {
Schema.FieldType key_type = 1;
Schema.FieldType value_type = 2;
}
message DataViewSpec {
string name = 1;
int32 field_index = 2;
oneof data_view {
ListView list_view = 3;
MapView map_view = 4;
}
}
```
##########
File path: flink-python/pyflink/fn_execution/aggregate.py
##########
@@ -258,6 +356,7 @@ def __init__(self,
aggs_handle: AggsHandleFunction,
key_selector: RowKeySelector,
state_backend: RemoteKeyedStateBackend,
+ state_value_coder: FastCoder,
Review comment:
```suggestion
state_value_coder: Coder,
```
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/typeutils/DataViewUtils.java
##########
@@ -159,7 +160,7 @@ public DataType transform(DataType dataType) {
/**
* Information about a {@link DataView} stored in state.
*/
- public abstract static class DataViewSpec {
+ public abstract static class DataViewSpec implements Serializable {
Review comment:
Add serialVersionUID for this class and the child classes?
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/PythonAggregateTest.scala
##########
@@ -60,4 +66,42 @@ class PythonAggregateTest extends TableTestBase {
util.verifyPlan(resultTable)
}
+
+ @Test
+ def testExtractDataViewSpecs(): Unit = {
+ val accTypeInfo =
+ new RowTypeInfo(Types.STRING(), new ListViewTypeInfo(Types.STRING()))
+
+ val specs = TestCommonPythonAggregate.extractDataViewSpecs(accTypeInfo)
+
+ val expected = Array(
+ new ListViewSpec("agg0$f1", 1, DataTypes.ARRAY(DataTypes.STRING())))
+
+ assertEquals(expected(0).getClass, specs(0).getClass)
+ assertEquals(expected(0).getDataType, specs(0).getDataType)
+ assertEquals(expected(0).getStateId, specs(0).getStateId)
+ assertEquals(expected(0).getFieldIndex, specs(0).getFieldIndex)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testExtractSecondLevelDataViewSpecs(): Unit = {
+ val accTypeInfo =
+ new RowTypeInfo(Types.STRING(), new RowTypeInfo(new
ListViewTypeInfo(Types.STRING())))
+
+ val specs = TestCommonPythonAggregate.extractDataViewSpecs(accTypeInfo)
Review comment:
```suggestion
TestCommonPythonAggregate.extractDataViewSpecs(accTypeInfo)
```
##########
File path: flink-python/pyflink/fn_execution/aggregate.py
##########
@@ -267,12 +366,12 @@ def __init__(self,
self.key_selector = key_selector
# Currently we do not support user-defined type accumulator.
Review comment:
The comments doesn't apply any more and could be removed?
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/PythonAggregateTest.scala
##########
@@ -60,4 +66,42 @@ class PythonAggregateTest extends TableTestBase {
util.verifyPlan(resultTable)
}
+
+ @Test
+ def testExtractDataViewSpecs(): Unit = {
+ val accTypeInfo =
+ new RowTypeInfo(Types.STRING(), new ListViewTypeInfo(Types.STRING()))
+
+ val specs = TestCommonPythonAggregate.extractDataViewSpecs(accTypeInfo)
+
+ val expected = Array(
+ new ListViewSpec("agg0$f1", 1, DataTypes.ARRAY(DataTypes.STRING())))
+
+ assertEquals(expected(0).getClass, specs(0).getClass)
+ assertEquals(expected(0).getDataType, specs(0).getDataType)
+ assertEquals(expected(0).getStateId, specs(0).getStateId)
+ assertEquals(expected(0).getFieldIndex, specs(0).getFieldIndex)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testExtractSecondLevelDataViewSpecs(): Unit = {
+ val accTypeInfo =
+ new RowTypeInfo(Types.STRING(), new RowTypeInfo(new
ListViewTypeInfo(Types.STRING())))
+
+ val specs = TestCommonPythonAggregate.extractDataViewSpecs(accTypeInfo)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testExtractDataViewSpecsFromTupleType(): Unit = {
+ val accTypeInfo =
+ new TupleTypeInfo(Types.STRING(), new ListViewTypeInfo(Types.STRING()))
+
+ val specs = TestCommonPythonAggregate.extractDataViewSpecs(accTypeInfo)
Review comment:
```suggestion
TestCommonPythonAggregate.extractDataViewSpecs(accTypeInfo)
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]