[ 
https://issues.apache.org/jira/browse/BEAM-4196?focusedWorklogId=100207&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100207
 ]

ASF GitHub Bot logged work on BEAM-4196:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/May/18 18:56
            Start Date: 09/May/18 18:56
    Worklog Time Spent: 10m 
      Work Description: kennknowles closed pull request #5276: [BEAM-4196][SQL] 
Support complex types in DDL
URL: https://github.com/apache/beam/pull/5276
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build_rules.gradle b/build_rules.gradle
index 1463326b4b0..8174326d504 100644
--- a/build_rules.gradle
+++ b/build_rules.gradle
@@ -188,6 +188,7 @@ def postgres_version = "9.4.1212.jre7"
 def jaxb_api_version = "2.2.12"
 def kafka_version = "1.0.0"
 def jackson_datatype_joda_version = "2.4.0"
+def quickcheck_version = "0.8"
 
 // A map of maps containing common libraries used per language. To use:
 // dependencies {
@@ -295,6 +296,7 @@ ext.library = [
     spark_streaming: "org.apache.spark:spark-streaming_2.11:$spark_version",
     stax2_api: "org.codehaus.woodstox:stax2-api:3.1.4",
     woodstox_core_asl: "org.codehaus.woodstox:woodstox-core-asl:4.4.1",
+    quickcheck_core: "com.pholser:junit-quickcheck-core:$quickcheck_version",
   ],
   // For generating pom.xml from archetypes.
   maven: [
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
index 5caa6464556..2e757d7f1eb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
@@ -103,9 +103,10 @@ private static long estimatedSizeBytes(FieldType 
typeDescriptor, Object value) {
         Map<Object, Object> map = (Map<Object, Object>) value;
         long mapSizeBytes = 0;
         for (Map.Entry<Object, Object> elem : map.entrySet()) {
-          mapSizeBytes += 
typeDescriptor.getMapKeyType().equals(TypeName.STRING)
-                ? ((String) elem.getKey()).length()
-                  : ESTIMATED_FIELD_SIZES.get(typeDescriptor.getMapKeyType());
+          mapSizeBytes +=
+              
typeDescriptor.getMapKeyType().getTypeName().equals(TypeName.STRING)
+                  ? ((String) elem.getKey()).length()
+                  : 
ESTIMATED_FIELD_SIZES.get(typeDescriptor.getMapKeyType().getTypeName());
           mapSizeBytes += estimatedSizeBytes(typeDescriptor.getMapValueType(), 
elem.getValue());
         }
         return 4 + mapSizeBytes;
@@ -133,7 +134,8 @@ Coder getCoder(FieldType fieldType) {
     if (TypeName.ARRAY.equals(fieldType.getTypeName())) {
       return ListCoder.of(getCoder(fieldType.getCollectionElementType()));
     } else if (TypeName.MAP.equals(fieldType.getTypeName())) {
-      return MapCoder.of(coderForPrimitiveType(fieldType.getMapKeyType()),
+      return MapCoder.of(
+          coderForPrimitiveType(fieldType.getMapKeyType().getTypeName()),
           getCoder(fieldType.getMapValueType()));
     } else if (TypeName.ROW.equals((fieldType.getTypeName()))) {
       return RowCoder.of(fieldType.getRowSchema());
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
index 3ea4d5fa42e..231b0c301c0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
@@ -252,7 +252,7 @@ public FieldType type() {
     // For container types (e.g. ARRAY), returns the type of the contained 
element.
     @Nullable public abstract FieldType getCollectionElementType();
     // For MAP type, returns the type of the key element, it must be a 
primitive type;
-    @Nullable public abstract TypeName getMapKeyType();
+    @Nullable public abstract FieldType getMapKeyType();
     // For MAP type, returns the type of the value element, it can be a nested 
type;
     @Nullable public abstract FieldType getMapValueType();
     // For ROW types, returns the schema for the row.
@@ -267,7 +267,7 @@ public FieldType type() {
     abstract static class Builder {
       abstract Builder setTypeName(TypeName typeName);
       abstract Builder setCollectionElementType(@Nullable FieldType 
collectionElementType);
-      abstract Builder setMapKeyType(@Nullable TypeName mapKeyType);
+      abstract Builder setMapKeyType(@Nullable FieldType mapKeyType);
       abstract Builder setMapValueType(@Nullable FieldType mapValueType);
       abstract Builder setRowSchema(@Nullable Schema rowSchema);
       abstract Builder setMetadata(@Nullable byte[] metadata);
@@ -294,13 +294,15 @@ public FieldType withCollectionElementType(@Nullable 
FieldType collectionElement
     /**
      * For MAP type, adds the type of the component key/value element.
      */
-    public FieldType withMapType(@Nullable TypeName mapKeyType,
+    public FieldType withMapType(
+        @Nullable FieldType mapKeyType,
         @Nullable FieldType mapValueType) {
       if (mapKeyType != null && mapValueType != null) {
         checkArgument(getTypeName().isMapType());
-        checkArgument(mapKeyType.isPrimitiveType());
+        checkArgument(mapKeyType.getTypeName().isPrimitiveType());
       }
-      return toBuilder().setMapKeyType(mapKeyType)
+      return toBuilder()
+          .setMapKeyType(mapKeyType)
           .setMapValueType(mapValueType).build();
     }
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
index cac1dac35f8..9ec983f163d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
@@ -413,8 +413,12 @@ private Object verify(Object value, FieldType type, String 
fieldName) {
         List<Object> arrayElements = verifyArray(value, 
type.getCollectionElementType(), fieldName);
         return arrayElements;
       } else if (TypeName.MAP.equals(type.getTypeName())) {
-        Map<Object, Object> mapElements = verifyMap(value, 
type.getMapKeyType(),
-            type.getMapValueType(), fieldName);
+        Map<Object, Object> mapElements =
+            verifyMap(
+              value,
+              type.getMapKeyType().getTypeName(),
+              type.getMapValueType(),
+              fieldName);
         return mapElements;
       } else if (TypeName.ROW.equals(type.getTypeName())) {
         return verifyRow(value, fieldName);
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java
index 7389aae67ca..95d6c1377f0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java
@@ -192,7 +192,7 @@ public void testCreatesArrayOfMap() {
     Schema type = Stream
         .of(Schema.Field.of("array",
             TypeName.ARRAY.type().withCollectionElementType(
-                TypeName.MAP.type().withMapType(TypeName.INT32, 
TypeName.STRING.type()))))
+                TypeName.MAP.type().withMapType(TypeName.INT32.type(), 
TypeName.STRING.type()))))
         .collect(toSchema());
     Row row = Row.withSchema(type).addArray(data).build();
     assertEquals(data, row.getArray("array"));
@@ -210,7 +210,7 @@ public void testCreateMapWithPrimitiveValue() {
     };
     Schema type = Stream
         .of(Schema.Field.of("map",
-            TypeName.MAP.type().withMapType(TypeName.INT32, 
TypeName.STRING.type())))
+            TypeName.MAP.type().withMapType(TypeName.INT32.type(), 
TypeName.STRING.type())))
         .collect(toSchema());
     Row row = Row.withSchema(type).addValue(data).build();
     assertEquals(data, row.getMap("map"));
@@ -226,7 +226,7 @@ public void testCreateMapWithArrayValue() {
     };
     Schema type = Stream
         .of(Schema.Field.of("map",
-            TypeName.MAP.type().withMapType(TypeName.INT32,
+            TypeName.MAP.type().withMapType(TypeName.INT32.type(),
                 
TypeName.ARRAY.type().withCollectionElementType(TypeName.STRING.type()))))
         .collect(toSchema());
     Row row = Row.withSchema(type).addValue(data).build();
@@ -251,8 +251,8 @@ public void testCreateMapWithMapValue() {
     };
     Schema type = Stream
         .of(Schema.Field.of("map",
-            TypeName.MAP.type().withMapType(TypeName.INT32,
-                TypeName.MAP.type().withMapType(TypeName.INT32, 
TypeName.STRING.type()))))
+            TypeName.MAP.type().withMapType(TypeName.INT32.type(),
+                TypeName.MAP.type().withMapType(TypeName.INT32.type(), 
TypeName.STRING.type()))))
         .collect(toSchema());
     Row row = Row.withSchema(type).addValue(data).build();
     assertEquals(data, row.getMap("map"));
@@ -268,7 +268,8 @@ public void testCreateMapWithRowValue() {
         put(2, Row.withSchema(nestedType).addValues("two").build());
       }
     };
-    Schema type = Stream.of(Schema.Field.of("map", 
TypeName.MAP.type().withMapType(TypeName.INT32,
+    Schema type = Stream.of(Schema.Field.of("map", 
TypeName.MAP.type().withMapType(
+        TypeName.INT32.type(),
         TypeName.ROW.type().withRowSchema(nestedType)))).collect(toSchema());
     Row row = Row.withSchema(type).addValue(data).build();
     assertEquals(data, row.getMap("map"));
diff --git a/sdks/java/extensions/sql/build.gradle 
b/sdks/java/extensions/sql/build.gradle
index 02955a0d2ff..0fe01bf967e 100644
--- a/sdks/java/extensions/sql/build.gradle
+++ b/sdks/java/extensions/sql/build.gradle
@@ -71,6 +71,7 @@ dependencies {
   testCompile library.java.junit
   testCompile library.java.hamcrest_core
   testCompile library.java.mockito_core
+  testCompile library.java.quickcheck_core
 }
 
 // Copy Caclcite templates and our own template into the build directory
diff --git a/sdks/java/extensions/sql/pom.xml b/sdks/java/extensions/sql/pom.xml
index 61fcb319f68..d3139834e95 100644
--- a/sdks/java/extensions/sql/pom.xml
+++ b/sdks/java/extensions/sql/pom.xml
@@ -38,6 +38,7 @@
     <calcite.version>1.16.0</calcite.version>
     <avatica.version>1.11.0</avatica.version>
     <mockito.version>1.9.5</mockito.version>
+    <quickcheck.version>0.8</quickcheck.version>
     
     <!-- charset that calcite will use for the tables in the tests -->
     <!-- need to setup as system property prior to running any tests
@@ -419,6 +420,12 @@
       <version>${mockito.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.pholser</groupId>
+      <artifactId>junit-quickcheck-core</artifactId>
+      <version>${quickcheck.version}</version>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>com.alibaba</groupId>
       <artifactId>fastjson</artifactId>
diff --git a/sdks/java/extensions/sql/src/main/codegen/config.fmpp 
b/sdks/java/extensions/sql/src/main/codegen/config.fmpp
index 5ecb3d53fe4..264c78cb569 100644
--- a/sdks/java/extensions/sql/src/main/codegen/config.fmpp
+++ b/sdks/java/extensions/sql/src/main/codegen/config.fmpp
@@ -24,23 +24,27 @@ data: {
         "org.apache.calcite.schema.ColumnStrategy"
         "org.apache.calcite.sql.SqlCreate"
         "org.apache.calcite.sql.SqlDrop"
+        "org.apache.beam.sdk.extensions.sql.impl.parser.SqlCreateTable"
         "org.apache.beam.sdk.extensions.sql.impl.parser.SqlDdlNodes"
+        "org.apache.beam.sdk.schemas.Schema"
+        "org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils"
+        "org.apache.calcite.sql.type.SqlTypeName"
       ]
 
       # List of keywords.
       keywords: [
         "COMMENT"
         "IF"
-       "LOCATION"
-       "TBLPROPERTIES"
+        "LOCATION"
+        "TBLPROPERTIES"
       ]
 
       # List of keywords from "keywords" section that are not reserved.
       nonReservedKeywords: [
         "COMMENT"
         "IF"
-       "LOCATION"
-       "TBLPROPERTIES"
+        "LOCATION"
+        "TBLPROPERTIES"
       ]
 
       # List of methods for parsing custom SQL statements.
diff --git a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl 
b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
index 30705443f85..be6f5864fc6 100644
--- a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
+++ b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
@@ -59,53 +59,80 @@ void Option(List<SqlNode> list) :
     }
 }
 
-SqlNodeList TableElementList() :
+List<Schema.Field> FieldListParens() :
 {
-    final Span s;
-    final List<SqlNode> list = Lists.newArrayList();
+    final List<Schema.Field> fields;
+}
+{
+    <LPAREN>
+        fields = FieldListBody()
+    <RPAREN>
+    {
+        return fields;
+    }
+}
+
+List<Schema.Field> FieldListAngular() :
+{
+    final List<Schema.Field> fields;
+}
+{
+    <LT>
+        fields = FieldListBody()
+    <GT>
+    {
+        return fields;
+    }
+}
+
+List<Schema.Field> FieldListBody() :
+{
+    final List<Schema.Field> fields = Lists.newArrayList();
+    Schema.Field field = null;
 }
 {
-    <LPAREN> { s = span(); }
-    TableElement(list)
+    field = Field() { fields.add(field); }
     (
-        <COMMA> TableElement(list)
+        <COMMA> field = Field() { fields.add(field); }
     )*
-    <RPAREN> {
-        return new SqlNodeList(list, s.end(this));
+    {
+        return fields;
     }
 }
 
-void TableElement(List<SqlNode> list) :
+Schema.Field Field() :
 {
-    final SqlIdentifier id;
-    final SqlDataTypeSpec type;
+    final String name;
+    final Schema.FieldType type;
     final boolean nullable;
+    Schema.Field field = null;
     SqlNode comment = null;
-    final Span s = Span.of();
 }
 {
-    id = SimpleIdentifier()
+    name = Identifier()
+    type = FieldType()
+    {
+        field = Schema.Field.of(name.toLowerCase(), type);
+    }
     (
-        type = DataType()
-        (
-            <NULL> { nullable = true; }
-        |
-            <NOT> <NULL> { nullable = false; }
-        |
-            { nullable = true; }
-        )
-        [ <COMMENT> comment = StringLiteral() ]
-        {
-            list.add(
-                SqlDdlNodes.column(s.add(id).end(this), id,
-                    type.withNullable(nullable), comment));
-        }
+        <NULL> { field = field.withNullable(true); }
+    |
+        <NOT> <NULL> { field = field.withNullable(false); }
     |
-        { list.add(id); }
+        { field = field.withNullable(true); }
     )
-|
-    id = SimpleIdentifier() {
-        list.add(id);
+    [
+        <COMMENT> comment = StringLiteral()
+        {
+            if (comment != null) {
+                String commentString =
+                    ((NlsString) SqlLiteral.value(comment)).getValue();
+                field = field.withDescription(commentString);
+            }
+        }
+    ]
+    {
+        return field;
     }
 }
 
@@ -123,22 +150,32 @@ SqlCreate SqlCreateTable(Span s, boolean replace) :
 {
     final boolean ifNotExists;
     final SqlIdentifier id;
-    SqlNodeList tableElementList = null;
+    List<Schema.Field> fieldList = null;
     SqlNode type = null;
     SqlNode comment = null;
     SqlNode location = null;
     SqlNode tblProperties = null;
 }
 {
-    <TABLE> ifNotExists = IfNotExistsOpt() id = CompoundIdentifier()
-    tableElementList = TableElementList()
+    <TABLE> ifNotExists = IfNotExistsOpt()
+    id = CompoundIdentifier()
+    fieldList = FieldListParens()
     <TYPE> type = StringLiteral()
     [ <COMMENT> comment = StringLiteral() ]
     [ <LOCATION> location = StringLiteral() ]
     [ <TBLPROPERTIES> tblProperties = StringLiteral() ]
     {
-        return SqlDdlNodes.createTable(s.end(this), replace, ifNotExists, id,
-            tableElementList, type, comment, location, tblProperties);
+        return
+            new SqlCreateTable(
+                s.end(this),
+                replace,
+                ifNotExists,
+                id,
+                fieldList,
+                type,
+                comment,
+                location,
+                tblProperties);
     }
 }
 
@@ -153,4 +190,97 @@ SqlDrop SqlDropTable(Span s, boolean replace) :
     }
 }
 
+Schema.FieldType FieldType() :
+{
+    final SqlTypeName collectionTypeName;
+    Schema.FieldType fieldType;
+    final Span s = Span.of();
+}
+{
+    (
+        fieldType = Map()
+    |
+        fieldType = Array()
+    |
+        fieldType = Row()
+    |
+        fieldType = SimpleType()
+    )
+    {
+        return fieldType;
+    }
+}
+
+Schema.FieldType Array() :
+{
+    final Schema.FieldType arrayElementType;
+}
+{
+    <ARRAY> <LT> arrayElementType = FieldType() <GT>
+    {
+        return Schema.TypeName.ARRAY.type()
+            .withCollectionElementType(arrayElementType);
+    }
+
+}
+
+Schema.FieldType Map() :
+{
+    final Schema.FieldType mapKeyType;
+    final Schema.FieldType mapValueType;
+}
+{
+    <MAP>
+        <LT>
+            mapKeyType = SimpleType()
+        <COMMA>
+            mapValueType = FieldType()
+        <GT>
+    {
+        return Schema.TypeName.MAP.type()
+            .withMapType(mapKeyType, mapValueType);
+    }
+}
+
+Schema.FieldType Row() :
+{
+    final List<Schema.Field> fields;
+}
+{
+    <ROW> fields = RowFields()
+    {
+        Schema rowSchema = Schema.builder().addFields(fields).build();
+        return Schema.TypeName.ROW.type()
+            .withRowSchema(rowSchema);
+    }
+}
+
+List<Schema.Field> RowFields() :
+{
+    final List<Schema.Field> fields;
+}
+{
+    (
+        fields = FieldListParens()
+    |
+        fields = FieldListAngular()
+    )
+    {
+        return fields;
+    }
+}
+
+Schema.FieldType SimpleType() :
+{
+    final Span s = Span.of();
+    final SqlTypeName simpleTypeName;
+}
+{
+    simpleTypeName = SqlTypeName(s)
+    {
+        s.end(this);
+        return CalciteUtils.toFieldType(simpleTypeName);
+    }
+}
+
 // End parserImpls.ftl
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
index c8984e1f684..799224b48c7 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
@@ -18,32 +18,28 @@
 
 import static com.alibaba.fastjson.JSON.parseObject;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.sdk.schemas.Schema.toSchema;
 import static org.apache.calcite.util.Static.RESOURCE;
 
 import com.alibaba.fastjson.JSONObject;
-import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema;
-import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.meta.Column;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.schemas.Schema;
 import org.apache.calcite.jdbc.CalcitePrepare;
 import org.apache.calcite.jdbc.CalciteSchema;
-import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.sql.SqlCreate;
 import org.apache.calcite.sql.SqlExecutableStatement;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlSpecialOperator;
 import org.apache.calcite.sql.SqlUtil;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.util.ImmutableNullableList;
 import org.apache.calcite.util.NlsString;
 import org.apache.calcite.util.Pair;
 
@@ -53,7 +49,7 @@
 public class SqlCreateTable extends SqlCreate
     implements SqlExecutableStatement {
   private final SqlIdentifier name;
-  private final SqlNodeList columnList;
+  private final List<Schema.Field> columnList;
   private final SqlNode type;
   private final SqlNode comment;
   private final SqlNode location;
@@ -62,10 +58,19 @@
   private static final SqlOperator OPERATOR =
       new SqlSpecialOperator("CREATE TABLE", SqlKind.CREATE_TABLE);
 
-  /** Creates a SqlCreateTable. */
-  SqlCreateTable(SqlParserPos pos, boolean replace, boolean ifNotExists,
-      SqlIdentifier name, SqlNodeList columnList, SqlNode type,
-      SqlNode comment, SqlNode location, SqlNode tblProperties) {
+  /**
+   * Creates a SqlCreateTable.
+   */
+  public SqlCreateTable(
+      SqlParserPos pos,
+      boolean replace,
+      boolean ifNotExists,
+      SqlIdentifier name,
+      List<Schema.Field> columnList,
+      SqlNode type,
+      SqlNode comment,
+      SqlNode location,
+      SqlNode tblProperties) {
     super(OPERATOR, pos, replace, ifNotExists);
     this.name = checkNotNull(name);
     this.columnList = columnList; // may be null
@@ -75,23 +80,24 @@
     this.tblProperties = tblProperties; // may be null
   }
 
+  @Override
   public List<SqlNode> getOperandList() {
-    return ImmutableNullableList.of(name, columnList, type, comment, location, 
tblProperties);
+    throw new UnsupportedOperationException(
+        "Getting operands CREATE TABLE is unsupported at the moment");
   }
 
-  @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) 
{
+  @Override
+  public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
     writer.keyword("CREATE");
     writer.keyword("TABLE");
     if (ifNotExists) {
       writer.keyword("IF NOT EXISTS");
     }
     name.unparse(writer, leftPrec, rightPrec);
+
     if (columnList != null) {
       SqlWriter.Frame frame = writer.startList("(", ")");
-      for (SqlNode c : columnList) {
-        writer.sep(",");
-        c.unparse(writer, 0, 0);
-      }
+      columnList.forEach(column -> unparseColumn(writer, column));
       writer.endList(frame);
     }
     writer.keyword("TYPE");
@@ -119,57 +125,52 @@ public void execute(CalcitePrepare.Context context) {
       if (!ifNotExists) {
         // They did not specify IF NOT EXISTS, so give error.
         throw SqlUtil.newContextException(name.getParserPosition(),
-            RESOURCE.tableExists(pair.right));
+                                          RESOURCE.tableExists(pair.right));
       }
       return;
     }
     // Table does not exist. Create it.
     if (!(pair.left.schema instanceof BeamCalciteSchema)) {
-      throw SqlUtil.newContextException(name.getParserPosition(),
+      throw SqlUtil.newContextException(
+          name.getParserPosition(),
           RESOURCE.internal("Schema is not instanceof BeamCalciteSchema"));
     }
     BeamCalciteSchema schema = (BeamCalciteSchema) pair.left.schema;
     schema.getTableProvider().createTable(toTable());
   }
 
+  private void unparseColumn(SqlWriter writer, Schema.Field column) {
+    writer.sep(",");
+    writer.identifier(column.getName());
+    writer.identifier(CalciteUtils.toSqlTypeName(column.getType()).name());
+
+    if (column.getNullable() != null && !column.getNullable()) {
+      writer.keyword("NOT NULL");
+    }
+
+    if (column.getDescription() != null) {
+      writer.keyword("COMMENT");
+      writer.literal(column.getDescription());
+    }
+  }
+
   private String getString(SqlNode n) {
     return n == null ? null : ((NlsString) SqlLiteral.value(n)).getValue();
   }
 
-  public Table toTable() {
-    List<Column> columns = new ArrayList<>(columnList.size());
-    for (Ord<SqlNode> c : Ord.zip(columnList)) {
-      if (c.e instanceof SqlColumnDeclaration) {
-        final SqlColumnDeclaration d = (SqlColumnDeclaration) c.e;
-        Column column = Column.builder()
-            .name(d.name.getSimple().toLowerCase())
-            .fieldType(CalciteUtils.toFieldType(
-                
d.dataType.deriveType(BeamQueryPlanner.TYPE_FACTORY).getSqlTypeName()))
-            .nullable(d.dataType.getNullable())
-            .comment(getString(d.comment))
+  Table toTable() {
+    return
+        Table
+            .builder()
+            .type(getString(type).toLowerCase())
+            .name(name.getSimple().toLowerCase())
+            .schema(columnList.stream().collect(toSchema()))
+            .comment(getString(comment))
+            .location(getString(location))
+            .properties((tblProperties == null)
+                            ? new JSONObject()
+                            : parseObject(getString(tblProperties)))
             .build();
-        columns.add(column);
-      } else {
-        throw new AssertionError(c.e.getClass());
-      }
-    }
-
-    Table.Builder tb = Table.builder()
-        .type(getString(type).toLowerCase())
-        .name(name.getSimple().toLowerCase())
-        .columns(columns);
-    if (comment != null) {
-      tb.comment(getString(comment));
-    }
-    if (location != null) {
-      tb.location(getString(location));
-    }
-    if (tblProperties != null) {
-      tb.properties(parseObject(getString(tblProperties)));
-    } else {
-      tb.properties(new JSONObject());
-    }
-    return tb.build();
   }
 }
 
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
index 9a779b8b4d6..5a54fa5302c 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
@@ -22,7 +22,6 @@
 import org.apache.calcite.sql.SqlDataTypeSpec;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.Util;
@@ -33,14 +32,6 @@
 public class SqlDdlNodes {
   private SqlDdlNodes() {}
 
-  /** Creates a CREATE TABLE. */
-  public static SqlCreateTable createTable(SqlParserPos pos, boolean replace,
-      boolean ifNotExists, SqlIdentifier name, SqlNodeList columnList,
-      SqlNode type, SqlNode comment, SqlNode location, SqlNode tblProperties) {
-    return new SqlCreateTable(pos, replace, ifNotExists, name, columnList,
-        type, comment, location, tblProperties);
-  }
-
   /** Creates a DROP TABLE. */
   public static SqlDropTable dropTable(SqlParserPos pos, boolean ifExists,
       SqlIdentifier name) {
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
index 140b1c10531..3d76773fe2f 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
@@ -117,7 +117,8 @@ public static FieldType toFieldType(RelDataType 
calciteType) {
       type = type.withRowSchema(toBeamSchema(calciteType));
     }
     if (calciteType.getKeyType() != null && calciteType.getValueType() != 
null) {
-      type = 
type.withMapType(toFieldType(calciteType.getKeyType()).getTypeName(),
+      type = type.withMapType(
+          toFieldType(calciteType.getKeyType()),
           toFieldType(calciteType.getValueType()));
     }
     return type;
@@ -132,12 +133,14 @@ public static FieldType toArrayType(RelDataType 
collectionElementType) {
   }
 
   public static FieldType toMapType(SqlTypeName componentKeyType, SqlTypeName 
componentValueType) {
-    return 
TypeName.MAP.type().withMapType(toFieldType(componentKeyType).getTypeName(),
+    return TypeName.MAP.type().withMapType(
+        toFieldType(componentKeyType),
         toFieldType(componentValueType));
   }
 
   public static FieldType toMapType(RelDataType componentKeyType, RelDataType 
componentValueType) {
-    return 
TypeName.MAP.type().withMapType(toFieldType(componentKeyType).getTypeName(),
+    return TypeName.MAP.type().withMapType(
+        toFieldType(componentKeyType),
         toFieldType(componentValueType));
   }
 
@@ -172,7 +175,7 @@ private static RelDataType toRelDataType(
       return dataTypeFactory.createArrayType(collectionElementType, 
UNLIMITED_ARRAY_SIZE);
     } else if (SqlTypeName.MAP.equals(typeName)) {
       RelDataType componentKeyType = toRelDataType(
-          dataTypeFactory, fieldType.getMapKeyType().type());
+          dataTypeFactory, fieldType.getMapKeyType());
       RelDataType componentValueType = toRelDataType(
           dataTypeFactory, fieldType.getMapValueType());
       return dataTypeFactory.createMapType(componentKeyType, 
componentValueType);
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Column.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Column.java
deleted file mode 100644
index 04f61a47699..00000000000
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Column.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.meta;
-
-import com.google.auto.value.AutoValue;
-import java.io.Serializable;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.schemas.Schema.FieldType;
-
-/**
- * Metadata class for a {@code BeamSqlTable} column.
- */
-@AutoValue
-public abstract class Column implements Serializable {
-  public abstract String getName();
-  public abstract FieldType getFieldType();
-  public abstract Boolean getNullable();
-
-  @Nullable
-  public abstract String getComment();
-
-  public static Builder builder() {
-    return new 
org.apache.beam.sdk.extensions.sql.meta.AutoValue_Column.Builder();
-  }
-
-  /**
-   * Builder class for {@link Column}.
-   */
-  @AutoValue.Builder
-  public abstract static class Builder {
-    public abstract Builder name(String name);
-    public abstract Builder fieldType(FieldType fieldType);
-    public abstract Builder nullable(Boolean nullable);
-    public abstract Builder comment(String comment);
-    public abstract Column build();
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java
index a3339afc37d..cf95ddc473f 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java
@@ -21,8 +21,8 @@
 import com.alibaba.fastjson.JSONObject;
 import com.google.auto.value.AutoValue;
 import java.io.Serializable;
-import java.util.List;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.Schema;
 
 /**
  * Represents the metadata of a {@code BeamSqlTable}.
@@ -32,7 +32,7 @@
   /** type of the table. */
   public abstract String getType();
   public abstract String getName();
-  public abstract List<Column> getColumns();
+  public abstract Schema getSchema();
   @Nullable
   public abstract String getComment();
   @Nullable
@@ -51,7 +51,7 @@ public static Builder builder() {
   public abstract static class Builder {
     public abstract Builder type(String type);
     public abstract Builder name(String name);
-    public abstract Builder columns(List<Column> columns);
+    public abstract Builder schema(Schema getSchema);
     public abstract Builder comment(String name);
     public abstract Builder location(String location);
     public abstract Builder properties(JSONObject properties);
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/BeamSqlTableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/BeamSqlTableProvider.java
index 928069f9db3..bb661986369 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/BeamSqlTableProvider.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/BeamSqlTableProvider.java
@@ -19,10 +19,10 @@
 package org.apache.beam.sdk.extensions.sql.meta.provider;
 
 import com.google.common.collect.ImmutableMap;
-import java.util.Collections;
 import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.schemas.Schema;
 
 /**
  * A {@code BeamSqlTableProvider} provides read only set of {@code 
BeamSqlTable}.
@@ -58,7 +58,7 @@ public void dropTable(String tableName) {
           Table.builder()
             .type(getTableType())
             .name(table.getKey())
-            .columns(Collections.emptyList())
+            .schema(Schema.builder().build())
             .build());
     }
     return map.build();
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/MetaUtils.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/MetaUtils.java
deleted file mode 100644
index 6d61726bd55..00000000000
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/MetaUtils.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.meta.provider;
-
-import static org.apache.beam.sdk.schemas.Schema.toSchema;
-
-import org.apache.beam.sdk.extensions.sql.meta.Column;
-import org.apache.beam.sdk.extensions.sql.meta.Table;
-import org.apache.beam.sdk.schemas.Schema;
-
-/**
- * Utility methods for metadata.
- */
-public class MetaUtils {
-  public static Schema getRowTypeFromTable(Table table) {
-    return
-        table
-            .getColumns()
-            .stream()
-            .map(MetaUtils::toRecordField)
-            .collect(toSchema());
-  }
-
-  private static Schema.Field toRecordField(Column column) {
-    String description = column.getComment() != null ? column.getComment() : 
"";
-    return Schema.Field.of(column.getName(), column.getFieldType())
-        .withDescription(description)
-        .withNullable(column.getNullable());
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
index 4550d02744a..99246f3837d 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
@@ -18,8 +18,6 @@
 
 package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
 
-import static 
org.apache.beam.sdk.extensions.sql.meta.provider.MetaUtils.getRowTypeFromTable;
-
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import java.util.ArrayList;
@@ -46,7 +44,7 @@
  */
 public class KafkaTableProvider extends InMemoryMetaTableProvider {
   @Override public BeamSqlTable buildBeamSqlTable(Table table) {
-    Schema schema = getRowTypeFromTable(table);
+    Schema schema = table.getSchema();
 
     JSONObject properties = table.getProperties();
     String bootstrapServers = properties.getString("bootstrap.servers");
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
index 55805a2b9a6..c936573ca5a 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
@@ -18,8 +18,6 @@
 
 package org.apache.beam.sdk.extensions.sql.meta.provider.text;
 
-import static 
org.apache.beam.sdk.extensions.sql.meta.provider.MetaUtils.getRowTypeFromTable;
-
 import com.alibaba.fastjson.JSONObject;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
@@ -49,7 +47,7 @@
   }
 
   @Override public BeamSqlTable buildBeamSqlTable(Table table) {
-    Schema schema = getRowTypeFromTable(table);
+    Schema schema = table.getSchema();
 
     String filePattern = table.getLocation();
     CSVFormat format = CSVFormat.DEFAULT;
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
index ebbffddf3cb..05e30479abb 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
@@ -17,13 +17,22 @@
  */
 package org.apache.beam.sdk.extensions.sql;
 
+import static org.apache.beam.sdk.extensions.sql.RowSqlTypes.BOOLEAN;
+import static org.apache.beam.sdk.extensions.sql.RowSqlTypes.INTEGER;
+import static org.apache.beam.sdk.extensions.sql.RowSqlTypes.VARCHAR;
+import static org.apache.beam.sdk.schemas.Schema.TypeName.ARRAY;
+import static org.apache.beam.sdk.schemas.Schema.TypeName.MAP;
+import static org.apache.beam.sdk.schemas.Schema.TypeName.ROW;
+import static org.apache.beam.sdk.schemas.Schema.toSchema;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
+import java.util.stream.Stream;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
+import org.apache.beam.sdk.schemas.Schema.Field;
 import org.apache.calcite.tools.ValidationException;
 import org.junit.Test;
 
@@ -41,13 +50,144 @@ public void testExecute_createTextTable() throws Exception 
{
     cli.execute(
         "create table person (\n"
         + "id int COMMENT 'id', \n"
-        + "name varchar(31) COMMENT 'name', \n"
+        + "name varchar COMMENT 'name', \n"
         + "age int COMMENT 'age') \n"
         + "TYPE 'text' \n"
         + "COMMENT '' LOCATION '/home/admin/orders'"
     );
     Table table = metaStore.getTables().get("person");
     assertNotNull(table);
+    assertEquals(
+        Stream
+            .of(
+                Field.of("id", 
INTEGER).withDescription("id").withNullable(true),
+                Field.of("name", 
VARCHAR).withDescription("name").withNullable(true),
+                Field.of("age", 
INTEGER).withDescription("age").withNullable(true))
+            .collect(toSchema()),
+        table.getSchema());
+  }
+
+  @Test
+  public void testExecute_createTableWithPrefixArrayField() throws Exception {
+    InMemoryMetaStore metaStore = new InMemoryMetaStore();
+    metaStore.registerProvider(new TextTableProvider());
+
+    BeamSqlCli cli = new BeamSqlCli()
+        .metaStore(metaStore);
+    cli.execute(
+        "create table person (\n"
+        + "id int COMMENT 'id', \n"
+        + "name varchar COMMENT 'name', \n"
+        + "age int COMMENT 'age', \n"
+        + "tags ARRAY<VARCHAR>, \n"
+        + "matrix ARRAY<ARRAY<INTEGER>> \n"
+        + ") \n"
+        + "TYPE 'text' \n"
+        + "COMMENT '' LOCATION '/home/admin/orders'"
+    );
+    Table table = metaStore.getTables().get("person");
+    assertNotNull(table);
+    assertEquals(
+        Stream
+            .of(
+                Field.of("id", 
INTEGER).withDescription("id").withNullable(true),
+                Field.of("name", 
VARCHAR).withDescription("name").withNullable(true),
+                Field.of("age", 
INTEGER).withDescription("age").withNullable(true),
+                Field.of("tags",
+                         
ARRAY.type().withCollectionElementType(VARCHAR)).withNullable(true),
+                Field.of("matrix",
+                         ARRAY.type().withCollectionElementType(
+                             
ARRAY.type().withCollectionElementType(INTEGER))).withNullable(true))
+            .collect(toSchema()),
+        table.getSchema());
+  }
+
+  @Test
+  public void testExecute_createTableWithPrefixMapField() throws Exception {
+    InMemoryMetaStore metaStore = new InMemoryMetaStore();
+    metaStore.registerProvider(new TextTableProvider());
+
+    BeamSqlCli cli = new BeamSqlCli()
+        .metaStore(metaStore);
+    cli.execute(
+        "create table person (\n"
+        + "id int COMMENT 'id', \n"
+        + "name varchar COMMENT 'name', \n"
+        + "age int COMMENT 'age', \n"
+        + "tags MAP<VARCHAR, VARCHAR>, \n"
+        + "nestedMap MAP<INTEGER, MAP<VARCHAR, INTEGER>> \n"
+        + ") \n"
+        + "TYPE 'text' \n"
+        + "COMMENT '' LOCATION '/home/admin/orders'"
+    );
+    Table table = metaStore.getTables().get("person");
+    assertNotNull(table);
+    assertEquals(
+        Stream
+            .of(
+                Field.of("id", 
INTEGER).withDescription("id").withNullable(true),
+                Field.of("name", 
VARCHAR).withDescription("name").withNullable(true),
+                Field.of("age", 
INTEGER).withDescription("age").withNullable(true),
+                Field.of("tags",
+                         MAP.type().withMapType(VARCHAR, 
VARCHAR)).withNullable(true),
+                Field.of("nestedmap",
+                         MAP.type().withMapType(
+                             INTEGER,
+                             MAP.type().withMapType(VARCHAR, 
INTEGER))).withNullable(true))
+            .collect(toSchema()),
+        table.getSchema());
+  }
+
+  @Test
+  public void testExecute_createTableWithRowField() throws Exception {
+    InMemoryMetaStore metaStore = new InMemoryMetaStore();
+    metaStore.registerProvider(new TextTableProvider());
+
+    BeamSqlCli cli = new BeamSqlCli()
+        .metaStore(metaStore);
+    cli.execute(
+        "create table person (\n"
+        + "id int COMMENT 'id', \n"
+        + "name varchar COMMENT 'name', \n"
+        + "age int COMMENT 'age', \n"
+        + "address ROW ( \n"
+        + "  street VARCHAR, \n"
+        + "  country VARCHAR \n"
+        + "  ), \n"
+        + "addressAngular ROW< \n"
+        + "  street VARCHAR, \n"
+        + "  country VARCHAR \n"
+        + "  >, \n"
+        + "isRobot BOOLEAN"
+        + ") \n"
+        + "TYPE 'text' \n"
+        + "COMMENT '' LOCATION '/home/admin/orders'"
+    );
+    Table table = metaStore.getTables().get("person");
+    assertNotNull(table);
+    assertEquals(
+        Stream
+            .of(
+                Field.of("id", 
INTEGER).withDescription("id").withNullable(true),
+                Field.of("name", 
VARCHAR).withDescription("name").withNullable(true),
+                Field.of("age", 
INTEGER).withDescription("age").withNullable(true),
+                Field.of("address",
+                         ROW.type().withRowSchema(
+                             RowSqlTypes
+                                 .builder()
+                                 .withVarcharField("street")
+                                 .withVarcharField("country")
+                                 .build())).withNullable(true),
+                Field.of("addressangular",
+                         ROW.type().withRowSchema(
+                             RowSqlTypes
+                                 .builder()
+                                 .withVarcharField("street")
+                                 .withVarcharField("country")
+                                 .build())).withNullable(true),
+                Field.of("isrobot", BOOLEAN).withNullable(true))
+            .collect(toSchema()),
+        table.getSchema());
   }
 
   @Test
@@ -60,7 +200,7 @@ public void testExecute_dropTable() throws Exception {
     cli.execute(
         "create table person (\n"
             + "id int COMMENT 'id', \n"
-            + "name varchar(31) COMMENT 'name', \n"
+            + "name varchar COMMENT 'name', \n"
             + "age int COMMENT 'age') \n"
             + "TYPE 'text' \n"
             + "COMMENT '' LOCATION '/home/admin/orders'"
@@ -83,7 +223,7 @@ public void 
testExecute_dropTable_assertTableRemovedFromPlanner() throws Excepti
     cli.execute(
         "create table person (\n"
             + "id int COMMENT 'id', \n"
-            + "name varchar(31) COMMENT 'name', \n"
+            + "name varchar COMMENT 'name', \n"
             + "age int COMMENT 'age') \n"
             + "TYPE 'text' \n"
             + "COMMENT '' LOCATION '/home/admin/orders'"
@@ -92,7 +232,6 @@ public void 
testExecute_dropTable_assertTableRemovedFromPlanner() throws Excepti
     cli.explainQuery("select * from person");
   }
 
-
   @Test
   public void testExplainQuery() throws Exception {
     InMemoryMetaStore metaStore = new InMemoryMetaStore();
@@ -104,7 +243,7 @@ public void testExplainQuery() throws Exception {
     cli.execute(
         "create table person (\n"
             + "id int COMMENT 'id', \n"
-            + "name varchar(31) COMMENT 'name', \n"
+            + "name varchar COMMENT 'name', \n"
             + "age int COMMENT 'age') \n"
             + "TYPE 'text' \n"
             + "COMMENT '' LOCATION '/home/admin/orders'"
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLNestedTypesTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLNestedTypesTest.java
new file mode 100644
index 00000000000..5a9ca6c2ce3
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLNestedTypesTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.parser;
+
+import static java.util.stream.Collectors.joining;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import com.pholser.junit.quickcheck.From;
+import com.pholser.junit.quickcheck.Property;
+import com.pholser.junit.quickcheck.runner.JUnitQuickcheck;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.extensions.sql.utils.QuickCheckGenerators;
+import 
org.apache.beam.sdk.extensions.sql.utils.QuickCheckGenerators.AnyFieldType;
+import 
org.apache.beam.sdk.extensions.sql.utils.QuickCheckGenerators.PrimitiveTypes;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.calcite.sql.SqlNode;
+import org.junit.runner.RunWith;
+
+/**
+ * Tests nested types using {@link JUnitQuickcheck}.
+ *
+ * <p>Types are randomly generated by {@link QuickCheckGenerators generators}.
+ *
+ * <p>By default quick check runs this test 100 times.
+ */
+@RunWith(JUnitQuickcheck.class)
+public class BeamDDLNestedTypesTest {
+
+  @Property
+  public void supportsNestedTypes(@From(AnyFieldType.class) FieldType 
generatedFieldType) {
+    String fieldTypeDeclaration = unparse(generatedFieldType);
+
+    Table table = executeCreateTableWith(fieldTypeDeclaration);
+
+    Schema expectedSchema = newSimpleSchemaWith(generatedFieldType);
+
+    assertEquals(expectedSchema, table.getSchema());
+  }
+
+  @Property
+  public void supportsPrimitiveTypes(@From(PrimitiveTypes.class) FieldType 
fieldType) {
+    String fieldTypeDeclaration = unparse(fieldType);
+
+    Table table = executeCreateTableWith(fieldTypeDeclaration);
+
+    Schema expectedSchema = newSimpleSchemaWith(fieldType);
+
+    assertEquals(expectedSchema, table.getSchema());
+  }
+
+  private Table executeCreateTableWith(String fieldType) {
+    String createTable =
+        "create table tablename ( "
+        + "fieldName " + fieldType + " ) "
+        + "TYPE 'text' "
+        + "LOCATION '/home/admin/person'\n";
+    System.out.println(createTable);
+
+    BeamSqlParser parser = new BeamSqlParser(createTable);
+    SqlNode sqlNode;
+    try {
+      sqlNode = parser.impl().parseSqlStmtEof();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    assertNotNull(sqlNode);
+    assertTrue(sqlNode instanceof SqlCreateTable);
+    SqlCreateTable stmt = (SqlCreateTable) sqlNode;
+    return stmt.toTable();
+  }
+
+  private Schema newSimpleSchemaWith(FieldType fieldType) {
+    return Schema
+        .builder()
+        .addField(Field.of("fieldname", fieldType).withNullable(true))
+        .build();
+  }
+
+  private String unparse(FieldType fieldType) {
+    if (fieldType.getTypeName().isMapType()) {
+      return unparseMap(fieldType);
+    } else if (fieldType.getTypeName().isCollectionType()){
+      return unparseArray(fieldType);
+    } else if (fieldType.getTypeName().isCompositeType()) {
+      return unparseRow(fieldType);
+    } else {
+      return unparsePrimitive(fieldType);
+    }
+  }
+
+  private String unparsePrimitive(FieldType fieldType) {
+    return CalciteUtils.toSqlTypeName(fieldType).getName();
+  }
+
+  private String unparseArray(FieldType fieldType) {
+    return "ARRAY<" + unparse(fieldType.getCollectionElementType()) + ">";
+  }
+
+  private String unparseMap(FieldType fieldType) {
+    return "MAP<"
+           + unparse(fieldType.getMapKeyType())
+           + ", "
+           + unparse(fieldType.getMapValueType()) + ">";
+  }
+
+  private String unparseRow(FieldType fieldType) {
+    return
+        "ROW<"
+        + fieldType
+            .getRowSchema()
+            .getFields()
+            .stream()
+            .map(field -> field.getName() + " " + unparse(field.getType()))
+            .collect(joining(","))
+        + ">";
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
index 5605da717bc..8bfe6c53237 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
@@ -17,16 +17,17 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.parser;
 
+import static org.apache.beam.sdk.schemas.Schema.toSchema;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
-import com.google.common.collect.ImmutableList;
+import java.util.stream.Stream;
 import org.apache.beam.sdk.extensions.sql.RowSqlTypes;
-import org.apache.beam.sdk.extensions.sql.meta.Column;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
 import org.apache.calcite.sql.SqlNode;
 import org.junit.Test;
@@ -46,7 +47,7 @@ public void testParseCreateTable_full() throws Exception {
     Table table = parseTable(
         "create table person (\n"
             + "id int COMMENT 'id', \n"
-            + "name varchar(31) COMMENT 'name') \n"
+            + "name varchar COMMENT 'name') \n"
             + "TYPE 'text' \n"
             + "COMMENT 'person table' \n"
             + "LOCATION '/home/admin/person'\n"
@@ -63,7 +64,7 @@ public void testParseCreateTable_withoutType() throws 
Exception {
     parseTable(
         "create table person (\n"
             + "id int COMMENT 'id', \n"
-            + "name varchar(31) COMMENT 'name') \n"
+            + "name varchar COMMENT 'name') \n"
             + "COMMENT 'person table' \n"
             + "LOCATION '/home/admin/person'\n"
             + "TBLPROPERTIES '{\"hello\": [\"james\", \"bond\"]}'"
@@ -81,7 +82,7 @@ public void testParseCreateTable_withoutTableComment() throws 
Exception {
     Table table = parseTable(
         "create table person (\n"
             + "id int COMMENT 'id', \n"
-            + "name varchar(31) COMMENT 'name') \n"
+            + "name varchar COMMENT 'name') \n"
             + "TYPE 'text' \n"
             + "LOCATION '/home/admin/person'\n"
             + "TBLPROPERTIES '{\"hello\": [\"james\", \"bond\"]}'"
@@ -94,7 +95,7 @@ public void testParseCreateTable_withoutTblProperties() 
throws Exception {
     Table table = parseTable(
         "create table person (\n"
             + "id int COMMENT 'id', \n"
-            + "name varchar(31) COMMENT 'name') \n"
+            + "name varchar COMMENT 'name') \n"
             + "TYPE 'text' \n"
             + "COMMENT 'person table' \n"
             + "LOCATION '/home/admin/person'\n"
@@ -110,7 +111,7 @@ public void testParseCreateTable_withoutLocation() throws 
Exception {
     Table table = parseTable(
         "create table person (\n"
             + "id int COMMENT 'id', \n"
-            + "name varchar(31) COMMENT 'name') \n"
+            + "name varchar COMMENT 'name') \n"
             + "TYPE 'text' \n"
             + "COMMENT 'person table' \n"
     );
@@ -150,25 +151,23 @@ private static Table mockTable(String name, String type, 
String comment, JSONObj
   private static Table mockTable(String name, String type, String comment, 
JSONObject properties,
       String location) {
 
-    return Table.builder()
+    return Table
+        .builder()
         .name(name)
         .type(type)
         .comment(comment)
         .location(location)
-        .columns(ImmutableList.of(
-            Column.builder()
-                .name("id")
-                .fieldType(TypeName.INT32.type())
-                .nullable(true)
-                .comment("id")
-                .build(),
-            Column.builder()
-                .name("name")
-                .fieldType(RowSqlTypes.VARCHAR)
-                .nullable(true)
-                .comment("name")
-                .build()
-        ))
+        .schema(
+            Stream.of(
+                Schema.Field
+                    .of("id", TypeName.INT32.type())
+                    .withNullable(true)
+                    .withDescription("id"),
+                Schema.Field
+                    .of("name", RowSqlTypes.VARCHAR)
+                    .withNullable(true)
+                    .withDescription("name"))
+                  .collect(toSchema()))
         .properties(properties)
         .build();
   }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
index ea50bc70c9f..1a58de37ba7 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
 
+import static org.apache.beam.sdk.schemas.Schema.toSchema;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -24,10 +25,11 @@
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.google.common.collect.ImmutableList;
+import java.util.stream.Stream;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.RowSqlTypes;
-import org.apache.beam.sdk.extensions.sql.meta.Column;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
 import org.junit.Test;
 
@@ -61,21 +63,16 @@ private static Table mockTable(String name) {
     topics.add("topic2");
     properties.put("topics", topics);
 
-    return Table.builder()
+    return Table
+        .builder()
         .name(name)
         .comment(name + " table")
         .location("kafka://localhost:2181/brokers?topic=test")
-        .columns(ImmutableList.of(
-            Column.builder()
-                .name("id")
-                .fieldType(TypeName.INT32.type())
-                .nullable(true)
-            .build(),
-            Column.builder()
-                .name("name")
-                .fieldType(RowSqlTypes.VARCHAR)
-                .nullable(true)
-                .build()))
+        .schema(
+            Stream.of(
+                Schema.Field.of("id", 
TypeName.INT32.type()).withNullable(true),
+                Schema.Field.of("name", 
RowSqlTypes.VARCHAR).withNullable(true))
+                  .collect(toSchema()))
         .type("kafka")
         .properties(properties)
         .build();
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
index 2e6c59fb865..8e0bb016064 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
@@ -17,16 +17,17 @@
  */
 package org.apache.beam.sdk.extensions.sql.meta.provider.text;
 
+import static org.apache.beam.sdk.schemas.Schema.toSchema;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import com.alibaba.fastjson.JSONObject;
-import com.google.common.collect.ImmutableList;
+import java.util.stream.Stream;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.RowSqlTypes;
-import org.apache.beam.sdk.extensions.sql.meta.Column;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
 import org.apache.commons.csv.CSVFormat;
 import org.junit.Test;
@@ -72,21 +73,16 @@ private static Table mockTable(String name, String format) {
     if (format != null) {
       properties.put("format", format);
     }
-    return Table.builder()
+    return Table
+        .builder()
         .name(name)
         .comment(name + " table")
         .location("/home/admin/" + name)
-        .columns(ImmutableList.of(
-            Column.builder()
-                .name("id")
-                .fieldType(TypeName.INT32.type())
-                .nullable(true)
-                .build(),
-            Column.builder()
-                .name("name")
-                .fieldType(RowSqlTypes.VARCHAR)
-                .nullable(true)
-                .build()))
+        .schema(
+            Stream.of(
+                Schema.Field.of("id", 
TypeName.INT32.type()).withNullable(true),
+                Schema.Field.of("name", 
RowSqlTypes.VARCHAR).withNullable(true))
+                  .collect(toSchema()))
         .type("text")
         .properties(properties)
         .build();
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
index 5a42b31b8d1..2574bff9669 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStoreTest.java
@@ -18,20 +18,21 @@
 package org.apache.beam.sdk.extensions.sql.meta.store;
 
 
+import static org.apache.beam.sdk.schemas.Schema.toSchema;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 
 import com.alibaba.fastjson.JSONObject;
-import com.google.common.collect.ImmutableList;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.stream.Stream;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.RowSqlTypes;
-import org.apache.beam.sdk.extensions.sql.meta.Column;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider;
+import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
 import org.hamcrest.Matchers;
 import org.junit.Before;
@@ -117,21 +118,16 @@ public void testRegisterProvider_duplicatedTableName() 
throws Exception {
   }
 
   private static Table mockTable(String name, String type) {
-    return Table.builder()
+    return Table
+        .builder()
         .name(name)
         .comment(name + " table")
         .location("/home/admin/" + name)
-        .columns(ImmutableList.of(
-            Column.builder()
-                .name("id")
-                .fieldType(TypeName.INT32.type())
-                .nullable(true)
-                .build(),
-            Column.builder()
-                .name("name")
-                .fieldType(RowSqlTypes.VARCHAR)
-                .nullable(true)
-                .build()))
+        .schema(
+            Stream.of(
+                Schema.Field.of("id", 
TypeName.INT32.type()).withNullable(true),
+                Schema.Field.of("name", 
RowSqlTypes.VARCHAR).withNullable(true))
+                  .collect(toSchema()))
         .type(type)
         .properties(new JSONObject())
         .build();
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/utils/QuickCheckGenerators.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/utils/QuickCheckGenerators.java
new file mode 100644
index 00000000000..4236eaaf660
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/utils/QuickCheckGenerators.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.utils;
+
+import static java.util.Arrays.asList;
+import static java.util.stream.Collectors.toList;
+import static org.apache.beam.sdk.schemas.Schema.toSchema;
+
+import com.pholser.junit.quickcheck.generator.GenerationStatus;
+import com.pholser.junit.quickcheck.generator.Generator;
+import com.pholser.junit.quickcheck.random.SourceOfRandomness;
+import com.pholser.junit.quickcheck.runner.JUnitQuickcheck;
+import java.util.List;
+import java.util.stream.IntStream;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+
+/**
+ * Field type generators invoked by {@link JUnitQuickcheck}.
+ */
+public class QuickCheckGenerators {
+
+  private static final FieldTypeGenerator PRIMITIVE_TYPES = new 
PrimitiveTypes();
+  private static final FieldTypeGenerator ARRAYS_OF_ANY = new Arrays();
+  private static final FieldTypeGenerator MAPS_OF_ANY = new Maps();
+  private static final FieldTypeGenerator ROWS = new Rows();
+  private static final FieldTypeGenerator ANY_TYPE = new AnyFieldType();
+
+  /**
+   * Generates a primitive {@link TypeName SQL type} randomly.
+   */
+  public static class PrimitiveTypes extends FieldTypeGenerator {
+    private static final List<FieldType> PRIMITIVE_TYPES =
+        java.util.Arrays.stream(TypeName.values())
+                        .filter(TypeName::isPrimitiveType)
+                        .map(TypeName::type)
+                        .collect(toList());
+
+    @Override
+    public FieldType generateFieldType(SourceOfRandomness random, 
GenerationStatus status) {
+      return random.choose(PRIMITIVE_TYPES);
+    }
+  }
+
+  /**
+   * Generates {@link TypeName#ARRAY SQL arrays} with random element type.
+   */
+  public static class Arrays extends FieldTypeGenerator {
+    @Override
+    public FieldType generateFieldType(SourceOfRandomness random, 
GenerationStatus status) {
+      return TypeName.ARRAY.type().withCollectionElementType(
+          ANY_TYPE.generate(random, status));
+    }
+  }
+
+  /**
+   * Generates {@link TypeName#MAP SQL maps} with random primitive key type 
and random value type.
+   */
+  public static class Maps extends FieldTypeGenerator {
+    @Override
+    public FieldType generateFieldType(SourceOfRandomness random, 
GenerationStatus status) {
+      return TypeName.MAP.type().withMapType(
+          PRIMITIVE_TYPES.generate(random, status),
+          ANY_TYPE.generate(random, status));
+    }
+  }
+
+  /**
+   * Generates {@link TypeName#ROW SQL rows} with random field types.
+   */
+  public static class Rows extends FieldTypeGenerator {
+    @Override
+    public FieldType generateFieldType(SourceOfRandomness random, 
GenerationStatus status) {
+      // stop at 10 levels of nesting to avoid stack overflows
+      FieldTypeGenerator rowFieldTypesGenerator =
+          (nestingLevel(status) >= 10)
+              ? PRIMITIVE_TYPES
+              : ANY_TYPE;
+
+      return TypeName.ROW.type().withRowSchema(
+          generateSchema(rowFieldTypesGenerator, random, status));
+    }
+
+    private Schema generateSchema(
+        FieldTypeGenerator fieldTypeGenerator,
+        SourceOfRandomness random,
+        GenerationStatus status) {
+
+      return
+          IntStream
+              .range(0, status.size() + 1)
+              .mapToObj(i -> Field.of(
+                  "field_" + i,
+                  fieldTypeGenerator.generate(random, 
status)).withNullable(true))
+              .collect(toSchema());
+    }
+  }
+
+  /**
+   * Generates a {@link FieldType}, randomly delegating to specific type 
generators.
+   */
+  public static class AnyFieldType extends FieldTypeGenerator {
+    @Override
+    public FieldType generateFieldType(SourceOfRandomness random, 
GenerationStatus status) {
+      return random.choose(asList(PRIMITIVE_TYPES, ARRAYS_OF_ANY, MAPS_OF_ANY, 
ROWS))
+                   .generate(random, status);
+    }
+  }
+
+  abstract static class FieldTypeGenerator extends Generator<FieldType> {
+    static final GenerationStatus.Key<Integer> NESTING_KEY =
+        new GenerationStatus.Key<>("nesting", Integer.class);
+
+    FieldTypeGenerator() {
+      super(FieldType.class);
+    }
+
+    @Override
+    public FieldType generate(SourceOfRandomness random, GenerationStatus 
status) {
+      incrementNesting(status);
+      return generateFieldType(random, status);
+    }
+
+    protected abstract FieldType generateFieldType(
+        SourceOfRandomness random,
+        GenerationStatus status);
+
+    int nestingLevel(GenerationStatus status) {
+      return status.valueOf(NESTING_KEY).orElse(-1);
+    }
+
+    void incrementNesting(GenerationStatus status) {
+      status.setValue(NESTING_KEY, nestingLevel(status) + 1);
+    }
+  }
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 100207)
    Time Spent: 5h 50m  (was: 5h 40m)

> [SQL] Support Complex Types in DDL
> ----------------------------------
>
>                 Key: BEAM-4196
>                 URL: https://issues.apache.org/jira/browse/BEAM-4196
>             Project: Beam
>          Issue Type: New Feature
>          Components: dsl-sql
>            Reporter: Anton Kedin
>            Assignee: Anton Kedin
>            Priority: Major
>          Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> Neither our DDL parser we copied from calcite-server or the calcite-server 
> don't support complex types in DDL. If we want to model something like JSON 
> objects we need to support at least Arrays and nested Rows.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to