[
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105188#comment-16105188
]
ASF GitHub Bot commented on FLINK-7206:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4355#discussion_r130092445
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
---
@@ -307,6 +312,119 @@ object UserDefinedFunctionUtils {
//
----------------------------------------------------------------------------------------------
/**
+ * get data view type information from accumulator constructor.
+ *
+ * @param aggFun aggregate function
+ * @return the data view specification
+ */
+ def getDataViewTypeInfoFromConstructor(
+ aggFun: AggregateFunction[_, _])
+ : mutable.HashMap[String, TypeInformation[_]] = {
+
+ val resultMap = new mutable.HashMap[String, TypeInformation[_]]
+ val acc = aggFun.createAccumulator()
+ val fields: util.List[Field] =
TypeExtractor.getAllDeclaredFields(acc.getClass, true)
+ for (i <- 0 until fields.size()) {
+ val field = fields.get(i)
+ field.setAccessible(true)
+ if (classOf[DataView].isAssignableFrom(field.getType)) {
+ if (field.getType == classOf[MapView[_, _]]) {
+ val mapView = field.get(acc)
+ val keyTypeInfo = getFieldValue(classOf[MapView[_, _]], mapView,
"keyTypeInfo")
+ val valueTypeInfo = getFieldValue(classOf[MapView[_, _]],
mapView, "valueTypeInfo")
+ if (keyTypeInfo != null && valueTypeInfo != null) {
+ resultMap.put(field.getName, new MapViewTypeInfo(
+ keyTypeInfo.asInstanceOf[TypeInformation[_]],
+ valueTypeInfo.asInstanceOf[TypeInformation[_]]))
+ }
+ } else if (field.getType == classOf[ListView[_]]) {
+ val listView = field.get(acc)
+ val elementTypeInfo = getFieldValue(classOf[ListView[_]],
listView, "elementTypeInfo")
+ if (elementTypeInfo != null) {
+ resultMap.put(field.getName,
+ new
ListViewTypeInfo(elementTypeInfo.asInstanceOf[TypeInformation[_]]))
+ }
+ }
+ }
+ }
+
+ resultMap
+ }
+
+
+ /**
+ * Extract data view specification.
+ *
+ * @param index aggregate function index
+ * @param aggFun aggregate function
+ * @param accType accumulator type information
+ * @param dataViewTypes data view fields types
+ * @param isUseState is use state
--- End diff --
The comment is not helpful. Please add more detailed parameters descriptions
> Implementation of DataView to support state access for UDAGG
> ------------------------------------------------------------
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Kaibo Zhou
> Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)