[ 
https://issues.apache.org/jira/browse/BEAM-4077?focusedWorklogId=107261&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-107261
 ]

ASF GitHub Bot logged work on BEAM-4077:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/May/18 17:17
            Start Date: 30/May/18 17:17
    Worklog Time Spent: 10m 
      Work Description: tgroh closed pull request #5490: [BEAM-4077] Refactor 
schemas and fields, simplify a bit
URL: https://github.com/apache/beam/pull/5490
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
index a9d2f333d12..94af2c7a809 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
@@ -73,53 +73,53 @@ public Builder addField(Field field) {
       return this;
     }
 
-    public Builder addByteField(String name, boolean nullable) {
-      fields.add(Field.of(name, TypeName.BYTE.type()).withNullable(nullable));
+    public Builder addByteField(String name) {
+      fields.add(Field.of(name, TypeName.BYTE.type()));
       return this;
     }
 
-    public Builder addInt16Field(String name, boolean nullable) {
-      fields.add(Field.of(name, TypeName.INT16.type()).withNullable(nullable));
+    public Builder addInt16Field(String name) {
+      fields.add(Field.of(name, TypeName.INT16.type()));
       return this;
     }
 
-    public Builder addInt32Field(String name, boolean nullable) {
-      fields.add(Field.of(name, TypeName.INT32.type()).withNullable(nullable));
+    public Builder addInt32Field(String name) {
+      fields.add(Field.of(name, TypeName.INT32.type()));
       return this;
     }
 
-    public Builder addInt64Field(String name, boolean nullable) {
-      fields.add(Field.of(name, TypeName.INT64.type()).withNullable(nullable));
+    public Builder addInt64Field(String name) {
+      fields.add(Field.of(name, TypeName.INT64.type()));
       return this;
     }
 
-    public Builder addDecimalField(String name, boolean nullable) {
-      fields.add(Field.of(name, 
TypeName.DECIMAL.type()).withNullable(nullable));
+    public Builder addDecimalField(String name) {
+      fields.add(Field.of(name, TypeName.DECIMAL.type()));
       return this;
     }
 
-    public Builder addFloatField(String name, boolean nullable) {
-      fields.add(Field.of(name, TypeName.FLOAT.type()).withNullable(nullable));
+    public Builder addFloatField(String name) {
+      fields.add(Field.of(name, TypeName.FLOAT.type()));
       return this;
     }
 
-    public Builder addDoubleField(String name, boolean nullable) {
-      fields.add(Field.of(name, 
TypeName.DOUBLE.type()).withNullable(nullable));
+    public Builder addDoubleField(String name) {
+      fields.add(Field.of(name, TypeName.DOUBLE.type()));
       return this;
     }
 
-    public Builder addStringField(String name, boolean nullable) {
-      fields.add(Field.of(name, 
TypeName.STRING.type()).withNullable(nullable));
+    public Builder addStringField(String name) {
+      fields.add(Field.of(name, TypeName.STRING.type()));
       return this;
     }
 
-    public Builder addDateTimeField(String name, boolean nullable) {
-      fields.add(Field.of(name, 
TypeName.DATETIME.type()).withNullable(nullable));
+    public Builder addDateTimeField(String name) {
+      fields.add(Field.of(name, TypeName.DATETIME.type()));
       return this;
     }
 
-    public Builder addBooleanField(String name, boolean nullable) {
-      fields.add(Field.of(name, 
TypeName.BOOLEAN.type()).withNullable(nullable));
+    public Builder addBooleanField(String name) {
+      fields.add(Field.of(name, TypeName.BOOLEAN.type()));
       return this;
     }
 
@@ -129,9 +129,14 @@ public Builder addArrayField(String name, FieldType 
collectionElementType) {
       return this;
     }
 
-    public Builder addRowField(String name, Schema fieldSchema, boolean 
nullable) {
-      fields.add(Field.of(name, TypeName.ROW.type().withRowSchema(fieldSchema))
-          .withNullable(nullable));
+    public Builder addRowField(String name, Schema fieldSchema) {
+      fields.add(Field.of(name, 
TypeName.ROW.type().withRowSchema(fieldSchema)));
+      return this;
+    }
+
+    public Builder addMapField(
+        String name, FieldType keyType, FieldType valueType) {
+      fields.add(Field.of(name, TypeName.MAP.type().withMapType(keyType, 
valueType)));
       return this;
     }
 
@@ -204,8 +209,6 @@ public int hashCode() {
     MAP,
     ROW;    // The field is itself a nested row.
 
-    private final FieldType fieldType = FieldType.of(this);
-
     public static final Set<TypeName> NUMERIC_TYPES = ImmutableSet.of(
         BYTE, INT16, INT32, INT64, DECIMAL, FLOAT, DOUBLE);
     public static final Set<TypeName> STRING_TYPES = ImmutableSet.of(STRING);
@@ -236,9 +239,14 @@ public boolean isCompositeType() {
       return COMPOSITE_TYPES.contains(this);
     }
 
-    /** Returns a {@link FieldType} representing this primitive type. */
+    /**
+     * Returns a {@link FieldType} representing this primitive type.
+     *
+     * @deprecated a {@link TypeName} is not a type, so this conversion is not 
sound.
+     */
+    @Deprecated
     public FieldType type() {
-      return fieldType;
+      return FieldType.of(this);
     }
   }
 
@@ -265,6 +273,11 @@ public FieldType type() {
     @SuppressWarnings("mutable")
     @Nullable public abstract byte[] getMetadata();
     abstract FieldType.Builder toBuilder();
+
+    public static Builder forTypeName(TypeName typeName) {
+      return new AutoValue_Schema_FieldType.Builder().setTypeName(typeName);
+    }
+
     @AutoValue.Builder
     abstract static class Builder {
       abstract Builder setTypeName(TypeName typeName);
@@ -280,7 +293,55 @@ public FieldType type() {
      * Create a {@link FieldType} for the given type.
      */
     public static FieldType of(TypeName typeName) {
-      return new 
AutoValue_Schema_FieldType.Builder().setTypeName(typeName).build();
+      return forTypeName(typeName).build();
+    }
+
+    /** The type of string fields. */
+    public static final FieldType STRING = FieldType.of(TypeName.STRING);
+
+    /** The type of byte fields. */
+    public static final FieldType BYTE = FieldType.of(TypeName.BYTE);
+
+    /** The type of int16 fields. */
+    public static final FieldType INT16 = FieldType.of(TypeName.INT16);
+
+    /** The type of int32 fields. */
+    public static final FieldType INT32 = FieldType.of(TypeName.INT32);
+
+    /** The type of int64 fields. */
+    public static final FieldType INT64 = FieldType.of(TypeName.INT64);
+
+    /** The type of float fields. */
+    public static final FieldType FLOAT = FieldType.of(TypeName.FLOAT);
+
+    /** The type of double fields. */
+    public static final FieldType DOUBLE = FieldType.of(TypeName.DOUBLE);
+
+    /** The type of decimal fields. */
+    public static final FieldType DECIMAL = FieldType.of(TypeName.DECIMAL);
+
+    /** The type of boolean fields. */
+    public static final FieldType BOOLEAN = FieldType.of(TypeName.BOOLEAN);
+
+    /** The type of datetime fields. */
+    public static final FieldType DATETIME = FieldType.of(TypeName.DATETIME);
+
+    /** Create an array type for the given field type. */
+    public static final FieldType array(FieldType elementType) {
+      return 
FieldType.forTypeName(TypeName.ARRAY).setCollectionElementType(elementType).build();
+    }
+
+    /** Create a map type for the given key and value types. */
+    public static final FieldType map(FieldType keyType, FieldType valueType) {
+      return FieldType.forTypeName(TypeName.MAP)
+          .setMapKeyType(keyType)
+          .setMapValueType(valueType)
+          .build();
+    }
+
+    /** Create a map type for the given key and value types. */
+    public static final FieldType row(Schema schema) {
+      return FieldType.forTypeName(TypeName.ROW).setRowSchema(schema).build();
     }
 
     /**
@@ -393,7 +454,7 @@ public int hashCode() {
     }
 
     /**
-     * Return's a field with the give name.
+     * Return's a field with the give name and type.
      */
     public static Field of(String name, FieldType fieldType) {
       return new AutoValue_Schema_Field.Builder()
@@ -404,6 +465,18 @@ public static Field of(String name, FieldType fieldType) {
           .build();
     }
 
+    /**
+     * Return's a nullable field with the give name and type.
+     */
+    public static Field nullable(String name, FieldType fieldType) {
+      return new AutoValue_Schema_Field.Builder()
+          .setName(name)
+          .setDescription("")
+          .setType(fieldType)
+          .setNullable(true)
+          .build();
+    }
+
     /**
      * Returns a copy of the Field with the name set.
      */
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/org/apache/beam/sdk/coders/RowCoderTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/org/apache/beam/sdk/coders/RowCoderTest.java
index cd1d96aebc4..2c62715b29c 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/org/apache/beam/sdk/coders/RowCoderTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/org/apache/beam/sdk/coders/RowCoderTest.java
@@ -35,9 +35,7 @@
 import org.joda.time.DateTimeZone;
 import org.junit.Test;
 
-/**
- * Unit tests for {@link RowCoder}.
- */
+/** Unit tests for {@link RowCoder}. */
 public class RowCoderTest {
 
   void checkEncodeDecode(Row row) throws IOException {
@@ -50,16 +48,16 @@ void checkEncodeDecode(Row row) throws IOException {
   @Test
   public void testPrimitiveTypes() throws Exception {
     Schema schema = Schema.builder()
-        .addByteField("f_byte", false)
-        .addInt16Field("f_int16", false)
-        .addInt32Field("f_int32", false)
-        .addInt64Field("f_int64", false)
-        .addDecimalField("f_decimal", false)
-        .addFloatField("f_float", false)
-        .addDoubleField("f_double", false)
-        .addStringField("f_string", false)
-        .addDateTimeField("f_datetime", false)
-        .addBooleanField("f_boolean", false).build();
+        .addByteField("f_byte")
+        .addInt16Field("f_int16")
+        .addInt32Field("f_int32")
+        .addInt64Field("f_int64")
+        .addDecimalField("f_decimal")
+        .addFloatField("f_float")
+        .addDoubleField("f_double")
+        .addStringField("f_string")
+        .addDateTimeField("f_datetime")
+        .addBooleanField("f_boolean").build();
 
     DateTime dateTime = new DateTime().withDate(1979, 03, 14)
         .withTime(1, 2, 3, 4)
@@ -76,11 +74,11 @@ public void testPrimitiveTypes() throws Exception {
   @Test
   public void testNestedTypes() throws Exception {
     Schema nestedSchema = Schema.builder()
-        .addInt32Field("f1_int", false)
-        .addStringField("f1_str", false).build();
+        .addInt32Field("f1_int")
+        .addStringField("f1_str").build();
     Schema schema = Schema.builder()
-        .addInt32Field("f_int", false)
-        .addRowField("nested", nestedSchema, false).build();
+        .addInt32Field("f_int")
+        .addRowField("nested", nestedSchema).build();
 
     Row nestedRow = Row.withSchema(nestedSchema).addValues(18, 
"foobar").build();
     Row row = Row.withSchema(schema).addValues(42, nestedRow).build();
@@ -99,8 +97,8 @@ public void testArrays() throws Exception {
   @Test
   public void testArrayOfRow() throws Exception {
     Schema nestedSchema = Schema.builder()
-        .addInt32Field("f1_int", false)
-        .addStringField("f1_str", false).build();
+        .addInt32Field("f1_int")
+        .addStringField("f1_str").build();
     FieldType collectionElementType = 
TypeName.ROW.type().withRowSchema(nestedSchema);
     Schema schema = Schema.builder().addArrayField("f_array", 
collectionElementType).build();
     Row row = Row.withSchema(schema).addArray(
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java
index f33edc65d20..fa8470ec89a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java
@@ -39,16 +39,16 @@
   @Test
   public void testCreate() {
     Schema schema = Schema.builder()
-        .addByteField("f_byte", false)
-        .addInt16Field("f_int16", false)
-        .addInt32Field("f_int32", false)
-        .addInt64Field("f_int64", false)
-        .addDecimalField("f_decimal", false)
-        .addFloatField("f_float", false)
-        .addDoubleField("f_double", false)
-        .addStringField("f_string", false)
-        .addDateTimeField("f_datetime", false)
-        .addBooleanField("f_boolean", false).build();
+        .addByteField("f_byte")
+        .addInt16Field("f_int16")
+        .addInt32Field("f_int32")
+        .addInt64Field("f_int64")
+        .addDecimalField("f_decimal")
+        .addFloatField("f_float")
+        .addDoubleField("f_double")
+        .addStringField("f_string")
+        .addDateTimeField("f_datetime")
+        .addBooleanField("f_boolean").build();
     assertEquals(10, schema.getFieldCount());
 
     assertEquals(0, schema.indexOf("f_byte"));
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/JsonToRowTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/JsonToRowTest.java
index 021bf287426..fa4ad756804 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/JsonToRowTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/JsonToRowTest.java
@@ -36,8 +36,6 @@
 @RunWith(JUnit4.class)
 public class JsonToRowTest implements Serializable {
 
-  private static final boolean NOT_NULLABLE = false;
-
   @Rule
   public transient TestPipeline pipeline = TestPipeline.create();
 
@@ -47,9 +45,9 @@ public void testParsesRows() throws Exception {
     Schema personSchema =
         Schema
             .builder()
-            .addStringField("name", NOT_NULLABLE)
-            .addInt32Field("height", NOT_NULLABLE)
-            .addBooleanField("knowsJavascript", NOT_NULLABLE)
+            .addStringField("name")
+            .addInt32Field("height")
+            .addBooleanField("knowsJavascript")
             .build();
 
     PCollection<String> jsonPersons = pipeline
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonDeserializerTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonDeserializerTest.java
index a93b3f81e32..56210d3d963 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonDeserializerTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonDeserializerTest.java
@@ -50,9 +50,6 @@
  * Unit tests for {@link RowJsonDeserializer}.
  */
 public class RowJsonDeserializerTest {
-  private static final boolean NOT_NULLABLE = false;
-  private static final boolean NULLABLE = true;
-
   private static final Boolean BOOLEAN_TRUE_VALUE = true;
   private static final String BOOLEAN_TRUE_STRING = "true";
   private static final Byte BYTE_VALUE = 126;
@@ -76,14 +73,14 @@ public void testParsesFlatRow() throws Exception {
     Schema schema =
         Schema
             .builder()
-            .addByteField("f_byte", NOT_NULLABLE)
-            .addInt16Field("f_int16", NOT_NULLABLE)
-            .addInt32Field("f_int32", NOT_NULLABLE)
-            .addInt64Field("f_int64", NOT_NULLABLE)
-            .addFloatField("f_float", NOT_NULLABLE)
-            .addDoubleField("f_double", NOT_NULLABLE)
-            .addBooleanField("f_boolean", NOT_NULLABLE)
-            .addStringField("f_string", NOT_NULLABLE)
+            .addByteField("f_byte")
+            .addInt16Field("f_int16")
+            .addInt32Field("f_int32")
+            .addInt64Field("f_int64")
+            .addFloatField("f_float")
+            .addDoubleField("f_double")
+            .addBooleanField("f_boolean")
+            .addStringField("f_string")
             .build();
 
     String rowString = "{\n"
@@ -115,7 +112,7 @@ public void testParsesArrayField() throws Exception {
     Schema schema =
         Schema
             .builder()
-            .addInt32Field("f_int32", NOT_NULLABLE)
+            .addInt32Field("f_int32")
             .addArrayField("f_intArray", INT32.type())
             .build();
 
@@ -194,15 +191,15 @@ public void testParsesRowField() throws Exception {
     Schema nestedRowSchema =
         Schema
             .builder()
-            .addInt32Field("f_nestedInt32", NOT_NULLABLE)
-            .addStringField("f_nestedString", NOT_NULLABLE)
+            .addInt32Field("f_nestedInt32")
+            .addStringField("f_nestedString")
             .build();
 
     Schema schema =
         Schema
             .builder()
-            .addInt32Field("f_int32", NOT_NULLABLE)
-            .addRowField("f_row", nestedRowSchema, NOT_NULLABLE)
+            .addInt32Field("f_int32")
+            .addRowField("f_row", nestedRowSchema)
             .build();
 
     String rowString = "{\n"
@@ -231,15 +228,15 @@ public void testThrowsForMismatchedRowField() throws 
Exception {
     Schema nestedRowSchema =
         Schema
             .builder()
-            .addInt32Field("f_nestedInt32", NOT_NULLABLE)
-            .addStringField("f_nestedString", NOT_NULLABLE)
+            .addInt32Field("f_nestedInt32")
+            .addStringField("f_nestedString")
             .build();
 
     Schema schema =
         Schema
             .builder()
-            .addInt32Field("f_int32", NOT_NULLABLE)
-            .addRowField("f_row", nestedRowSchema, NOT_NULLABLE)
+            .addInt32Field("f_int32")
+            .addRowField("f_row", nestedRowSchema)
             .build();
 
     String rowString = "{\n"
@@ -261,19 +258,19 @@ public void testParsesNestedRowField() throws Exception {
     Schema doubleNestedRowSchema =
         Schema
             .builder()
-            .addStringField("f_doubleNestedString", NOT_NULLABLE)
+            .addStringField("f_doubleNestedString")
             .build();
 
     Schema nestedRowSchema =
         Schema
             .builder()
-            .addRowField("f_nestedRow", doubleNestedRowSchema, NOT_NULLABLE)
+            .addRowField("f_nestedRow", doubleNestedRowSchema)
             .build();
 
     Schema schema =
         Schema
             .builder()
-            .addRowField("f_row", nestedRowSchema, NOT_NULLABLE)
+            .addRowField("f_row", nestedRowSchema)
             .build();
 
     String rowString = "{\n"
@@ -310,7 +307,7 @@ public void testThrowsForUnsupportedType() throws Exception 
{
     Schema schema =
         Schema
             .builder()
-            .addDateTimeField("f_dateTime", NOT_NULLABLE)
+            .addDateTimeField("f_dateTime")
             .build();
 
     thrown.expect(UnsupportedRowJsonException.class);
@@ -344,7 +341,7 @@ public void testThrowsForUnsupportedNestedFieldType() 
throws Exception {
     Schema schema =
         Schema
             .builder()
-            .addRowField("f_nestedRow", nestedSchema, NOT_NULLABLE)
+            .addRowField("f_nestedRow", nestedSchema)
             .build();
 
     thrown.expect(UnsupportedRowJsonException.class);
@@ -358,8 +355,8 @@ public void testParsesNulls() throws Exception {
     Schema schema =
         Schema
             .builder()
-            .addByteField("f_byte", NOT_NULLABLE)
-            .addStringField("f_string", NULLABLE)
+            .addByteField("f_byte")
+            .addField(Schema.Field.nullable("f_string", FieldType.STRING))
             .build();
 
     String rowString = "{\n"
@@ -385,8 +382,8 @@ public void testThrowsForMissingNotNullableField() throws 
Exception {
     Schema schema =
         Schema
             .builder()
-            .addByteField("f_byte", NOT_NULLABLE)
-            .addStringField("f_string", NOT_NULLABLE)
+            .addByteField("f_byte")
+            .addStringField("f_string")
             .build();
 
     String rowString = "{\n"
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java
index b29f8c37104..25c84230fa9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java
@@ -78,16 +78,16 @@ public void testRejectsNullRecord() {
   @Test
   public void testCreatesRecord() {
     Schema schema = Schema.builder()
-        .addByteField("f_byte", false)
-        .addInt16Field("f_int16", false)
-        .addInt32Field("f_int32", false)
-        .addInt64Field("f_int64", false)
-        .addDecimalField("f_decimal", false)
-        .addFloatField("f_float", false)
-        .addDoubleField("f_double", false)
-        .addStringField("f_string", false)
-        .addDateTimeField("f_datetime", false)
-        .addBooleanField("f_boolean", false).build();
+        .addByteField("f_byte")
+        .addInt16Field("f_int16")
+        .addInt32Field("f_int32")
+        .addInt64Field("f_int64")
+        .addDecimalField("f_decimal")
+        .addFloatField("f_float")
+        .addDoubleField("f_double")
+        .addStringField("f_string")
+        .addDateTimeField("f_datetime")
+        .addBooleanField("f_boolean").build();
 
     DateTime dateTime = new DateTime().withDate(1979, 03, 14)
         .withTime(1, 2, 3, 4)
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/InferredRowCoderTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/InferredRowCoderTest.java
index e590735689e..d7a93ec5f40 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/InferredRowCoderTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/InferredRowCoderTest.java
@@ -31,13 +31,11 @@
  * Unit tests for {@link InferredRowCoder}.
  */
 public class InferredRowCoderTest {
-  private static final boolean NOT_NULLABLE = false;
-
   private static final Schema PERSON_ROW_TYPE =
       Schema
           .builder()
-          .addInt32Field("ageYears", NOT_NULLABLE)
-          .addStringField("name", NOT_NULLABLE)
+          .addInt32Field("ageYears")
+          .addStringField("name")
           .build();
 
   private static final PersonPojo PERSON_FOO = new PersonPojo("Foo", 13);
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlTypes.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlTypes.java
index 7ae03a6e102..72cf5efa5a8 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlTypes.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlTypes.java
@@ -163,7 +163,7 @@ public Builder withArrayField(String fieldName, Schema 
schema) {
     }
 
     public Builder withRowField(String fieldName, Schema schema) {
-      builder.addRowField(fieldName, schema, true);
+      builder.addField(Field.nullable(fieldName, FieldType.row(schema)));
       return this;
     }
 
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
index 4032592d067..9cd3e0c4cbe 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
@@ -43,29 +43,27 @@
   // Same story with CHAR and VARCHAR - they both map to STRING.
   private static final BiMap<FieldType, SqlTypeName> 
BEAM_TO_CALCITE_TYPE_MAPPING =
       ImmutableBiMap.<FieldType, SqlTypeName>builder()
-          .put(TypeName.BYTE.type(), SqlTypeName.TINYINT)
-          .put(TypeName.INT16.type(), SqlTypeName.SMALLINT)
-          .put(TypeName.INT32.type(), SqlTypeName.INTEGER)
-          .put(TypeName.INT64.type(), SqlTypeName.BIGINT)
-          .put(TypeName.FLOAT.type(), SqlTypeName.FLOAT)
-          .put(TypeName.DOUBLE.type(), SqlTypeName.DOUBLE)
-          .put(TypeName.DECIMAL.type(), SqlTypeName.DECIMAL)
-          .put(TypeName.BOOLEAN.type(), SqlTypeName.BOOLEAN)
-          .put(TypeName.MAP.type(), SqlTypeName.MAP)
-          .put(TypeName.ARRAY.type(), SqlTypeName.ARRAY)
-          .put(TypeName.ROW.type(), SqlTypeName.ROW)
-          .put(TypeName.DATETIME.type().withMetadata("DATE"), SqlTypeName.DATE)
-          .put(TypeName.DATETIME.type().withMetadata("TIME"), SqlTypeName.TIME)
+          .put(FieldType.BYTE, SqlTypeName.TINYINT)
+          .put(FieldType.INT16, SqlTypeName.SMALLINT)
+          .put(FieldType.INT32, SqlTypeName.INTEGER)
+          .put(FieldType.INT64, SqlTypeName.BIGINT)
+          .put(FieldType.FLOAT, SqlTypeName.FLOAT)
+          .put(FieldType.DOUBLE, SqlTypeName.DOUBLE)
+          .put(FieldType.DECIMAL, SqlTypeName.DECIMAL)
+          .put(FieldType.BOOLEAN, SqlTypeName.BOOLEAN)
+          .put(FieldType.DATETIME.withMetadata("DATE"), SqlTypeName.DATE)
+          .put(FieldType.DATETIME.withMetadata("TIME"), SqlTypeName.TIME)
           .put(
-              TypeName.DATETIME.type().withMetadata("TIME_WITH_LOCAL_TZ"),
+              FieldType.DATETIME.withMetadata("TIME_WITH_LOCAL_TZ"),
               SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE)
-          .put(TypeName.DATETIME.type().withMetadata("TS"), 
SqlTypeName.TIMESTAMP)
+          .put(FieldType.DATETIME.withMetadata("TS"), SqlTypeName.TIMESTAMP)
           .put(
-              TypeName.DATETIME.type().withMetadata("TS_WITH_LOCAL_TZ"),
+              FieldType.DATETIME.withMetadata("TS_WITH_LOCAL_TZ"),
               SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE)
-          .put(TypeName.STRING.type().withMetadata("CHAR"), SqlTypeName.CHAR)
-          .put(TypeName.STRING.type().withMetadata("VARCHAR"), 
SqlTypeName.VARCHAR)
+          .put(FieldType.STRING.withMetadata("CHAR"), SqlTypeName.CHAR)
+          .put(FieldType.STRING.withMetadata("VARCHAR"), SqlTypeName.VARCHAR)
           .build();
+
   private static final BiMap<SqlTypeName, FieldType> 
CALCITE_TO_BEAM_TYPE_MAPPING =
       BEAM_TO_CALCITE_TYPE_MAPPING.inverse();
 
@@ -73,8 +71,8 @@
   // default mapping.
   private static final Map<FieldType, SqlTypeName> 
BEAM_TO_CALCITE_DEFAULT_MAPPING =
       ImmutableMap.of(
-          TypeName.DATETIME.type(), SqlTypeName.TIMESTAMP,
-          TypeName.STRING.type(), SqlTypeName.VARCHAR);
+          FieldType.DATETIME, SqlTypeName.TIMESTAMP,
+          FieldType.STRING, SqlTypeName.VARCHAR);
 
   /** Generate {@link Schema} from {@code RelDataType} which is used to create 
table. */
   public static Schema toBeamSchema(RelDataType tableInfo) {
@@ -103,19 +101,19 @@ public static FieldType toFieldType(SqlTypeName 
sqlTypeName) {
   }
 
   public static FieldType toFieldType(RelDataType calciteType) {
-    FieldType type = toFieldType((calciteType.getSqlTypeName()));
-    if (calciteType.getComponentType() != null) {
-      type = 
type.withCollectionElementType(toFieldType(calciteType.getComponentType()));
-    }
-    if (calciteType.isStruct()) {
-      type = type.withRowSchema(toBeamSchema(calciteType));
+    switch (calciteType.getSqlTypeName()) {
+      case ARRAY:
+      case MULTISET:
+        return FieldType.array(toFieldType(calciteType.getComponentType()));
+      case MAP:
+        return FieldType.map(
+            toFieldType(calciteType.getKeyType()), 
toFieldType(calciteType.getValueType()));
+      case ROW:
+        return FieldType.row(toBeamSchema(calciteType));
+
+      default:
+        return toFieldType(calciteType.getSqlTypeName());
     }
-    if (calciteType.getKeyType() != null && calciteType.getValueType() != 
null) {
-      type =
-          type.withMapType(
-              toFieldType(calciteType.getKeyType()), 
toFieldType(calciteType.getValueType()));
-    }
-    return type;
   }
 
   public static FieldType toArrayType(SqlTypeName collectionElementType) {
@@ -158,19 +156,20 @@ public static RelDataType toCalciteRowType(Schema schema, 
RelDataTypeFactory dat
 
   private static RelDataType toRelDataType(
       RelDataTypeFactory dataTypeFactory, FieldType fieldType) {
-    SqlTypeName typeName = toSqlTypeName(fieldType);
-    if (SqlTypeName.ARRAY.equals(typeName)) {
-      RelDataType collectionElementType =
-          toRelDataType(dataTypeFactory, fieldType.getCollectionElementType());
-      return dataTypeFactory.createArrayType(collectionElementType, 
UNLIMITED_ARRAY_SIZE);
-    } else if (SqlTypeName.MAP.equals(typeName)) {
-      RelDataType componentKeyType = toRelDataType(dataTypeFactory, 
fieldType.getMapKeyType());
-      RelDataType componentValueType = toRelDataType(dataTypeFactory, 
fieldType.getMapValueType());
-      return dataTypeFactory.createMapType(componentKeyType, 
componentValueType);
-    } else if (SqlTypeName.ROW.equals(typeName)) {
-      return toCalciteRowType(fieldType.getRowSchema(), dataTypeFactory);
-    } else {
-      return dataTypeFactory.createSqlType(typeName);
+    switch (fieldType.getTypeName()) {
+      case ARRAY:
+        return dataTypeFactory.createArrayType(
+            toRelDataType(dataTypeFactory, 
fieldType.getCollectionElementType()),
+            UNLIMITED_ARRAY_SIZE);
+      case MAP:
+        RelDataType componentKeyType = toRelDataType(dataTypeFactory, 
fieldType.getMapKeyType());
+        RelDataType componentValueType =
+            toRelDataType(dataTypeFactory, fieldType.getMapValueType());
+        return dataTypeFactory.createMapType(componentKeyType, 
componentValueType);
+      case ROW:
+        return toCalciteRowType(fieldType.getRowSchema(), dataTypeFactory);
+      default:
+        return dataTypeFactory.createSqlType(toSqlTypeName(fieldType));
     }
   }
 
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/InferredRowCoderSqlTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/InferredRowCoderSqlTest.java
index 39e2fff11f1..92dd5dbf59a 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/InferredRowCoderSqlTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/InferredRowCoderSqlTest.java
@@ -34,7 +34,6 @@
 /** Tests for automatic inferring schema from the input {@link PCollection} of 
pojos. */
 public class InferredRowCoderSqlTest {
 
-  private static final boolean NOT_NULLABLE = false;
   @Rule public final TestPipeline pipeline = TestPipeline.create();
 
   /** Person POJO. */
@@ -91,10 +90,7 @@ public void testSelect() {
     PAssert.that(result)
         .containsInAnyOrder(
             TestUtils.rowsBuilderOf(
-                    Schema.builder()
-                        .addStringField("name", NOT_NULLABLE)
-                        .addInt32Field("ageYears", NOT_NULLABLE)
-                        .build())
+                    
Schema.builder().addStringField("name").addInt32Field("ageYears").build())
                 .addRows(
                     "Foo", 5,
                     "Bar", 53)
@@ -118,7 +114,7 @@ public void testProject() {
 
     PAssert.that(result)
         .containsInAnyOrder(
-            TestUtils.rowsBuilderOf(Schema.builder().addStringField("name", 
NOT_NULLABLE).build())
+            
TestUtils.rowsBuilderOf(Schema.builder().addStringField("name").build())
                 .addRows("Foo", "Bar")
                 .getRows());
 
@@ -162,10 +158,7 @@ public void testJoin() {
     PAssert.that(result)
         .containsInAnyOrder(
             TestUtils.rowsBuilderOf(
-                    Schema.builder()
-                        .addStringField("name", NOT_NULLABLE)
-                        .addInt32Field("amount", NOT_NULLABLE)
-                        .build())
+                    
Schema.builder().addStringField("name").addInt32Field("amount").build())
                 .addRows(
                     "Foo", 15,
                     "Foo", 10,
@@ -212,10 +205,7 @@ public void testAggregation() {
     PAssert.that(result)
         .containsInAnyOrder(
             TestUtils.rowsBuilderOf(
-                    Schema.builder()
-                        .addStringField("name", NOT_NULLABLE)
-                        .addInt32Field("total", NOT_NULLABLE)
-                        .build())
+                    
Schema.builder().addStringField("name").addInt32Field("total").build())
                 .addRows(
                     "Foo", 30,
                     "Bar", 162)
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/JsonToRowSqlTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/JsonToRowSqlTest.java
index 173df823143..78c502f9fa3 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/JsonToRowSqlTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/JsonToRowSqlTest.java
@@ -29,7 +29,6 @@
 
 /** JsonToRowSqlTest. */
 public class JsonToRowSqlTest {
-  private static final boolean NOT_NULLABLE = false;
 
   @Rule public transient TestPipeline pipeline = TestPipeline.create();
 
@@ -37,9 +36,9 @@
   public void testParsesRows() throws Exception {
     Schema personSchema =
         Schema.builder()
-            .addStringField("name", NOT_NULLABLE)
-            .addInt32Field("height", NOT_NULLABLE)
-            .addBooleanField("knowsJavascript", NOT_NULLABLE)
+            .addStringField("name")
+            .addInt32Field("height")
+            .addBooleanField("knowsJavascript")
             .build();
 
     PCollection<String> jsonPersons =
@@ -51,7 +50,7 @@ public void testParsesRows() throws Exception {
                 jsonPerson("person4", "50", "false"),
                 jsonPerson("person5", "40", "true")));
 
-    Schema resultSchema = Schema.builder().addInt32Field("avg_height", 
NOT_NULLABLE).build();
+    Schema resultSchema = Schema.builder().addInt32Field("avg_height").build();
 
     PCollection<Row> sqlResult =
         jsonPersons
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
index b92615733d0..8cf4247bbca 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
@@ -31,7 +31,6 @@
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.values.PBegin;
@@ -245,8 +244,7 @@ public static Schema buildBeamSqlSchema(Object... args) {
   // TODO: support nested.
   // TODO: support nullable.
   private static Schema.Field toRecordField(Object[] args, int i) {
-    return Schema.Field.of((String) args[i + 1], FieldType.of((TypeName) 
args[i]))
-        .withNullable(true);
+    return Schema.Field.of((String) args[i + 1], (FieldType) 
args[i]).withNullable(true);
   }
 
   /**
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
index 8df4916f3c1..5d86b71ee15 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
@@ -30,7 +30,7 @@
 import java.sql.Statement;
 import org.apache.beam.sdk.extensions.sql.meta.provider.BeamSqlTableProvider;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.Schema;
 import org.apache.calcite.jdbc.CalciteConnection;
 import org.junit.Before;
 import org.junit.Test;
@@ -99,8 +99,8 @@ public void testInternalConnect_boundedTable() throws 
Exception {
             ImmutableMap.of(
                 "test",
                 MockedBoundedTable.of(
-                        TypeName.INT32, "id",
-                        TypeName.STRING, "name")
+                        Schema.FieldType.INT32, "id",
+                        Schema.FieldType.STRING, "name")
                     .addRows(1, "first")));
     CalciteConnection connection = JdbcDriver.connect(tableProvider);
     Statement statement = connection.createStatement();
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java
index 2b79e777577..07276fa08fa 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java
@@ -60,7 +60,7 @@
 
   @Test
   public void testToEnumerable_collectSingle() {
-    Schema schema = Schema.builder().addInt64Field("id", false).build();
+    Schema schema = Schema.builder().addInt64Field("id").build();
     RelDataType type = CalciteUtils.toCalciteRowType(schema, TYPE_FACTORY);
     ImmutableList<ImmutableList<RexLiteral>> tuples =
         
ImmutableList.of(ImmutableList.of(rexBuilder.makeBigintLiteral(BigDecimal.ZERO)));
@@ -77,8 +77,7 @@ public void testToEnumerable_collectSingle() {
 
   @Test
   public void testToEnumerable_collectMultiple() {
-    Schema schema =
-        Schema.builder().addInt64Field("id", false).addInt64Field("otherid", 
false).build();
+    Schema schema = 
Schema.builder().addInt64Field("id").addInt64Field("otherid").build();
     RelDataType type = CalciteUtils.toCalciteRowType(schema, TYPE_FACTORY);
     ImmutableList<ImmutableList<RexLiteral>> tuples =
         ImmutableList.of(
@@ -132,7 +131,7 @@ public void processElement(ProcessContext context) {}
 
   @Test
   public void testToEnumerable_count() {
-    Schema schema = Schema.builder().addInt64Field("id", false).build();
+    Schema schema = Schema.builder().addInt64Field("id").build();
     RelDataType type = CalciteUtils.toCalciteRowType(schema, TYPE_FACTORY);
     ImmutableList<ImmutableList<RexLiteral>> tuples =
         ImmutableList.of(
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
index 66fce5954be..dcfa2a1aedc 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRelTest.java
@@ -20,7 +20,7 @@
 
 import org.apache.beam.sdk.extensions.sql.TestUtils;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
@@ -39,17 +39,17 @@ public static void prepare() {
     registerTable(
         "ORDER_DETAILS1",
         MockedBoundedTable.of(
-                TypeName.INT64, "order_id",
-                TypeName.INT32, "site_id",
-                TypeName.DOUBLE, "price")
+                Schema.FieldType.INT64, "order_id",
+                Schema.FieldType.INT32, "site_id",
+                Schema.FieldType.DOUBLE, "price")
             .addRows(1L, 1, 1.0, 1L, 1, 1.0, 2L, 2, 2.0, 4L, 4, 4.0));
 
     registerTable(
         "ORDER_DETAILS2",
         MockedBoundedTable.of(
-                TypeName.INT64, "order_id",
-                TypeName.INT32, "site_id",
-                TypeName.DOUBLE, "price")
+                Schema.FieldType.INT64, "order_id",
+                Schema.FieldType.INT32, "site_id",
+                Schema.FieldType.DOUBLE, "price")
             .addRows(1L, 1, 1.0, 2L, 2, 2.0, 3L, 3, 3.0));
   }
 
@@ -67,9 +67,9 @@ public void testIntersect() throws Exception {
     PAssert.that(rows)
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
-                    TypeName.INT64, "order_id",
-                    TypeName.INT32, "site_id",
-                    TypeName.DOUBLE, "price")
+                    Schema.FieldType.INT64, "order_id",
+                    Schema.FieldType.INT32, "site_id",
+                    Schema.FieldType.DOUBLE, "price")
                 .addRows(1L, 1, 1.0, 2L, 2, 2.0)
                 .getRows());
 
@@ -92,9 +92,9 @@ public void testIntersectAll() throws Exception {
     PAssert.that(rows)
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
-                    TypeName.INT64, "order_id",
-                    TypeName.INT32, "site_id",
-                    TypeName.DOUBLE, "price")
+                    Schema.FieldType.INT64, "order_id",
+                    Schema.FieldType.INT32, "site_id",
+                    Schema.FieldType.DOUBLE, "price")
                 .addRows(1L, 1, 1.0, 1L, 1, 1.0, 2L, 2, 2.0)
                 .getRows());
 
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
index faeec691709..ad32a6c0fed 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelBoundedVsBoundedTest.java
@@ -20,7 +20,7 @@
 
 import org.apache.beam.sdk.extensions.sql.TestUtils;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
@@ -35,16 +35,16 @@
 
   public static final MockedBoundedTable ORDER_DETAILS1 =
       MockedBoundedTable.of(
-              TypeName.INT32, "order_id",
-              TypeName.INT32, "site_id",
-              TypeName.INT32, "price")
+              Schema.FieldType.INT32, "order_id",
+              Schema.FieldType.INT32, "site_id",
+              Schema.FieldType.INT32, "price")
           .addRows(1, 2, 3, 2, 3, 3, 3, 4, 5);
 
   public static final MockedBoundedTable ORDER_DETAILS2 =
       MockedBoundedTable.of(
-              TypeName.INT32, "order_id",
-              TypeName.INT32, "site_id",
-              TypeName.INT32, "price")
+              Schema.FieldType.INT32, "order_id",
+              Schema.FieldType.INT32, "site_id",
+              Schema.FieldType.INT32, "price")
           .addRows(1, 2, 3, 2, 3, 3, 3, 4, 5);
 
   @BeforeClass
@@ -66,12 +66,12 @@ public void testInnerJoin() throws Exception {
     PAssert.that(rows)
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
-                    TypeName.INT32, "order_id",
-                    TypeName.INT32, "site_id",
-                    TypeName.INT32, "price",
-                    TypeName.INT32, "order_id0",
-                    TypeName.INT32, "site_id0",
-                    TypeName.INT32, "price0")
+                    Schema.FieldType.INT32, "order_id",
+                    Schema.FieldType.INT32, "site_id",
+                    Schema.FieldType.INT32, "price",
+                    Schema.FieldType.INT32, "order_id0",
+                    Schema.FieldType.INT32, "site_id0",
+                    Schema.FieldType.INT32, "price0")
                 .addRows(2, 3, 3, 1, 2, 3)
                 .getRows());
     pipeline.run();
@@ -91,12 +91,12 @@ public void testLeftOuterJoin() throws Exception {
     PAssert.that(rows)
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
-                    TypeName.INT32, "order_id",
-                    TypeName.INT32, "site_id",
-                    TypeName.INT32, "price",
-                    TypeName.INT32, "order_id0",
-                    TypeName.INT32, "site_id0",
-                    TypeName.INT32, "price0")
+                    Schema.FieldType.INT32, "order_id",
+                    Schema.FieldType.INT32, "site_id",
+                    Schema.FieldType.INT32, "price",
+                    Schema.FieldType.INT32, "order_id0",
+                    Schema.FieldType.INT32, "site_id0",
+                    Schema.FieldType.INT32, "price0")
                 .addRows(1, 2, 3, null, null, null, 2, 3, 3, 1, 2, 3, 3, 4, 5, 
null, null, null)
                 .getRows());
     pipeline.run();
@@ -115,12 +115,12 @@ public void testRightOuterJoin() throws Exception {
     PAssert.that(rows)
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
-                    TypeName.INT32, "order_id",
-                    TypeName.INT32, "site_id",
-                    TypeName.INT32, "price",
-                    TypeName.INT32, "order_id0",
-                    TypeName.INT32, "site_id0",
-                    TypeName.INT32, "price0")
+                    Schema.FieldType.INT32, "order_id",
+                    Schema.FieldType.INT32, "site_id",
+                    Schema.FieldType.INT32, "price",
+                    Schema.FieldType.INT32, "order_id0",
+                    Schema.FieldType.INT32, "site_id0",
+                    Schema.FieldType.INT32, "price0")
                 .addRows(2, 3, 3, 1, 2, 3, null, null, null, 2, 3, 3, null, 
null, null, 3, 4, 5)
                 .getRows());
     pipeline.run();
@@ -139,12 +139,12 @@ public void testFullOuterJoin() throws Exception {
     PAssert.that(rows)
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
-                    TypeName.INT32, "order_id",
-                    TypeName.INT32, "site_id",
-                    TypeName.INT32, "price",
-                    TypeName.INT32, "order_id0",
-                    TypeName.INT32, "site_id0",
-                    TypeName.INT32, "price0")
+                    Schema.FieldType.INT32, "order_id",
+                    Schema.FieldType.INT32, "site_id",
+                    Schema.FieldType.INT32, "price",
+                    Schema.FieldType.INT32, "order_id0",
+                    Schema.FieldType.INT32, "site_id0",
+                    Schema.FieldType.INT32, "price0")
                 .addRows(
                     2, 3, 3, 1, 2, 3, 1, 2, 3, null, null, null, 3, 4, 5, 
null, null, null, null,
                     null, null, 2, 3, 3, null, null, null, 3, 4, 5)
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
index bd62b3f8a8c..59d4db82019 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
@@ -29,7 +29,6 @@
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
 import org.apache.beam.sdk.extensions.sql.mock.MockedUnboundedTable;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -56,10 +55,10 @@ public static void prepare() {
     registerTable(
         "ORDER_DETAILS",
         MockedUnboundedTable.of(
-                TypeName.INT32, "order_id",
-                TypeName.INT32, "site_id",
-                TypeName.INT32, "price",
-                TypeName.DATETIME, "order_time")
+                Schema.FieldType.INT32, "order_id",
+                Schema.FieldType.INT32, "site_id",
+                Schema.FieldType.INT32, "price",
+                Schema.FieldType.DATETIME, "order_time")
             .timestampColumnIndex(3)
             .addRows(Duration.ZERO, 1, 1, 1, FIRST_DATE, 1, 2, 2, FIRST_DATE)
             .addRows(
@@ -92,8 +91,8 @@ public static void prepare() {
     registerTable(
         "ORDER_DETAILS1",
         MockedBoundedTable.of(
-                TypeName.INT32, "order_id",
-                TypeName.STRING, "buyer")
+                Schema.FieldType.INT32, "order_id",
+                Schema.FieldType.STRING, "buyer")
             .addRows(
                 1, "james",
                 2, "bond"));
@@ -102,8 +101,8 @@ public static void prepare() {
         "SITE_LKP",
         new SiteLookupTable(
             TestUtils.buildBeamSqlSchema(
-                TypeName.INT32, "site_id",
-                TypeName.STRING, "site_name")));
+                Schema.FieldType.INT32, "site_id",
+                Schema.FieldType.STRING, "site_name")));
   }
 
   /** Test table for JOIN-AS-LOOKUP. */
@@ -149,9 +148,9 @@ public void testInnerJoin_unboundedTableOnTheLeftSide() 
throws Exception {
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
-                    TypeName.INT32, "order_id",
-                    TypeName.INT32, "sum_site_id",
-                    TypeName.STRING, "buyer")
+                    Schema.FieldType.INT32, "order_id",
+                    Schema.FieldType.INT32, "sum_site_id",
+                    Schema.FieldType.STRING, "buyer")
                 .addRows(1, 3, "james", 2, 5, "bond")
                 .getStringRows());
     pipeline.run();
@@ -172,9 +171,9 @@ public void testInnerJoin_boundedTableOnTheLeftSide() 
throws Exception {
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
-                    TypeName.INT32, "order_id",
-                    TypeName.INT32, "sum_site_id",
-                    TypeName.STRING, "buyer")
+                    Schema.FieldType.INT32, "order_id",
+                    Schema.FieldType.INT32, "sum_site_id",
+                    Schema.FieldType.STRING, "buyer")
                 .addRows(1, 3, "james", 2, 5, "bond")
                 .getStringRows());
     pipeline.run();
@@ -196,9 +195,9 @@ public void testLeftOuterJoin() throws Exception {
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
-                    TypeName.INT32, "order_id",
-                    TypeName.INT32, "sum_site_id",
-                    TypeName.STRING, "buyer")
+                    Schema.FieldType.INT32, "order_id",
+                    Schema.FieldType.INT32, "sum_site_id",
+                    Schema.FieldType.STRING, "buyer")
                 .addRows(1, 3, "james", 2, 5, "bond", 3, 3, null)
                 .getStringRows());
     pipeline.run();
@@ -233,9 +232,9 @@ public void testRightOuterJoin() throws Exception {
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
-                    TypeName.INT32, "order_id",
-                    TypeName.INT32, "sum_site_id",
-                    TypeName.STRING, "buyer")
+                    Schema.FieldType.INT32, "order_id",
+                    Schema.FieldType.INT32, "sum_site_id",
+                    Schema.FieldType.STRING, "buyer")
                 .addRows(1, 3, "james", 2, 5, "bond", 3, 3, null)
                 .getStringRows());
     pipeline.run();
@@ -285,8 +284,8 @@ public void testJoinAsLookup() throws Exception {
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
-                    TypeName.INT32, "order_id",
-                    TypeName.STRING, "site_name")
+                    Schema.FieldType.INT32, "order_id",
+                    Schema.FieldType.STRING, "site_name")
                 .addRows(1, "SITE1")
                 .getStringRows());
     pipeline.run();
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
index 5aaa2de7b27..a23867415c4 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.java
@@ -21,7 +21,7 @@
 import org.apache.beam.sdk.extensions.sql.TestUtils;
 import 
org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn;
 import org.apache.beam.sdk.extensions.sql.mock.MockedUnboundedTable;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -46,10 +46,10 @@ public static void prepare() {
     registerTable(
         "ORDER_DETAILS",
         MockedUnboundedTable.of(
-                TypeName.INT32, "order_id",
-                TypeName.INT32, "site_id",
-                TypeName.INT32, "price",
-                TypeName.DATETIME, "order_time")
+                Schema.FieldType.INT32, "order_id",
+                Schema.FieldType.INT32, "site_id",
+                Schema.FieldType.INT32, "price",
+                Schema.FieldType.DATETIME, "order_time")
             .timestampColumnIndex(3)
             .addRows(Duration.ZERO, 1, 1, 1, FIRST_DATE, 1, 2, 6, FIRST_DATE)
             .addRows(
@@ -92,10 +92,10 @@ public void testInnerJoin() throws Exception {
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
-                    TypeName.INT32, "order_id",
-                    TypeName.INT32, "sum_site_id",
-                    TypeName.INT32, "order_id0",
-                    TypeName.INT32, "sum_site_id0")
+                    Schema.FieldType.INT32, "order_id",
+                    Schema.FieldType.INT32, "sum_site_id",
+                    Schema.FieldType.INT32, "order_id0",
+                    Schema.FieldType.INT32, "sum_site_id0")
                 .addRows(1, 3, 1, 3, 2, 5, 2, 5)
                 .getStringRows());
     pipeline.run();
@@ -123,10 +123,10 @@ public void testLeftOuterJoin() throws Exception {
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
-                    TypeName.INT32, "order_id",
-                    TypeName.INT32, "sum_site_id",
-                    TypeName.INT32, "order_id0",
-                    TypeName.INT32, "sum_site_id0")
+                    Schema.FieldType.INT32, "order_id",
+                    Schema.FieldType.INT32, "sum_site_id",
+                    Schema.FieldType.INT32, "order_id0",
+                    Schema.FieldType.INT32, "sum_site_id0")
                 .addRows(1, 1, 1, 3, 2, 2, null, null, 2, 2, 2, 5, 3, 3, null, 
null)
                 .getStringRows());
     pipeline.run();
@@ -148,10 +148,10 @@ public void testRightOuterJoin() throws Exception {
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
-                    TypeName.INT32, "order_id",
-                    TypeName.INT32, "sum_site_id",
-                    TypeName.INT32, "order_id0",
-                    TypeName.INT32, "sum_site_id0")
+                    Schema.FieldType.INT32, "order_id",
+                    Schema.FieldType.INT32, "sum_site_id",
+                    Schema.FieldType.INT32, "order_id0",
+                    Schema.FieldType.INT32, "sum_site_id0")
                 .addRows(1, 3, 1, 1, null, null, 2, 2, 2, 5, 2, 2, null, null, 
3, 3)
                 .getStringRows());
     pipeline.run();
@@ -174,10 +174,10 @@ public void testFullOuterJoin() throws Exception {
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
-                    TypeName.INT32, "order_id1",
-                    TypeName.INT32, "sum_site_id",
-                    TypeName.INT32, "order_id",
-                    TypeName.INT32, "sum_site_id0")
+                    Schema.FieldType.INT32, "order_id1",
+                    Schema.FieldType.INT32, "sum_site_id",
+                    Schema.FieldType.INT32, "order_id",
+                    Schema.FieldType.INT32, "sum_site_id0")
                 .addRows(
                     1, 1, 1, 3, 6, 2, null, null, 7, 2, null, null, 8, 3, 
null, null, null, null, 2,
                     5)
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
index a9fa53a01db..3736bdebf3d 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java
@@ -20,7 +20,7 @@
 
 import org.apache.beam.sdk.extensions.sql.TestUtils;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
@@ -38,17 +38,17 @@ public static void prepare() {
     registerTable(
         "ORDER_DETAILS1",
         MockedBoundedTable.of(
-                TypeName.INT64, "order_id",
-                TypeName.INT32, "site_id",
-                TypeName.DOUBLE, "price")
+                Schema.FieldType.INT64, "order_id",
+                Schema.FieldType.INT32, "site_id",
+                Schema.FieldType.DOUBLE, "price")
             .addRows(1L, 1, 1.0, 1L, 1, 1.0, 2L, 2, 2.0, 4L, 4, 4.0, 4L, 4, 
4.0));
 
     registerTable(
         "ORDER_DETAILS2",
         MockedBoundedTable.of(
-                TypeName.INT64, "order_id",
-                TypeName.INT32, "site_id",
-                TypeName.DOUBLE, "price")
+                Schema.FieldType.INT64, "order_id",
+                Schema.FieldType.INT32, "site_id",
+                Schema.FieldType.DOUBLE, "price")
             .addRows(1L, 1, 1.0, 2L, 2, 2.0, 3L, 3, 3.0));
   }
 
@@ -66,9 +66,9 @@ public void testExcept() throws Exception {
     PAssert.that(rows)
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
-                    TypeName.INT64, "order_id",
-                    TypeName.INT32, "site_id",
-                    TypeName.DOUBLE, "price")
+                    Schema.FieldType.INT64, "order_id",
+                    Schema.FieldType.INT32, "site_id",
+                    Schema.FieldType.DOUBLE, "price")
                 .addRows(4L, 4, 4.0)
                 .getRows());
 
@@ -91,9 +91,9 @@ public void testExceptAll() throws Exception {
     PAssert.that(rows)
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
-                    TypeName.INT64, "order_id",
-                    TypeName.INT32, "site_id",
-                    TypeName.DOUBLE, "price")
+                    Schema.FieldType.INT64, "order_id",
+                    Schema.FieldType.INT32, "site_id",
+                    Schema.FieldType.DOUBLE, "price")
                 .addRows(4L, 4, 4.0, 4L, 4, 4.0)
                 .getRows());
 
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java
index 23b935d61b5..3f87fda4e49 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBaseTest.java
@@ -22,7 +22,7 @@
 import org.apache.beam.sdk.extensions.sql.TestUtils;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -43,10 +43,10 @@ public static void prepare() {
     registerTable(
         "ORDER_DETAILS",
         MockedBoundedTable.of(
-                TypeName.INT64, "order_id",
-                TypeName.INT32, "site_id",
-                TypeName.DOUBLE, "price",
-                TypeName.DATETIME, "order_time")
+                Schema.FieldType.INT64, "order_id",
+                Schema.FieldType.INT32, "site_id",
+                Schema.FieldType.DOUBLE, "price",
+                Schema.FieldType.DATETIME, "order_time")
             .addRows(1L, 1, 1.0, THE_DATE, 2L, 2, 2.0, THE_DATE));
   }
 
@@ -67,9 +67,9 @@ public void testSameWindow() throws Exception {
     PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
-                    TypeName.INT64, "order_id",
-                    TypeName.INT32, "site_id",
-                    TypeName.INT64, "cnt")
+                    Schema.FieldType.INT64, "order_id",
+                    Schema.FieldType.INT32, "site_id",
+                    Schema.FieldType.INT64, "cnt")
                 .addRows(1L, 1, 1L, 2L, 2, 1L)
                 .getStringRows());
     pipeline.run();
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
index 7be74a87f5a..5965fa39df9 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRelTest.java
@@ -20,7 +20,7 @@
 
 import org.apache.beam.sdk.extensions.sql.TestUtils;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
@@ -40,10 +40,10 @@ public void prepare() {
     registerTable(
         "ORDER_DETAILS",
         MockedBoundedTable.of(
-                TypeName.INT64, "order_id",
-                TypeName.INT32, "site_id",
-                TypeName.DOUBLE, "price",
-                TypeName.DATETIME, "order_time")
+                Schema.FieldType.INT64, "order_id",
+                Schema.FieldType.INT32, "site_id",
+                Schema.FieldType.DOUBLE, "price",
+                Schema.FieldType.DATETIME, "order_time")
             .addRows(
                 1L,
                 2,
@@ -88,9 +88,9 @@ public void prepare() {
     registerTable(
         "SUB_ORDER_RAM",
         MockedBoundedTable.of(
-            TypeName.INT64, "order_id",
-            TypeName.INT32, "site_id",
-            TypeName.DOUBLE, "price"));
+            Schema.FieldType.INT64, "order_id",
+            Schema.FieldType.INT32, "site_id",
+            Schema.FieldType.DOUBLE, "price"));
   }
 
   @Test
@@ -105,9 +105,9 @@ public void testOrderBy_basic() throws Exception {
     PAssert.that(rows)
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
-                    TypeName.INT64, "order_id",
-                    TypeName.INT32, "site_id",
-                    TypeName.DOUBLE, "price")
+                    Schema.FieldType.INT64, "order_id",
+                    Schema.FieldType.INT32, "site_id",
+                    Schema.FieldType.DOUBLE, "price")
                 .addRows(1L, 2, 1.0, 1L, 1, 2.0, 2L, 4, 3.0, 2L, 1, 4.0)
                 .getRows());
     pipeline.run().waitUntilFinish();
@@ -124,10 +124,10 @@ public void testOrderBy_timestamp() throws Exception {
     PAssert.that(rows)
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
-                    TypeName.INT64, "order_id",
-                    TypeName.INT32, "site_id",
-                    TypeName.DOUBLE, "price",
-                    TypeName.DATETIME, "order_time")
+                    Schema.FieldType.INT64, "order_id",
+                    Schema.FieldType.INT32, "site_id",
+                    Schema.FieldType.DOUBLE, "price",
+                    Schema.FieldType.DATETIME, "order_time")
                 .addRows(
                     7L,
                     7,
@@ -154,16 +154,16 @@ public void testOrderBy_nullsFirst() throws Exception {
     registerTable(
         "ORDER_DETAILS",
         MockedBoundedTable.of(
-                TypeName.INT64, "order_id",
-                TypeName.INT32, "site_id",
-                TypeName.DOUBLE, "price")
+                Schema.FieldType.INT64, "order_id",
+                Schema.FieldType.INT32, "site_id",
+                Schema.FieldType.DOUBLE, "price")
             .addRows(1L, 2, 1.0, 1L, null, 2.0, 2L, 1, 3.0, 2L, null, 4.0, 5L, 
5, 5.0));
     registerTable(
         "SUB_ORDER_RAM",
         MockedBoundedTable.of(
-            TypeName.INT64, "order_id",
-            TypeName.INT32, "site_id",
-            TypeName.DOUBLE, "price"));
+            Schema.FieldType.INT64, "order_id",
+            Schema.FieldType.INT32, "site_id",
+            Schema.FieldType.DOUBLE, "price"));
 
     String sql =
         "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
@@ -175,9 +175,9 @@ public void testOrderBy_nullsFirst() throws Exception {
     PAssert.that(rows)
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
-                    TypeName.INT64, "order_id",
-                    TypeName.INT32, "site_id",
-                    TypeName.DOUBLE, "price")
+                    Schema.FieldType.INT64, "order_id",
+                    Schema.FieldType.INT32, "site_id",
+                    Schema.FieldType.DOUBLE, "price")
                 .addRows(1L, null, 2.0, 1L, 2, 1.0, 2L, null, 4.0, 2L, 1, 3.0)
                 .getRows());
     pipeline.run().waitUntilFinish();
@@ -188,16 +188,16 @@ public void testOrderBy_nullsLast() throws Exception {
     registerTable(
         "ORDER_DETAILS",
         MockedBoundedTable.of(
-                TypeName.INT64, "order_id",
-                TypeName.INT32, "site_id",
-                TypeName.DOUBLE, "price")
+                Schema.FieldType.INT64, "order_id",
+                Schema.FieldType.INT32, "site_id",
+                Schema.FieldType.DOUBLE, "price")
             .addRows(1L, 2, 1.0, 1L, null, 2.0, 2L, 1, 3.0, 2L, null, 4.0, 5L, 
5, 5.0));
     registerTable(
         "SUB_ORDER_RAM",
         MockedBoundedTable.of(
-            TypeName.INT64, "order_id",
-            TypeName.INT32, "site_id",
-            TypeName.DOUBLE, "price"));
+            Schema.FieldType.INT64, "order_id",
+            Schema.FieldType.INT32, "site_id",
+            Schema.FieldType.DOUBLE, "price"));
 
     String sql =
         "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
@@ -209,9 +209,9 @@ public void testOrderBy_nullsLast() throws Exception {
     PAssert.that(rows)
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
-                    TypeName.INT64, "order_id",
-                    TypeName.INT32, "site_id",
-                    TypeName.DOUBLE, "price")
+                    Schema.FieldType.INT64, "order_id",
+                    Schema.FieldType.INT32, "site_id",
+                    Schema.FieldType.DOUBLE, "price")
                 .addRows(1L, 2, 1.0, 1L, null, 2.0, 2L, 1, 3.0, 2L, null, 4.0)
                 .getRows());
     pipeline.run().waitUntilFinish();
@@ -229,9 +229,9 @@ public void testOrderBy_with_offset() throws Exception {
     PAssert.that(rows)
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
-                    TypeName.INT64, "order_id",
-                    TypeName.INT32, "site_id",
-                    TypeName.DOUBLE, "price")
+                    Schema.FieldType.INT64, "order_id",
+                    Schema.FieldType.INT32, "site_id",
+                    Schema.FieldType.DOUBLE, "price")
                 .addRows(5L, 5, 5.0, 6L, 6, 6.0, 7L, 7, 7.0, 8L, 8888, 8.0)
                 .getRows());
     pipeline.run().waitUntilFinish();
@@ -249,9 +249,9 @@ public void testOrderBy_bigFetch() throws Exception {
     PAssert.that(rows)
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
-                    TypeName.INT64, "order_id",
-                    TypeName.INT32, "site_id",
-                    TypeName.DOUBLE, "price")
+                    Schema.FieldType.INT64, "order_id",
+                    Schema.FieldType.INT32, "site_id",
+                    Schema.FieldType.DOUBLE, "price")
                 .addRows(
                     1L, 2, 1.0, 1L, 1, 2.0, 2L, 4, 3.0, 2L, 1, 4.0, 5L, 5, 
5.0, 6L, 6, 6.0, 7L, 7,
                     7.0, 8L, 8888, 8.0, 8L, 999, 9.0, 10L, 100, 10.0)
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
index 742f5736000..efbb5a68dc4 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRelTest.java
@@ -20,7 +20,7 @@
 
 import org.apache.beam.sdk.extensions.sql.TestUtils;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
@@ -38,9 +38,9 @@ public static void prepare() {
     registerTable(
         "ORDER_DETAILS",
         MockedBoundedTable.of(
-                TypeName.INT64, "order_id",
-                TypeName.INT32, "site_id",
-                TypeName.DOUBLE, "price")
+                Schema.FieldType.INT64, "order_id",
+                Schema.FieldType.INT32, "site_id",
+                Schema.FieldType.DOUBLE, "price")
             .addRows(1L, 1, 1.0, 2L, 2, 2.0));
   }
 
@@ -58,9 +58,9 @@ public void testUnion() throws Exception {
     PAssert.that(rows)
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
-                    TypeName.INT64, "order_id",
-                    TypeName.INT32, "site_id",
-                    TypeName.DOUBLE, "price")
+                    Schema.FieldType.INT64, "order_id",
+                    Schema.FieldType.INT32, "site_id",
+                    Schema.FieldType.DOUBLE, "price")
                 .addRows(1L, 1, 1.0, 2L, 2, 2.0)
                 .getRows());
     pipeline.run();
@@ -80,9 +80,9 @@ public void testUnionAll() throws Exception {
     PAssert.that(rows)
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
-                    TypeName.INT64, "order_id",
-                    TypeName.INT32, "site_id",
-                    TypeName.DOUBLE, "price")
+                    Schema.FieldType.INT64, "order_id",
+                    Schema.FieldType.INT32, "site_id",
+                    Schema.FieldType.DOUBLE, "price")
                 .addRows(1L, 1, 1.0, 1L, 1, 1.0, 2L, 2, 2.0, 2L, 2, 2.0)
                 .getRows());
     pipeline.run();
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
index 7fd4e37b468..19cdd62fbc8 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRelTest.java
@@ -20,7 +20,7 @@
 
 import org.apache.beam.sdk.extensions.sql.TestUtils;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
@@ -38,13 +38,13 @@ public static void prepare() {
     registerTable(
         "string_table",
         MockedBoundedTable.of(
-            TypeName.STRING, "name",
-            TypeName.STRING, "description"));
+            Schema.FieldType.STRING, "name",
+            Schema.FieldType.STRING, "description"));
     registerTable(
         "int_table",
         MockedBoundedTable.of(
-            TypeName.INT32, "c0",
-            TypeName.INT32, "c1"));
+            Schema.FieldType.INT32, "c0",
+            Schema.FieldType.INT32, "c1"));
   }
 
   @Test
@@ -56,8 +56,8 @@ public void testValues() throws Exception {
     PAssert.that(rows)
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
-                    TypeName.STRING, "name",
-                    TypeName.STRING, "description")
+                    Schema.FieldType.STRING, "name",
+                    Schema.FieldType.STRING, "description")
                 .addRows(
                     "hello", "world",
                     "james", "bond")
@@ -72,8 +72,8 @@ public void testValues_castInt() throws Exception {
     PAssert.that(rows)
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
-                    TypeName.INT32, "c0",
-                    TypeName.INT32, "c1")
+                    Schema.FieldType.INT32, "c0",
+                    Schema.FieldType.INT32, "c1")
                 .addRows(1, 2)
                 .getRows());
     pipeline.run();
@@ -86,8 +86,8 @@ public void testValues_onlySelect() throws Exception {
     PAssert.that(rows)
         .containsInAnyOrder(
             TestUtils.RowsBuilder.of(
-                    TypeName.INT32, "EXPR$0",
-                    TypeName.STRING, "EXPR$1")
+                    Schema.FieldType.INT32, "EXPR$0",
+                    Schema.FieldType.STRING, "EXPR$1")
                 .addRows(1, "1")
                 .getRows());
     pipeline.run();
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
index 8b07fcc0a85..99ccb9d98f0 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
@@ -43,11 +43,11 @@
 public class BigQueryUtilsTest {
   private static final Schema FLAT_TYPE = Schema
       .builder()
-      .addInt64Field("id", true)
-      .addDoubleField("value", true)
-      .addStringField("name", true)
-      .addDateTimeField("timestamp", true)
-      .addBooleanField("valid", true)
+      .addField(Schema.Field.nullable("id", Schema.FieldType.INT64))
+      .addField(Schema.Field.nullable("value", Schema.FieldType.DOUBLE))
+      .addField(Schema.Field.nullable("name", Schema.FieldType.STRING))
+      .addField(Schema.Field.nullable("timestamp", Schema.FieldType.DATETIME))
+      .addField(Schema.Field.nullable("valid", Schema.FieldType.BOOLEAN))
       .build();
 
   private static final Schema ARRAY_TYPE = Schema
@@ -57,15 +57,13 @@
 
   private static final Schema ROW_TYPE = Schema
       .builder()
-      .addRowField("row", FLAT_TYPE, true)
+      .addField(Schema.Field.nullable("row", Schema.FieldType.row(FLAT_TYPE)))
       .build();
 
-  private static final Schema ARRAY_ROW_TYPE = Schema
-      .builder()
-      .addArrayField("rows", Schema.FieldType
-          .of(Schema.TypeName.ROW)
-          .withRowSchema(FLAT_TYPE))
-      .build();
+  private static final Schema ARRAY_ROW_TYPE =
+      Schema.builder()
+          .addArrayField("rows", 
Schema.FieldType.of(Schema.TypeName.ROW).withRowSchema(FLAT_TYPE))
+          .build();
 
   private static final TableFieldSchema ID =
       new TableFieldSchema().setName("id")


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 107261)
    Time Spent: 2h 40m  (was: 2.5h)

> Refactor builder field nullability
> ----------------------------------
>
>                 Key: BEAM-4077
>                 URL: https://issues.apache.org/jira/browse/BEAM-4077
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-java-core
>            Reporter: Kenneth Knowles
>            Assignee: Kenneth Knowles
>            Priority: Major
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Currently the Schema builder methods take a boolean for nullability. It would 
> be more standard to have separate builder methods. At this point the builder 
> might as well just take the Field spec since it does not add concision.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to