This is an automated email from the ASF dual-hosted git repository.

alexvanboxel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 34d7c85  [BEAM-9394] DynamicMessage handling of empty map violates 
schema nullability
     new 777a401  Merge pull request #10984 from [BEAM-9394] DynamicMessage 
handling of empty map violates nullability
34d7c85 is described below

commit 34d7c85d7c1cb894b620cd7bb903152a5d43dbe7
Author: Alex Van Boxel <[email protected]>
AuthorDate: Thu Feb 27 08:40:18 2020 +0100

    [BEAM-9394] DynamicMessage handling of empty map violates schema nullability
    
    Fixed the handling of empty maps. It runned NULL, but should return and 
emtpy
    map in the Row. Added tests for Maps and Array. Only Map had the incorrect
    behaviour.
---
 .../protobuf/ProtoDynamicMessageSchema.java        |  4 +--
 .../protobuf/ProtoDynamicMessageSchemaTest.java    | 35 ++++++++++++++++++++++
 .../protobuf/ProtoMessageSchemaTest.java           | 32 ++++++++++++++++++++
 .../sdk/extensions/protobuf/TestProtoSchemas.java  | 34 +++++++++++++++++++++
 4 files changed, 103 insertions(+), 2 deletions(-)

diff --git 
a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoDynamicMessageSchema.java
 
b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoDynamicMessageSchema.java
index 93bf002..ec8537c 100644
--- 
a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoDynamicMessageSchema.java
+++ 
b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoDynamicMessageSchema.java
@@ -561,10 +561,10 @@ public class ProtoDynamicMessageSchema<T> implements 
Serializable {
     @Override
     Map getFromProtoMessage(Message message) {
       List<Message> list = (List<Message>) 
message.getField(getFieldDescriptor(message));
+      Map<Object, Object> rowMap = new HashMap<>();
       if (list.size() == 0) {
-        return null;
+        return rowMap;
       }
-      Map<Object, Object> rowMap = new HashMap<>();
       list.forEach(
           entryMessage -> {
             Descriptors.Descriptor entryDescriptor = 
entryMessage.getDescriptorForType();
diff --git 
a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoDynamicMessageSchemaTest.java
 
b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoDynamicMessageSchemaTest.java
index 14bd131..0b7984d 100644
--- 
a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoDynamicMessageSchemaTest.java
+++ 
b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoDynamicMessageSchemaTest.java
@@ -24,6 +24,10 @@ import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.MAP_PRIMI
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_PROTO;
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_ROW;
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_SCHEMA;
+import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_MAP_PRIMITIVE_PROTO;
+import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_MAP_PRIMITIVE_ROW;
+import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_REPEATED_PROTO;
+import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_REPEATED_ROW;
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.ONEOF_PROTO_BOOL;
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.ONEOF_PROTO_INT32;
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.ONEOF_PROTO_PRIMITIVE;
@@ -126,6 +130,22 @@ public class ProtoDynamicMessageSchemaTest {
     assertEquals(REPEATED_PROTO.toString(), 
fromRow.apply(REPEATED_ROW).toString());
   }
 
+  @Test
+  public void testNullRepeatedProtoToRow() throws 
InvalidProtocolBufferException {
+    ProtoDynamicMessageSchema schemaProvider =
+        schemaFromDescriptor(RepeatPrimitive.getDescriptor());
+    SerializableFunction<DynamicMessage, Row> toRow = 
schemaProvider.getToRowFunction();
+    assertEquals(NULL_REPEATED_ROW, 
toRow.apply(toDynamic(NULL_REPEATED_PROTO)));
+  }
+
+  @Test
+  public void testNullRepeatedRowToProto() {
+    ProtoDynamicMessageSchema schemaProvider =
+        schemaFromDescriptor(RepeatPrimitive.getDescriptor());
+    SerializableFunction<Row, DynamicMessage> fromRow = 
schemaProvider.getFromRowFunction();
+    assertEquals(NULL_REPEATED_PROTO.toString(), 
fromRow.apply(NULL_REPEATED_ROW).toString());
+  }
+
   // Test map type
   @Test
   public void testMapSchema() {
@@ -149,6 +169,21 @@ public class ProtoDynamicMessageSchemaTest {
   }
 
   @Test
+  public void testNullMapProtoToRow() throws InvalidProtocolBufferException {
+    ProtoDynamicMessageSchema schemaProvider = 
schemaFromDescriptor(MapPrimitive.getDescriptor());
+    SerializableFunction<DynamicMessage, Row> toRow = 
schemaProvider.getToRowFunction();
+    assertEquals(NULL_MAP_PRIMITIVE_ROW, 
toRow.apply(toDynamic(NULL_MAP_PRIMITIVE_PROTO)));
+  }
+
+  @Test
+  public void testNullMapRowToProto() {
+    ProtoDynamicMessageSchema schemaProvider = 
schemaFromDescriptor(MapPrimitive.getDescriptor());
+    SerializableFunction<Row, DynamicMessage> fromRow = 
schemaProvider.getFromRowFunction();
+    assertEquals(
+        NULL_MAP_PRIMITIVE_PROTO.toString(), 
fromRow.apply(NULL_MAP_PRIMITIVE_ROW).toString());
+  }
+
+  @Test
   public void testNestedSchema() {
     ProtoDynamicMessageSchema schemaProvider = 
schemaFromDescriptor(Nested.getDescriptor());
     Schema schema = schemaProvider.getSchema();
diff --git 
a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchemaTest.java
 
b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchemaTest.java
index b00333f..c8b182c 100644
--- 
a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchemaTest.java
+++ 
b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchemaTest.java
@@ -24,6 +24,10 @@ import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.MAP_PRIMI
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_PROTO;
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_ROW;
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_SCHEMA;
+import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_MAP_PRIMITIVE_PROTO;
+import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_MAP_PRIMITIVE_ROW;
+import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_REPEATED_PROTO;
+import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_REPEATED_ROW;
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.ONEOF_PROTO_BOOL;
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.ONEOF_PROTO_INT32;
 import static 
org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.ONEOF_PROTO_PRIMITIVE;
@@ -158,6 +162,20 @@ public class ProtoMessageSchemaTest {
     assertEquals(REPEATED_PROTO, fromRow.apply(REPEATED_ROW));
   }
 
+  @Test
+  public void testNullRepeatedProtoToRow() {
+    SerializableFunction<RepeatPrimitive, Row> toRow =
+        new 
ProtoMessageSchema().toRowFunction(TypeDescriptor.of(RepeatPrimitive.class));
+    assertEquals(NULL_REPEATED_ROW, toRow.apply(NULL_REPEATED_PROTO));
+  }
+
+  @Test
+  public void testNullRepeatedRowToProto() {
+    SerializableFunction<Row, RepeatPrimitive> fromRow =
+        new 
ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(RepeatPrimitive.class));
+    assertEquals(NULL_REPEATED_PROTO, fromRow.apply(NULL_REPEATED_ROW));
+  }
+
   // Test map type
   @Test
   public void testMapSchema() {
@@ -180,6 +198,20 @@ public class ProtoMessageSchemaTest {
   }
 
   @Test
+  public void testNullMapProtoToRow() {
+    SerializableFunction<MapPrimitive, Row> toRow =
+        new 
ProtoMessageSchema().toRowFunction(TypeDescriptor.of(MapPrimitive.class));
+    assertEquals(NULL_MAP_PRIMITIVE_ROW, 
toRow.apply(NULL_MAP_PRIMITIVE_PROTO));
+  }
+
+  @Test
+  public void testNullMapRowToProto() {
+    SerializableFunction<Row, MapPrimitive> fromRow =
+        new 
ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(MapPrimitive.class));
+    assertEquals(NULL_MAP_PRIMITIVE_PROTO, 
fromRow.apply(NULL_MAP_PRIMITIVE_ROW));
+  }
+
+  @Test
   public void testNestedSchema() {
     Schema schema = new 
ProtoMessageSchema().schemaFor(TypeDescriptor.of(Nested.class));
     assertEquals(NESTED_SCHEMA, schema);
diff --git 
a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java
 
b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java
index 862d637..6367659 100644
--- 
a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java
+++ 
b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/TestProtoSchemas.java
@@ -237,6 +237,28 @@ class TestProtoSchemas {
               ImmutableList.of(ByteString.copyFrom(BYTE_ARRAY), 
ByteString.copyFrom(BYTE_ARRAY)))
           .build();
 
+  static final Row NULL_REPEATED_ROW =
+      Row.withSchema(REPEATED_SCHEMA)
+          .addArray()
+          .addArray()
+          .addArray()
+          .addArray()
+          .addArray()
+          .addArray()
+          .addArray()
+          .addArray()
+          .addArray()
+          .addArray()
+          .addArray()
+          .addArray()
+          .addArray()
+          .addArray()
+          .addArray()
+          .build();
+
+  // A sample instance of the proto.
+  static final RepeatPrimitive NULL_REPEATED_PROTO = 
RepeatPrimitive.newBuilder().build();
+
   // The schema for the MapPrimitive proto.
   static final Schema MAP_PRIMITIVE_SCHEMA =
       Schema.builder()
@@ -274,6 +296,18 @@ class TestProtoSchemas {
                   "k1", ByteString.copyFrom(BYTE_ARRAY), "k2", 
ByteString.copyFrom(BYTE_ARRAY)))
           .build();
 
+  // A sample instance of the row.
+  static final Row NULL_MAP_PRIMITIVE_ROW =
+      Row.withSchema(MAP_PRIMITIVE_SCHEMA)
+          .addValue(ImmutableMap.of())
+          .addValue(ImmutableMap.of())
+          .addValue(ImmutableMap.of())
+          .addValue(ImmutableMap.of())
+          .build();
+
+  // A sample instance of the proto.
+  static final MapPrimitive NULL_MAP_PRIMITIVE_PROTO = 
MapPrimitive.newBuilder().build();
+
   // The schema for the Nested proto.
   static final Schema NESTED_SCHEMA =
       Schema.builder()

Reply via email to