[ 
https://issues.apache.org/jira/browse/FLINK-24290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17415348#comment-17415348
 ] 

Ingo Bürk commented on FLINK-24290:
-----------------------------------

{code:java}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
index e9d62db0da..83c446e370 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
@@ -73,6 +73,7 @@ public final class SpecificInputTypeStrategies {
                                             
logical(LogicalTypeFamily.BINARY_STRING),
                                             
logical(LogicalTypeFamily.TIMESTAMP),
                                             
logical(LogicalTypeFamily.CONSTRUCTED),
+                                            
logical(LogicalTypeRoot.STRUCTURED_TYPE),
                                             logical(LogicalTypeRoot.BOOLEAN),
                                             
logical(LogicalTypeFamily.NUMERIC))));
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/JsonGenerateUtils.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/JsonGenerateUtils.scala
index fde4b8e302..e29876a430 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/JsonGenerateUtils.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/JsonGenerateUtils.scala
@@ -24,7 +24,8 @@ import 
org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable.JSON_O
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
 import 
org.apache.flink.table.runtime.typeutils.TypeCheckUtils.isCharacterString
 import org.apache.flink.table.types.logical.LogicalTypeRoot._
-import org.apache.flink.table.types.logical.{ArrayType, LogicalType, MapType, 
MultisetType, RowType}
+import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute
+import org.apache.flink.table.types.logical.{ArrayType, LogicalType, MapType, 
MultisetType, RowType, StructuredType}
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.{ArrayNode,
 ContainerNode, ObjectNode}
@@ -93,6 +94,11 @@ object JsonGenerateUtils {
         val converterName = generateMapConverter(ctx, containerNodeTerm,
           logicalType.asInstanceOf[MultisetType].getElementType, 
DataTypes.INT().getLogicalType)
 
+        s"$converterName($term)"
+      case STRUCTURED_TYPE =>
+        val converterName = generateRowConverter(ctx, containerNodeTerm,
+          logicalType.asInstanceOf[StructuredType])
+
         s"$converterName($term)"
       case _ => throw new CodeGenException(
         s"Type '$logicalType' is not scalar or cannot be converted into JSON.")
@@ -158,10 +164,27 @@ object JsonGenerateUtils {
       ctx: CodeGeneratorContext,
       containerNodeTerm: String,
       rowType: RowType): String = {
+    val fieldTypes = (0 to rowType.getFieldNames.size()).map(rowType.getTypeAt)
+    generateRowConverter(ctx, containerNodeTerm, 
toScala(rowType.getFieldNames), fieldTypes)
+  }
+
+  /** Generates a method to convert structured types into [[ObjectNode]]. */
+  private def generateRowConverter(
+      ctx: CodeGeneratorContext,
+      containerNodeTerm: String,
+      structuredType: StructuredType): String = {
+    val attrs = toScala(structuredType.getAttributes)
+    generateRowConverter(ctx, containerNodeTerm, attrs.map(_.getName), 
attrs.map(_.getType))
+  }
 
-    val populateObjectCode = toScala(rowType.getFieldNames).zipWithIndex.map {
+  private def generateRowConverter(
+      ctx: CodeGeneratorContext,
+      containerNodeTerm: String,
+      fieldNames: Seq[String],
+      fieldTypes: Seq[LogicalType]): String = {
+    val populateObjectCode = fieldNames.zipWithIndex.map {
       case (fieldName, idx) =>
-        val fieldType = rowType.getTypeAt(idx)
+        val fieldType = fieldTypes(idx)
         val fieldAccessCode = toExternalTypeTerm(
           rowFieldReadAccess(ctx, idx.toString, "rowData", fieldType), 
fieldType)
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
index 742f994e0c..901983cd7e 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
@@ -47,10 +47,12 @@ import static org.apache.flink.table.api.DataTypes.BINARY;
 import static org.apache.flink.table.api.DataTypes.BOOLEAN;
 import static org.apache.flink.table.api.DataTypes.DECIMAL;
 import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
 import static org.apache.flink.table.api.DataTypes.INT;
 import static org.apache.flink.table.api.DataTypes.MAP;
 import static org.apache.flink.table.api.DataTypes.ROW;
 import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.STRUCTURED;
 import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
 import static 
org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
 import static org.apache.flink.table.api.DataTypes.VARBINARY;
@@ -477,7 +479,8 @@ public class JsonFunctionsITCase extends 
BuiltInFunctionTestBase {
                                 multisetData,
                                 "Test".getBytes(StandardCharsets.UTF_8),
                                 "Test".getBytes(StandardCharsets.UTF_8),
-                                Row.of(Collections.singletonList(Row.of(1, 
2))))
+                                Row.of(Collections.singletonList(Row.of(1, 
2))),
+                                new TestPojo("V", 5))
                         .andDataTypes(
                                 STRING(),
                                 BOOLEAN(),
@@ -492,7 +495,9 @@ public class JsonFunctionsITCase extends 
BuiltInFunctionTestBase {
                                 MAP(STRING(), INT()),
                                 BINARY(4),
                                 VARBINARY(4),
-                                ROW(ARRAY(ROW(INT(), INT()))))
+                                ROW(ARRAY(ROW(INT(), INT()))),
+                                STRUCTURED(
+                                        TestPojo.class, FIELD("f0", STRING()), 
FIELD("f1", INT())))
                         .withFunction(CreateMultiset.class)
                         .testResult(
                                 jsonObject(
@@ -526,6 +531,8 @@ public class JsonFunctionsITCase extends 
BuiltInFunctionTestBase {
                                         "K13",
                                         $("f13"),
                                         "K14",
+                                        $("f14"),
+                                        "K15",
                                         jsonObject(JsonOnNull.NULL, "A", "B")),
                                 "JSON_OBJECT("
                                         + "'K0' VALUE f0, "
@@ -542,7 +549,8 @@ public class JsonFunctionsITCase extends 
BuiltInFunctionTestBase {
                                         + "'K11' VALUE f11, "
                                         + "'K12' VALUE f12, "
                                         + "'K13' VALUE f13, "
-                                        + "'K14' VALUE JSON_OBJECT(KEY 'A' 
VALUE 'B')"
+                                        + "'K14' VALUE f14, "
+                                        + "'K15' VALUE JSON_OBJECT(KEY 'A' 
VALUE 'B')"
                                         + ")",
                                 "{"
                                         + "\"K0\":\"V\","
@@ -559,7 +567,8 @@ public class JsonFunctionsITCase extends 
BuiltInFunctionTestBase {
                                         + "\"K11\":\"VGVzdA==\","
                                         + "\"K12\":\"VGVzdA==\","
                                         + 
"\"K13\":{\"f0\":[{\"f0\":1,\"f1\":2}]},"
-                                        + "\"K14\":{\"A\":\"B\"}"
+                                        + "\"K14\":{\"a\":\"V\",\"b\":5},"
+                                        + "\"K15\":{\"A\":\"B\"}"
                                         + "}",
                                 STRING().notNull(),
                                 VARCHAR(2000).notNull()));
@@ -578,6 +587,25 @@ public class JsonFunctionsITCase extends 
BuiltInFunctionTestBase {
         }
     }
 
+    /** Example POJO. */
+    public static class TestPojo {
+        private final String a;
+        private final int b;
+
+        public TestPojo(String a, int b) {
+            this.a = a;
+            this.b = b;
+        }
+
+        public String getA() {
+            return a;
+        }
+
+        public int getB() {
+            return b;
+        }
+    }
+
     private static String getJsonFromResource(String fileName) throws 
Exception {
         final InputStream jsonResource = 
JsonFunctionsITCase.class.getResourceAsStream(fileName);
         if (jsonResource == null) {

{code}

> Support STRUCTURED_TYPE for JSON_OBJECT / JSON_ARRAY
> ----------------------------------------------------
>
>                 Key: FLINK-24290
>                 URL: https://issues.apache.org/jira/browse/FLINK-24290
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table SQL / API
>            Reporter: Ingo Bürk
>            Assignee: Ingo Bürk
>            Priority: Minor
>
> In FLINK-16203 we excluded this because we run into a limitation with 
> fromValues when testing it. I will apply the patch that should implement 
> this, but we need to find a way to make the test work.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to