asfgit closed pull request #6578: [FLINK-10170] [table] Support string 
representation for map and array types in descriptor-based Table API
URL: https://github.com/apache/flink/pull/6578
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
index 1bfff420d62..4def3a485c3 100644
--- a/docs/dev/table/connect.md
+++ b/docs/dev/table/connect.md
@@ -417,13 +417,19 @@ DECIMAL
 DATE
 TIME
 TIMESTAMP
-ROW(fieldtype, ...)              # unnamed row; e.g. ROW(VARCHAR, INT) that is 
mapped to Flink's RowTypeInfo
+MAP<fieldtype, fieldtype>        # generic map; e.g. MAP<VARCHAR, INT> that is 
mapped to Flink's MapTypeInfo
+MULTISET<fieldtype>              # multiset; e.g. MULTISET<VARCHAR> that is 
mapped to Flink's MultisetTypeInfo
+PRIMITIVE_ARRAY<fieldtype>       # primitive array; e.g. PRIMITIVE_ARRAY<INT> 
that is mapped to Flink's PrimitiveArrayTypeInfo
+BASIC_ARRAY<fieldtype>           # basic array; e.g. BASIC_ARRAY<INT> that is 
mapped to Flink's BasicArrayTypeInfo
+OBJECT_ARRAY<fieldtype>          # object array; e.g. 
OBJECT_ARRAY<POJO(org.mycompany.MyPojoClass)> that is mapped to
+                                 # Flink's ObjectArrayTypeInfo
+ROW<fieldtype, ...>              # unnamed row; e.g. ROW<VARCHAR, INT> that is 
mapped to Flink's RowTypeInfo
                                  # with indexed fields names f0, f1, ...
-ROW(fieldname fieldtype, ...)    # named row; e.g., ROW(myField VARCHAR, 
myOtherField INT) that
+ROW<fieldname fieldtype, ...>    # named row; e.g., ROW<myField VARCHAR, 
myOtherField INT> that
                                  # is mapped to Flink's RowTypeInfo
-POJO(class)                      # e.g., POJO(org.mycompany.MyPojoClass) that 
is mapped to Flink's PojoTypeInfo
-ANY(class)                       # e.g., ANY(org.mycompany.MyClass) that is 
mapped to Flink's GenericTypeInfo
-ANY(class, serialized)           # used for type information that is not 
supported by Flink's Table & SQL API
+POJO<class>                      # e.g., POJO<org.mycompany.MyPojoClass> that 
is mapped to Flink's PojoTypeInfo
+ANY<class>                       # e.g., ANY<org.mycompany.MyClass> that is 
mapped to Flink's GenericTypeInfo
+ANY<class, serialized>           # used for type information that is not 
supported by Flink's Table & SQL API
 {% endhighlight %}
 
 {% top %}
@@ -1046,4 +1052,4 @@ table.writeToSink(sink)
 </div>
 </div>
 
-{% top %}
\ No newline at end of file
+{% top %}
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/table/descriptors/JsonTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/table/descriptors/JsonTest.java
index 6e370a02c13..ac6ff11c370 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/table/descriptors/JsonTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/table/descriptors/JsonTest.java
@@ -106,7 +106,7 @@ public void testDuplicateSchema() {
                final Map<String, String> props3 = new HashMap<>();
                props3.put("format.type", "json");
                props3.put("format.property-version", "1");
-               props3.put("format.schema", "ROW(test1 VARCHAR, test2 
TIMESTAMP)");
+               props3.put("format.schema", "ROW<test1 VARCHAR, test2 
TIMESTAMP>");
                props3.put("format.fail-on-missing-field", "true");
 
                final Map<String, String> props4 = new HashMap<>();
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
index afc6506cbef..9e96ea5a535 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.typeutils
 
 import java.io.Serializable
+import java.lang.reflect.Array
 
 import org.apache.commons.codec.binary.Base64
 import org.apache.commons.lang3.StringEscapeUtils
@@ -67,6 +68,11 @@ object TypeStringUtils extends JavaTokenParsers with 
PackratParsers {
   lazy val ROW: Keyword = Keyword("ROW")
   lazy val ANY: Keyword = Keyword("ANY")
   lazy val POJO: Keyword = Keyword("POJO")
+  lazy val MAP: Keyword = Keyword("MAP")
+  lazy val MULTISET: Keyword = Keyword("MULTISET")
+  lazy val PRIMITIVE_ARRAY: Keyword = Keyword("PRIMITIVE_ARRAY")
+  lazy val BASIC_ARRAY: Keyword = Keyword("BASIC_ARRAY")
+  lazy val OBJECT_ARRAY: Keyword = Keyword("OBJECT_ARRAY")
 
   lazy val qualifiedName: Parser[String] =
     """\p{javaJavaIdentifierStart}[\p{javaJavaIdentifierPart}.]*""".r
@@ -74,6 +80,13 @@ object TypeStringUtils extends JavaTokenParsers with 
PackratParsers {
   lazy val base64Url: Parser[String] =
     """[A-Za-z0-9_-]*""".r
 
+  // keep parenthesis to ensure backward compatibility
+  lazy val leftBracket: PackratParser[(String)] =
+    "(" | "<"
+
+  lazy val rightBracket: PackratParser[(String)] =
+    ")" | ">"
+
   lazy val atomic: PackratParser[TypeInformation[_]] =
     (VARCHAR | STRING) ^^ { e => Types.STRING } |
     BOOLEAN ^^ { e => Types.BOOLEAN } |
@@ -101,34 +114,35 @@ object TypeStringUtils extends JavaTokenParsers with 
PackratParsers {
     }
 
   lazy val namedRow: PackratParser[TypeInformation[_]] =
-    ROW ~ "(" ~> rep1sep(field, ",") <~ ")" ^^ {
+    ROW ~ leftBracket ~> rep1sep(field, ", ") <~ rightBracket ^^ {
       fields => Types.ROW(fields.map(_._1).toArray, fields.map(_._2).toArray)
     } | failure("Named row type expected.")
 
   lazy val unnamedRow: PackratParser[TypeInformation[_]] =
-    ROW ~ "(" ~> rep1sep(typeInfo, ",") <~ ")" ^^ {
+    ROW ~ leftBracket ~> rep1sep(typeInfo, ", ") <~ rightBracket ^^ {
       types => Types.ROW(types: _*)
     } | failure("Unnamed row type expected.")
 
   lazy val generic: PackratParser[TypeInformation[_]] =
-    ANY ~ "(" ~> qualifiedName <~ ")" ^^ {
+    ANY ~ leftBracket ~> qualifiedName <~ rightBracket ^^ {
       typeClass =>
         val clazz = loadClass(typeClass)
         new GenericTypeInfo[AnyRef](clazz.asInstanceOf[Class[AnyRef]])
     }
 
-  lazy val pojo: PackratParser[TypeInformation[_]] = POJO ~ "(" ~> 
qualifiedName <~ ")" ^^ {
-    typeClass =>
-      val clazz = loadClass(typeClass)
-      val info = TypeExtractor.createTypeInfo(clazz)
-      if (!info.isInstanceOf[PojoTypeInfo[_]]) {
-        throw new ValidationException(s"Class '$typeClass'is not a POJO type.")
-      }
-      info
-  }
+  lazy val pojo: PackratParser[TypeInformation[_]] =
+    POJO ~ leftBracket ~> qualifiedName <~ rightBracket ^^ {
+      typeClass =>
+        val clazz = loadClass(typeClass)
+        val info = TypeExtractor.createTypeInfo(clazz)
+        if (!info.isInstanceOf[PojoTypeInfo[_]]) {
+          throw new ValidationException(s"Class '$typeClass'is not a POJO 
type.")
+        }
+        info
+    }
 
   lazy val any: PackratParser[TypeInformation[_]] =
-    ANY ~ "(" ~ qualifiedName ~ "," ~ base64Url ~ ")" ^^ {
+    ANY ~ leftBracket ~ qualifiedName ~ "," ~ base64Url ~ rightBracket ^^ {
       case _ ~ _ ~ typeClass ~ _ ~ serialized ~ _=>
         val clazz = loadClass(typeClass)
         val typeInfo = deserialize(serialized)
@@ -140,8 +154,45 @@ object TypeStringUtils extends JavaTokenParsers with 
PackratParsers {
         typeInfo
     }
 
+  lazy val genericMap: PackratParser[TypeInformation[_]] =
+    MAP ~ leftBracket ~ typeInfo ~ "," ~ typeInfo ~ rightBracket ^^ {
+      case _ ~ _ ~ keyTypeInfo ~ _ ~ valueTypeInfo ~ _=>
+        Types.MAP(keyTypeInfo, valueTypeInfo)
+    }
+
+  lazy val multiSet: PackratParser[TypeInformation[_]] =
+    MULTISET ~ leftBracket ~ typeInfo ~ rightBracket ^^ {
+      case _ ~ _ ~ elementTypeInfo ~ _ =>
+        Types.MULTISET(elementTypeInfo)
+    }
+
+  lazy val primitiveArray: PackratParser[TypeInformation[_]] =
+    PRIMITIVE_ARRAY ~ leftBracket ~ typeInfo ~ rightBracket ^^ {
+      case _ ~ _ ~ componentTypeInfo ~ _ =>
+        Types.PRIMITIVE_ARRAY(componentTypeInfo)
+    }
+
+  lazy val basicArray: PackratParser[TypeInformation[_]] =
+    BASIC_ARRAY ~ leftBracket ~ typeInfo ~ rightBracket ^^ {
+      case _ ~ _ ~ componentTypeInfo ~ _ =>
+        BasicArrayTypeInfo.getInfoFor(Array.newInstance(
+          componentTypeInfo.getTypeClass, 0).getClass)
+    }
+
+  lazy val objectArray: PackratParser[TypeInformation[_]] =
+    OBJECT_ARRAY ~ leftBracket ~ typeInfo ~ rightBracket ^^ {
+      case _ ~ _ ~ componentTypeInfo ~ _ =>
+        Types.OBJECT_ARRAY(componentTypeInfo)
+    }
+
+  lazy val map: PackratParser[TypeInformation[_]] =
+    genericMap | multiSet
+
+  lazy val array: PackratParser[TypeInformation[_]] =
+    primitiveArray | basicArray | objectArray
+
   lazy val typeInfo: PackratParser[TypeInformation[_]] =
-    namedRow | unnamedRow | any | generic | pojo | atomic | failure("Invalid 
type.")
+    namedRow | unnamedRow | any | generic | pojo | atomic | map | array | 
failure("Invalid type.")
 
   def readTypeInfo(typeString: String): TypeInformation[_] = {
     parseAll(typeInfo, typeString) match {
@@ -182,10 +233,10 @@ object TypeStringUtils extends JavaTokenParsers with 
PackratParsers {
 
         s"$name ${normalizeTypeInfo(f._2)}"
       }
-      s"${ROW.key}(${normalizedFields.mkString(", ")})"
+      s"${ROW.key}<${normalizedFields.mkString(", ")}>"
 
     case generic: GenericTypeInfo[_] =>
-      s"${ANY.key}(${generic.getTypeClass.getName})"
+      s"${ANY.key}<${generic.getTypeClass.getName}>"
 
     case pojo: PojoTypeInfo[_] =>
       // we only support very simple POJOs that only contain extracted fields
@@ -196,7 +247,7 @@ object TypeStringUtils extends JavaTokenParsers with 
PackratParsers {
         case _: InvalidTypesException => None
       }
       extractedInfo match {
-        case Some(ei) if ei == pojo => 
s"${POJO.key}(${pojo.getTypeClass.getName})"
+        case Some(ei) if ei == pojo => 
s"${POJO.key}<${pojo.getTypeClass.getName}>"
         case _ =>
           throw new TableException(
             "A string representation for custom POJO types is not supported 
yet.")
@@ -205,15 +256,25 @@ object TypeStringUtils extends JavaTokenParsers with 
PackratParsers {
     case _: CompositeType[_] =>
       throw new TableException("A string representation for composite types is 
not supported yet.")
 
-    case _: BasicArrayTypeInfo[_, _] | _: ObjectArrayTypeInfo[_, _] |
-         _: PrimitiveArrayTypeInfo[_] =>
-      throw new TableException("A string representation for array types is not 
supported yet.")
+    case array: PrimitiveArrayTypeInfo[_] =>
+      s"${PRIMITIVE_ARRAY.key}<${normalizeTypeInfo(array.getComponentType)}>"
+
+    case array: BasicArrayTypeInfo[_, _] =>
+      s"${BASIC_ARRAY.key}<${normalizeTypeInfo(array.getComponentInfo)}>"
+
+    case array: ObjectArrayTypeInfo[_, _] =>
+      s"${OBJECT_ARRAY.key}<${normalizeTypeInfo(array.getComponentInfo)}>"
+
+    case set: MultisetTypeInfo[_] =>
+      s"${MULTISET.key}<${normalizeTypeInfo(set.getElementTypeInfo)}>"
 
-    case _: MapTypeInfo[_, _] | _: MultisetTypeInfo[_] =>
-      throw new TableException("A string representation for map types is not 
supported yet.")
+    case map: MapTypeInfo[_, _] =>
+      val normalizedKey = normalizeTypeInfo(map.getKeyTypeInfo)
+      val normalizedValue = normalizeTypeInfo(map.getValueTypeInfo)
+      s"${MAP.key}<${normalizedKey}, ${normalizedValue}>"
 
     case any: TypeInformation[_] =>
-      s"${ANY.key}(${any.getTypeClass.getName}, ${serialize(any)})"
+      s"${ANY.key}<${any.getTypeClass.getName}, ${serialize(any)}>"
   }
 
   // 
----------------------------------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
index 0c2e8069474..8d01b8be3c3 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
@@ -78,9 +78,9 @@ class CsvTest extends DescriptorTestBase {
       "format.fields.1.name" -> "field2",
       "format.fields.1.type" -> "TIMESTAMP",
       "format.fields.2.name" -> "field3",
-      "format.fields.2.type" -> "ANY(java.lang.Class)",
+      "format.fields.2.type" -> "ANY<java.lang.Class>",
       "format.fields.3.name" -> "field4",
-      "format.fields.3.type" -> "ROW(test INT, row VARCHAR)",
+      "format.fields.3.type" -> "ROW<test INT, row VARCHAR>",
       "format.line-delimiter" -> "^")
 
     val props2 = Map(
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
index ccac3170cbd..00e3a21c9c2 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
@@ -18,7 +18,9 @@
 
 package org.apache.flink.table.descriptors
 
+import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.table.api.Types
+import org.apache.flink.table.runtime.utils.CommonTestData.Person
 import org.apache.flink.table.utils.TableTestBase
 import org.junit.Assert.assertEquals
 import org.junit.Test
@@ -45,6 +47,10 @@ class TableDescriptorTest extends TableTestBase {
     val schema = Schema()
       .field("myfield", Types.STRING)
       .field("myfield2", Types.INT)
+      .field("myfield3", Types.MAP(Types.STRING, Types.INT))
+      .field("myfield4", Types.MULTISET(Types.LONG))
+      .field("myfield5", Types.PRIMITIVE_ARRAY(Types.SHORT))
+      .field("myfield6", 
Types.OBJECT_ARRAY(TypeExtractor.createTypeInfo(classOf[Person])))
     // CSV table source and sink do not support proctime yet
     //if (isStreaming) {
     //  schema.field("proctime", Types.SQL_TIMESTAMP).proctime()
@@ -56,6 +62,10 @@ class TableDescriptorTest extends TableTestBase {
     val format = Csv()
       .field("myfield", Types.STRING)
       .field("myfield2", Types.INT)
+      .field("myfield3", Types.MAP(Types.STRING, Types.INT))
+      .field("myfield4", Types.MULTISET(Types.LONG))
+      .field("myfield5", Types.PRIMITIVE_ARRAY(Types.SHORT))
+      .field("myfield6", 
Types.OBJECT_ARRAY(TypeExtractor.createTypeInfo(classOf[Person])))
       .fieldDelimiter("#")
 
     val descriptor: RegistrableDescriptor = if (isStreaming) {
@@ -84,11 +94,29 @@ class TableDescriptorTest extends TableTestBase {
       "format.fields.0.type" -> "VARCHAR",
       "format.fields.1.name" -> "myfield2",
       "format.fields.1.type" -> "INT",
+      "format.fields.2.name" -> "myfield3",
+      "format.fields.2.type" -> "MAP<VARCHAR, INT>",
+      "format.fields.3.name" -> "myfield4",
+      "format.fields.3.type" -> "MULTISET<BIGINT>",
+      "format.fields.4.name" -> "myfield5",
+      "format.fields.4.type" -> "PRIMITIVE_ARRAY<SMALLINT>",
+      "format.fields.5.name" -> "myfield6",
+      "format.fields.5.type" ->
+        
"OBJECT_ARRAY<POJO<org.apache.flink.table.runtime.utils.CommonTestData$Person>>",
       "format.field-delimiter" -> "#",
       "schema.0.name" -> "myfield",
       "schema.0.type" -> "VARCHAR",
       "schema.1.name" -> "myfield2",
-      "schema.1.type" -> "INT"
+      "schema.1.type" -> "INT",
+      "schema.2.name" -> "myfield3",
+      "schema.2.type" -> "MAP<VARCHAR, INT>",
+      "schema.3.name" -> "myfield4",
+      "schema.3.type" -> "MULTISET<BIGINT>",
+      "schema.4.name" -> "myfield5",
+      "schema.4.type" -> "PRIMITIVE_ARRAY<SMALLINT>",
+      "schema.5.name" -> "myfield6",
+      "schema.5.type" ->
+        
"OBJECT_ARRAY<POJO<org.apache.flink.table.runtime.utils.CommonTestData$Person>>"
     )
 
     val expectedProperties = if (isStreaming) {
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala
index 9ea8be08620..56e971b964a 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala
@@ -18,14 +18,12 @@
 
 package org.apache.flink.table.typeutils
 
-import java.util
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, 
BasicArrayTypeInfo, BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.{RowTypeInfo, TypeExtractor}
 import org.apache.flink.table.api.Types
 import org.apache.flink.table.runtime.utils.CommonTestData.{NonPojo, Person}
 import org.junit.Assert.{assertEquals, assertTrue}
-import org.junit.{Assert, Test}
+import org.junit.{Test}
 
 /**
   * Tests for string-based representation of [[TypeInformation]].
@@ -49,7 +47,7 @@ class TypeStringUtilsTest {
 
     // unsupported type information
     testReadAndWrite(
-      "ANY(java.lang.Void, " +
+      "ANY<java.lang.Void, " +
         
"rO0ABXNyADJvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZWluZm8uQmFzaWNUeXBlSW5mb_oE8IKl"
 +
         
"ad0GAgAETAAFY2xhenp0ABFMamF2YS9sYW5nL0NsYXNzO0wAD2NvbXBhcmF0b3JDbGFzc3EAfgABWwAXcG9z"
 +
         
"c2libGVDYXN0VGFyZ2V0VHlwZXN0ABJbTGphdmEvbGFuZy9DbGFzcztMAApzZXJpYWxpemVydAA2TG9yZy9h"
 +
@@ -59,31 +57,62 @@ class TypeStringUtilsTest {
         
"cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlZvaWRTZXJpYWxpemVyAAAA"
 +
         
"AAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJp"
 +
         
"YWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1"
 +
-        "dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHA)",
+        "dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHA>",
       BasicTypeInfo.VOID_TYPE_INFO)
   }
 
   @Test
   def testWriteComplexTypes(): Unit = {
     testReadAndWrite(
-      "ROW(f0 DECIMAL, f1 TINYINT)",
+      "ROW<f0 DECIMAL, f1 TINYINT>",
       Types.ROW(Types.DECIMAL, Types.BYTE))
 
     testReadAndWrite(
-      "ROW(hello DECIMAL, world TINYINT)",
+      "ROW<hello DECIMAL, world TINYINT>",
       Types.ROW(
         Array[String]("hello", "world"),
         Array[TypeInformation[_]](Types.DECIMAL, Types.BYTE)))
 
     testReadAndWrite(
-      "POJO(org.apache.flink.table.runtime.utils.CommonTestData$Person)",
+      "POJO<org.apache.flink.table.runtime.utils.CommonTestData$Person>",
       TypeExtractor.createTypeInfo(classOf[Person]))
 
     testReadAndWrite(
-      "ANY(org.apache.flink.table.runtime.utils.CommonTestData$NonPojo)",
+      "ANY<org.apache.flink.table.runtime.utils.CommonTestData$NonPojo>",
       TypeExtractor.createTypeInfo(classOf[NonPojo]))
 
+    testReadAndWrite(
+      "MAP<VARCHAR, ROW<f0 DECIMAL, f1 TINYINT>>",
+      Types.MAP(Types.STRING, Types.ROW(Types.DECIMAL, Types.BYTE))
+    )
+
+    testReadAndWrite(
+      "MULTISET<ROW<f0 DECIMAL, f1 TINYINT>>",
+      Types.MULTISET(Types.ROW(Types.DECIMAL, Types.BYTE))
+    )
+
+    testReadAndWrite(
+      "PRIMITIVE_ARRAY<TINYINT>",
+      PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO
+    )
+
+    testReadAndWrite(
+      "BASIC_ARRAY<INT>",
+      BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO
+    )
+
+    testReadAndWrite(
+      
"OBJECT_ARRAY<POJO<org.apache.flink.table.runtime.utils.CommonTestData$Person>>",
+      Types.OBJECT_ARRAY(TypeExtractor.createTypeInfo(classOf[Person]))
+    )
+
     // test escaping
+    assertTrue(
+      TypeStringUtils.readTypeInfo("ROW<\"he         \\nllo\" DECIMAL, world 
TINYINT>")
+        .asInstanceOf[RowTypeInfo].getFieldNames
+        .sameElements(Array[String]("he         \nllo", "world")))
+
+    // test backward compatibility with brackets ()
     assertTrue(
       TypeStringUtils.readTypeInfo("ROW(\"he         \\nllo\" DECIMAL, world 
TINYINT)")
         .asInstanceOf[RowTypeInfo].getFieldNames


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to