[ 
https://issues.apache.org/jira/browse/BEAM-3789?focusedWorklogId=79319&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79319
 ]

ASF GitHub Bot logged work on BEAM-3789:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Mar/18 22:47
            Start Date: 11/Mar/18 22:47
    Worklog Time Spent: 10m 
      Work Description: kennknowles closed pull request #4822: [BEAM-3789][SQL] 
Support nested Rows
URL: https://github.com/apache/beam/pull/4822
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlType.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlType.java
index 71ff22897a1..77eda6a84de 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlType.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlType.java
@@ -104,6 +104,11 @@ public Builder withTimestampField(String fieldName) {
 
     public Builder withArrayField(String fieldName, SqlTypeCoder elementCoder) 
{
       return withField(fieldName, SqlTypeCoders.arrayOf(elementCoder));
+
+    }
+
+    public Builder withRowField(String fieldName, RowType rowType) {
+      return withField(fieldName, SqlTypeCoders.rowOf(rowType));
     }
 
     private Builder() {
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoder.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoder.java
index eefc8701960..5b6e104a3fd 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoder.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoder.java
@@ -31,6 +31,7 @@
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.values.RowType;
 
 /**
  * Base class for coders for supported SQL types.
@@ -189,8 +190,8 @@ public SqlTypeCoder getElementCoder() {
     @Override
     public boolean equals(Object other) {
       return other != null
-          && this.getClass().equals(other.getClass())
-          && this.elementCoder.equals(((SqlArrayCoder) other).elementCoder);
+             && this.getClass().equals(other.getClass())
+             && this.elementCoder.equals(((SqlArrayCoder) other).elementCoder);
     }
 
     @Override
@@ -198,4 +199,40 @@ public int hashCode() {
       return Objects.hashCode(elementCoder);
     }
   }
+
+  /**
+   * Represents SQL type ROW.
+   */
+  public static class SqlRowCoder extends SqlTypeCoder {
+
+    private final RowType rowType;
+
+    private SqlRowCoder(RowType rowType) {
+      this.rowType = rowType;
+    }
+
+    public static SqlTypeCoder of(RowType rowType) {
+      return new SqlRowCoder(rowType);
+    }
+
+    public RowType getRowType() {
+      return rowType;
+    }
+
+    @Override
+    protected Coder delegateCoder() {
+      return rowType.getRowCoder();
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      return other instanceof SqlRowCoder
+             && Objects.equals(this.rowType, ((SqlRowCoder) other).rowType);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(this.rowType);
+    }
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoders.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoders.java
index f9be95926f5..9eea2df1a99 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoders.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoders.java
@@ -35,6 +35,8 @@
 import java.util.Set;
 import org.apache.beam.sdk.extensions.sql.SqlTypeCoder.SqlArrayCoder;
 import org.apache.beam.sdk.extensions.sql.SqlTypeCoder.SqlIntegerCoder;
+import org.apache.beam.sdk.extensions.sql.SqlTypeCoder.SqlRowCoder;
+import org.apache.beam.sdk.values.RowType;
 
 /**
  * Coders for SQL types supported in Beam.
@@ -69,4 +71,12 @@ public static SqlTypeCoder arrayOf(SqlTypeCoder 
elementCoder) {
           SqlTypeCoders.FLOAT,
           SqlTypeCoders.DOUBLE,
           SqlTypeCoders.DECIMAL);
+
+  public static boolean isRow(SqlTypeCoder sqlTypeCoder) {
+    return sqlTypeCoder instanceof SqlRowCoder;
+  }
+
+  public static SqlTypeCoder rowOf(RowType rowType) {
+    return SqlRowCoder.of(rowType);
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
index b04cf970b64..5a47aa460ea 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
@@ -31,53 +31,32 @@
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowEndExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowStartExpression;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic
-    .BeamSqlDivideExpression;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic
-    .BeamSqlMinusExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlDivideExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlMinusExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlModExpression;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic
-    .BeamSqlMultiplyExpression;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic
-    .BeamSqlPlusExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlMultiplyExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlPlusExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.array.BeamSqlArrayExpression;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.array
-    .BeamSqlArrayItemExpression;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.collection
-    .BeamSqlCardinalityExpression;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.collection
-    .BeamSqlSingleElementExpression;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison
-    .BeamSqlEqualsExpression;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison
-    .BeamSqlGreaterThanExpression;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison
-    .BeamSqlGreaterThanOrEqualsExpression;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison
-    .BeamSqlIsNotNullExpression;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison
-    .BeamSqlIsNullExpression;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison
-    .BeamSqlLessThanExpression;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison
-    .BeamSqlLessThanOrEqualsExpression;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison
-    .BeamSqlNotEqualsExpression;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date
-    .BeamSqlCurrentDateExpression;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date
-    .BeamSqlCurrentTimeExpression;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date
-    .BeamSqlCurrentTimestampExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.array.BeamSqlArrayItemExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.collection.BeamSqlCardinalityExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.collection.BeamSqlSingleElementExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlEqualsExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlGreaterThanExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlGreaterThanOrEqualsExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlIsNotNullExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlIsNullExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlLessThanExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlLessThanOrEqualsExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlNotEqualsExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentDateExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentTimeExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentTimestampExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateCeilExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateFloorExpression;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date
-    .BeamSqlDatetimeMinusExpression;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date
-    .BeamSqlDatetimePlusExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimeMinusExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimePlusExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlExtractExpression;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date
-    .BeamSqlIntervalMultiplyExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlIntervalMultiplyExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlAndExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlNotExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlOrExpression;
@@ -98,25 +77,21 @@
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlPowerExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlRadiansExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlRandExpression;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math
-    .BeamSqlRandIntegerExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlRandIntegerExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlRoundExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlSignExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlSinExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlTanExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlTruncateExpression;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret
-    .BeamSqlReinterpretExpression;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string
-    .BeamSqlCharLengthExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret.BeamSqlReinterpretExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.row.BeamSqlFieldAccessExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlCharLengthExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlConcatExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlInitCapExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlLowerExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlOverlayExpression;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string
-    .BeamSqlPositionExpression;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string
-    .BeamSqlSubstringExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlPositionExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlSubstringExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlTrimExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlUpperExpression;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamFilterRel;
@@ -125,6 +100,7 @@
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexFieldAccess;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
@@ -224,6 +200,12 @@ static BeamSqlExpression buildExpression(RexNode rexNode) {
     } else if (rexNode instanceof RexInputRef) {
       RexInputRef node = (RexInputRef) rexNode;
       ret = new BeamSqlInputRefExpression(node.getType().getSqlTypeName(), 
node.getIndex());
+    } else if (rexNode instanceof RexFieldAccess) {
+      RexFieldAccess fieldAccessNode = (RexFieldAccess) rexNode;
+      int rowFieldIndex = ((RexInputRef) 
fieldAccessNode.getReferenceExpr()).getIndex();
+      int nestedFieldIndex = fieldAccessNode.getField().getIndex();
+      SqlTypeName nestedFieldType = 
fieldAccessNode.getField().getType().getSqlTypeName();
+      ret = new BeamSqlFieldAccessExpression(rowFieldIndex, nestedFieldIndex, 
nestedFieldType);
     } else if (rexNode instanceof RexCall) {
       RexCall node = (RexCall) rexNode;
       String opName = node.op.getName();
@@ -475,7 +457,7 @@ static BeamSqlExpression buildExpression(RexNode rexNode) {
       }
     } else {
       throw new UnsupportedOperationException(
-          String.format("%s is not supported yet!", 
rexNode.getClass().toString()));
+          String.format("%s is not supported yet", 
rexNode.getClass().toString()));
     }
 
     if (ret != null && !ret.accept()) {
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpression.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpression.java
new file mode 100644
index 00000000000..478b4e374e6
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpression.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.row;
+
+import java.util.Collections;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Represents a field access expression.
+ */
+public class BeamSqlFieldAccessExpression extends BeamSqlExpression {
+
+  private int rowFieldIndex;
+  private int nestedFieldIndex;
+
+  public BeamSqlFieldAccessExpression(
+      int rowFieldIndex,
+      int nestedFieldIndex,
+      SqlTypeName nestedFieldType) {
+
+    super(Collections.emptyList(), nestedFieldType);
+    this.rowFieldIndex = rowFieldIndex;
+    this.nestedFieldIndex = nestedFieldIndex;
+  }
+
+  @Override
+  public boolean accept() {
+    return true;
+  }
+
+  @Override
+  public BeamSqlPrimitive evaluate(Row inputRow, BoundedWindow window) {
+    Row nestedRow = inputRow.getValue(rowFieldIndex);
+    return BeamSqlPrimitive.of(outputType, 
nestedRow.getValue(nestedFieldIndex));
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/package-info.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/package-info.java
new file mode 100644
index 00000000000..205f64a0ab9
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Support for fields of type ROW.
+ */
+@DefaultAnnotation(NonNull.class)
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.row;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
index b34330d8a25..eccbed83956 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.beam.sdk.extensions.sql.impl.utils;
 
 import static org.apache.beam.sdk.values.RowType.toRowType;
@@ -25,6 +24,7 @@
 import java.util.stream.IntStream;
 import org.apache.beam.sdk.extensions.sql.SqlTypeCoder;
 import org.apache.beam.sdk.extensions.sql.SqlTypeCoder.SqlArrayCoder;
+import org.apache.beam.sdk.extensions.sql.SqlTypeCoder.SqlRowCoder;
 import org.apache.beam.sdk.extensions.sql.SqlTypeCoders;
 import org.apache.beam.sdk.values.RowType;
 import org.apache.calcite.rel.type.RelDataType;
@@ -68,9 +68,15 @@
    * for supported Beam SQL type coder, see {@link SqlTypeCoder}.
    */
   public static SqlTypeName toCalciteType(SqlTypeCoder coder) {
-    return SqlTypeCoder.isArray(coder)
-        ? SqlTypeName.ARRAY
-        : BEAM_TO_CALCITE_TYPE_MAPPING.get(coder);
+    if (SqlTypeCoder.isArray(coder)) {
+        return SqlTypeName.ARRAY;
+    }
+
+    if (SqlTypeCoders.isRow(coder)) {
+      return SqlTypeName.ROW;
+    }
+
+    return BEAM_TO_CALCITE_TYPE_MAPPING.get(coder);
   }
 
   /**
@@ -83,9 +89,14 @@ public static SqlTypeCoder toCoder(RelDataTypeField 
relFieldType) {
       RelDataType elementType = relFieldType.getValue().getComponentType();
       SqlTypeCoder elementCoder = 
CALCITE_TO_BEAM_TYPE_MAPPING.get(elementType.getSqlTypeName());
       return SqlTypeCoders.arrayOf(elementCoder);
-    } else {
-      return toCoder(fieldTypeName);
     }
+
+    if (SqlTypeName.ROW.equals(fieldTypeName)) {
+      RelDataType nestedCalciteRowType = 
relFieldType.getValue().getComponentType();
+      return SqlTypeCoders.rowOf(toBeamRowType(nestedCalciteRowType));
+    }
+
+    return toCoder(fieldTypeName);
   }
 
   /**
@@ -147,9 +158,15 @@ private static RelDataType toRelDataType(
     SqlTypeCoder fieldCoder = (SqlTypeCoder) rowType.getFieldCoder(fieldIndex);
     SqlTypeName typeName = toCalciteType(fieldCoder);
 
-    return (SqlTypeName.ARRAY.equals(typeName))
-        ? createArrayRelType(dataTypeFactory, (SqlArrayCoder) fieldCoder)
-        : dataTypeFactory.createSqlType(typeName);
+    if (SqlTypeName.ARRAY.equals(typeName)) {
+      return createArrayRelType(dataTypeFactory, (SqlArrayCoder) fieldCoder);
+    }
+
+    if (SqlTypeName.ROW.equals(typeName)) {
+      return createRowRelType(dataTypeFactory, (SqlRowCoder) fieldCoder);
+    }
+
+    return dataTypeFactory.createSqlType(typeName);
   }
 
   private static RelDataType createArrayRelType(
@@ -161,4 +178,12 @@ private static RelDataType createArrayRelType(
             .createArrayType(
                 dataTypeFactory.createSqlType(elementType), 
UNLIMITED_ARRAY_SIZE);
   }
+
+  private static RelDataType createRowRelType(
+      RelDataTypeFactory dataTypeFactory,
+      SqlRowCoder rowFieldCoder) {
+
+    RelProtoDataType relProtoDataType = 
toCalciteRowType(rowFieldCoder.getRowType());
+    return relProtoDataType.apply(dataTypeFactory);
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslNestedRowsTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslNestedRowsTest.java
new file mode 100644
index 00000000000..53ce670f3f9
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslNestedRowsTest.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.util.Arrays;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.RowType;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/** Tests for nested rows handling. */
+public class BeamSqlDslNestedRowsTest {
+
+  @Rule public final TestPipeline pipeline = TestPipeline.create();
+  @Rule public ExpectedException exceptions = ExpectedException.none();
+
+  @Test
+  public void testRowConstructorKeyword() {
+    RowType nestedRowType =
+        RowSqlType
+            .builder()
+            .withIntegerField("f_nestedInt")
+            .withVarcharField("f_nestedString")
+            .withIntegerField("f_nestedIntPlusOne")
+            .build();
+
+    RowType resultRowType =
+        RowSqlType
+            .builder()
+            .withIntegerField("f_int")
+            .withIntegerField("f_int2")
+            .withVarcharField("f_varchar")
+            .withIntegerField("f_int3")
+            .build();
+
+    RowType inputType =
+        RowSqlType
+            .builder()
+            .withIntegerField("f_int")
+            .withRowField("f_row", nestedRowType)
+            .build();
+
+    PCollection<Row> input =
+        PBegin.in(pipeline)
+              .apply(
+                  Create.of(
+                      Row.withRowType(inputType)
+                         .addValues(
+                             1,
+                             Row.withRowType(nestedRowType)
+                                .addValues(312, "CC", 313)
+                                .build())
+                         .build())
+                        .withCoder(inputType.getRowCoder()));
+
+    PCollection<Row> result =
+        input
+            .apply(
+                BeamSql.query(
+                    "SELECT 1 as `f_int`, ROW(3, 'BB', f_int + 1) as `f_row1` 
FROM PCOLLECTION"))
+            .setCoder(resultRowType.getRowCoder());
+
+    PAssert
+        .that(result)
+        .containsInAnyOrder(
+            Row
+                .withRowType(resultRowType)
+                .addValues(1, 3, "BB", 2)
+                .build());
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testRowConstructorBraces() {
+
+    RowType nestedRowType =
+        RowSqlType
+            .builder()
+            .withIntegerField("f_nestedInt")
+            .withVarcharField("f_nestedString")
+            .withIntegerField("f_nestedIntPlusOne")
+            .build();
+
+    RowType resultRowType =
+        RowSqlType
+            .builder()
+            .withIntegerField("f_int")
+            .withIntegerField("f_int2")
+            .withVarcharField("f_varchar")
+            .withIntegerField("f_int3")
+            .build();
+
+    RowType inputType =
+        RowSqlType
+            .builder()
+            .withIntegerField("f_int")
+            .withRowField("f_row", nestedRowType)
+            .build();
+
+    PCollection<Row> input =
+        PBegin.in(pipeline)
+              .apply(
+                  Create.of(
+                      Row.withRowType(inputType)
+                         .addValues(
+                             1,
+                             Row.withRowType(nestedRowType)
+                                .addValues(312, "CC", 313)
+                                .build())
+                         .build())
+                        .withCoder(inputType.getRowCoder()));
+
+    PCollection<Row> result =
+        input
+            .apply(
+                BeamSql.query(
+                    "SELECT 1 as `f_int`, (3, 'BB', f_int + 1) as `f_row1` 
FROM PCOLLECTION"))
+            .setCoder(resultRowType.getRowCoder());
+
+    PAssert
+        .that(result)
+        .containsInAnyOrder(
+            Row
+                .withRowType(resultRowType)
+                .addValues(1, 3, "BB", 2)
+                .build());
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testNestedRowFieldAccess() {
+
+    RowType nestedRowType =
+        RowSqlType
+            .builder()
+            .withIntegerField("f_nestedInt")
+            .withVarcharField("f_nestedString")
+            .withIntegerField("f_nestedIntPlusOne")
+            .build();
+
+    RowType resultRowType =
+        RowSqlType
+            .builder()
+            .withVarcharField("f_nestedString")
+            .build();
+
+    RowType inputType =
+        RowSqlType
+            .builder()
+            .withIntegerField("f_int")
+            .withRowField("f_nestedRow", nestedRowType)
+            .build();
+
+    PCollection<Row> input =
+        PBegin.in(pipeline)
+              .apply(
+                  Create.of(
+                      Row.withRowType(inputType)
+                         .addValues(
+                             1,
+                             Row.withRowType(nestedRowType)
+                                .addValues(312, "CC", 313)
+                                .build())
+                         .build(),
+                      Row.withRowType(inputType)
+                         .addValues(
+                             2,
+                             Row.withRowType(nestedRowType)
+                                .addValues(412, "DD", 413)
+                                .build())
+                         .build())
+                        .withCoder(inputType.getRowCoder()));
+
+    PCollection<Row> result =
+        input
+            .apply(
+                BeamSql.query(
+                    "SELECT `PCOLLECTION`.`f_nestedRow`.`f_nestedString` FROM 
PCOLLECTION"))
+            .setCoder(resultRowType.getRowCoder());
+
+    PAssert
+        .that(result)
+        .containsInAnyOrder(
+            Row
+                .withRowType(resultRowType)
+                .addValues("CC")
+                .build(),
+            Row
+                .withRowType(resultRowType)
+                .addValues("DD")
+                .build());
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testNestedRowArrayFieldAccess() {
+
+    RowType resultRowType =
+        RowSqlType
+            .builder()
+            .withArrayField("f_nestedArray", SqlTypeCoders.VARCHAR)
+            .build();
+
+    RowType nestedRowType =
+        RowSqlType
+            .builder()
+            .withIntegerField("f_nestedInt")
+            .withVarcharField("f_nestedString")
+            .withIntegerField("f_nestedIntPlusOne")
+            .withArrayField("f_nestedArray", SqlTypeCoders.VARCHAR)
+            .build();
+
+    RowType inputType =
+        RowSqlType
+            .builder()
+            .withIntegerField("f_int")
+            .withRowField("f_nestedRow", nestedRowType)
+            .build();
+
+    PCollection<Row> input =
+        PBegin.in(pipeline)
+            .apply(
+                Create.of(
+                        Row.withRowType(inputType)
+                            .addValues(
+                                1,
+                                Row.withRowType(nestedRowType)
+                                    .addValues(312, "CC", 313, 
Arrays.asList("one", "two"))
+                                    .build())
+                            .build(),
+                        Row.withRowType(inputType)
+                            .addValues(
+                                2,
+                                Row.withRowType(nestedRowType)
+                                   .addValues(412, "DD", 413, 
Arrays.asList("three", "four"))
+                                   .build())
+                            .build())
+                    .withCoder(inputType.getRowCoder()));
+
+    PCollection<Row> result =
+        input
+            .apply(
+                BeamSql.query(
+                    "SELECT `PCOLLECTION`.`f_nestedRow`.`f_nestedArray` FROM 
PCOLLECTION"))
+            .setCoder(resultRowType.getRowCoder());
+
+    PAssert
+        .that(result)
+        .containsInAnyOrder(
+            Row
+                .withRowType(resultRowType)
+                .addArray(Arrays.asList("one", "two"))
+                .build(),
+            Row
+                .withRowType(resultRowType)
+                .addArray(Arrays.asList("three", "four"))
+                .build());
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testNestedRowArrayElementAccess() {
+
+    RowType resultRowType =
+        RowSqlType
+            .builder()
+            .withVarcharField("f_nestedArrayStringField")
+            .build();
+
+    RowType nestedRowType =
+        RowSqlType
+            .builder()
+            .withIntegerField("f_nestedInt")
+            .withVarcharField("f_nestedString")
+            .withIntegerField("f_nestedIntPlusOne")
+            .withArrayField("f_nestedArray", SqlTypeCoders.VARCHAR)
+            .build();
+
+    RowType inputType =
+        RowSqlType
+            .builder()
+            .withIntegerField("f_int")
+            .withRowField("f_nestedRow", nestedRowType)
+            .build();
+
+    PCollection<Row> input =
+        PBegin.in(pipeline)
+              .apply(
+                  Create.of(
+                      Row.withRowType(inputType)
+                         .addValues(
+                             1,
+                             Row.withRowType(nestedRowType)
+                                .addValues(312, "CC", 313, 
Arrays.asList("one", "two"))
+                                .build())
+                         .build(),
+                      Row.withRowType(inputType)
+                         .addValues(
+                             2,
+                             Row.withRowType(nestedRowType)
+                                .addValues(412, "DD", 413, 
Arrays.asList("three", "four"))
+                                .build())
+                         .build())
+                        .withCoder(inputType.getRowCoder()));
+
+    PCollection<Row> result =
+        input
+            .apply(
+                BeamSql.query(
+                    "SELECT `PCOLLECTION`.`f_nestedRow`.`f_nestedArray`[1] 
FROM PCOLLECTION"))
+            .setCoder(resultRowType.getRowCoder());
+
+    PAssert
+        .that(result)
+        .containsInAnyOrder(
+            Row
+                .withRowType(resultRowType)
+                .addValues("two")
+                .build(),
+            Row
+                .withRowType(resultRowType)
+                .addValues("four")
+                .build());
+
+    pipeline.run();
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 79319)
    Time Spent: 1h 20m  (was: 1h 10m)

> [SQL] Support Nested Rows
> -------------------------
>
>                 Key: BEAM-3789
>                 URL: https://issues.apache.org/jira/browse/BEAM-3789
>             Project: Beam
>          Issue Type: Improvement
>          Components: dsl-sql
>            Reporter: Anton Kedin
>            Assignee: Anton Kedin
>            Priority: Major
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Add support for SqlTypeName.ROW



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to