This is an automated email from the ASF dual-hosted git repository.

anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e813673  [BEAM-7623] Add support to select MAP with Row as values in 
Beam SQL
     new c80ca6b  Merge pull request #9181 from bmv126/map_with_row_as_value
e813673 is described below

commit e81367329b103b255931d95fe60a5cb6e089b695
Author: B M VISHWAS <[email protected]>
AuthorDate: Mon Jul 29 07:26:14 2019 -0500

    [BEAM-7623] Add support to select MAP with Row as values in Beam SQL
---
 .../sdk/extensions/sql/impl/rel/BeamCalcRel.java   |  8 +++
 .../sdk/extensions/sql/BeamComplexTypeTest.java    | 79 ++++++++++++++++++++++
 2 files changed, 87 insertions(+)

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
index de54625..78c8939 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
@@ -45,6 +45,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.Row;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 import org.apache.calcite.DataContext;
 import org.apache.calcite.adapter.enumerable.JavaRowFormat;
 import org.apache.calcite.adapter.enumerable.PhysType;
@@ -418,6 +419,9 @@ public class BeamCalcRel extends Calc implements 
BeamRelNode {
                 Expressions.equal(field, Expressions.constant(null)),
                 Expressions.constant(null),
                 Expressions.call(WrappedList.class, "of", field));
+      } else if (fromType.getTypeName().isMapType()
+          && fromType.getMapValueType().getTypeName().isCompositeType()) {
+        field = nullOr(field, Expressions.call(WrappedList.class, 
"ofMapValues", field));
       } else if (fromType.getTypeName() == TypeName.BYTES) {
         field =
             Expressions.condition(
@@ -486,6 +490,10 @@ public class BeamCalcRel extends Calc implements 
BeamRelNode {
       return new WrappedList(row.getValues());
     }
 
+    public static Map<Object, List> ofMapValues(Map<Object, Row> map) {
+      return Maps.transformValues(map, val -> (val == null) ? null : 
WrappedList.of(val));
+    }
+
     @Override
     public Object get(int index) {
       Object obj = list.get(index);
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
index f75e92d..40caf5c 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
@@ -18,6 +18,8 @@
 package org.apache.beam.sdk.extensions.sql;
 
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
 import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
@@ -430,4 +432,81 @@ public class BeamComplexTypeTest {
 
     pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
   }
+
+  @Test
+  public void testMapWithRowAsValue() {
+
+    Schema inputSchema =
+        Schema.builder()
+            .addMapField("mapWithValueAsRow", FieldType.STRING, 
FieldType.row(rowWithArraySchema))
+            .build();
+
+    Map<String, Row> mapWithValueAsRow = new HashMap<>();
+    Row complexRow =
+        Row.withSchema(rowWithArraySchema)
+            .addValues("RED", 5L, Arrays.asList(10L, 20L, 30L))
+            .build();
+    mapWithValueAsRow.put("key", complexRow);
+
+    Row rowOfMap = 
Row.withSchema(inputSchema).addValue(mapWithValueAsRow).build();
+
+    PCollection<Row> outputRow =
+        pipeline
+            .apply(Create.of(rowOfMap))
+            .setRowSchema(inputSchema)
+            .apply(
+                SqlTransform.query(
+                    "select  PCOLLECTION.mapWithValueAsRow['key'].field1 as 
color, PCOLLECTION.mapWithValueAsRow['key'].field3[2]  as num   from 
PCOLLECTION"));
+
+    Row expectedRow =
+        
Row.withSchema(Schema.builder().addStringField("color").addInt64Field("num").build())
+            .addValues("RED", 20L)
+            .build();
+
+    PAssert.that(outputRow).containsInAnyOrder(expectedRow);
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(1));
+  }
+
+  @Test
+  public void testMapWithNullRowFields() {
+
+    Schema nullableInnerSchema =
+        Schema.builder()
+            .addNullableField("strField", FieldType.STRING)
+            .addNullableField("arrField", FieldType.array(FieldType.INT64))
+            .build();
+    Schema inputSchema =
+        Schema.builder()
+            .addMapField("mapField", FieldType.STRING, 
FieldType.row(nullableInnerSchema))
+            .addNullableField(
+                "nullableMapField",
+                FieldType.map(FieldType.STRING, 
FieldType.row(nullableInnerSchema)))
+            .build();
+
+    Row mapValue = Row.withSchema(nullableInnerSchema).addValues("str", 
null).build();
+    Map<String, Row> mapWithValueAsRow = new HashMap<>();
+    mapWithValueAsRow.put("key", mapValue);
+
+    Row inputRow = Row.withSchema(inputSchema).addValues(mapWithValueAsRow, 
null).build();
+
+    PCollection<Row> outputRow =
+        pipeline
+            .apply(Create.of(inputRow))
+            .setRowSchema(inputSchema)
+            .apply(
+                SqlTransform.query(
+                    "select PCOLLECTION.mapField['key'].strField as str, 
PCOLLECTION.mapField['key'].arrField[1] as arr, 
PCOLLECTION.nullableMapField['key'].arrField[1] as nullableField  from 
PCOLLECTION"));
+
+    Row expectedRow =
+        Row.withSchema(
+                Schema.builder()
+                    .addStringField("str")
+                    .addNullableField("arr", FieldType.INT64)
+                    .addNullableField("nullableField", FieldType.INT64)
+                    .build())
+            .addValues("str", null, null)
+            .build();
+    PAssert.that(outputRow).containsInAnyOrder(expectedRow);
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(1));
+  }
 }

Reply via email to