snuyanzin commented on code in PR #26396:
URL: https://github.com/apache/flink/pull/26396#discussion_r2027138807


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala:
##########
@@ -238,62 +246,118 @@ object ProcessTableRunnerGenerator {
 
   private def generateStateToFunction(
       ctx: CodeGeneratorContext,
-      stateDataTypes: Seq[DataType],
-      stateToFunctionTerm: String): Seq[GeneratedExpression] = {
-    stateDataTypes.zipWithIndex
-      .map {
-        case (stateDataType, pos) =>
-          val stateEntryTerm = s"$stateToFunctionTerm[$pos]"
-          val externalStateTypeTerm = 
typeTerm(stateDataType.getConversionClass)
-          val externalStateTerm = newName(ctx, "externalState")
-
-          val converterCode = genToExternalConverter(ctx, stateDataType, 
stateEntryTerm)
-
-          val constructorCode = stateDataType.getConversionClass match {
-            case rowType if rowType == classOf[Row] =>
-              // This allows us to retrieve the converter term that has been 
generated
-              // in genToExternalConverter(). The converter is able to created 
named positions
-              // for row fields.
-              val converterTerm = ctx.addReusableConverter(stateDataType)
-              s"((${className[RowRowConverter]}) 
$converterTerm).createEmptyRow()"
-            case rowType if rowType == classOf[RowData] =>
-              val fieldCount = 
LogicalTypeChecks.getFieldCount(stateDataType.getLogicalType)
-              s"new $GENERIC_ROW($fieldCount)"
-            case structuredType @ _ => s"new ${className(structuredType)}()"
-          }
+      stateEntries: Seq[(StateInfo, Int)],
+      stateHandlesTerm: String,
+      valueStateToFunctionTerm: String): Seq[GeneratedExpression] = {
+    stateEntries.map {
+      case (stateInfo, pos) =>
+        val stateDataType = stateInfo.getDataType
+        val stateType = stateDataType.getLogicalType
+        val externalStateTypeTerm = typeTerm(stateDataType.getConversionClass)
+        val externalStateTerm = newName(ctx, "externalState")
+
+        val externalStateCode = if (DataViewUtils.isDataView(stateType, 
classOf[DataView])) {
+          generateDataViewStateToFunction(
+            ctx,
+            stateHandlesTerm,
+            pos,
+            stateType,
+            externalStateTypeTerm,
+            externalStateTerm)
+          NO_CODE
+        } else {
+          DataViewUtils.checkForInvalidDataViews(stateType)
+          generateValueStateToFunction(
+            ctx,
+            valueStateToFunctionTerm,
+            pos,
+            externalStateTypeTerm,
+            externalStateTerm,
+            stateDataType)
+        }
 
-          val externalStateCode =
-            s"""
-               |final $externalStateTypeTerm $externalStateTerm;
-               |if ($stateEntryTerm == null) {
-               |  $externalStateTerm = $constructorCode;
-               |} else {
-               |  $externalStateTerm = $converterCode;
-               |}
-               |""".stripMargin
+        GeneratedExpression(s"$externalStateTerm", NEVER_NULL, 
externalStateCode, stateType)
+    }
+  }
 
-          GeneratedExpression(
-            s"$externalStateTerm",
-            NEVER_NULL,
-            externalStateCode,
-            stateDataType.getLogicalType)
+  private def generateDataViewStateToFunction(
+      ctx: CodeGeneratorContext,
+      stateHandlesTerm: String,
+      pos: Int,
+      stateType: LogicalType,
+      externalStateTypeTerm: String,
+      externalStateTerm: String): Unit = {
+    ctx.addReusableMember(s"private $externalStateTypeTerm 
$externalStateTerm;")
+
+    val (constructor, stateHandleTypeTerm) =
+      if (DataViewUtils.isDataView(stateType, classOf[ListView[_]])) {
+        (className[KeyedStateListView[_, _]], className[ListState[_]])
+      } else if (DataViewUtils.isDataView(stateType, classOf[MapView[_, _]])) {
+        (className[KeyedStateMapViewWithKeysNotNull[_, _, _]], 
className[MapState[_, _]])
       }
+
+    val openCode =
+      s"""
+         |$externalStateTerm = new $constructor(($stateHandleTypeTerm) 
$stateHandlesTerm[$pos]);
+         """.stripMargin
+    ctx.addReusableOpenStatement(openCode)
+  }
+
+  private def generateValueStateToFunction(
+      ctx: CodeGeneratorContext,
+      valueStateToFunctionTerm: String,
+      pos: Int,
+      externalStateTypeTerm: String,
+      externalStateTerm: String,
+      stateDataType: DataType): String = {
+    val stateEntryTerm = s"$valueStateToFunctionTerm[$pos]"
+    val converterCode = genToExternalConverter(ctx, stateDataType, 
stateEntryTerm)
+
+    val constructorCode = stateDataType.getConversionClass match {
+      case rowType if rowType == classOf[Row] =>
+        // This allows us to retrieve the converter term that has been 
generated
+        // in genToExternalConverter(). The converter is able to created named 
positions

Review Comment:
   ```suggestion
           // in genToExternalConverter(). The converter is able to create 
named positions
   ```
   misprint?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to