This is an automated email from the ASF dual-hosted git repository.
anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e813673 [BEAM-7623] Add support to select MAP with Row as values in
Beam SQL
new c80ca6b Merge pull request #9181 from bmv126/map_with_row_as_value
e813673 is described below
commit e81367329b103b255931d95fe60a5cb6e089b695
Author: B M VISHWAS <[email protected]>
AuthorDate: Mon Jul 29 07:26:14 2019 -0500
[BEAM-7623] Add support to select MAP with Row as values in Beam SQL
---
.../sdk/extensions/sql/impl/rel/BeamCalcRel.java | 8 +++
.../sdk/extensions/sql/BeamComplexTypeTest.java | 79 ++++++++++++++++++++++
2 files changed, 87 insertions(+)
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
index de54625..78c8939 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
@@ -45,6 +45,7 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.Row;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.enumerable.JavaRowFormat;
import org.apache.calcite.adapter.enumerable.PhysType;
@@ -418,6 +419,9 @@ public class BeamCalcRel extends Calc implements
BeamRelNode {
Expressions.equal(field, Expressions.constant(null)),
Expressions.constant(null),
Expressions.call(WrappedList.class, "of", field));
+ } else if (fromType.getTypeName().isMapType()
+ && fromType.getMapValueType().getTypeName().isCompositeType()) {
+ field = nullOr(field, Expressions.call(WrappedList.class,
"ofMapValues", field));
} else if (fromType.getTypeName() == TypeName.BYTES) {
field =
Expressions.condition(
@@ -486,6 +490,10 @@ public class BeamCalcRel extends Calc implements
BeamRelNode {
return new WrappedList(row.getValues());
}
+ public static Map<Object, List> ofMapValues(Map<Object, Row> map) {
+ return Maps.transformValues(map, val -> (val == null) ? null :
WrappedList.of(val));
+ }
+
@Override
public Object get(int index) {
Object obj = list.get(index);
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
index f75e92d..40caf5c 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
@@ -18,6 +18,8 @@
package org.apache.beam.sdk.extensions.sql;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
@@ -430,4 +432,81 @@ public class BeamComplexTypeTest {
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
+
+ @Test
+ public void testMapWithRowAsValue() {
+
+ Schema inputSchema =
+ Schema.builder()
+ .addMapField("mapWithValueAsRow", FieldType.STRING,
FieldType.row(rowWithArraySchema))
+ .build();
+
+ Map<String, Row> mapWithValueAsRow = new HashMap<>();
+ Row complexRow =
+ Row.withSchema(rowWithArraySchema)
+ .addValues("RED", 5L, Arrays.asList(10L, 20L, 30L))
+ .build();
+ mapWithValueAsRow.put("key", complexRow);
+
+ Row rowOfMap =
Row.withSchema(inputSchema).addValue(mapWithValueAsRow).build();
+
+ PCollection<Row> outputRow =
+ pipeline
+ .apply(Create.of(rowOfMap))
+ .setRowSchema(inputSchema)
+ .apply(
+ SqlTransform.query(
+ "select PCOLLECTION.mapWithValueAsRow['key'].field1 as
color, PCOLLECTION.mapWithValueAsRow['key'].field3[2] as num from
PCOLLECTION"));
+
+ Row expectedRow =
+
Row.withSchema(Schema.builder().addStringField("color").addInt64Field("num").build())
+ .addValues("RED", 20L)
+ .build();
+
+ PAssert.that(outputRow).containsInAnyOrder(expectedRow);
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(1));
+ }
+
+ @Test
+ public void testMapWithNullRowFields() {
+
+ Schema nullableInnerSchema =
+ Schema.builder()
+ .addNullableField("strField", FieldType.STRING)
+ .addNullableField("arrField", FieldType.array(FieldType.INT64))
+ .build();
+ Schema inputSchema =
+ Schema.builder()
+ .addMapField("mapField", FieldType.STRING,
FieldType.row(nullableInnerSchema))
+ .addNullableField(
+ "nullableMapField",
+ FieldType.map(FieldType.STRING,
FieldType.row(nullableInnerSchema)))
+ .build();
+
+ Row mapValue = Row.withSchema(nullableInnerSchema).addValues("str",
null).build();
+ Map<String, Row> mapWithValueAsRow = new HashMap<>();
+ mapWithValueAsRow.put("key", mapValue);
+
+ Row inputRow = Row.withSchema(inputSchema).addValues(mapWithValueAsRow,
null).build();
+
+ PCollection<Row> outputRow =
+ pipeline
+ .apply(Create.of(inputRow))
+ .setRowSchema(inputSchema)
+ .apply(
+ SqlTransform.query(
+ "select PCOLLECTION.mapField['key'].strField as str,
PCOLLECTION.mapField['key'].arrField[1] as arr,
PCOLLECTION.nullableMapField['key'].arrField[1] as nullableField from
PCOLLECTION"));
+
+ Row expectedRow =
+ Row.withSchema(
+ Schema.builder()
+ .addStringField("str")
+ .addNullableField("arr", FieldType.INT64)
+ .addNullableField("nullableField", FieldType.INT64)
+ .build())
+ .addValues("str", null, null)
+ .build();
+ PAssert.that(outputRow).containsInAnyOrder(expectedRow);
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(1));
+ }
}