[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105193#comment-16105193
 ] 

ASF GitHub Bot commented on FLINK-7206:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4355#discussion_r130104980
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
    @@ -307,6 +312,119 @@ object UserDefinedFunctionUtils {
       // 
----------------------------------------------------------------------------------------------
     
       /**
    +    * get data view type information from accumulator constructor.
    +    *
    +    * @param aggFun aggregate function
    +    * @return the data view specification
    +    */
    +  def getDataViewTypeInfoFromConstructor(
    +    aggFun: AggregateFunction[_, _])
    +  : mutable.HashMap[String, TypeInformation[_]] = {
    +
    +    val resultMap = new mutable.HashMap[String, TypeInformation[_]]
    +    val acc = aggFun.createAccumulator()
    +    val fields: util.List[Field] = 
TypeExtractor.getAllDeclaredFields(acc.getClass, true)
    +    for (i <- 0 until fields.size()) {
    +      val field = fields.get(i)
    +      field.setAccessible(true)
    +      if (classOf[DataView].isAssignableFrom(field.getType)) {
    +        if (field.getType == classOf[MapView[_, _]]) {
    +          val mapView = field.get(acc)
    +          val keyTypeInfo = getFieldValue(classOf[MapView[_, _]], mapView, 
"keyTypeInfo")
    +          val valueTypeInfo = getFieldValue(classOf[MapView[_, _]], 
mapView, "valueTypeInfo")
    +          if (keyTypeInfo != null && valueTypeInfo != null) {
    +              resultMap.put(field.getName, new MapViewTypeInfo(
    +                keyTypeInfo.asInstanceOf[TypeInformation[_]],
    +                valueTypeInfo.asInstanceOf[TypeInformation[_]]))
    +          }
    +        } else if (field.getType == classOf[ListView[_]]) {
    +          val listView = field.get(acc)
    +          val elementTypeInfo = getFieldValue(classOf[ListView[_]], 
listView, "elementTypeInfo")
    +          if (elementTypeInfo != null) {
    +            resultMap.put(field.getName,
    +              new 
ListViewTypeInfo(elementTypeInfo.asInstanceOf[TypeInformation[_]]))
    +          }
    +        }
    +      }
    +    }
    +
    +    resultMap
    +  }
    +
    +
    +  /**
    +    * Extract data view specification.
    +    *
    +    * @param index aggregate function index
    +    * @param aggFun aggregate function
    +    * @param accType accumulator type information
    +    * @param dataViewTypes data view fields types
    +    * @param isUseState is use state
    +    * @return the data view specification
    +    */
    +  def extractDataViewTypeInfo(
    --- End diff --
    
    This method should get some inline comments.


> Implementation of DataView to support state access for UDAGG
> ------------------------------------------------------------
>
>                 Key: FLINK-7206
>                 URL: https://issues.apache.org/jira/browse/FLINK-7206
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Kaibo Zhou
>            Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to