This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 323fc9e  Merge pull request #8006: [BEAM-6772] Change Select semantics 
to match what a user expects
323fc9e is described below

commit 323fc9e14b9a166fc90e17302701396335b292b5
Author: reuvenlax <[email protected]>
AuthorDate: Fri Apr 12 08:16:23 2019 -0700

    Merge pull request #8006: [BEAM-6772] Change Select semantics to match what 
a user expects
---
 .../beam/sdk/schemas/transforms/Convert.java       |  52 ++-
 .../apache/beam/sdk/schemas/transforms/Select.java |   3 +-
 .../beam/sdk/schemas/utils/SelectHelpers.java      | 262 ++++++++++----
 .../main/java/org/apache/beam/sdk/values/Row.java  |  11 +
 .../beam/sdk/schemas/transforms/GroupTest.java     |  23 +-
 .../beam/sdk/schemas/transforms/SelectTest.java    | 183 +++++-----
 .../beam/sdk/schemas/utils/SelectHelpersTest.java  | 381 +++++++++++++++++++++
 7 files changed, 735 insertions(+), 180 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java
index 1d8116c..9b01b35 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java
@@ -17,9 +17,12 @@
  */
 package org.apache.beam.sdk.schemas.transforms;
 
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.SchemaCoder;
 import org.apache.beam.sdk.schemas.SchemaRegistry;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -70,7 +73,9 @@ public class Convert {
    *
    * <p>This function allows converting between two types as long as the two 
types have
    * <i>compatible</i> schemas. Two schemas are said to be <i>compatible</i> 
if they recursively
-   * have fields with the same names, but possibly different orders.
+   * have fields with the same names, but possibly different orders. If the 
source schema can be
+   * unboxed to match the target schema (i.e. the source schema contains a 
single field that is
+   * compatible with the target schema), then conversion also succeeds.
    */
   public static <InputT, OutputT> PTransform<PCollection<InputT>, 
PCollection<OutputT>> to(
       Class<OutputT> clazz) {
@@ -82,7 +87,9 @@ public class Convert {
    *
    * <p>This function allows converting between two types as long as the two 
types have
    * <i>compatible</i> schemas. Two schemas are said to be <i>compatible</i> 
if they recursively
-   * have fields with the same names, but possibly different orders.
+   * have fields with the same names, but possibly different orders. If the 
source schema can be
+   * unboxed to match the target schema (i.e. the source schema contains a 
single field that is
+   * compatible with the target schema), then conversion also succeeds.
    */
   public static <InputT, OutputT> PTransform<PCollection<InputT>, 
PCollection<OutputT>> to(
       TypeDescriptor<OutputT> typeDescriptor) {
@@ -92,11 +99,24 @@ public class Convert {
   private static class ConvertTransform<InputT, OutputT>
       extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
     TypeDescriptor<OutputT> outputTypeDescriptor;
+    Schema unboxedSchema = null;
 
     ConvertTransform(TypeDescriptor<OutputT> outputTypeDescriptor) {
       this.outputTypeDescriptor = outputTypeDescriptor;
     }
 
+    @Nullable
+    private static Schema getBoxedNestedSchema(Schema schema) {
+      if (schema.getFieldCount() != 1) {
+        return null;
+      }
+      FieldType fieldType = schema.getField(0).getType();
+      if (!fieldType.getTypeName().isCompositeType()) {
+        return null;
+      }
+      return fieldType.getRowSchema();
+    }
+
     @Override
     @SuppressWarnings("unchecked")
     public PCollection<OutputT> expand(PCollection<InputT> input) {
@@ -124,15 +144,21 @@ public class Convert {
                   registry.getSchema(outputTypeDescriptor),
                   registry.getToRowFunction(outputTypeDescriptor),
                   registry.getFromRowFunction(outputTypeDescriptor));
-          // assert matches input schema.
-          // TODO: Properly handle nullable.
-          if 
(!outputSchemaCoder.getSchema().assignableToIgnoreNullable(input.getSchema())) {
-            throw new RuntimeException(
-                "Cannot convert between types that don't have equivalent 
schemas."
-                    + " input schema: "
-                    + input.getSchema()
-                    + " output schema: "
-                    + outputSchemaCoder.getSchema());
+
+          Schema outputSchema = outputSchemaCoder.getSchema();
+          if (!outputSchema.assignableToIgnoreNullable(input.getSchema())) {
+            // We also support unboxing nested Row schemas, so attempt that.
+            // TODO: Support unboxing to primitive types as well.
+            unboxedSchema = getBoxedNestedSchema(input.getSchema());
+            if (unboxedSchema == null || 
!outputSchema.assignableToIgnoreNullable(unboxedSchema)) {
+              Schema checked = (unboxedSchema == null) ? input.getSchema() : 
unboxedSchema;
+              throw new RuntimeException(
+                  "Cannot convert between types that don't have equivalent 
schemas."
+                      + " input schema: "
+                      + checked
+                      + " output schema: "
+                      + outputSchemaCoder.getSchema());
+            }
           }
         } catch (NoSuchSchemaException e) {
           throw new RuntimeException("No schema registered for " + 
outputTypeDescriptor);
@@ -145,7 +171,9 @@ public class Convert {
                   new DoFn<InputT, OutputT>() {
                     @ProcessElement
                     public void processElement(@Element Row row, 
OutputReceiver<OutputT> o) {
-                      
o.output(outputSchemaCoder.getFromRowFunction().apply(row));
+                      // Read the row, potentially unboxing if necessary.
+                      Row input = (unboxedSchema == null) ? row : 
row.getValue(0);
+                      
o.output(outputSchemaCoder.getFromRowFunction().apply(input));
                     }
                   }))
           .setSchema(
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java
index 8686a7a..077cc33 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java
@@ -64,7 +64,8 @@ import org.apache.beam.sdk.values.Row;
  *
  * <pre>{@code
  * PCollection<UserEvent> events = readUserEvents();
- * PCollection<Row> rows = event.apply(Select.fieldNames("location.*"));
+ * PCollection<Row> rows = event.apply(Select.fieldNames("location")
+ *                              .apply(Convert.to(Location.class));
  * }</pre>
  */
 @Experimental(Kind.SCHEMAS)
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java
index e3baf68..37742a5 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java
@@ -34,32 +34,84 @@ import org.apache.beam.sdk.values.Row;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
 
-/** Helper methods to select fields from a Schema. */
+/** Helper methods to select subrows out of rows. */
 public class SelectHelpers {
-  // Currently we don't flatten selected nested fields.
+
+  private static Schema union(Iterable<Schema> schemas) {
+    Schema.Builder unioned = Schema.builder();
+    for (Schema schema : schemas) {
+      unioned.addFields(schema.getFields());
+    }
+    return unioned.build();
+  }
+
+  /**
+   * Get the output schema resulting from selecting the given {@link 
FieldAccessDescriptor} from the
+   * given schema.
+   *
+   * <p>Fields are always extracted and then stored in a new Row. For example, 
consider the
+   * following Java POJOs:
+   *
+   * <pre>{@code
+   *  class UserEvent {
+   *    String userId;
+   *    String eventId;
+   *    int eventType;
+   *    Location location;
+   * }
+   * }</pre>
+   *
+   * <pre>{@code
+   * class Location {
+   *   double latitude;
+   *   double longtitude;
+   * }
+   * }</pre>
+   *
+   * <p>If selecting just the location field, then the returned schema will 
wrap that of the
+   * singular field being selected; in this case the returned schema will be a 
Row containing a
+   * single Location field. If location.latitude is selected, then the 
returned Schema will be a Row
+   * containing a double latitude field.
+   *
+   * <p>The same holds true when selecting from lists or maps. For example:
+   *
+   * <pre>{@code
+   * class EventList {
+   *   List<UserEvent> events;
+   * }
+   * }</pre>
+   *
+   * <p>If selecting events.location.latitude, the returned schema will 
contain a single array of
+   * Row, where that Row contains a single double latitude field; it will not 
contain an array of
+   * double.
+   */
   public static Schema getOutputSchema(
       Schema inputSchema, FieldAccessDescriptor fieldAccessDescriptor) {
     if (fieldAccessDescriptor.getAllFields()) {
       return inputSchema;
     }
-    Schema.Builder builder = new Schema.Builder();
+
+    List<Schema> schemas = Lists.newArrayList();
+    Schema.Builder builder = Schema.builder();
     for (int fieldId : fieldAccessDescriptor.fieldIdsAccessed()) {
       builder.addField(inputSchema.getField(fieldId));
     }
+    schemas.add(builder.build());
 
     for (Map.Entry<FieldDescriptor, FieldAccessDescriptor> nested :
         fieldAccessDescriptor.getNestedFieldsAccessed().entrySet()) {
       FieldDescriptor fieldDescriptor = nested.getKey();
+      FieldAccessDescriptor nestedAccess = nested.getValue();
       Field field = 
inputSchema.getField(checkNotNull(fieldDescriptor.getFieldId()));
-      FieldType outputType =
-          getOutputSchemaHelper(
-              field.getType(), nested.getValue(), 
fieldDescriptor.getQualifiers(), 0);
-      builder.addField(field.getName(), outputType);
+      Schema outputSchema =
+          getOutputSchemaHelper(field.getType(), nestedAccess, 
fieldDescriptor.getQualifiers(), 0);
+      schemas.add(outputSchema);
     }
-    return builder.build();
+
+    return union(schemas);
   }
 
-  private static FieldType getOutputSchemaHelper(
+  private static Schema getOutputSchemaHelper(
       FieldType inputFieldType,
       FieldAccessDescriptor fieldAccessDescriptor,
       List<Qualifier> qualifiers,
@@ -68,34 +120,45 @@ public class SelectHelpers {
       // We have walked through any containers, and are at a row type. Extract 
the subschema
       // for the row, preserving nullable attributes.
       checkArgument(inputFieldType.getTypeName().isCompositeType());
-      return FieldType.row(getOutputSchema(inputFieldType.getRowSchema(), 
fieldAccessDescriptor))
-          .withNullable(inputFieldType.getNullable());
+      return getOutputSchema(inputFieldType.getRowSchema(), 
fieldAccessDescriptor);
     }
 
     Qualifier qualifier = qualifiers.get(qualifierPosition);
+    Schema.Builder builder = Schema.builder();
     switch (qualifier.getKind()) {
       case LIST:
         checkArgument(qualifier.getList().equals(ListQualifier.ALL));
         FieldType componentType = 
checkNotNull(inputFieldType.getCollectionElementType());
-        FieldType outputComponent =
+        Schema outputComponent =
             getOutputSchemaHelper(
-                    componentType, fieldAccessDescriptor, qualifiers, 
qualifierPosition + 1)
-                .withNullable(componentType.getNullable());
-        return 
FieldType.array(outputComponent).withNullable(inputFieldType.getNullable());
+                componentType, fieldAccessDescriptor, qualifiers, 
qualifierPosition + 1);
+        for (Field field : outputComponent.getFields()) {
+          Field newField =
+              Field.of(field.getName(), FieldType.array(field.getType()))
+                  .withNullable(inputFieldType.getNullable());
+          builder.addField(newField);
+        }
+        return builder.build();
       case MAP:
         checkArgument(qualifier.getMap().equals(MapQualifier.ALL));
         FieldType keyType = checkNotNull(inputFieldType.getMapKeyType());
         FieldType valueType = checkNotNull(inputFieldType.getMapValueType());
-        FieldType outputValueType =
+        Schema outputValueSchema =
             getOutputSchemaHelper(
-                    valueType, fieldAccessDescriptor, qualifiers, 
qualifierPosition + 1)
-                .withNullable(valueType.getNullable());
-        return FieldType.map(keyType, 
outputValueType).withNullable(inputFieldType.getNullable());
+                valueType, fieldAccessDescriptor, qualifiers, 
qualifierPosition + 1);
+        for (Field field : outputValueSchema.getFields()) {
+          Field newField =
+              Field.of(field.getName(), FieldType.map(keyType, 
field.getType()))
+                  .withNullable(inputFieldType.getNullable());
+          builder.addField(newField);
+        }
+        return builder.build();
       default:
         throw new RuntimeException("unexpected");
     }
   }
 
+  /** Select a sub Row from an input Row. */
   public static Row selectRow(
       Row input,
       FieldAccessDescriptor fieldAccessDescriptor,
@@ -106,47 +169,73 @@ public class SelectHelpers {
     }
 
     Row.Builder output = Row.withSchema(outputSchema);
+    selectIntoRow(input, output, fieldAccessDescriptor);
+    return output.build();
+  }
+
+  /** Select out of a given {@link Row} object. */
+  public static void selectIntoRow(
+      Row input, Row.Builder output, FieldAccessDescriptor 
fieldAccessDescriptor) {
+    if (fieldAccessDescriptor.getAllFields()) {
+      output.addValues(input.getValues());
+      return;
+    }
+
     for (int fieldId : fieldAccessDescriptor.fieldIdsAccessed()) {
       // TODO: Once we support specific qualifiers (like array slices), 
extract them here.
       output.addValue(input.getValue(fieldId));
     }
 
+    Schema outputSchema = output.getSchema();
     for (Map.Entry<FieldDescriptor, FieldAccessDescriptor> nested :
         fieldAccessDescriptor.getNestedFieldsAccessed().entrySet()) {
       FieldDescriptor field = nested.getKey();
-      String fieldName = inputSchema.nameOf(checkNotNull(field.getFieldId()));
-      FieldType nestedInputType = 
inputSchema.getField(field.getFieldId()).getType();
-      FieldType nestedOutputType = outputSchema.getField(fieldName).getType();
-      Object value =
-          selectRowHelper(
-              field.getQualifiers(),
-              0,
-              input.getValue(fieldName),
-              nested.getValue(),
-              nestedInputType,
-              nestedOutputType);
-      output.addValue(value);
+      FieldAccessDescriptor nestedAccess = nested.getValue();
+      FieldType nestedInputType = 
input.getSchema().getField(field.getFieldId()).getType();
+      FieldType nestedOutputType = 
outputSchema.getField(output.nextFieldId()).getType();
+      selectIntoRowHelper(
+          field.getQualifiers(),
+          input.getValue(field.getFieldId()),
+          output,
+          nestedAccess,
+          nestedInputType,
+          nestedOutputType);
     }
-    return output.build();
   }
 
   @SuppressWarnings("unchecked")
-  private static Object selectRowHelper(
+  private static void selectIntoRowHelper(
       List<Qualifier> qualifiers,
-      int qualifierPosition,
       Object value,
+      Row.Builder output,
       FieldAccessDescriptor fieldAccessDescriptor,
       FieldType inputType,
       FieldType outputType) {
-    if (qualifierPosition >= qualifiers.size()) {
+    if (qualifiers.isEmpty()) {
       Row row = (Row) value;
-      return selectRow(
-          row, fieldAccessDescriptor, inputType.getRowSchema(), 
outputType.getRowSchema());
+      selectIntoRow(row, output, fieldAccessDescriptor);
+      return;
     }
 
-    if (fieldAccessDescriptor.getAllFields()) {
-      // Since we are selecting all fields (and we do not yet support array 
slicing), short circuit.
-      return value;
+    // There are qualifiers. That means that the result will be either a list 
or a map, so
+    // construct the result and add that to our Row.
+    selectIntoRowWithQualifiers(
+        qualifiers, 0, value, output, fieldAccessDescriptor, inputType, 
outputType);
+  }
+
+  private static void selectIntoRowWithQualifiers(
+      List<Qualifier> qualifiers,
+      int qualifierPosition,
+      Object value,
+      Row.Builder output,
+      FieldAccessDescriptor fieldAccessDescriptor,
+      FieldType inputType,
+      FieldType outputType) {
+    if (qualifierPosition >= qualifiers.size()) {
+      // We have already constructed all arrays and maps. What remains must be 
a Row.
+      Row row = (Row) value;
+      selectIntoRow(row, output, fieldAccessDescriptor);
+      return;
     }
 
     Qualifier qualifier = qualifiers.get(qualifierPosition);
@@ -156,38 +245,87 @@ public class SelectHelpers {
           FieldType nestedInputType = 
checkNotNull(inputType.getCollectionElementType());
           FieldType nestedOutputType = 
checkNotNull(outputType.getCollectionElementType());
           List<Object> list = (List) value;
-          List selectedList = Lists.newArrayListWithCapacity(list.size());
+
+          // When selecting multiple subelements under a list, we distribute 
the select
+          // resulting in multiple lists. For example, if there is a field 
"list" with type
+          // {a: string, b: int}[], selecting list.a, list.b results in a 
schema of type
+          // {a: string[], b: int[]}. This preserves the invariant that the 
name selected always
+          // appears in the top-level schema.
+          Schema tempSchema = Schema.builder().addField("a", 
nestedInputType).build();
+          FieldAccessDescriptor tempAccessDescriptor =
+              FieldAccessDescriptor.create()
+                  .withNestedField("a", fieldAccessDescriptor)
+                  .resolve(tempSchema);
+          // TODO: doing this on each element might be inefficient. Consider 
caching this, or
+          // using codegen based on the schema.
+          Schema nestedSchema = getOutputSchema(tempSchema, 
tempAccessDescriptor);
+
+          List<List<Object>> selectedLists =
+              Lists.newArrayListWithCapacity(nestedSchema.getFieldCount());
+          for (int i = 0; i < nestedSchema.getFieldCount(); i++) {
+            selectedLists.add(Lists.newArrayListWithCapacity(list.size()));
+          }
           for (Object o : list) {
-            Object selected =
-                selectRowHelper(
-                    qualifiers,
-                    qualifierPosition + 1,
-                    o,
-                    fieldAccessDescriptor,
-                    nestedInputType,
-                    nestedOutputType);
-            selectedList.add(selected);
+            Row.Builder selectElementBuilder = Row.withSchema(nestedSchema);
+            selectIntoRowWithQualifiers(
+                qualifiers,
+                qualifierPosition + 1,
+                o,
+                selectElementBuilder,
+                fieldAccessDescriptor,
+                nestedInputType,
+                nestedOutputType);
+
+            Row elementBeforeDistribution = selectElementBuilder.build();
+            for (int i = 0; i < nestedSchema.getFieldCount(); ++i) {
+              selectedLists.get(i).add(elementBeforeDistribution.getValue(i));
+            }
           }
-          return selectedList;
+          for (List aList : selectedLists) {
+            output.addValue(aList);
+          }
+          break;
         }
       case MAP:
         {
           FieldType nestedInputType = 
checkNotNull(inputType.getMapValueType());
           FieldType nestedOutputType = 
checkNotNull(outputType.getMapValueType());
+
+          // When selecting multiple subelements under a map, we distribute 
the select
+          // resulting in multiple maps. The semantics are the same as for 
lists above (except we
+          // only support subelement select for map values, not for map keys).
+          Schema tempSchema = Schema.builder().addField("a", 
nestedInputType).build();
+          FieldAccessDescriptor tempAccessDescriptor =
+              FieldAccessDescriptor.create()
+                  .withNestedField("a", fieldAccessDescriptor)
+                  .resolve(tempSchema);
+          Schema nestedSchema = getOutputSchema(tempSchema, 
tempAccessDescriptor);
+          List<Map> selectedMaps = 
Lists.newArrayListWithExpectedSize(nestedSchema.getFieldCount());
+          for (int i = 0; i < nestedSchema.getFieldCount(); ++i) {
+            selectedMaps.add(Maps.newHashMap());
+          }
+
           Map<Object, Object> map = (Map) value;
-          Map selectedMap = Maps.newHashMapWithExpectedSize(map.size());
           for (Map.Entry<Object, Object> entry : map.entrySet()) {
-            Object selected =
-                selectRowHelper(
-                    qualifiers,
-                    qualifierPosition + 1,
-                    entry.getValue(),
-                    fieldAccessDescriptor,
-                    nestedInputType,
-                    nestedOutputType);
-            selectedMap.put(entry.getKey(), selected);
+            Row.Builder selectValueBuilder = Row.withSchema(nestedSchema);
+            selectIntoRowWithQualifiers(
+                qualifiers,
+                qualifierPosition + 1,
+                entry.getValue(),
+                selectValueBuilder,
+                fieldAccessDescriptor,
+                nestedInputType,
+                nestedOutputType);
+
+            Row valueBeforeDistribution = selectValueBuilder.build();
+            for (int i = 0; i < nestedSchema.getFieldCount(); ++i) {
+              selectedMaps.get(i).put(entry.getKey(), 
valueBeforeDistribution.getValue(i));
+            }
+          }
+          for (Map aMap : selectedMaps) {
+            output.addValue(aMap);
           }
-          return selectedMap;
+          break;
         }
       default:
         throw new RuntimeException("Unexpected type " + qualifier.getKind());
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
index 47958e2..1f6e386 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
@@ -500,6 +500,17 @@ public abstract class Row implements Serializable {
       this.schema = schema;
     }
 
+    public int nextFieldId() {
+      if (fieldValueGetterFactory != null) {
+        throw new RuntimeException("Not supported");
+      }
+      return values.size();
+    }
+
+    public Schema getSchema() {
+      return schema;
+    }
+
     public Builder addValue(@Nullable Object values) {
       this.values.add(values);
       return this;
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java
index cd8ec7a..0229c55 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java
@@ -216,22 +216,16 @@ public class GroupTest implements Serializable {
                     new OuterPOJO(new POJO("key2", 2L, "value4"))))
             .apply(Group.byFieldNames("inner.field1", "inner.field2"));
 
-    Schema selectedSchema =
-        
Schema.builder().addStringField("field1").addInt64Field("field2").build();
-    Schema keySchema = Schema.builder().addRowField("inner", 
selectedSchema).build();
+    Schema keySchema = 
Schema.builder().addStringField("field1").addInt64Field("field2").build();
     List<KV<Row, Collection<OuterPOJO>>> expected =
         ImmutableList.of(
             KV.of(
-                Row.withSchema(keySchema)
-                    .addValue(Row.withSchema(selectedSchema).addValues("key1", 
1L).build())
-                    .build(),
+                Row.withSchema(keySchema).addValues("key1", 1L).build(),
                 ImmutableList.of(
                     new OuterPOJO(new POJO("key1", 1L, "value1")),
                     new OuterPOJO(new POJO("key1", 1L, "value2")))),
             KV.of(
-                Row.withSchema(keySchema)
-                    .addValue(Row.withSchema(selectedSchema).addValues("key2", 
2L).build())
-                    .build(),
+                Row.withSchema(keySchema).addValues("key2", 2L).build(),
                 ImmutableList.of(
                     new OuterPOJO(new POJO("key2", 2L, "value3")),
                     new OuterPOJO(new POJO("key2", 2L, "value4")))));
@@ -535,8 +529,7 @@ public class GroupTest implements Serializable {
                     .aggregateField("inner.field3", Sum.ofIntegers(), 
"field3_sum")
                     .aggregateField("inner.field1", Top.largestLongsFn(1), 
"field1_top"));
 
-    Schema innerKeySchema = Schema.builder().addInt64Field("field2").build();
-    Schema keySchema = Schema.builder().addRowField("inner", 
innerKeySchema).build();
+    Schema keySchema = Schema.builder().addInt64Field("field2").build();
     Schema valueSchema =
         Schema.builder()
             .addInt64Field("field1_sum")
@@ -547,14 +540,10 @@ public class GroupTest implements Serializable {
     List<KV<Row, Row>> expected =
         ImmutableList.of(
             KV.of(
-                Row.withSchema(keySchema)
-                    
.addValue(Row.withSchema(innerKeySchema).addValue(1L).build())
-                    .build(),
+                Row.withSchema(keySchema).addValue(1L).build(),
                 
Row.withSchema(valueSchema).addValue(3L).addValue(5).addArray(2L).build()),
             KV.of(
-                Row.withSchema(keySchema)
-                    
.addValue(Row.withSchema(innerKeySchema).addValue(2L).build())
-                    .build(),
+                Row.withSchema(keySchema).addValue(2L).build(),
                 
Row.withSchema(valueSchema).addValue(7L).addValue(9).addArray(4L).build()));
     PAssert.that(aggregations).satisfies(actual -> containsKvs(expected, 
actual));
 
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java
index 3238ebd..f2728e0 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java
@@ -127,52 +127,6 @@ public class SelectTest {
     }
   }
 
-  /** A pojo matching the schema results from selection field2.*. */
-  @DefaultSchema(JavaFieldSchema.class)
-  static class POJO2NestedAll {
-    POJO1 field2 = new POJO1();
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      POJO2NestedAll that = (POJO2NestedAll) o;
-      return Objects.equals(field2, that.field2);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(field2);
-    }
-  }
-
-  /** A pojo matching the schema results from selection field2.field1, 
field2.field3. */
-  @DefaultSchema(JavaFieldSchema.class)
-  static class POJO2NestedPartial {
-    POJO1Selected field2 = new POJO1Selected();
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      POJO2NestedPartial that = (POJO2NestedPartial) o;
-      return Objects.equals(field2, that.field2);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(field2);
-    }
-  }
-
   @Test
   @Category(NeedsRunner.class)
   public void testSelectMissingFieldName() {
@@ -216,24 +170,36 @@ public class SelectTest {
   @Test
   @Category(NeedsRunner.class)
   public void testSelectNestedAll() {
-    PCollection<POJO2NestedAll> pojos =
+    PCollection<POJO1> pojos =
+        pipeline
+            .apply(Create.of(new POJO2()))
+            .apply(Select.fieldNames("field2"))
+            .apply(Convert.to(POJO1.class));
+    PAssert.that(pojos).containsInAnyOrder(new POJO1());
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testSelectNestedAllWildcard() {
+    PCollection<POJO1> pojos =
         pipeline
             .apply(Create.of(new POJO2()))
             .apply(Select.fieldNames("field2.*"))
-            .apply(Convert.to(POJO2NestedAll.class));
-    PAssert.that(pojos).containsInAnyOrder(new POJO2NestedAll());
+            .apply(Convert.to(POJO1.class));
+    PAssert.that(pojos).containsInAnyOrder(new POJO1());
     pipeline.run();
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void testSelectNestedPartial() {
-    PCollection<POJO2NestedPartial> pojos =
+    PCollection<POJO1Selected> pojos =
         pipeline
             .apply(Create.of(new POJO2()))
             .apply(Select.fieldNames("field2.field1", "field2.field3"))
-            .apply(Convert.to(POJO2NestedPartial.class));
-    PAssert.that(pojos).containsInAnyOrder(new POJO2NestedPartial());
+            .apply(Convert.to(POJO1Selected.class));
+    PAssert.that(pojos).containsInAnyOrder(new POJO1Selected());
     pipeline.run();
   }
 
@@ -295,8 +261,8 @@ public class SelectTest {
 
   @DefaultSchema(JavaFieldSchema.class)
   static class PartialRowSingleArray {
-    List<POJO1Selected> field1 =
-        ImmutableList.of(new POJO1Selected(), new POJO1Selected(), new 
POJO1Selected());
+    List<String> field1 = ImmutableList.of("field1", "field1", "field1");
+    List<Double> field3 = ImmutableList.of(3.14, 3.14, 3.14);
 
     @Override
     public boolean equals(Object o) {
@@ -307,12 +273,12 @@ public class SelectTest {
         return false;
       }
       PartialRowSingleArray that = (PartialRowSingleArray) o;
-      return Objects.equals(field1, that.field1);
+      return Objects.equals(field1, that.field1) && Objects.equals(field3, 
that.field3);
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(field1);
+      return Objects.hash(field1, field3);
     }
   }
 
@@ -356,11 +322,16 @@ public class SelectTest {
 
   @DefaultSchema(JavaFieldSchema.class)
   static class PartialRowSingleMap {
-    Map<String, POJO1Selected> field1 =
+    Map<String, String> field1 =
         ImmutableMap.of(
-            "key1", new POJO1Selected(),
-            "key2", new POJO1Selected(),
-            "key3", new POJO1Selected());
+            "key1", "field1",
+            "key2", "field1",
+            "key3", "field1");
+    Map<String, Double> field3 =
+        ImmutableMap.of(
+            "key1", 3.14,
+            "key2", 3.14,
+            "key3", 3.14);
 
     @Override
     public boolean equals(Object o) {
@@ -371,12 +342,12 @@ public class SelectTest {
         return false;
       }
       PartialRowSingleMap that = (PartialRowSingleMap) o;
-      return Objects.equals(field1, that.field1);
+      return Objects.equals(field1, that.field1) && Objects.equals(field3, 
that.field3);
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(field1);
+      return Objects.hash(field1, field3);
     }
   }
 
@@ -427,8 +398,17 @@ public class SelectTest {
     private static final List<List<POJO1Selected>> POJO_LIST_LIST =
         ImmutableList.of(POJO_LIST, POJO_LIST, POJO_LIST);
 
-    List<List<List<POJO1Selected>>> field1 =
-        ImmutableList.of(POJO_LIST_LIST, POJO_LIST_LIST, POJO_LIST_LIST);
+    private static final List<String> STRING_LIST = ImmutableList.of("field1", 
"field1", "field1");
+    private static final List<List<String>> STRING_LISTLIST =
+        ImmutableList.of(STRING_LIST, STRING_LIST, STRING_LIST);
+    List<List<List<String>>> field1 =
+        ImmutableList.of(STRING_LISTLIST, STRING_LISTLIST, STRING_LISTLIST);
+
+    private static final List<Double> DOUBLE_LIST = ImmutableList.of(3.14, 
3.14, 3.14);
+    private static final List<List<Double>> DOUBLE_LISTLIST =
+        ImmutableList.of(DOUBLE_LIST, DOUBLE_LIST, DOUBLE_LIST);
+    List<List<List<Double>>> field3 =
+        ImmutableList.of(DOUBLE_LISTLIST, DOUBLE_LISTLIST, DOUBLE_LISTLIST);
 
     @Override
     public boolean equals(Object o) {
@@ -507,21 +487,37 @@ public class SelectTest {
 
   @DefaultSchema(JavaFieldSchema.class)
   static class PartialRowMultipleMaps {
-    static final Map<String, POJO1Selected> POJO_MAP =
+    static final Map<String, String> STRING_MAP =
         ImmutableMap.of(
-            "key1", new POJO1Selected(),
-            "key2", new POJO1Selected(),
-            "key3", new POJO1Selected());
-    static final Map<String, Map<String, POJO1Selected>> POJO_MAP_MAP =
+            "key1", "field1",
+            "key2", "field1",
+            "key3", "field1");
+    static final Map<String, Map<String, String>> STRING_MAPMAP =
         ImmutableMap.of(
-            "key1", POJO_MAP,
-            "key2", POJO_MAP,
-            "key3", POJO_MAP);
-    Map<String, Map<String, Map<String, POJO1Selected>>> field1 =
+            "key1", STRING_MAP,
+            "key2", STRING_MAP,
+            "key3", STRING_MAP);
+    Map<String, Map<String, Map<String, String>>> field1 =
         ImmutableMap.of(
-            "key1", POJO_MAP_MAP,
-            "key2", POJO_MAP_MAP,
-            "key3", POJO_MAP_MAP);
+            "key1", STRING_MAPMAP,
+            "key2", STRING_MAPMAP,
+            "key3", STRING_MAPMAP);
+    static final Map<String, Double> DOUBLE_MAP =
+        ImmutableMap.of(
+            "key1", 3.14,
+            "key2", 3.14,
+            "key3", 3.14);
+    static final Map<String, Map<String, Double>> DOUBLE_MAPMAP =
+        ImmutableMap.of(
+            "key1", DOUBLE_MAP,
+            "key2", DOUBLE_MAP,
+            "key3", DOUBLE_MAP);
+
+    Map<String, Map<String, Map<String, Double>>> field3 =
+        ImmutableMap.of(
+            "key1", DOUBLE_MAPMAP,
+            "key2", DOUBLE_MAPMAP,
+            "key3", DOUBLE_MAPMAP);
 
     @Override
     public boolean equals(Object o) {
@@ -532,12 +528,12 @@ public class SelectTest {
         return false;
       }
       PartialRowMultipleMaps that = (PartialRowMultipleMaps) o;
-      return Objects.equals(field1, that.field1);
+      return Objects.equals(field1, that.field1) && Objects.equals(field3, 
that.field3);
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(field1);
+      return Objects.hash(field1, field3);
     }
   }
 
@@ -592,15 +588,20 @@ public class SelectTest {
 
   @DefaultSchema(JavaFieldSchema.class)
   static class PartialRowNestedArraysAndMaps {
-    static final List<POJO1Selected> POJO_LIST =
-        ImmutableList.of(new POJO1Selected(), new POJO1Selected(), new 
POJO1Selected());
-    static final Map<String, List<POJO1Selected>> POJO_MAP_LIST =
+    static final Map<String, List<String>> STRING_MAP =
         ImmutableMap.of(
-            "key1", POJO_LIST,
-            "key2", POJO_LIST,
-            "key3", POJO_LIST);
-    List<Map<String, List<POJO1Selected>>> field1 =
-        ImmutableList.of(POJO_MAP_LIST, POJO_MAP_LIST, POJO_MAP_LIST);
+            "key1", ImmutableList.of("field1", "field1", "field1"),
+            "key2", ImmutableList.of("field1", "field1", "field1"),
+            "key3", ImmutableList.of("field1", "field1", "field1"));
+    List<Map<String, List<String>>> field1 = ImmutableList.of(STRING_MAP, 
STRING_MAP, STRING_MAP);
+
+    static final Map<String, List<Double>> DOUBLE_MAP =
+        ImmutableMap.of(
+            "key1", ImmutableList.of(3.14, 3.14, 3.14),
+            "key2", ImmutableList.of(3.14, 3.14, 3.14),
+            "key3", ImmutableList.of(3.14, 3.14, 3.14));
+
+    List<Map<String, List<Double>>> field3 = ImmutableList.of(DOUBLE_MAP, 
DOUBLE_MAP, DOUBLE_MAP);
 
     @Override
     public boolean equals(Object o) {
@@ -611,12 +612,17 @@ public class SelectTest {
         return false;
       }
       PartialRowNestedArraysAndMaps that = (PartialRowNestedArraysAndMaps) o;
-      return Objects.equals(field1, that.field1);
+      return Objects.equals(field1, that.field1) && Objects.equals(field3, 
that.field3);
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(field1);
+      return Objects.hash(field1, field3);
+    }
+
+    @Override
+    public String toString() {
+      return "PartialRowNestedArraysAndMaps{" + "field1=" + field1 + ", 
field3=" + field3 + '}';
     }
   }
 
@@ -637,6 +643,7 @@ public class SelectTest {
             .apply("convert2", 
Convert.to(PartialRowNestedArraysAndMaps.class));
 
     PAssert.that(selected).containsInAnyOrder(new 
PartialRowNestedArraysAndMaps());
+    PAssert.that(selected2).containsInAnyOrder(new 
PartialRowNestedArraysAndMaps());
     pipeline.run();
   }
 }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java
new file mode 100644
index 0000000..77183bf
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.utils;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
+import org.junit.Test;
+
+/** Tests for {@link SelectHelpers}. */
+public class SelectHelpersTest {
+  static final Schema FLAT_SCHEMA =
+      Schema.builder()
+          .addStringField("field1")
+          .addInt32Field("field2")
+          .addDoubleField("field3")
+          .build();
+  static final Row FLAT_ROW = Row.withSchema(FLAT_SCHEMA).addValues("first", 
42, 3.14).build();
+
+  static final Schema NESTED_SCHEMA =
+      Schema.builder().addRowField("nested", 
FLAT_SCHEMA).addStringField("foo").build();
+  static final Row NESTED_ROW = 
Row.withSchema(NESTED_SCHEMA).addValues(FLAT_ROW, "").build();
+
+  static final Schema DOUBLE_NESTED_SCHEMA =
+      Schema.builder().addRowField("nested2", NESTED_SCHEMA).build();
+  static final Row DOUBLE_NESTED_ROW =
+      Row.withSchema(DOUBLE_NESTED_SCHEMA).addValue(NESTED_ROW).build();
+
+  static final Schema ARRAY_SCHEMA =
+      Schema.builder()
+          .addArrayField("primitiveArray", FieldType.INT32)
+          .addArrayField("rowArray", FieldType.row(FLAT_SCHEMA))
+          .addArrayField("arrayOfRowArray", 
FieldType.array(FieldType.row(FLAT_SCHEMA)))
+          .addArrayField("nestedRowArray", FieldType.row(NESTED_SCHEMA))
+          .build();
+  static final Row ARRAY_ROW =
+      Row.withSchema(ARRAY_SCHEMA)
+          .addArray(1, 2)
+          .addArray(FLAT_ROW, FLAT_ROW)
+          .addArray(ImmutableList.of(FLAT_ROW), ImmutableList.of(FLAT_ROW))
+          .addArray(NESTED_ROW, NESTED_ROW)
+          .build();
+
+  static final Schema MAP_SCHEMA =
+      Schema.builder().addMapField("map", FieldType.INT32, 
FieldType.row(FLAT_SCHEMA)).build();
+  static final Row MAP_ROW =
+      Row.withSchema(MAP_SCHEMA).addValue(ImmutableMap.of(1, 
FLAT_ROW)).build();
+
+  static final Schema MAP_ARRAY_SCHEMA =
+      Schema.builder()
+          .addMapField("map", FieldType.INT32, 
FieldType.array(FieldType.row(FLAT_SCHEMA)))
+          .build();
+  static final Row MAP_ARRAY_ROW =
+      Row.withSchema(MAP_ARRAY_SCHEMA)
+          .addValue(ImmutableMap.of(1, ImmutableList.of(FLAT_ROW)))
+          .build();
+
+  @Test
+  public void testSelectAll() {
+    FieldAccessDescriptor fieldAccessDescriptor =
+        FieldAccessDescriptor.withFieldNames("*").resolve(FLAT_SCHEMA);
+    Schema outputSchema = SelectHelpers.getOutputSchema(FLAT_SCHEMA, 
fieldAccessDescriptor);
+    assertEquals(FLAT_SCHEMA, outputSchema);
+
+    Row row = SelectHelpers.selectRow(FLAT_ROW, fieldAccessDescriptor, 
FLAT_SCHEMA, outputSchema);
+    assertEquals(FLAT_ROW, row);
+  }
+
+  @Test
+  public void testsSimpleSelectSingle() {
+    FieldAccessDescriptor fieldAccessDescriptor =
+        FieldAccessDescriptor.withFieldNames("field1").resolve(FLAT_SCHEMA);
+    Schema outputSchema = SelectHelpers.getOutputSchema(FLAT_SCHEMA, 
fieldAccessDescriptor);
+    Schema expectedSchema = Schema.builder().addStringField("field1").build();
+    assertEquals(expectedSchema, outputSchema);
+
+    Row row = SelectHelpers.selectRow(FLAT_ROW, fieldAccessDescriptor, 
FLAT_SCHEMA, outputSchema);
+    Row expectedRow = Row.withSchema(expectedSchema).addValue("first").build();
+    assertEquals(expectedRow, row);
+  }
+
+  @Test
+  public void testsSimpleSelectMultiple() {
+    FieldAccessDescriptor fieldAccessDescriptor =
+        FieldAccessDescriptor.withFieldNames("field1", 
"field3").resolve(FLAT_SCHEMA);
+    Schema outputSchema = SelectHelpers.getOutputSchema(FLAT_SCHEMA, 
fieldAccessDescriptor);
+    Schema expectedSchema =
+        
Schema.builder().addStringField("field1").addDoubleField("field3").build();
+    assertEquals(expectedSchema, outputSchema);
+
+    Row row = SelectHelpers.selectRow(FLAT_ROW, fieldAccessDescriptor, 
FLAT_SCHEMA, outputSchema);
+    Row expectedRow = Row.withSchema(expectedSchema).addValues("first", 
3.14).build();
+    assertEquals(expectedRow, row);
+  }
+
+  @Test
+  public void testSelectedNested() {
+    FieldAccessDescriptor fieldAccessDescriptor =
+        FieldAccessDescriptor.withFieldNames("nested").resolve(NESTED_SCHEMA);
+    Schema outputSchema = SelectHelpers.getOutputSchema(NESTED_SCHEMA, 
fieldAccessDescriptor);
+    Schema expectedSchema = Schema.builder().addRowField("nested", 
FLAT_SCHEMA).build();
+    assertEquals(expectedSchema, outputSchema);
+
+    Row row =
+        SelectHelpers.selectRow(NESTED_ROW, fieldAccessDescriptor, 
NESTED_SCHEMA, outputSchema);
+    Row expectedRow = 
Row.withSchema(expectedSchema).addValue(FLAT_ROW).build();
+    assertEquals(expectedRow, row);
+  }
+
+  @Test
+  public void testSelectedNestedSingle() {
+    FieldAccessDescriptor fieldAccessDescriptor =
+        
FieldAccessDescriptor.withFieldNames("nested.field1").resolve(NESTED_SCHEMA);
+    Schema outputSchema = SelectHelpers.getOutputSchema(NESTED_SCHEMA, 
fieldAccessDescriptor);
+    Schema expectedSchema = Schema.builder().addStringField("field1").build();
+    assertEquals(expectedSchema, outputSchema);
+
+    Row row =
+        SelectHelpers.selectRow(NESTED_ROW, fieldAccessDescriptor, 
NESTED_SCHEMA, outputSchema);
+    Row expectedRow = Row.withSchema(expectedSchema).addValue("first").build();
+    assertEquals(expectedRow, row);
+  }
+
+  @Test
+  public void testSelectedNestedWildcard() {
+    FieldAccessDescriptor fieldAccessDescriptor =
+        
FieldAccessDescriptor.withFieldNames("nested.*").resolve(NESTED_SCHEMA);
+    Schema outputSchema = SelectHelpers.getOutputSchema(NESTED_SCHEMA, 
fieldAccessDescriptor);
+    assertEquals(FLAT_SCHEMA, outputSchema);
+
+    Row row =
+        SelectHelpers.selectRow(NESTED_ROW, fieldAccessDescriptor, 
NESTED_SCHEMA, outputSchema);
+    assertEquals(FLAT_ROW, row);
+  }
+
+  @Test
+  public void testSelectDoubleNested() {
+    FieldAccessDescriptor fieldAccessDescriptor =
+        
FieldAccessDescriptor.withFieldNames("nested2.nested.field1").resolve(DOUBLE_NESTED_SCHEMA);
+    Schema outputSchema =
+        SelectHelpers.getOutputSchema(DOUBLE_NESTED_SCHEMA, 
fieldAccessDescriptor);
+    Schema expectedSchema = Schema.builder().addStringField("field1").build();
+    assertEquals(expectedSchema, outputSchema);
+
+    Row row =
+        SelectHelpers.selectRow(
+            DOUBLE_NESTED_ROW, fieldAccessDescriptor, DOUBLE_NESTED_SCHEMA, 
outputSchema);
+    Row expectedRow = Row.withSchema(expectedSchema).addValue("first").build();
+    assertEquals(expectedRow, row);
+  }
+
+  @Test
+  public void testSelectArrayOfPrimitive() {
+    FieldAccessDescriptor fieldAccessDescriptor =
+        
FieldAccessDescriptor.withFieldNames("primitiveArray").resolve(ARRAY_SCHEMA);
+    Schema outputSchema = SelectHelpers.getOutputSchema(ARRAY_SCHEMA, 
fieldAccessDescriptor);
+    Schema expectedSchema =
+        Schema.builder().addArrayField("primitiveArray", 
FieldType.INT32).build();
+    assertEquals(expectedSchema, outputSchema);
+
+    Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor, 
ARRAY_SCHEMA, outputSchema);
+    Row expectedRow = Row.withSchema(expectedSchema).addArray(1, 2).build();
+    assertEquals(expectedRow, row);
+  }
+
+  @Test
+  public void testSelectArrayOfRow() {
+    FieldAccessDescriptor fieldAccessDescriptor =
+        FieldAccessDescriptor.withFieldNames("rowArray").resolve(ARRAY_SCHEMA);
+    Schema outputSchema = SelectHelpers.getOutputSchema(ARRAY_SCHEMA, 
fieldAccessDescriptor);
+    Schema expectedSchema =
+        Schema.builder().addArrayField("rowArray", 
FieldType.row(FLAT_SCHEMA)).build();
+    assertEquals(expectedSchema, outputSchema);
+
+    Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor, 
ARRAY_SCHEMA, outputSchema);
+    Row expectedRow = Row.withSchema(expectedSchema).addArray(FLAT_ROW, 
FLAT_ROW).build();
+    assertEquals(expectedRow, row);
+  }
+
+  @Test
+  public void testSelectArrayOfRowPartial() {
+    FieldAccessDescriptor fieldAccessDescriptor =
+        
FieldAccessDescriptor.withFieldNames("rowArray[].field1").resolve(ARRAY_SCHEMA);
+    Schema outputSchema = SelectHelpers.getOutputSchema(ARRAY_SCHEMA, 
fieldAccessDescriptor);
+
+    Schema expectedSchema = Schema.builder().addArrayField("field1", 
FieldType.STRING).build();
+    assertEquals(expectedSchema, outputSchema);
+
+    Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor, 
ARRAY_SCHEMA, outputSchema);
+    Row expectedRow = Row.withSchema(expectedSchema).addArray("first", 
"first").build();
+    assertEquals(expectedRow, row);
+  }
+
+  @Test
+  public void testSelectArrayOfRowArray() {
+    FieldAccessDescriptor fieldAccessDescriptor =
+        
FieldAccessDescriptor.withFieldNames("arrayOfRowArray[][].field1").resolve(ARRAY_SCHEMA);
+    Schema outputSchema = SelectHelpers.getOutputSchema(ARRAY_SCHEMA, 
fieldAccessDescriptor);
+
+    Schema expectedSchema =
+        Schema.builder().addArrayField("field1", 
FieldType.array(FieldType.STRING)).build();
+    assertEquals(expectedSchema, outputSchema);
+
+    Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor, 
ARRAY_SCHEMA, outputSchema);
+
+    Row expectedRow =
+        Row.withSchema(expectedSchema)
+            .addArray(ImmutableList.of("first"), ImmutableList.of("first"))
+            .build();
+    assertEquals(expectedRow, row);
+  }
+
+  @Test
+  public void testSelectArrayOfNestedRow() {
+    FieldAccessDescriptor fieldAccessDescriptor =
+        FieldAccessDescriptor.withFieldNames("nestedRowArray[].nested.field1")
+            .resolve(ARRAY_SCHEMA);
+    Schema outputSchema = SelectHelpers.getOutputSchema(ARRAY_SCHEMA, 
fieldAccessDescriptor);
+
+    Schema expectedElementSchema = 
Schema.builder().addStringField("field1").build();
+    Schema expectedSchema = Schema.builder().addArrayField("field1", 
FieldType.STRING).build();
+    assertEquals(expectedSchema, outputSchema);
+
+    Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor, 
ARRAY_SCHEMA, outputSchema);
+    Row expectedRow = Row.withSchema(expectedSchema).addArray("first", 
"first").build();
+    assertEquals(expectedRow, row);
+  }
+
+  @Test
+  public void testSelectMapOfRowSelectSingle() {
+    FieldAccessDescriptor fieldAccessDescriptor =
+        
FieldAccessDescriptor.withFieldNames("map{}.field1").resolve(MAP_SCHEMA);
+    Schema outputSchema = SelectHelpers.getOutputSchema(MAP_SCHEMA, 
fieldAccessDescriptor);
+
+    Schema expectedValueSchema = 
Schema.builder().addStringField("field1").build();
+    Schema expectedSchema =
+        Schema.builder().addMapField("field1", FieldType.INT32, 
FieldType.STRING).build();
+    assertEquals(expectedSchema, outputSchema);
+
+    Row row = SelectHelpers.selectRow(MAP_ROW, fieldAccessDescriptor, 
MAP_SCHEMA, outputSchema);
+    Row expectedRow = 
Row.withSchema(expectedSchema).addValue(ImmutableMap.of(1, "first")).build();
+    assertEquals(expectedRow, row);
+  }
+
+  @Test
+  public void testSelectMapOfRowSelectAll() {
+    FieldAccessDescriptor fieldAccessDescriptor =
+        FieldAccessDescriptor.withFieldNames("map{}.*").resolve(MAP_SCHEMA);
+    Schema outputSchema = SelectHelpers.getOutputSchema(MAP_SCHEMA, 
fieldAccessDescriptor);
+    Schema expectedSchema =
+        Schema.builder()
+            .addMapField("field1", FieldType.INT32, FieldType.STRING)
+            .addMapField("field2", FieldType.INT32, FieldType.INT32)
+            .addMapField("field3", FieldType.INT32, FieldType.DOUBLE)
+            .build();
+    assertEquals(expectedSchema, outputSchema);
+
+    Row row = SelectHelpers.selectRow(MAP_ROW, fieldAccessDescriptor, 
MAP_SCHEMA, outputSchema);
+    Row expectedRow =
+        Row.withSchema(expectedSchema)
+            .addValue(ImmutableMap.of(1, FLAT_ROW.getValue(0)))
+            .addValue(ImmutableMap.of(1, FLAT_ROW.getValue(1)))
+            .addValue(ImmutableMap.of(1, FLAT_ROW.getValue(2)))
+            .build();
+    assertEquals(expectedRow, row);
+  }
+
+  @Test
+  public void testSelectMapOfArray() {
+    FieldAccessDescriptor fieldAccessDescriptor =
+        
FieldAccessDescriptor.withFieldNames("map.field1").resolve(MAP_ARRAY_SCHEMA);
+    Schema outputSchema = SelectHelpers.getOutputSchema(MAP_ARRAY_SCHEMA, 
fieldAccessDescriptor);
+
+    Schema expectedSchema =
+        Schema.builder()
+            .addMapField("field1", FieldType.INT32, 
FieldType.array(FieldType.STRING))
+            .build();
+    assertEquals(expectedSchema, outputSchema);
+
+    Row row =
+        SelectHelpers.selectRow(
+            MAP_ARRAY_ROW, fieldAccessDescriptor, MAP_ARRAY_SCHEMA, 
outputSchema);
+
+    Row expectedRow =
+        Row.withSchema(expectedSchema)
+            .addValue(ImmutableMap.of(1, ImmutableList.of("first")))
+            .build();
+    assertEquals(expectedRow, row);
+  }
+
+  @Test
+  public void testSelectFieldOfRecord() {
+    Schema f1 = Schema.builder().addInt64Field("f0").build();
+    Schema f2 = Schema.builder().addRowField("f1", f1).build();
+    Schema f3 = Schema.builder().addRowField("f2", f2).build();
+
+    Row r1 = Row.withSchema(f1).addValue(42L).build(); // {"f0": 42}
+    Row r2 = Row.withSchema(f2).addValue(r1).build(); // {"f1": {"f0": 42}}
+    Row r3 = Row.withSchema(f3).addValue(r2).build(); // {"f2": {"f1": {"f0": 
42}}}
+
+    FieldAccessDescriptor fieldAccessDescriptor =
+        FieldAccessDescriptor.withFieldNames("f2.f1").resolve(f3);
+
+    Schema outputSchema = SelectHelpers.getOutputSchema(f3, 
fieldAccessDescriptor);
+
+    Row out = SelectHelpers.selectRow(r3, fieldAccessDescriptor, 
r3.getSchema(), outputSchema);
+
+    assertEquals(f2, outputSchema);
+    assertEquals(r2, out);
+  }
+
+  @Test
+  public void testSelectFieldOfRecordOrRecord() {
+    Schema f1 = Schema.builder().addInt64Field("f0").build();
+    Schema f2 = Schema.builder().addRowField("f1", f1).build();
+    Schema f3 = Schema.builder().addRowField("f2", f2).build();
+    Schema f4 = Schema.builder().addRowField("f3", f3).build();
+
+    Row r1 = Row.withSchema(f1).addValue(42L).build(); // {"f0": 42}
+    Row r2 = Row.withSchema(f2).addValue(r1).build(); // {"f1": {"f0": 42}}
+    Row r3 = Row.withSchema(f3).addValue(r2).build(); // {"f2": {"f1": {"f0": 
42}}}
+    Row r4 = Row.withSchema(f4).addValue(r3).build(); // {"f3": {"f2": {"f1": 
{"f0": 42}}}}
+
+    FieldAccessDescriptor fieldAccessDescriptor =
+        FieldAccessDescriptor.withFieldNames("f3.f2").resolve(f4);
+
+    Schema outputSchema = SelectHelpers.getOutputSchema(f4, 
fieldAccessDescriptor);
+
+    Row out = SelectHelpers.selectRow(r4, fieldAccessDescriptor, 
r4.getSchema(), outputSchema);
+
+    assertEquals(f3, outputSchema);
+    assertEquals(r3, out);
+  }
+
+  @Test
+  public void testArrayRowArray() {
+    Schema f1 = Schema.builder().addStringField("f0").build();
+    Schema f2 = Schema.builder().addArrayField("f1", 
FieldType.row(f1)).build();
+    Schema f3 = Schema.builder().addRowField("f2", f2).build();
+    Schema f4 = Schema.builder().addArrayField("f3", 
FieldType.row(f3)).build();
+
+    Row r1 = Row.withSchema(f1).addValue("first").build();
+    Row r2 = Row.withSchema(f2).addArray(r1, r1).build();
+    Row r3 = Row.withSchema(f3).addValue(r2).build();
+    Row r4 = Row.withSchema(f4).addArray(r3, r3).build();
+
+    FieldAccessDescriptor fieldAccessDescriptor =
+        FieldAccessDescriptor.withFieldNames("f3.f2.f1.f0").resolve(f4);
+    Schema outputSchema = SelectHelpers.getOutputSchema(f4, 
fieldAccessDescriptor);
+    Schema expectedSchema =
+        Schema.builder().addArrayField("f0", 
FieldType.array(FieldType.STRING)).build();
+    assertEquals(expectedSchema, outputSchema);
+    Row out = SelectHelpers.selectRow(r4, fieldAccessDescriptor, 
r4.getSchema(), outputSchema);
+    Row expected =
+        Row.withSchema(outputSchema)
+            .addArray(Lists.newArrayList("first", "first"), 
Lists.newArrayList("first", "first"))
+            .build();
+    assertEquals(expected, out);
+  }
+}

Reply via email to