This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5504f32 Merge pull request #11290: [BEAM-9670] Fix nullability
widening in CoGroup key resolution
5504f32 is described below
commit 5504f32a70bd65b5fbc76bb4a270fa1cfe43a4eb
Author: reuvenlax <[email protected]>
AuthorDate: Thu Apr 2 15:12:31 2020 -0700
Merge pull request #11290: [BEAM-9670] Fix nullability widening in CoGroup
key resolution
---
.../org/apache/beam/sdk/schemas/SchemaUtils.java | 101 +++++++++++++++++++
.../beam/sdk/schemas/transforms/CoGroup.java | 5 +-
.../apache/beam/sdk/schemas/SchemaUtilsTest.java | 107 +++++++++++++++++++++
.../beam/sdk/schemas/transforms/CoGroupTest.java | 2 +-
.../sql/impl/rel/BeamSetOperatorRelBase.java | 11 ---
.../sql/zetasql/ZetaSQLDialectSpecTest.java | 50 ++++++++++
6 files changed, 261 insertions(+), 15 deletions(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaUtils.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaUtils.java
new file mode 100644
index 0000000..e2e749a
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaUtils.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+
+/** A set of utility functions for schemas. */
+public class SchemaUtils {
+ /**
+ * Given two schema that have matching types, return a nullable-widened
schema.
+ *
+ * <p>The schemas must have matching types, except for field names which can
differ. The returned
+ * schema will contain the field names in the first schema. All field types
will be nullable if
+ * the corresponding field type is nullable in either of the input schemas.
+ */
+ public static Schema mergeWideningNullable(Schema schema1, Schema schema2) {
+ if (schema1.getFieldCount() != schema2.getFieldCount()) {
+ throw new IllegalArgumentException(
+ "Cannot merge schemas with different numbers of fields. "
+ + "schema1: "
+ + schema1
+ + " schema2: "
+ + schema2);
+ }
+ Schema.Builder builder = Schema.builder();
+ for (int i = 0; i < schema1.getFieldCount(); ++i) {
+ String name = schema1.getField(i).getName();
+ builder.addField(
+ name, widenNullableTypes(schema1.getField(i).getType(),
schema2.getField(i).getType()));
+ }
+ return builder.build();
+ }
+
+ static FieldType widenNullableTypes(FieldType fieldType1, FieldType
fieldType2) {
+ if (fieldType1.getTypeName() != fieldType2.getTypeName()) {
+ throw new IllegalArgumentException(
+ "Cannot merge two types: "
+ + fieldType1.getTypeName()
+ + " and "
+ + fieldType2.getTypeName());
+ }
+
+ FieldType result;
+ switch (fieldType1.getTypeName()) {
+ case ROW:
+ result =
+ FieldType.row(
+ mergeWideningNullable(fieldType1.getRowSchema(),
fieldType2.getRowSchema()));
+ break;
+ case ARRAY:
+ FieldType arrayElementType =
+ widenNullableTypes(
+ fieldType1.getCollectionElementType(),
fieldType2.getCollectionElementType());
+ result = FieldType.array(arrayElementType);
+ break;
+ case ITERABLE:
+ FieldType iterableElementType =
+ widenNullableTypes(
+ fieldType1.getCollectionElementType(),
fieldType2.getCollectionElementType());
+ result = FieldType.iterable(iterableElementType);
+ break;
+ case MAP:
+ FieldType keyType =
+ widenNullableTypes(fieldType1.getMapKeyType(),
fieldType2.getMapKeyType());
+ FieldType valueType =
+ widenNullableTypes(fieldType1.getMapValueType(),
fieldType2.getMapValueType());
+ result = FieldType.map(keyType, valueType);
+ break;
+ case LOGICAL_TYPE:
+ if (!fieldType1
+ .getLogicalType()
+ .getIdentifier()
+ .equals(fieldType2.getLogicalType().getIdentifier())) {
+ throw new IllegalArgumentException(
+ "Logical types don't match and cannot be merged: "
+ + fieldType1.getLogicalType().getIdentifier()
+ + ".v.s"
+ + fieldType2.getLogicalType().getIdentifier());
+ }
+ // fall through
+ default:
+ result = fieldType1;
+ }
+ return result.withNullable(fieldType1.getNullable() ||
fieldType2.getNullable());
+ }
+}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java
index c4366e2..2f6da8d 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaUtils;
import
org.apache.beam.sdk.schemas.transforms.CoGroup.ConvertCoGbkResult.ConvertType;
import org.apache.beam.sdk.schemas.utils.RowSelector;
import org.apache.beam.sdk.schemas.utils.SelectHelpers;
@@ -399,9 +400,7 @@ public class CoGroup {
if (keySchema == null) {
keySchema = currentKeySchema;
} else {
- if (!currentKeySchema.typesEqual(keySchema)) {
- throw new IllegalStateException("All keys must have the same
schema");
- }
+ keySchema = SchemaUtils.mergeWideningNullable(keySchema,
currentKeySchema);
}
// Create a new tag for the output.
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaUtilsTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaUtilsTest.java
new file mode 100644
index 0000000..45c06f6
--- /dev/null
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaUtilsTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.junit.Test;
+
+/** Tests for {@link org.apache.beam.sdk.schemas.SchemaUtils}. */
+public class SchemaUtilsTest {
+ @Test
+ public void testWidenPrimitives() {
+ Schema schema1 =
+ Schema.builder()
+ .addField("field1", FieldType.INT32)
+ .addNullableField("field2", FieldType.STRING)
+ .build();
+ Schema schema2 =
+ Schema.builder()
+ .addNullableField("field3", FieldType.INT32)
+ .addField("field4", FieldType.STRING)
+ .build();
+ Schema expected =
+ Schema.builder()
+ .addNullableField("field1", FieldType.INT32)
+ .addNullableField("field2", FieldType.STRING)
+ .build();
+ assertEquals(expected, SchemaUtils.mergeWideningNullable(schema1,
schema2));
+ }
+
+ @Test
+ public void testWidenNested() {
+ Schema schema1 =
+ Schema.builder()
+ .addField("field1", FieldType.INT32)
+ .addNullableField("field2", FieldType.STRING)
+ .build();
+ Schema schema2 =
+ Schema.builder()
+ .addNullableField("field3", FieldType.INT32)
+ .addField("field4", FieldType.STRING)
+ .build();
+ Schema top1 = Schema.builder().addField("top1",
FieldType.row(schema1)).build();
+ Schema top2 = Schema.builder().addField("top2",
FieldType.row(schema2)).build();
+ Schema expected =
+ Schema.builder()
+ .addNullableField("field1", FieldType.INT32)
+ .addNullableField("field2", FieldType.STRING)
+ .build();
+ Schema expectedTop = Schema.builder().addField("top1",
FieldType.row(expected)).build();
+
+ assertEquals(expectedTop, SchemaUtils.mergeWideningNullable(top1, top2));
+ }
+
+ @Test
+ public void testWidenArray() {
+ Schema schema1 = Schema.builder().addArrayField("field1",
FieldType.INT32).build();
+ Schema schema2 =
+ Schema.builder().addArrayField("field1",
FieldType.INT32.withNullable(true)).build();
+ Schema expected =
+ Schema.builder().addArrayField("field1",
FieldType.INT32.withNullable(true)).build();
+ assertEquals(expected, SchemaUtils.mergeWideningNullable(schema1,
schema2));
+ }
+
+ @Test
+ public void testWidenIterable() {
+ Schema schema1 = Schema.builder().addIterableField("field1",
FieldType.INT32).build();
+ Schema schema2 =
+ Schema.builder().addIterableField("field1",
FieldType.INT32.withNullable(true)).build();
+ Schema expected =
+ Schema.builder().addIterableField("field1",
FieldType.INT32.withNullable(true)).build();
+ assertEquals(expected, SchemaUtils.mergeWideningNullable(schema1,
schema2));
+ }
+
+ @Test
+ public void testWidenMap() {
+ Schema schema1 =
+ Schema.builder().addMapField("field1", FieldType.INT32,
FieldType.INT32).build();
+ Schema schema2 =
+ Schema.builder()
+ .addMapField(
+ "field1", FieldType.INT32.withNullable(true),
FieldType.INT32.withNullable(true))
+ .build();
+ Schema expected =
+ Schema.builder()
+ .addMapField(
+ "field1", FieldType.INT32.withNullable(true),
FieldType.INT32.withNullable(true))
+ .build();
+ assertEquals(expected, SchemaUtils.mergeWideningNullable(schema1,
schema2));
+ }
+}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/CoGroupTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/CoGroupTest.java
index 527a9c6..0008605 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/CoGroupTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/CoGroupTest.java
@@ -475,7 +475,7 @@ public class CoGroupTest {
Create.of(Row.withSchema(CG_SCHEMA_1).addValues("user1", 9,
"us").build()))
.setRowSchema(CG_SCHEMA_1);
- thrown.expect(IllegalStateException.class);
+ thrown.expect(IllegalArgumentException.class);
PCollection<Row> joined =
PCollectionTuple.of("pc1", pc1, "pc2", pc2)
.apply(
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
index a036148..ab76a7b 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
@@ -21,7 +21,6 @@ import static
org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Prec
import java.io.Serializable;
import
org.apache.beam.sdk.extensions.sql.impl.transform.BeamSetOperatorsTransforms;
-import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.CoGroup;
import org.apache.beam.sdk.schemas.transforms.CoGroup.By;
import org.apache.beam.sdk.transforms.PTransform;
@@ -63,16 +62,6 @@ public class BeamSetOperatorRelBase extends
PTransform<PCollectionList<Row>, PCo
inputs);
PCollection<Row> leftRows = inputs.get(0);
PCollection<Row> rightRows = inputs.get(1);
- Schema leftSchema = leftRows.getSchema();
- Schema rightSchema = rightRows.getSchema();
- if (!leftSchema.typesEqual(rightSchema)) {
- throw new IllegalArgumentException(
- "Can't intersect two tables with different schemas."
- + "lhsSchema: "
- + leftSchema
- + " rhsSchema: "
- + rightSchema);
- }
WindowFn leftWindow = leftRows.getWindowingStrategy().getWindowFn();
WindowFn rightWindow = rightRows.getWindowingStrategy().getWindowFn();
diff --git
a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
index 8af8d8d..4695846 100644
---
a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
+++
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
@@ -2271,6 +2271,56 @@ public class ZetaSQLDialectSpecTest {
}
@Test
+ public void testSelectNullIntersectDistinct() {
+ String sql = "SELECT NULL INTERSECT DISTINCT SELECT 2";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+ System.err.println("SCHEMA " + stream.getSchema());
+
+ PAssert.that(stream).empty();
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testSelectNullIntersectAll() {
+ String sql = "SELECT NULL INTERSECT ALL SELECT 2";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+ System.err.println("SCHEMA " + stream.getSchema());
+
+ PAssert.that(stream).empty();
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testSelectNullExceptDistinct() {
+ String sql = "SELECT NULL EXCEPT DISTINCT SELECT 2";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream).containsInAnyOrder(Row.nullRow(stream.getSchema()));
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testSelectNullExceptAll() {
+ String sql = "SELECT NULL EXCEPT ALL SELECT 2";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream).containsInAnyOrder(Row.nullRow(stream.getSchema()));
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
public void testTimestampLiteralWithNonUTCTimeZone() {
String sql = "SELECT TIMESTAMP '2018-12-10 10:38:59-10:00'";