[
https://issues.apache.org/jira/browse/FLINK-24290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17415348#comment-17415348
]
Ingo Bürk commented on FLINK-24290:
-----------------------------------
{code:java}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
index e9d62db0da..83c446e370 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
@@ -73,6 +73,7 @@ public final class SpecificInputTypeStrategies {
logical(LogicalTypeFamily.BINARY_STRING),
logical(LogicalTypeFamily.TIMESTAMP),
logical(LogicalTypeFamily.CONSTRUCTED),
+
logical(LogicalTypeRoot.STRUCTURED_TYPE),
logical(LogicalTypeRoot.BOOLEAN),
logical(LogicalTypeFamily.NUMERIC))));
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/JsonGenerateUtils.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/JsonGenerateUtils.scala
index fde4b8e302..e29876a430 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/JsonGenerateUtils.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/JsonGenerateUtils.scala
@@ -24,7 +24,8 @@ import
org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable.JSON_O
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
import
org.apache.flink.table.runtime.typeutils.TypeCheckUtils.isCharacterString
import org.apache.flink.table.types.logical.LogicalTypeRoot._
-import org.apache.flink.table.types.logical.{ArrayType, LogicalType, MapType,
MultisetType, RowType}
+import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute
+import org.apache.flink.table.types.logical.{ArrayType, LogicalType, MapType,
MultisetType, RowType, StructuredType}
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.{ArrayNode,
ContainerNode, ObjectNode}
@@ -93,6 +94,11 @@ object JsonGenerateUtils {
val converterName = generateMapConverter(ctx, containerNodeTerm,
logicalType.asInstanceOf[MultisetType].getElementType,
DataTypes.INT().getLogicalType)
+ s"$converterName($term)"
+ case STRUCTURED_TYPE =>
+ val converterName = generateRowConverter(ctx, containerNodeTerm,
+ logicalType.asInstanceOf[StructuredType])
+
s"$converterName($term)"
case _ => throw new CodeGenException(
s"Type '$logicalType' is not scalar or cannot be converted into JSON.")
@@ -158,10 +164,27 @@ object JsonGenerateUtils {
ctx: CodeGeneratorContext,
containerNodeTerm: String,
rowType: RowType): String = {
+ val fieldTypes = (0 to rowType.getFieldNames.size()).map(rowType.getTypeAt)
+ generateRowConverter(ctx, containerNodeTerm,
toScala(rowType.getFieldNames), fieldTypes)
+ }
+
+ /** Generates a method to convert structured types into [[ObjectNode]]. */
+ private def generateRowConverter(
+ ctx: CodeGeneratorContext,
+ containerNodeTerm: String,
+ structuredType: StructuredType): String = {
+ val attrs = toScala(structuredType.getAttributes)
+ generateRowConverter(ctx, containerNodeTerm, attrs.map(_.getName),
attrs.map(_.getType))
+ }
- val populateObjectCode = toScala(rowType.getFieldNames).zipWithIndex.map {
+ private def generateRowConverter(
+ ctx: CodeGeneratorContext,
+ containerNodeTerm: String,
+ fieldNames: Seq[String],
+ fieldTypes: Seq[LogicalType]): String = {
+ val populateObjectCode = fieldNames.zipWithIndex.map {
case (fieldName, idx) =>
- val fieldType = rowType.getTypeAt(idx)
+ val fieldType = fieldTypes(idx)
val fieldAccessCode = toExternalTypeTerm(
rowFieldReadAccess(ctx, idx.toString, "rowData", fieldType),
fieldType)
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
index 742f994e0c..901983cd7e 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java
@@ -47,10 +47,12 @@ import static org.apache.flink.table.api.DataTypes.BINARY;
import static org.apache.flink.table.api.DataTypes.BOOLEAN;
import static org.apache.flink.table.api.DataTypes.DECIMAL;
import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.INT;
import static org.apache.flink.table.api.DataTypes.MAP;
import static org.apache.flink.table.api.DataTypes.ROW;
import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.STRUCTURED;
import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
import static
org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
import static org.apache.flink.table.api.DataTypes.VARBINARY;
@@ -477,7 +479,8 @@ public class JsonFunctionsITCase extends
BuiltInFunctionTestBase {
multisetData,
"Test".getBytes(StandardCharsets.UTF_8),
"Test".getBytes(StandardCharsets.UTF_8),
- Row.of(Collections.singletonList(Row.of(1,
2))))
+ Row.of(Collections.singletonList(Row.of(1,
2))),
+ new TestPojo("V", 5))
.andDataTypes(
STRING(),
BOOLEAN(),
@@ -492,7 +495,9 @@ public class JsonFunctionsITCase extends
BuiltInFunctionTestBase {
MAP(STRING(), INT()),
BINARY(4),
VARBINARY(4),
- ROW(ARRAY(ROW(INT(), INT()))))
+ ROW(ARRAY(ROW(INT(), INT()))),
+ STRUCTURED(
+ TestPojo.class, FIELD("f0", STRING()),
FIELD("f1", INT())))
.withFunction(CreateMultiset.class)
.testResult(
jsonObject(
@@ -526,6 +531,8 @@ public class JsonFunctionsITCase extends
BuiltInFunctionTestBase {
"K13",
$("f13"),
"K14",
+ $("f14"),
+ "K15",
jsonObject(JsonOnNull.NULL, "A", "B")),
"JSON_OBJECT("
+ "'K0' VALUE f0, "
@@ -542,7 +549,8 @@ public class JsonFunctionsITCase extends
BuiltInFunctionTestBase {
+ "'K11' VALUE f11, "
+ "'K12' VALUE f12, "
+ "'K13' VALUE f13, "
- + "'K14' VALUE JSON_OBJECT(KEY 'A'
VALUE 'B')"
+ + "'K14' VALUE f14, "
+ + "'K15' VALUE JSON_OBJECT(KEY 'A'
VALUE 'B')"
+ ")",
"{"
+ "\"K0\":\"V\","
@@ -559,7 +567,8 @@ public class JsonFunctionsITCase extends
BuiltInFunctionTestBase {
+ "\"K11\":\"VGVzdA==\","
+ "\"K12\":\"VGVzdA==\","
+
"\"K13\":{\"f0\":[{\"f0\":1,\"f1\":2}]},"
- + "\"K14\":{\"A\":\"B\"}"
+ + "\"K14\":{\"a\":\"V\",\"b\":5},"
+ + "\"K15\":{\"A\":\"B\"}"
+ "}",
STRING().notNull(),
VARCHAR(2000).notNull()));
@@ -578,6 +587,25 @@ public class JsonFunctionsITCase extends
BuiltInFunctionTestBase {
}
}
+ /** Example POJO. */
+ public static class TestPojo {
+ private final String a;
+ private final int b;
+
+ public TestPojo(String a, int b) {
+ this.a = a;
+ this.b = b;
+ }
+
+ public String getA() {
+ return a;
+ }
+
+ public int getB() {
+ return b;
+ }
+ }
+
private static String getJsonFromResource(String fileName) throws
Exception {
final InputStream jsonResource =
JsonFunctionsITCase.class.getResourceAsStream(fileName);
if (jsonResource == null) {
{code}
> Support STRUCTURED_TYPE for JSON_OBJECT / JSON_ARRAY
> ----------------------------------------------------
>
> Key: FLINK-24290
> URL: https://issues.apache.org/jira/browse/FLINK-24290
> Project: Flink
> Issue Type: Sub-task
> Components: Table SQL / API
> Reporter: Ingo Bürk
> Assignee: Ingo Bürk
> Priority: Minor
>
> In FLINK-16203 we excluded this because we run into a limitation with
> fromValues when testing it. I will apply the patch that should implement
> this, but we need to find a way to make the test work.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)