[ https://issues.apache.org/jira/browse/BEAM-3785?focusedWorklogId=80706&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80706 ]
ASF GitHub Bot logged work on BEAM-3785: ---------------------------------------- Author: ASF GitHub Bot Created on: 15/Mar/18 04:20 Start Date: 15/Mar/18 04:20 Worklog Time Spent: 10m Work Description: XuMingmin closed pull request #4857: [BEAM-3785][SQL] Add support for arrays of rows URL: https://github.com/apache/beam/pull/4857 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlType.java index 77eda6a84de..fc16c848db2 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlType.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlType.java @@ -102,9 +102,18 @@ public Builder withTimestampField(String fieldName) { return withField(fieldName, SqlTypeCoders.TIMESTAMP); } + /** + * Adds an ARRAY field with elements of {@code elementCoder}. + */ public Builder withArrayField(String fieldName, SqlTypeCoder elementCoder) { return withField(fieldName, SqlTypeCoders.arrayOf(elementCoder)); + } + /** + * Adds an ARRAY field with elements of {@code rowType}. + */ + public Builder withArrayField(String fieldName, RowType rowType) { + return withField(fieldName, SqlTypeCoders.arrayOf(rowType)); } public Builder withRowField(String fieldName, RowType rowType) { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoder.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoder.java index 5b6e104a3fd..c2a6a3ac081 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoder.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoder.java @@ -65,10 +65,6 @@ public int hashCode() { return this.getClass().hashCode(); } - public static boolean isArray(SqlTypeCoder sqlTypeCoder) { - return sqlTypeCoder instanceof SqlArrayCoder; - } - static class SqlTinyIntCoder extends SqlTypeCoder { @Override diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoders.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoders.java index 9eea2df1a99..d9a132ca623 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoders.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoders.java @@ -58,10 +58,6 @@ public static final SqlTypeCoder DATE = new SqlDateCoder(); public static final SqlTypeCoder TIMESTAMP = new SqlTimestampCoder(); - public static SqlTypeCoder arrayOf(SqlTypeCoder elementCoder) { - return SqlArrayCoder.of(elementCoder); - } - public static final Set<SqlTypeCoder> NUMERIC_TYPES = ImmutableSet.of( SqlTypeCoders.TINYINT, @@ -72,6 +68,18 @@ public static SqlTypeCoder arrayOf(SqlTypeCoder elementCoder) { SqlTypeCoders.DOUBLE, SqlTypeCoders.DECIMAL); + public static SqlTypeCoder arrayOf(SqlTypeCoder elementCoder) { + return SqlArrayCoder.of(elementCoder); + } + + public static SqlTypeCoder arrayOf(RowType rowType) { + return SqlArrayCoder.of(rowOf(rowType)); + } + + public static boolean isArray(SqlTypeCoder sqlTypeCoder) { + return sqlTypeCoder instanceof SqlArrayCoder; + } + public static boolean isRow(SqlTypeCoder sqlTypeCoder) { return sqlTypeCoder instanceof SqlRowCoder; } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java index e81b9278842..7bd04f2b1f5 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java @@ -169,8 +169,7 @@ public BeamCalciteTable(RowType beamRowType) { @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) { - return CalciteUtils.toCalciteRowType(this.beamRowType) - .apply(BeamQueryPlanner.TYPE_FACTORY); + return CalciteUtils.toCalciteRowType(this.beamRowType, BeamQueryPlanner.TYPE_FACTORY); } @Override diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java index 5a47aa460ea..3e7089a71fd 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java @@ -202,10 +202,14 @@ static BeamSqlExpression buildExpression(RexNode rexNode) { ret = new BeamSqlInputRefExpression(node.getType().getSqlTypeName(), node.getIndex()); } else if (rexNode instanceof RexFieldAccess) { RexFieldAccess fieldAccessNode = (RexFieldAccess) rexNode; - int rowFieldIndex = ((RexInputRef) fieldAccessNode.getReferenceExpr()).getIndex(); + BeamSqlExpression referenceExpression = buildExpression(fieldAccessNode.getReferenceExpr()); int nestedFieldIndex = fieldAccessNode.getField().getIndex(); SqlTypeName nestedFieldType = fieldAccessNode.getField().getType().getSqlTypeName(); - ret = new BeamSqlFieldAccessExpression(rowFieldIndex, nestedFieldIndex, nestedFieldType); + + ret = new BeamSqlFieldAccessExpression( + referenceExpression, + nestedFieldIndex, + nestedFieldType); } else if (rexNode instanceof RexCall) { RexCall node = (RexCall) rexNode; String opName = node.op.getName(); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java index 544734a91b0..2efbc7fc71d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java @@ -145,8 +145,11 @@ public boolean accept() { return true; case ARRAY: return value instanceof List; + case ROW: + return value instanceof Row; default: - throw new UnsupportedOperationException(outputType.name()); + throw new UnsupportedOperationException( + "Unsupported Beam SQL type in expression: " + outputType.name()); } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpression.java index 478b4e374e6..50270864145 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpression.java @@ -15,9 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.row; import java.util.Collections; +import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -29,16 +31,16 @@ */ public class BeamSqlFieldAccessExpression extends BeamSqlExpression { - private int rowFieldIndex; + private BeamSqlExpression referenceExpression; private int nestedFieldIndex; public BeamSqlFieldAccessExpression( - int rowFieldIndex, + BeamSqlExpression referenceExpression, int nestedFieldIndex, SqlTypeName nestedFieldType) { super(Collections.emptyList(), nestedFieldType); - this.rowFieldIndex = rowFieldIndex; + this.referenceExpression = referenceExpression; this.nestedFieldIndex = nestedFieldIndex; } @@ -49,7 +51,22 @@ public boolean accept() { @Override public BeamSqlPrimitive evaluate(Row inputRow, BoundedWindow window) { - Row nestedRow = inputRow.getValue(rowFieldIndex); - return BeamSqlPrimitive.of(outputType, nestedRow.getValue(nestedFieldIndex)); + BeamSqlPrimitive targetObject = referenceExpression.evaluate(inputRow, window); + SqlTypeName targetFieldType = targetObject.getOutputType(); + + Object targetFieldValue; + + if (SqlTypeName.ARRAY.equals(targetFieldType)) { + targetFieldValue = ((List) targetObject.getValue()).get(nestedFieldIndex); + } else if (SqlTypeName.ROW.equals(targetFieldType)) { + targetFieldValue = ((Row) targetObject.getValue()).getValue(nestedFieldIndex); + } else { + throw new IllegalArgumentException( + "Attempt to access field of unsupported type " + + targetFieldType.getClass().getSimpleName() + + ". Field access operator is only supported for arrays or rows"); + } + + return BeamSqlPrimitive.of(outputType, targetFieldValue); } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java index eccbed83956..d8e93f9c66f 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.beam.sdk.extensions.sql.impl.utils; import static org.apache.beam.sdk.values.RowType.toRowType; @@ -30,7 +31,6 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.calcite.rel.type.RelProtoDataType; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -68,7 +68,7 @@ * for supported Beam SQL type coder, see {@link SqlTypeCoder}. */ public static SqlTypeName toCalciteType(SqlTypeCoder coder) { - if (SqlTypeCoder.isArray(coder)) { + if (SqlTypeCoders.isArray(coder)) { return SqlTypeName.ARRAY; } @@ -135,19 +135,16 @@ public static RowType toBeamRowType(RelDataType tableInfo) { /** * Create an instance of {@code RelDataType} so it can be used to create a table. */ - public static RelProtoDataType toCalciteRowType(final RowType rowType) { - return dataTypeFactory -> { - RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(dataTypeFactory); - - IntStream - .range(0, rowType.getFieldCount()) - .forEach(idx -> - builder.add( - rowType.getFieldName(idx), - toRelDataType(dataTypeFactory, rowType, idx))); - - return builder.build(); - }; + public static RelDataType toCalciteRowType(RowType rowType, RelDataTypeFactory dataTypeFactory) { + RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(dataTypeFactory); + + IntStream + .range(0, rowType.getFieldCount()) + .forEach(idx -> + builder.add( + rowType.getFieldName(idx), + toRelDataType(dataTypeFactory, rowType, idx))); + return builder.build(); } private static RelDataType toRelDataType( @@ -163,7 +160,7 @@ private static RelDataType toRelDataType( } if (SqlTypeName.ROW.equals(typeName)) { - return createRowRelType(dataTypeFactory, (SqlRowCoder) fieldCoder); + return toCalciteRowType(((SqlRowCoder) fieldCoder).getRowType(), dataTypeFactory); } return dataTypeFactory.createSqlType(typeName); @@ -172,18 +169,18 @@ private static RelDataType toRelDataType( private static RelDataType createArrayRelType( RelDataTypeFactory dataTypeFactory, SqlArrayCoder arrayFieldCoder) { - SqlTypeName elementType = toCalciteType(arrayFieldCoder.getElementCoder()); - return - dataTypeFactory - .createArrayType( - dataTypeFactory.createSqlType(elementType), UNLIMITED_ARRAY_SIZE); - } - private static RelDataType createRowRelType( - RelDataTypeFactory dataTypeFactory, - SqlRowCoder rowFieldCoder) { + SqlTypeName elementTypeName = toCalciteType(arrayFieldCoder.getElementCoder()); + + RelDataType elementType; + + if (SqlTypeName.ROW.equals(elementTypeName)) { + RowType rowType = ((SqlRowCoder) arrayFieldCoder.getElementCoder()).getRowType(); + elementType = toCalciteRowType(rowType, dataTypeFactory); + } else { + elementType = dataTypeFactory.createSqlType(elementTypeName); + } - RelProtoDataType relProtoDataType = toCalciteRowType(rowFieldCoder.getRowType()); - return relProtoDataType.apply(dataTypeFactory); + return dataTypeFactory.createArrayType(elementType, UNLIMITED_ARRAY_SIZE); } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlArrayTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlArrayTest.java index 295ec5f83e1..8553c7ce6b7 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlArrayTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlArrayTest.java @@ -206,6 +206,127 @@ public void testCardinality() { pipeline.run(); } + @Test + public void testSelectRowsFromArrayOfRows() { + RowType elementRowType = + RowSqlType + .builder() + .withVarcharField("f_rowString") + .withIntegerField("f_rowInt") + .build(); + + RowType resultRowType = + RowSqlType + .builder() + .withArrayField("f_resultArray", elementRowType) + .build(); + + RowType inputType = + RowSqlType + .builder() + .withIntegerField("f_int") + .withArrayField("f_arrayOfRows", elementRowType) + .build(); + + PCollection<Row> input = + PBegin.in(pipeline) + .apply( + Create.of( + Row.withRowType(inputType) + .addValues( + 1, + Arrays.asList( + Row.withRowType(elementRowType).addValues("AA", 11).build(), + Row.withRowType(elementRowType).addValues("BB", 22).build())) + .build(), + Row.withRowType(inputType) + .addValues( + 2, + Arrays.asList( + Row.withRowType(elementRowType).addValues("CC", 33).build(), + Row.withRowType(elementRowType).addValues("DD", 44).build())) + .build()) + .withCoder(inputType.getRowCoder())); + + PCollection<Row> result = + input + .apply( + BeamSql.query( + "SELECT f_arrayOfRows FROM PCOLLECTION")) + .setCoder(resultRowType.getRowCoder()); + + PAssert.that(result) + .containsInAnyOrder( + Row.withRowType(resultRowType) + .addArray( + Arrays.asList( + Row.withRowType(elementRowType).addValues("AA", 11).build(), + Row.withRowType(elementRowType).addValues("BB", 22).build())) + .build(), + Row.withRowType(resultRowType) + .addArray( + Arrays.asList( + Row.withRowType(elementRowType).addValues("CC", 33).build(), + Row.withRowType(elementRowType).addValues("DD", 44).build())) + .build() + ); + + pipeline.run(); + } + + @Test + public void testSelectSingleRowFromArrayOfRows() { + RowType elementRowType = + RowSqlType + .builder() + .withVarcharField("f_rowString") + .withIntegerField("f_rowInt") + .build(); + + RowType resultRowType = elementRowType; + + RowType inputType = + RowSqlType + .builder() + .withIntegerField("f_int") + .withArrayField("f_arrayOfRows", elementRowType) + .build(); + + PCollection<Row> input = + PBegin.in(pipeline) + .apply( + Create.of( + Row.withRowType(inputType) + .addValues( + 1, + Arrays.asList( + Row.withRowType(elementRowType).addValues("AA", 11).build(), + Row.withRowType(elementRowType).addValues("BB", 22).build())) + .build(), + Row.withRowType(inputType) + .addValues( + 2, + Arrays.asList( + Row.withRowType(elementRowType).addValues("CC", 33).build(), + Row.withRowType(elementRowType).addValues("DD", 44).build())) + .build()) + .withCoder(inputType.getRowCoder())); + + PCollection<Row> result = + input + .apply( + BeamSql.query( + "SELECT f_arrayOfRows[1] FROM PCOLLECTION")) + .setCoder(resultRowType.getRowCoder()); + + PAssert.that(result) + .containsInAnyOrder( + Row.withRowType(elementRowType).addValues("BB", 22).build(), + Row.withRowType(elementRowType).addValues("DD", 44).build()); + + pipeline.run(); + } + private PCollection<Row> pCollectionOf2Elements() { return PBegin diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpressionTest.java new file mode 100644 index 00000000000..ce971c337e7 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpressionTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.row; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.RowSqlType; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.RowType; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +/** + * Unit tests for {@link BeamSqlFieldAccessExpression}. + */ +public class BeamSqlFieldAccessExpressionTest { + + private static final Row NULL_ROW = null; + private static final BoundedWindow NULL_WINDOW = null; + + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testAccessesFieldOfArray() { + BeamSqlPrimitive<List<String>> targetArray = + BeamSqlPrimitive.of( + SqlTypeName.ARRAY, + Arrays.asList("aaa", "bbb", "ccc")); + + BeamSqlFieldAccessExpression arrayExpression = + new BeamSqlFieldAccessExpression(targetArray, 1, SqlTypeName.VARCHAR); + + assertEquals("bbb", arrayExpression.evaluate(NULL_ROW, NULL_WINDOW).getValue()); + } + + @Test + public void testAccessesFieldOfRow() { + RowType rowType = + RowSqlType + .builder() + .withVarcharField("f_string1") + .withVarcharField("f_string2") + .withVarcharField("f_string3") + .build(); + + BeamSqlPrimitive<Row> targetRow = + BeamSqlPrimitive.of( + SqlTypeName.ROW, + Row + .withRowType(rowType) + .addValues("aa", "bb", "cc") + .build()); + + BeamSqlFieldAccessExpression arrayExpression = + new BeamSqlFieldAccessExpression(targetRow, 1, SqlTypeName.VARCHAR); + + assertEquals("bb", arrayExpression.evaluate(NULL_ROW, NULL_WINDOW).getValue()); + } + + @Test + public void testThrowsForUnsupportedType() { + BeamSqlPrimitive<Integer> targetRow = BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("unsupported type"); + + new BeamSqlFieldAccessExpression(targetRow, 1, SqlTypeName.VARCHAR) + .evaluate(NULL_ROW, NULL_WINDOW).getValue(); + } +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 80706) Time Spent: 3h 10m (was: 3h) > [SQL] Add support for arrays > ---------------------------- > > Key: BEAM-3785 > URL: https://issues.apache.org/jira/browse/BEAM-3785 > Project: Beam > Issue Type: Improvement > Components: dsl-sql > Reporter: Anton Kedin > Assignee: Anton Kedin > Priority: Major > Time Spent: 3h 10m > Remaining Estimate: 0h > > SupportĀ fields of Array type -- This message was sent by Atlassian JIRA (v7.6.3#76005)