[ 
https://issues.apache.org/jira/browse/BEAM-5884?focusedWorklogId=171282&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-171282
 ]

ASF GitHub Bot logged work on BEAM-5884:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Dec/18 06:21
            Start Date: 01/Dec/18 06:21
    Worklog Time Spent: 10m 
      Work Description: reuvenlax closed pull request #7175: [BEAM-5884] Move 
the nullable attribute onto FieldType.
URL: https://github.com/apache/beam/pull/7175
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
index 50be370adb0f..112ffe3be5a7 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
@@ -51,6 +51,7 @@
 import net.bytebuddy.matcher.ElementMatchers;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
 import org.apache.beam.sdk.values.Row;
 
@@ -147,7 +148,8 @@
 
   private static DynamicType.Builder<Coder> implementMethods(
       Schema schema, DynamicType.Builder<Coder> builder) {
-    boolean hasNullableFields = 
schema.getFields().stream().anyMatch(Field::getNullable);
+    boolean hasNullableFields =
+        
schema.getFields().stream().map(Field::getType).anyMatch(FieldType::getNullable);
     return builder
         .defineMethod("getSchema", Schema.class, Visibility.PRIVATE, 
Ownership.STATIC)
         .intercept(FixedValue.reference(schema))
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
index 90ec371cadb5..a923c0ea16b4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
@@ -269,7 +269,7 @@ private boolean equivalent(Schema other, 
EquivalenceNullablePolicy nullablePolic
     for (int i = 0; i < otherFields.size(); ++i) {
       Field otherField = otherFields.get(i);
       Field actualField = actualFields.get(i);
-      if (!otherField.equivalent(actualField, nullablePolicy)) {
+      if (!actualField.equivalent(otherField, nullablePolicy)) {
         return false;
       }
     }
@@ -410,14 +410,13 @@ public boolean isSupertypeOf(TypeName other) {
     // Returns the type of this field.
     public abstract TypeName getTypeName();
 
+    // Whether this type is nullable.
+    public abstract Boolean getNullable();
+
     // For container types (e.g. ARRAY), returns the type of the contained 
element.
     @Nullable
     public abstract FieldType getCollectionElementType();
 
-    // For container types (e.g. ARRAY), returns nullable of the type of the 
contained element.
-    @Nullable
-    public abstract Boolean getCollectionElementTypeNullable();
-
     // For MAP type, returns the type of the key element, it must be a 
primitive type;
     @Nullable
     public abstract FieldType getMapKeyType();
@@ -426,10 +425,6 @@ public boolean isSupertypeOf(TypeName other) {
     @Nullable
     public abstract FieldType getMapValueType();
 
-    // For MAP type, returns nullable of the type of the value element, it can 
be a nested type;
-    @Nullable
-    public abstract Boolean getMapValueTypeNullable();
-
     // For ROW types, returns the schema for the row.
     @Nullable
     public abstract Schema getRowSchema();
@@ -442,7 +437,7 @@ public boolean isSupertypeOf(TypeName other) {
     abstract FieldType.Builder toBuilder();
 
     public static FieldType.Builder forTypeName(TypeName typeName) {
-      return new AutoValue_Schema_FieldType.Builder().setTypeName(typeName);
+      return new 
AutoValue_Schema_FieldType.Builder().setTypeName(typeName).setNullable(false);
     }
 
     @AutoValue.Builder
@@ -451,14 +446,12 @@ public boolean isSupertypeOf(TypeName other) {
 
       abstract Builder setCollectionElementType(@Nullable FieldType 
collectionElementType);
 
-      abstract Builder setCollectionElementTypeNullable(@Nullable Boolean 
nullable);
+      abstract Builder setNullable(Boolean nullable);
 
       abstract Builder setMapKeyType(@Nullable FieldType mapKeyType);
 
       abstract Builder setMapValueType(@Nullable FieldType mapValueType);
 
-      abstract Builder setMapValueTypeNullable(@Nullable Boolean nullable);
-
       abstract Builder setRowSchema(@Nullable Schema rowSchema);
 
       abstract Builder setMetadata(@Nullable byte[] metadata);
@@ -506,16 +499,12 @@ public static FieldType of(TypeName typeName) {
 
     /** Create an array type for the given field type. */
     public static final FieldType array(FieldType elementType) {
-      return FieldType.forTypeName(TypeName.ARRAY)
-          .setCollectionElementType(elementType)
-          .setCollectionElementTypeNullable(false)
-          .build();
+      return 
FieldType.forTypeName(TypeName.ARRAY).setCollectionElementType(elementType).build();
     }
 
     public static final FieldType array(FieldType elementType, boolean 
nullable) {
       return FieldType.forTypeName(TypeName.ARRAY)
-          .setCollectionElementType(elementType)
-          .setCollectionElementTypeNullable(nullable)
+          .setCollectionElementType(elementType.withNullable(nullable))
           .build();
     }
 
@@ -524,7 +513,6 @@ public static final FieldType map(FieldType keyType, 
FieldType valueType) {
       return FieldType.forTypeName(TypeName.MAP)
           .setMapKeyType(keyType)
           .setMapValueType(valueType)
-          .setMapValueTypeNullable(false)
           .build();
     }
 
@@ -532,8 +520,7 @@ public static final FieldType map(
         FieldType keyType, FieldType valueType, boolean valueTypeNullable) {
       return FieldType.forTypeName(TypeName.MAP)
           .setMapKeyType(keyType)
-          .setMapValueType(valueType)
-          .setMapValueTypeNullable(valueTypeNullable)
+          .setMapValueType(valueType.withNullable(valueTypeNullable))
           .build();
     }
 
@@ -552,6 +539,10 @@ public FieldType withMetadata(String metadata) {
       return 
toBuilder().setMetadata(metadata.getBytes(StandardCharsets.UTF_8)).build();
     }
 
+    public FieldType withNullable(boolean nullable) {
+      return toBuilder().setNullable(nullable).build();
+    }
+
     @Override
     public boolean equals(Object o) {
       if (!(o instanceof FieldType)) {
@@ -559,6 +550,7 @@ public boolean equals(Object o) {
       }
       FieldType other = (FieldType) o;
       return Objects.equals(getTypeName(), other.getTypeName())
+          && Objects.equals(getNullable(), other.getNullable())
           && Objects.equals(getCollectionElementType(), 
other.getCollectionElementType())
           && Objects.equals(getMapKeyType(), other.getMapKeyType())
           && Objects.equals(getMapValueType(), other.getMapValueType())
@@ -589,29 +581,43 @@ public boolean typesEqual(FieldType other) {
       return true;
     }
 
-    private boolean equivalent(FieldType other) {
-      if (!other.getTypeName().equals(getTypeName())) {
+    private boolean equivalent(FieldType other, EquivalenceNullablePolicy 
nullablePolicy) {
+      if (nullablePolicy == EquivalenceNullablePolicy.SAME
+          && !other.getNullable().equals(getNullable())) {
         return false;
+      } else if (nullablePolicy == EquivalenceNullablePolicy.WEAKEN) {
+        if (getNullable() && !other.getNullable()) {
+          return false;
+        }
       }
+
+      if (!getTypeName().equals(other.getTypeName())) {
+        return false;
+      }
+      if (!Arrays.equals(getMetadata(), other.getMetadata())) {
+        return false;
+      }
+
       switch (getTypeName()) {
         case ROW:
-          if (!other.getRowSchema().equivalent(getRowSchema())) {
+          if (!getRowSchema().equivalent(other.getRowSchema(), 
nullablePolicy)) {
             return false;
           }
           break;
         case ARRAY:
-          if 
(!other.getCollectionElementType().equivalent(getCollectionElementType())) {
+          if (!getCollectionElementType()
+              .equivalent(other.getCollectionElementType(), nullablePolicy)) {
             return false;
           }
           break;
         case MAP:
-          if (!other.getMapKeyType().equivalent(getMapKeyType())
-              || !other.getMapValueType().equivalent(getMapValueType())) {
+          if (!getMapKeyType().equivalent(other.getMapKeyType(), 
nullablePolicy)
+              || !getMapValueType().equivalent(other.getMapValueType(), 
nullablePolicy)) {
             return false;
           }
           break;
         default:
-          return other.equals(this);
+          return true;
       }
       return true;
     }
@@ -621,6 +627,7 @@ public int hashCode() {
       return Arrays.deepHashCode(
           new Object[] {
             getTypeName(),
+            getNullable(),
             getCollectionElementType(),
             getMapKeyType(),
             getMapValueType(),
@@ -642,9 +649,6 @@ public int hashCode() {
     /** Returns the fields {@link FieldType}. */
     public abstract FieldType getType();
 
-    /** Returns whether the field supports null values. */
-    public abstract Boolean getNullable();
-
     public abstract Builder toBuilder();
 
     /** Builder for {@link Field}. */
@@ -656,8 +660,6 @@ public int hashCode() {
 
       public abstract Builder setType(FieldType fieldType);
 
-      public abstract Builder setNullable(Boolean nullable);
-
       public abstract Field build();
     }
 
@@ -667,7 +669,6 @@ public static Field of(String name, FieldType fieldType) {
           .setName(name)
           .setDescription("")
           .setType(fieldType)
-          .setNullable(false) // By default fields are not nullable.
           .build();
     }
 
@@ -676,8 +677,7 @@ public static Field nullable(String name, FieldType 
fieldType) {
       return new AutoValue_Schema_Field.Builder()
           .setName(name)
           .setDescription("")
-          .setType(fieldType)
-          .setNullable(true)
+          .setType(fieldType.withNullable(true))
           .build();
     }
 
@@ -698,7 +698,7 @@ public Field withType(FieldType fieldType) {
 
     /** Returns a copy of the Field with isNullable set. */
     public Field withNullable(boolean isNullable) {
-      return toBuilder().setNullable(isNullable).build();
+      return toBuilder().setType(getType().withNullable(isNullable)).build();
     }
 
     @Override
@@ -709,31 +709,22 @@ public boolean equals(Object o) {
       Field other = (Field) o;
       return Objects.equals(getName(), other.getName())
           && Objects.equals(getDescription(), other.getDescription())
-          && Objects.equals(getType(), other.getType())
-          && Objects.equals(getNullable(), other.getNullable());
+          && Objects.equals(getType(), other.getType());
     }
 
     /** Returns true if two fields are equal, ignoring name and description. */
     public boolean typesEqual(Field other) {
-      return getType().typesEqual(other.getType())
-          && Objects.equals(getNullable(), other.getNullable());
+      return getType().typesEqual(other.getType());
     }
 
     private boolean equivalent(Field otherField, EquivalenceNullablePolicy 
nullablePolicy) {
-      if (nullablePolicy == EquivalenceNullablePolicy.SAME
-          && !otherField.getNullable().equals(getNullable())) {
-        return false;
-      } else if (nullablePolicy == EquivalenceNullablePolicy.WEAKEN) {
-        if (getNullable() && !otherField.getNullable()) {
-          return false;
-        }
-      }
-      return otherField.getName().equals(getName()) && 
getType().equivalent(otherField.getType());
+      return getName().equals(otherField.getName())
+          && getType().equivalent(otherField.getType(), nullablePolicy);
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(getName(), getDescription(), getType(), 
getNullable());
+      return Objects.hash(getName(), getDescription(), getType());
     }
   }
 
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Cast.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Cast.java
index 3048806edf0a..fc2521427a69 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Cast.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Cast.java
@@ -148,7 +148,7 @@ public String toString() {
           return Collections.singletonList(
               CompatibilityError.create(context.path(), "Field is missing in 
output schema"));
         } else {
-          if (left.get().getNullable() && !right.get().getNullable()) {
+          if (left.get().getType().getNullable() && 
!right.get().getType().getNullable()) {
             return Collections.singletonList(
                 CompatibilityError.create(
                     context.path(), "Can't cast nullable field to non-nullable 
field"));
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java
index 651cb163bfe4..fa49edad8cd4 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java
@@ -149,11 +149,8 @@ static Schema getOutputSchema(Schema inputSchema, 
FieldAccessDescriptor fieldAcc
       FieldType nestedType =
           FieldType.row(getOutputSchema(field.getType().getRowSchema(), 
nestedDescriptor));
 
-      if (field.getNullable()) {
-        builder.addNullableField(field.getName(), nestedType);
-      } else {
-        builder.addField(field.getName(), nestedType);
-      }
+      nestedType = nestedType.withNullable(field.getType().getNullable());
+      builder.addField(field.getName(), nestedType);
     }
     return builder.build();
   }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
index 23256a373b03..7f349c939f1b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
@@ -543,7 +543,7 @@ public Builder withFieldValueGetters(
         Object value = values.get(i);
         Schema.Field field = schema.getField(i);
         if (value == null) {
-          if (!field.getNullable()) {
+          if (!field.getType().getNullable()) {
             throw new IllegalArgumentException(
                 String.format("Field %s is not nullable", field.getName()));
           }
@@ -557,21 +557,11 @@ public Builder withFieldValueGetters(
 
     private Object verify(Object value, FieldType type, String fieldName) {
       if (TypeName.ARRAY.equals(type.getTypeName())) {
-        List<Object> arrayElements =
-            verifyArray(
-                value,
-                type.getCollectionElementType(),
-                type.getCollectionElementTypeNullable(),
-                fieldName);
+        List<Object> arrayElements = verifyArray(value, 
type.getCollectionElementType(), fieldName);
         return arrayElements;
       } else if (TypeName.MAP.equals(type.getTypeName())) {
         Map<Object, Object> mapElements =
-            verifyMap(
-                value,
-                type.getMapKeyType().getTypeName(),
-                type.getMapValueType(),
-                type.getMapValueTypeNullable(),
-                fieldName);
+            verifyMap(value, type.getMapKeyType().getTypeName(), 
type.getMapValueType(), fieldName);
         return mapElements;
       } else if (TypeName.ROW.equals(type.getTypeName())) {
         return verifyRow(value, fieldName);
@@ -581,10 +571,8 @@ private Object verify(Object value, FieldType type, String 
fieldName) {
     }
 
     private List<Object> verifyArray(
-        Object value,
-        FieldType collectionElementType,
-        boolean collectionElementTypeNullable,
-        String fieldName) {
+        Object value, FieldType collectionElementType, String fieldName) {
+      boolean collectionElementTypeNullable = 
collectionElementType.getNullable();
       if (!(value instanceof List)) {
         throw new IllegalArgumentException(
             String.format(
@@ -610,11 +598,8 @@ private Object verify(Object value, FieldType type, String 
fieldName) {
     }
 
     private Map<Object, Object> verifyMap(
-        Object value,
-        TypeName keyTypeName,
-        FieldType valueType,
-        boolean valueTypeNullable,
-        String fieldName) {
+        Object value, TypeName keyTypeName, FieldType valueType, String 
fieldName) {
+      boolean valueTypeNullable = valueType.getNullable();
       if (!(value instanceof Map)) {
         throw new IllegalArgumentException(
             String.format(
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java
index c0ea2026474b..27599c2ccc63 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java
@@ -60,7 +60,7 @@
     Field field = getSchema().getField(fieldIdx);
     FieldType type = field.getType();
     Object fieldValue = getters.get(fieldIdx).get(getterTarget);
-    if (fieldValue == null && !field.getNullable()) {
+    if (fieldValue == null && !field.getType().getNullable()) {
       throw new RuntimeException("Null value set on non-nullable field" + 
field);
     }
     return fieldValue != null ? getValue(type, fieldValue, fieldIdx) : null;
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java
index 0f438b458cfe..125be49ce4cc 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java
@@ -61,8 +61,8 @@
   @Test
   public void testNullable() {
     Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NullableBean.class);
-    assertTrue(schema.getField("str").getNullable());
-    assertFalse(schema.getField("anInt").getNullable());
+    assertTrue(schema.getField("str").getType().getNullable());
+    assertFalse(schema.getField("anInt").getType().getNullable());
   }
 
   @Test
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java
index 51ef8ff02130..1a0946d6f609 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java
@@ -63,8 +63,8 @@
   @Test
   public void testNullables() {
     Schema schema = POJOUtils.schemaFromPojoClass(POJOWithNullables.class);
-    assertTrue(schema.getField("str").getNullable());
-    assertFalse(schema.getField("anInt").getNullable());
+    assertTrue(schema.getField("str").getType().getNullable());
+    assertFalse(schema.getField("anInt").getType().getNullable());
   }
 
   @Test
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java
index 3c779ca6313a..e5d3de8efd2b 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java
@@ -138,7 +138,7 @@ private void unparseColumn(SqlWriter writer, Schema.Field 
column) {
     writer.identifier(column.getName());
     writer.identifier(CalciteUtils.toSqlTypeName(column.getType()).name());
 
-    if (column.getNullable() != null && !column.getNullable()) {
+    if (column.getType().getNullable() != null && 
!column.getType().getNullable()) {
       writer.keyword("NOT NULL");
     }
 
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java
index 325208278ab5..9ddc642dff3c 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java
@@ -92,7 +92,7 @@ public static String beamRow2CsvLine(Row row, CSVFormat 
csvFormat) {
 
   public static Object autoCastField(Schema.Field field, Object rawObj) {
     if (rawObj == null) {
-      if (!field.getNullable()) {
+      if (!field.getType().getNullable()) {
         throw new IllegalArgumentException(String.format("Field %s not 
nullable", field.getName()));
       }
       return null;
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
index 5c22314c171f..cbd067561934 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
@@ -104,7 +104,7 @@ public static SqlTypeName toSqlTypeName(FieldType type) {
       case MAP:
         return SqlTypeName.MAP;
       default:
-        SqlTypeName typeName = BEAM_TO_CALCITE_TYPE_MAPPING.get(type);
+        SqlTypeName typeName = 
BEAM_TO_CALCITE_TYPE_MAPPING.get(type.withNullable(false));
         if (typeName != null) {
           return typeName;
         } else {
@@ -191,7 +191,7 @@ private static RelDataType toRelDataType(
     Schema.Field field = schema.getField(fieldIndex);
     RelDataType type = toRelDataType(dataTypeFactory, field.getType());
 
-    return dataTypeFactory.createTypeWithNullability(type, 
field.getNullable());
+    return dataTypeFactory.createTypeWithNullability(type, 
field.getType().getNullable());
   }
 
   /**
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
index 547fe44b0de8..3185e9174dab 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
@@ -33,6 +33,7 @@
 import 
org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
 import org.apache.beam.sdk.transforms.FlatMapElements;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -95,9 +96,9 @@ public BeamSqlTable buildBeamSqlTable(Table table) {
       case "lines":
         checkArgument(
             schema.getFieldCount() == 1
-                && 
schema.getField(0).getType().equals(Schema.FieldType.STRING),
+                && 
schema.getField(0).getType().getTypeName().equals(TypeName.STRING),
             "Table with type 'text' and format 'lines' "
-                + "must have exactly one STRING/VARCHAR/CHAR column");
+                + "must have exactly one STRING/VARCHAR/CHAR column ");
         return new TextTable(
             schema, filePattern, new LinesReadConverter(), new 
LinesWriteConverter());
       default:
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index db86c210f763..649f8c816e12 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -132,7 +132,7 @@ private static StandardSQLTypeName 
toStandardSQLTypeName(FieldType fieldType) {
         field.setDescription(schemaField.getDescription());
       }
 
-      if (!schemaField.getNullable()) {
+      if (!schemaField.getType().getNullable()) {
         field.setMode(Mode.REQUIRED.toString());
       }
       if (TypeName.ARRAY == type.getTypeName()) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 171282)
    Time Spent: 4h 20m  (was: 4h 10m)

> Allow nested types have null value.
> -----------------------------------
>
>                 Key: BEAM-5884
>                 URL: https://issues.apache.org/jira/browse/BEAM-5884
>             Project: Beam
>          Issue Type: Bug
>          Components: dsl-sql
>            Reporter: Rui Wang
>            Assignee: Rui Wang
>            Priority: Major
>             Fix For: Not applicable
>
>          Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> We could allow arbitrary combination of nested types have null value.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to