[ 
https://issues.apache.org/jira/browse/BEAM-5884?focusedWorklogId=162764&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162764
 ]

ASF GitHub Bot logged work on BEAM-5884:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Nov/18 22:24
            Start Date: 05/Nov/18 22:24
    Worklog Time Spent: 10m 
      Work Description: kennknowles closed pull request #6861: [BEAM-5884] 
support null values in Array and Map.
URL: https://github.com/apache/beam/pull/6861
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
index 86a0f4653d5..0ca6053dac4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
@@ -354,6 +354,10 @@ public boolean isCompositeType() {
     @Nullable
     public abstract FieldType getCollectionElementType();
 
+    // For container types (e.g. ARRAY), returns nullable of the type of the 
contained element.
+    @Nullable
+    public abstract Boolean getCollectionElementTypeNullable();
+
     // For MAP type, returns the type of the key element, it must be a 
primitive type;
     @Nullable
     public abstract FieldType getMapKeyType();
@@ -362,6 +366,10 @@ public boolean isCompositeType() {
     @Nullable
     public abstract FieldType getMapValueType();
 
+    // For MAP type, returns nullable of the type of the value element, it can 
be a nested type;
+    @Nullable
+    public abstract Boolean getMapValueTypeNullable();
+
     // For ROW types, returns the schema for the row.
     @Nullable
     public abstract Schema getRowSchema();
@@ -383,10 +391,14 @@ public boolean isCompositeType() {
 
       abstract Builder setCollectionElementType(@Nullable FieldType 
collectionElementType);
 
+      abstract Builder setCollectionElementTypeNullable(@Nullable Boolean 
nullable);
+
       abstract Builder setMapKeyType(@Nullable FieldType mapKeyType);
 
       abstract Builder setMapValueType(@Nullable FieldType mapValueType);
 
+      abstract Builder setMapValueTypeNullable(@Nullable Boolean nullable);
+
       abstract Builder setRowSchema(@Nullable Schema rowSchema);
 
       abstract Builder setMetadata(@Nullable byte[] metadata);
@@ -434,7 +446,17 @@ public static FieldType of(TypeName typeName) {
 
     /** Create an array type for the given field type. */
     public static final FieldType array(FieldType elementType) {
-      return 
FieldType.forTypeName(TypeName.ARRAY).setCollectionElementType(elementType).build();
+      return FieldType.forTypeName(TypeName.ARRAY)
+          .setCollectionElementType(elementType)
+          .setCollectionElementTypeNullable(false)
+          .build();
+    }
+
+    public static final FieldType array(FieldType elementType, boolean 
nullable) {
+      return FieldType.forTypeName(TypeName.ARRAY)
+          .setCollectionElementType(elementType)
+          .setCollectionElementTypeNullable(nullable)
+          .build();
     }
 
     /** Create a map type for the given key and value types. */
@@ -442,6 +464,16 @@ public static final FieldType map(FieldType keyType, 
FieldType valueType) {
       return FieldType.forTypeName(TypeName.MAP)
           .setMapKeyType(keyType)
           .setMapValueType(valueType)
+          .setMapValueTypeNullable(false)
+          .build();
+    }
+
+    public static final FieldType map(
+        FieldType keyType, FieldType valueType, boolean valueTypeNullable) {
+      return FieldType.forTypeName(TypeName.MAP)
+          .setMapKeyType(keyType)
+          .setMapValueType(valueType)
+          .setMapValueTypeNullable(valueTypeNullable)
           .build();
     }
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
index b078f742fe9..d03f6d14bcb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
@@ -444,11 +444,21 @@ public Builder withFieldValueGetters(
 
     private Object verify(Object value, FieldType type, String fieldName) {
       if (TypeName.ARRAY.equals(type.getTypeName())) {
-        List<Object> arrayElements = verifyArray(value, 
type.getCollectionElementType(), fieldName);
+        List<Object> arrayElements =
+            verifyArray(
+                value,
+                type.getCollectionElementType(),
+                type.getCollectionElementTypeNullable(),
+                fieldName);
         return arrayElements;
       } else if (TypeName.MAP.equals(type.getTypeName())) {
         Map<Object, Object> mapElements =
-            verifyMap(value, type.getMapKeyType().getTypeName(), 
type.getMapValueType(), fieldName);
+            verifyMap(
+                value,
+                type.getMapKeyType().getTypeName(),
+                type.getMapValueType(),
+                type.getMapValueTypeNullable(),
+                fieldName);
         return mapElements;
       } else if (TypeName.ROW.equals(type.getTypeName())) {
         return verifyRow(value, fieldName);
@@ -458,7 +468,10 @@ private Object verify(Object value, FieldType type, String 
fieldName) {
     }
 
     private List<Object> verifyArray(
-        Object value, FieldType collectionElementType, String fieldName) {
+        Object value,
+        FieldType collectionElementType,
+        boolean collectionElementTypeNullable,
+        String fieldName) {
       if (!(value instanceof List)) {
         throw new IllegalArgumentException(
             String.format(
@@ -469,13 +482,26 @@ private Object verify(Object value, FieldType type, 
String fieldName) {
       List<Object> valueList = (List<Object>) value;
       List<Object> verifiedList = 
Lists.newArrayListWithCapacity(valueList.size());
       for (Object listValue : valueList) {
-        verifiedList.add(verify(listValue, collectionElementType, fieldName));
+        if (listValue == null) {
+          if (!collectionElementTypeNullable) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "%s is not nullable in Array field %s", 
collectionElementType, fieldName));
+          }
+          verifiedList.add(null);
+        } else {
+          verifiedList.add(verify(listValue, collectionElementType, 
fieldName));
+        }
       }
       return verifiedList;
     }
 
     private Map<Object, Object> verifyMap(
-        Object value, TypeName keyTypeName, FieldType valueType, String 
fieldName) {
+        Object value,
+        TypeName keyTypeName,
+        FieldType valueType,
+        boolean valueTypeNullable,
+        String fieldName) {
       if (!(value instanceof Map)) {
         throw new IllegalArgumentException(
             String.format(
@@ -486,9 +512,17 @@ private Object verify(Object value, FieldType type, String 
fieldName) {
       Map<Object, Object> valueMap = (Map<Object, Object>) value;
       Map<Object, Object> verifiedMap = 
Maps.newHashMapWithExpectedSize(valueMap.size());
       for (Entry<Object, Object> kv : valueMap.entrySet()) {
-        verifiedMap.put(
-            verifyPrimitiveType(kv.getKey(), keyTypeName, fieldName),
-            verify(kv.getValue(), valueType, fieldName));
+        if (kv.getValue() == null) {
+          if (!valueTypeNullable) {
+            throw new IllegalArgumentException(
+                String.format("%s is not nullable in Map field %s", valueType, 
fieldName));
+          }
+          verifiedMap.put(verifyPrimitiveType(kv.getKey(), keyTypeName, 
fieldName), null);
+        } else {
+          verifiedMap.put(
+              verifyPrimitiveType(kv.getKey(), keyTypeName, fieldName),
+              verify(kv.getValue(), valueType, fieldName));
+        }
       }
       return verifiedMap;
     }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java
index 5d3a50f91f1..7fe55f5fc99 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java
@@ -28,6 +28,7 @@
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Stream;
@@ -58,7 +59,10 @@ public void testCreatesNullRecord() {
                 Schema.Field.of("f_double", 
FieldType.DOUBLE).withNullable(true),
                 Schema.Field.of("f_string", 
FieldType.STRING).withNullable(true),
                 Schema.Field.of("f_datetime", 
FieldType.DATETIME).withNullable(true),
-                Schema.Field.of("f_boolean", 
FieldType.BOOLEAN).withNullable(true))
+                Schema.Field.of("f_boolean", 
FieldType.BOOLEAN).withNullable(true),
+                Schema.Field.of("f_array", 
FieldType.array(FieldType.DATETIME)).withNullable(true),
+                Schema.Field.of("f_map", FieldType.map(FieldType.INT32, 
FieldType.DOUBLE))
+                    .withNullable(true))
             .collect(toSchema());
 
     Row row = Row.nullRow(type);
@@ -83,6 +87,10 @@ public void testCreatesNullRecord() {
     assertNull(row.getDateTime(8));
     assertNull(row.getBoolean("f_boolean"));
     assertNull(row.getBoolean(9));
+    assertNull(row.getBoolean("f_array"));
+    assertNull(row.getBoolean(10));
+    assertNull(row.getBoolean("f_map"));
+    assertNull(row.getBoolean(11));
   }
 
   @Test
@@ -164,6 +172,16 @@ public void testCreatesArray() {
     assertEquals(data, row.getArray("array"));
   }
 
+  @Test
+  public void testCreatesArrayWithNullElement() {
+    List<Integer> data = Lists.newArrayList(2, null, 5, null);
+    Schema type =
+        Stream.of(Schema.Field.of("array", 
Schema.FieldType.array(Schema.FieldType.INT32, true)))
+            .collect(toSchema());
+    Row row = Row.withSchema(type).addArray(data).build();
+    assertEquals(data, row.getArray("array"));
+  }
+
   @Test
   public void testCreatesRowArray() {
     Schema nestedType = Stream.of(Schema.Field.of("f1_str", 
FieldType.STRING)).collect(toSchema());
@@ -190,6 +208,19 @@ public void testCreatesArrayArray() {
     assertEquals(data, row.getArray("array"));
   }
 
+  @Test
+  public void testCreatesArrayArrayWithNullElement() {
+    List<List<Integer>> data =
+        Lists.<List<Integer>>newArrayList(Lists.newArrayList(1, null, 3, 
null), null);
+    Schema type =
+        Stream.of(
+                Schema.Field.of(
+                    "array", FieldType.array(FieldType.array(FieldType.INT32, 
true), true)))
+            .collect(toSchema());
+    Row row = Row.withSchema(type).addArray(data).build();
+    assertEquals(data, row.getArray("array"));
+  }
+
   @Test
   public void testCreatesArrayOfMap() {
     List<Map<Integer, String>> data =
@@ -222,6 +253,20 @@ public void testCreateMapWithPrimitiveValue() {
     assertEquals(data, row.getMap("map"));
   }
 
+  @Test
+  public void testCreateMapWithNullValue() {
+    Map<Integer, String> data = new HashMap();
+    data.put(1, "value1");
+    data.put(2, "value2");
+    data.put(3, null);
+    data.put(4, null);
+    Schema type =
+        Stream.of(Schema.Field.of("map", FieldType.map(FieldType.INT32, 
FieldType.STRING, true)))
+            .collect(toSchema());
+    Row row = Row.withSchema(type).addValue(data).build();
+    assertEquals(data, row.getMap("map"));
+  }
+
   @Test
   public void testCreateMapWithArrayValue() {
     Map<Integer, List<String>> data =
@@ -256,6 +301,30 @@ public void testCreateMapWithMapValue() {
     assertEquals(data, row.getMap("map"));
   }
 
+  @Test
+  public void testCreateMapWithMapValueWithNull() {
+    Map<Integer, Map<Integer, String>> data = new HashMap();
+    Map<Integer, String> innerData = new HashMap();
+    innerData.put(11, null);
+    innerData.put(12, "value3");
+    data.put(1, ImmutableMap.of(1, "value1"));
+    data.put(2, ImmutableMap.of(2, "value2"));
+    data.put(3, null);
+    data.put(4, innerData);
+
+    Schema type =
+        Stream.of(
+                Schema.Field.of(
+                    "map",
+                    FieldType.map(
+                        FieldType.INT32,
+                        FieldType.map(FieldType.INT32, FieldType.STRING, true),
+                        true)))
+            .collect(toSchema());
+    Row row = Row.withSchema(type).addValue(data).build();
+    assertEquals(data, row.getMap("map"));
+  }
+
   @Test
   public void testCreateMapWithRowValue() {
     Schema nestedType = Stream.of(Schema.Field.of("f1_str", 
FieldType.STRING)).collect(toSchema());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 162764)
    Time Spent: 4h  (was: 3h 50m)

> Allow nested types have null value.
> -----------------------------------
>
>                 Key: BEAM-5884
>                 URL: https://issues.apache.org/jira/browse/BEAM-5884
>             Project: Beam
>          Issue Type: Bug
>          Components: dsl-sql
>            Reporter: Rui Wang
>            Assignee: Rui Wang
>            Priority: Major
>          Time Spent: 4h
>  Remaining Estimate: 0h
>
> We could allow arbitrary combination of nested types have null value.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to