This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8abc90b  Merge pull request #11025: [BEAM-6428] Improve select 
performance with codegen
8abc90b is described below

commit 8abc90bb13fd0b8c46a26fc59752161b012a9f03
Author: reuvenlax <[email protected]>
AuthorDate: Wed Mar 4 10:57:01 2020 -0800

    Merge pull request #11025: [BEAM-6428] Improve select performance with 
codegen
---
 .../beam/sdk/schemas/transforms/CoGroup.java       |   8 +-
 .../apache/beam/sdk/schemas/transforms/Filter.java |  26 +-
 .../apache/beam/sdk/schemas/transforms/Group.java  |   8 +-
 .../sdk/schemas/transforms/SchemaAggregateFn.java  |  46 +-
 .../apache/beam/sdk/schemas/transforms/Select.java |  13 +-
 .../utils/ByteBuddyLocalVariableManager.java       |  93 +++
 .../apache/beam/sdk/schemas/utils/RowSelector.java |  25 +
 .../sdk/schemas/utils/SelectByteBuddyHelpers.java  | 843 +++++++++++++++++++++
 .../beam/sdk/schemas/utils/SelectHelpers.java      |  69 +-
 .../beam/sdk/transforms/DoFnSchemaInformation.java |  13 +-
 .../main/java/org/apache/beam/sdk/values/Row.java  |   4 +
 .../beam/sdk/schemas/utils/SelectHelpersTest.java  |  87 ++-
 12 files changed, 1126 insertions(+), 109 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java
index 8386eac..cde224a 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java
@@ -35,7 +35,9 @@ import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.utils.RowSelector;
 import org.apache.beam.sdk.schemas.utils.SelectHelpers;
+import org.apache.beam.sdk.schemas.utils.SelectHelpers.RowSelectorContainer;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -399,10 +401,12 @@ public class CoGroup {
               "extractKey" + tag,
               ParDo.of(
                   new DoFn<T, KV<Row, Row>>() {
+                    private RowSelector rowSelector =
+                        new RowSelectorContainer(schema, keyFields, true);
+
                     @ProcessElement
                     public void process(@Element Row row, 
OutputReceiver<KV<Row, Row>> o) {
-                      o.output(
-                          KV.of(SelectHelpers.selectRow(row, keyFields, 
schema, keySchema), row));
+                      o.output(KV.of(rowSelector.select(row), row));
                     }
                   }))
           .setCoder(KvCoder.of(SchemaCoder.of(keySchema), 
SchemaCoder.of(schema)));
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java
index 60f86b1..0015835 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.RowSelector;
 import org.apache.beam.sdk.schemas.utils.SelectHelpers;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -87,6 +88,8 @@ public class Filter {
 
   /** Implementation of the filter. */
   public static class Inner<T> extends PTransform<PCollection<T>, 
PCollection<T>> {
+    private RowSelector rowSelector;
+
     @AutoValue
     abstract static class FilterDescription<FieldT> implements Serializable {
       abstract FieldAccessDescriptor getFieldAccessDescriptor();
@@ -98,6 +101,9 @@ public class Filter {
 
       abstract boolean getSelectsSingleField();
 
+      @Nullable
+      abstract Schema getInputSchema();
+
       abstract Builder<FieldT> toBuilder();
 
       @AutoValue.Builder
@@ -111,8 +117,20 @@ public class Filter {
 
         abstract Builder<FieldT> setSelectsSingleField(boolean unbox);
 
+        abstract Builder<FieldT> setInputSchema(@Nullable Schema inputSchema);
+
         abstract FilterDescription<FieldT> build();
       }
+
+      transient RowSelector rowSelector;
+
+      public RowSelector getRowSelector() {
+        if (rowSelector == null) {
+          rowSelector =
+              SelectHelpers.getRowSelectorOptimized(getInputSchema(), 
getFieldAccessDescriptor());
+        }
+        return rowSelector;
+      }
     }
 
     private final List<FilterDescription<?>> filters = Lists.newArrayList();
@@ -179,6 +197,7 @@ public class Filter {
               .map(
                   f ->
                       f.toBuilder()
+                          .setInputSchema(inputSchema)
                           .setSelectedSchema(
                               SelectHelpers.getOutputSchema(
                                   inputSchema, f.getFieldAccessDescriptor()))
@@ -191,12 +210,7 @@ public class Filter {
                 @ProcessElement
                 public void process(@Element Row row, OutputReceiver<Row> o) {
                   for (FilterDescription filter : resolvedFilters) {
-                    Row selected =
-                        SelectHelpers.selectRow(
-                            row,
-                            filter.getFieldAccessDescriptor(),
-                            inputSchema,
-                            filter.getSelectedSchema());
+                    Row selected = filter.getRowSelector().select(row);
                     if (filter.getSelectsSingleField()) {
                       SerializableFunction<Object, Boolean> predicate =
                           (SerializableFunction<Object, Boolean>) 
filter.getPredicate();
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java
index f0e891c..2f95753 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java
@@ -27,7 +27,9 @@ import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.Field;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.utils.RowSelector;
 import org.apache.beam.sdk.schemas.utils.SelectHelpers;
+import org.apache.beam.sdk.schemas.utils.SelectHelpers.RowSelectorContainer;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -495,18 +497,20 @@ public class Group {
     }
 
     class ToKv extends PTransform<PCollection<InputT>, PCollection<KV<Row, 
Iterable<Row>>>> {
+      private RowSelector rowSelector;
+
       @Override
       public PCollection<KV<Row, Iterable<Row>>> expand(PCollection<InputT> 
input) {
         Schema schema = input.getSchema();
         FieldAccessDescriptor resolved = 
getFieldAccessDescriptor().resolve(schema);
+        rowSelector = new RowSelectorContainer(schema, resolved, true);
         Schema keySchema = getKeySchema(schema);
 
         return input
             .apply("toRow", Convert.toRows())
             .apply(
                 "selectKeys",
-                WithKeys.of((Row e) -> SelectHelpers.selectRow(e, resolved, 
schema, keySchema))
-                    .withKeyType(TypeDescriptors.rows()))
+                WithKeys.of((Row e) -> 
rowSelector.select(e)).withKeyType(TypeDescriptors.rows()))
             .setCoder(KvCoder.of(SchemaCoder.of(keySchema), 
SchemaCoder.of(schema)))
             .apply(GroupByKey.create());
       }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java
index a07afdd..0ee2942 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java
@@ -32,7 +32,9 @@ import org.apache.beam.sdk.schemas.FieldTypeDescriptors;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.Field;
 import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.utils.RowSelector;
 import org.apache.beam.sdk.schemas.utils.SelectHelpers;
+import org.apache.beam.sdk.schemas.utils.SelectHelpers.RowSelectorContainer;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.CombineFns;
 import org.apache.beam.sdk.transforms.CombineFns.CoCombineResult;
@@ -158,12 +160,12 @@ class SchemaAggregateFn {
         SimpleFunction<Row, ?> extractFunction;
         Coder extractOutputCoder;
         if (fieldAggregation.fieldsToAggregate.referencesSingleField()) {
-          extractFunction = new ExtractSingleFieldFunction(fieldAggregation);
+          extractFunction = new ExtractSingleFieldFunction(inputSchema, 
fieldAggregation);
           extractOutputCoder =
               SchemaCoder.coderForFieldType(
                   
fieldAggregation.flattenedInputSubSchema.getField(0).getType());
         } else {
-          extractFunction = new ExtractFieldsFunction(fieldAggregation);
+          extractFunction = new ExtractFieldsFunction(inputSchema, 
fieldAggregation);
           extractOutputCoder = SchemaCoder.of(fieldAggregation.inputSubSchema);
         }
         if (i == 0) {
@@ -229,27 +231,28 @@ class SchemaAggregateFn {
 
     /** Extract a single field from an input {@link Row}. */
     private static class ExtractSingleFieldFunction<OutputT> extends 
SimpleFunction<Row, OutputT> {
+      private final RowSelector rowSelector;
+      @Nullable private final RowSelector flatteningSelector;
       private final FieldAggregation fieldAggregation;
 
-      private ExtractSingleFieldFunction(FieldAggregation fieldAggregation) {
+      private ExtractSingleFieldFunction(Schema inputSchema, FieldAggregation 
fieldAggregation) {
+        rowSelector =
+            new RowSelectorContainer(inputSchema, 
fieldAggregation.fieldsToAggregate, true);
+        flatteningSelector =
+            fieldAggregation.needsFlattening
+                ? new RowSelectorContainer(
+                    fieldAggregation.inputSubSchema,
+                    fieldAggregation.flattenedFieldAccessDescriptor,
+                    true)
+                : null;
         this.fieldAggregation = fieldAggregation;
       }
 
       @Override
       public OutputT apply(Row row) {
-        Row selected =
-            SelectHelpers.selectRow(
-                row,
-                fieldAggregation.fieldsToAggregate,
-                row.getSchema(),
-                fieldAggregation.inputSubSchema);
+        Row selected = rowSelector.select(row);
         if (fieldAggregation.needsFlattening) {
-          selected =
-              SelectHelpers.selectRow(
-                  selected,
-                  fieldAggregation.flattenedFieldAccessDescriptor,
-                  row.getSchema(),
-                  fieldAggregation.flattenedInputSubSchema);
+          selected = flatteningSelector.select(selected);
         }
         return selected.getValue(0);
       }
@@ -257,19 +260,18 @@ class SchemaAggregateFn {
 
     /** Extract multiple fields from an input {@link Row}. */
     private static class ExtractFieldsFunction extends SimpleFunction<Row, 
Row> {
-      private FieldAggregation fieldAggregation;
+      private final RowSelector rowSelector;
+      private final FieldAggregation fieldAggregation;
 
-      private ExtractFieldsFunction(FieldAggregation fieldAggregation) {
+      private ExtractFieldsFunction(Schema inputSchema, FieldAggregation 
fieldAggregation) {
+        rowSelector =
+            new RowSelectorContainer(inputSchema, 
fieldAggregation.fieldsToAggregate, true);
         this.fieldAggregation = fieldAggregation;
       }
 
       @Override
       public Row apply(Row row) {
-        return SelectHelpers.selectRow(
-            row,
-            fieldAggregation.fieldsToAggregate,
-            row.getSchema(),
-            fieldAggregation.inputSubSchema);
+        return rowSelector.select(row);
       }
     }
 
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java
index 0a67b37..ba6dc9b 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java
@@ -27,9 +27,10 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.RowSelector;
 import org.apache.beam.sdk.schemas.utils.SelectHelpers;
+import org.apache.beam.sdk.schemas.utils.SelectHelpers.RowSelectorContainer;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.FieldAccess;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -114,9 +115,10 @@ public class Select {
   }
 
   private static class SelectDoFn<T> extends DoFn<T, Row> {
-    private FieldAccessDescriptor fieldAccessDescriptor;
-    private Schema inputSchema;
-    private Schema outputSchema;
+    private final FieldAccessDescriptor fieldAccessDescriptor;
+    private final Schema inputSchema;
+    private final Schema outputSchema;
+    RowSelector rowSelector;
 
     // TODO: This should be the same as resolved so that Beam knows which 
fields
     // are being accessed. Currently Beam only supports wildcard descriptors.
@@ -129,11 +131,12 @@ public class Select {
       this.fieldAccessDescriptor = fieldAccessDescriptor;
       this.inputSchema = inputSchema;
       this.outputSchema = outputSchema;
+      this.rowSelector = new RowSelectorContainer(inputSchema, 
fieldAccessDescriptor, true);
     }
 
     @ProcessElement
     public void process(@FieldAccess("selectFields") @Element Row row, 
OutputReceiver<Row> r) {
-      r.output(SelectHelpers.selectRow(row, fieldAccessDescriptor, 
inputSchema, outputSchema));
+      r.output(rowSelector.select(row));
     }
   }
 
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyLocalVariableManager.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyLocalVariableManager.java
new file mode 100644
index 0000000..d988e76
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyLocalVariableManager.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.utils;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.description.type.TypeDescription.ForLoadedType;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.StackManipulation;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.StackManipulation.Compound;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.assign.TypeCasting;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
+
+/** This class allows managing local variables in a ByteBuddy-generated 
function. */
+class ByteBuddyLocalVariableManager {
+  private int nextLocalVariableIndex;
+
+  // Initialize with the number of arguments to the function (including the 
this parameter if
+  // applicable).
+  public ByteBuddyLocalVariableManager(int numFunctionArgs) {
+    nextLocalVariableIndex = numFunctionArgs;
+  }
+
+  // Create a new variable.
+  public int createVariable() {
+    return nextLocalVariableIndex++;
+  }
+
+  // Returns a StackManipulation to read a variable.
+  public StackManipulation readVariable(int variableIndex) {
+    checkArgument(variableIndex < nextLocalVariableIndex);
+    return MethodVariableAccess.REFERENCE.loadFrom(variableIndex);
+  }
+
+  // Returns a StackManipulation to read a variable, casting to the specified 
type.
+  StackManipulation readVariable(int variableIndex, Class<?> type) {
+    return new Compound(readVariable(variableIndex), TypeCasting.to(new 
ForLoadedType(type)));
+  }
+
+  // Returns a StackManipulation to write a variable.
+  StackManipulation writeVariable(int variableIndex) {
+    checkArgument(variableIndex < nextLocalVariableIndex);
+    return MethodVariableAccess.REFERENCE.storeAt(variableIndex);
+  }
+
+  // Returns a StackManipulation to copy a variable.
+  StackManipulation copy(int sourceVariableIndex, int destVariableIndex) {
+    return new Compound(readVariable(sourceVariableIndex), 
writeVariable(destVariableIndex));
+  }
+
+  // Returns a class that can be used to backup and restore a variable, using 
a newly-created temp
+  // variable.
+  BackupLocalVariable backupVariable(int variableToBackup) {
+    return new BackupLocalVariable(variableToBackup);
+  }
+
+  // Gets the total num variables in the function. Should be used when 
returning the Size parameter.
+  int getTotalNumVariables() {
+    return nextLocalVariableIndex;
+  }
+
+  public class BackupLocalVariable {
+    private final int variableToBackup; // Variable to save.
+    private final int tempVariable; // Temp variable we are saving in.
+
+    public BackupLocalVariable(int variableToBackup) {
+      this.variableToBackup = variableToBackup;
+      this.tempVariable = createVariable();
+    }
+
+    public StackManipulation backup() {
+      return copy(variableToBackup, tempVariable);
+    }
+
+    public StackManipulation restore() {
+      return copy(tempVariable, variableToBackup);
+    }
+  }
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/RowSelector.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/RowSelector.java
new file mode 100644
index 0000000..0ea251b
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/RowSelector.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.utils;
+
+import org.apache.beam.sdk.values.Row;
+
+/** A selector interface for extracting fields from a row. * */
+public interface RowSelector {
+  Row select(Row input);
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectByteBuddyHelpers.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectByteBuddyHelpers.java
new file mode 100644
index 0000000..a482bcb
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectByteBuddyHelpers.java
@@ -0,0 +1,843 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.utils;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Modifier;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor.FieldDescriptor;
+import 
org.apache.beam.sdk.schemas.FieldAccessDescriptor.FieldDescriptor.Qualifier;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.ByteBuddy;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.asm.AsmVisitorWrapper;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.description.method.MethodDescription;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.description.modifier.FieldManifestation;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.description.modifier.Visibility;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.description.type.TypeDescription.ForLoadedType;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.description.type.TypeDescription.Generic;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.dynamic.DynamicType;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.dynamic.scaffold.InstrumentedType;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.Implementation;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.Implementation.Context;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.ByteCodeAppender;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.ByteCodeAppender.Size;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.Duplication;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.Removal;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.StackManipulation;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.StackSize;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.assign.TypeCasting;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.collection.ArrayAccess;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.constant.IntegerConstant;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.member.FieldAccess;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.member.MethodInvocation;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.member.MethodReturn;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.jar.asm.ClassWriter;
+import org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.jar.asm.Label;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.jar.asm.MethodVisitor;
+import org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.jar.asm.Opcodes;
+import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.matcher.ElementMatchers;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+
+public class SelectByteBuddyHelpers {
+  private static final ByteBuddy BYTE_BUDDY = new ByteBuddy();
+  private static final String SELECT_SCHEMA_FIELD_NAME = "OUTPUTSCHEMA";
+
+  private static final ForLoadedType ROW_LOADED_TYPE = new 
ForLoadedType(Row.class);
+  private static final ForLoadedType LIST_LOADED_TYPE = new 
ForLoadedType(List.class);
+  private static final ForLoadedType LISTS_LOADED_TYPE = new 
ForLoadedType(Lists.class);
+  private static final ForLoadedType MAP_LOADED_TYPE = new 
ForLoadedType(Map.class);
+  private static final ForLoadedType MAPS_LOADED_TYPE = new 
ForLoadedType(Maps.class);
+  private static final ForLoadedType MAPENTRY_LOADED_TYPE = new 
ForLoadedType(Map.Entry.class);
+  private static final ForLoadedType ITERABLE_LOADED_TYPE = new 
ForLoadedType(Iterable.class);
+  private static final ForLoadedType ITERATOR_LOADED_TYPE = new 
ForLoadedType(Iterator.class);
+
+  private static final MethodDescription LIST_ADD =
+      LIST_LOADED_TYPE
+          .getDeclaredMethods()
+          
.filter(ElementMatchers.named("add").and(ElementMatchers.takesArguments(1)))
+          .getOnly();
+
+  private static final MethodDescription LISTS_NEW_ARRAYLIST =
+      LISTS_LOADED_TYPE
+          .getDeclaredMethods()
+          
.filter(ElementMatchers.named("newArrayList").and(ElementMatchers.takesArguments(0)))
+          .getOnly();
+
+  private static final MethodDescription MAP_ENTRYSET =
+      
MAP_LOADED_TYPE.getDeclaredMethods().filter(ElementMatchers.named("entrySet")).getOnly();
+
+  private static final MethodDescription MAP_PUT =
+      MAP_LOADED_TYPE
+          .getDeclaredMethods()
+          
.filter(ElementMatchers.named("put").and(ElementMatchers.takesArguments(2)))
+          .getOnly();
+
+  private static final MethodDescription MAPS_NEW_HASHMAP =
+      MAPS_LOADED_TYPE
+          .getDeclaredMethods()
+          
.filter(ElementMatchers.named("newHashMap").and(ElementMatchers.takesArguments(0)))
+          .getOnly();
+
+  private static final MethodDescription ITERABLE_ITERATOR =
+      
ITERABLE_LOADED_TYPE.getDeclaredMethods().filter(ElementMatchers.named("iterator")).getOnly();
+
+  private static final MethodDescription ITERATOR_HASNEXT =
+      
ITERATOR_LOADED_TYPE.getDeclaredMethods().filter(ElementMatchers.named("hasNext")).getOnly();
+
+  private static final MethodDescription ITERATOR_NEXT =
+      
ITERATOR_LOADED_TYPE.getDeclaredMethods().filter(ElementMatchers.named("next")).getOnly();
+
+  private static final MethodDescription MAPENTRY_GETKEY =
+      
MAPENTRY_LOADED_TYPE.getDeclaredMethods().filter(ElementMatchers.named("getKey")).getOnly();
+
+  private static final MethodDescription MAPENTRY_GETVALUE =
+      
MAPENTRY_LOADED_TYPE.getDeclaredMethods().filter(ElementMatchers.named("getValue")).getOnly();
+
+  @AutoValue
+  abstract static class SchemaAndDescriptor {
+    abstract Schema getSchema();
+
+    abstract FieldAccessDescriptor getFieldAccessDecriptor();
+
+    static SchemaAndDescriptor of(Schema schema, FieldAccessDescriptor 
fieldAccessDescriptor) {
+      return new AutoValue_SelectByteBuddyHelpers_SchemaAndDescriptor(
+          schema, fieldAccessDescriptor);
+    }
+  }
+
+  private static final Map<SchemaAndDescriptor, RowSelector> CACHED_SELECTORS =
+      Maps.newConcurrentMap();
+
+  public static RowSelector getRowSelector(
+      Schema inputSchema, FieldAccessDescriptor fieldAccessDescriptor) {
+    return CACHED_SELECTORS.computeIfAbsent(
+        SchemaAndDescriptor.of(inputSchema, fieldAccessDescriptor),
+        SelectByteBuddyHelpers::createRowSelector);
+  }
+
+  static RowSelector createRowSelector(SchemaAndDescriptor 
schemaAndDescriptor) {
+    Schema outputSchema =
+        SelectHelpers.getOutputSchema(
+            schemaAndDescriptor.getSchema(), 
schemaAndDescriptor.getFieldAccessDecriptor());
+    try {
+      DynamicType.Builder<RowSelector> builder =
+          BYTE_BUDDY
+              .subclass(RowSelector.class)
+              .method(ElementMatchers.named("select"))
+              .intercept(
+                  new SelectInstruction(
+                      schemaAndDescriptor.getFieldAccessDecriptor(),
+                      schemaAndDescriptor.getSchema(),
+                      outputSchema))
+              .defineField(
+                  SELECT_SCHEMA_FIELD_NAME,
+                  Schema.class,
+                  Visibility.PRIVATE,
+                  FieldManifestation.FINAL)
+              .defineConstructor(Modifier.PUBLIC)
+              .withParameters(Schema.class)
+              .intercept(new SelectInstructionConstructor());
+
+      return builder
+          .visit(new 
AsmVisitorWrapper.ForDeclaredMethods().writerFlags(ClassWriter.COMPUTE_FRAMES))
+          .make()
+          .load(Row.class.getClassLoader(), 
ClassLoadingStrategy.Default.INJECTION)
+          .getLoaded()
+          .getDeclaredConstructor(Schema.class)
+          .newInstance(outputSchema);
+    } catch (InstantiationException
+        | IllegalAccessException
+        | NoSuchMethodException
+        | InvocationTargetException e) {
+      throw new RuntimeException("Unable to generate");
+    }
+  }
+
+  private static class SelectInstructionConstructor implements Implementation {
+    @Override
+    public InstrumentedType prepare(InstrumentedType instrumentedType) {
+      return instrumentedType;
+    }
+
+    @Override
+    public ByteCodeAppender appender(final Target implementationTarget) {
+      return (methodVisitor, implementationContext, instrumentedMethod) -> {
+        int numLocals = 1 + instrumentedMethod.getParameters().size();
+        StackManipulation stackManipulation =
+            new StackManipulation.Compound(
+                // Call the base constructor for Object.
+                MethodVariableAccess.loadThis(),
+                Duplication.SINGLE,
+                MethodInvocation.invoke(
+                    new ForLoadedType(Object.class)
+                        .getDeclaredMethods()
+                        .filter(
+                            
ElementMatchers.isConstructor().and(ElementMatchers.takesArguments(0)))
+                        .getOnly()),
+                MethodVariableAccess.REFERENCE.loadFrom(1),
+                FieldAccess.forField(
+                        implementationTarget
+                            .getInstrumentedType()
+                            .getDeclaredFields()
+                            
.filter(ElementMatchers.named(SELECT_SCHEMA_FIELD_NAME))
+                            .getOnly())
+                    .write(),
+                MethodReturn.VOID);
+        StackManipulation.Size size = stackManipulation.apply(methodVisitor, 
implementationContext);
+        return new Size(size.getMaximalSize(), numLocals);
+      };
+    }
+  }
+
+  // Manage array creation and appending.
+  private static class ArrayManager {
+    private final StackManipulation.Size sizeDecreaseArrayStore;
+    private final int arraySize;
+    int currentArrayField = 0;
+
+    ArrayManager(int arraySize) {
+      // Size decreases by index and array reference (2) and array element (1, 
2) after each element
+      // storage.
+      this.sizeDecreaseArrayStore =
+          StackSize.DOUBLE
+              .toDecreasingSize()
+              .aggregate(Generic.OBJECT.getStackSize().toDecreasingSize());
+      this.arraySize = arraySize;
+    }
+
+    public StackManipulation createArray() {
+      return new StackManipulation() {
+        @Override
+        public boolean isValid() {
+          return true;
+        }
+
+        @Override
+        public Size apply(MethodVisitor methodVisitor, Context context) {
+          Size size = IntegerConstant.forValue(arraySize).apply(methodVisitor, 
context);
+          methodVisitor.visitTypeInsn(
+              Opcodes.ANEWARRAY, Generic.OBJECT.asErasure().getInternalName());
+          size = size.aggregate(StackSize.ZERO.toDecreasingSize());
+          return size;
+        }
+      };
+    }
+
+    public StackManipulation append(StackManipulation valueToWrite) {
+      return store(currentArrayField++, valueToWrite);
+    }
+
+    public StackManipulation store(int arrayIndexToWrite, StackManipulation 
valueToWrite) {
+      Preconditions.checkArgument(arrayIndexToWrite < arraySize);
+      return new StackManipulation() {
+        @Override
+        public boolean isValid() {
+          return true;
+        }
+
+        @Override
+        public Size apply(MethodVisitor methodVisitor, Context context) {
+          StackManipulation stackManipulation =
+              new StackManipulation.Compound(
+                  Duplication.SINGLE, // Duplicate the array reference
+                  IntegerConstant.forValue(arrayIndexToWrite),
+                  valueToWrite);
+          StackManipulation.Size size = stackManipulation.apply(methodVisitor, 
context);
+          methodVisitor.visitInsn(Opcodes.AASTORE);
+          size = size.aggregate(sizeDecreaseArrayStore);
+          return size;
+        }
+      };
+    }
+  }
+
+  private static class SelectInstruction implements Implementation {
+    private final FieldAccessDescriptor fieldAccessDescriptor;
+    private final Schema inputSchema;
+    private final Schema outputSchema;
+    private ByteBuddyLocalVariableManager localVariables;
+    private static final int INPUT_ROW_ARG = 1;
+    private final int currentSelectRowArg;
+    private final int fieldValueArg;
+
+    private static final int NUM_FUNCTION_ARGS = 2; // Args are this and the 
row to select.
+
+    public SelectInstruction(
+        FieldAccessDescriptor fieldAccessDescriptor, Schema inputSchema, 
Schema outputSchema) {
+      this.fieldAccessDescriptor = fieldAccessDescriptor;
+      this.inputSchema = inputSchema;
+      this.outputSchema = outputSchema;
+      this.localVariables = new 
ByteBuddyLocalVariableManager(NUM_FUNCTION_ARGS);
+      currentSelectRowArg = localVariables.createVariable();
+      fieldValueArg = localVariables.createVariable();
+    }
+
+    @Override
+    public InstrumentedType prepare(InstrumentedType instrumentedType) {
+      return instrumentedType;
+    }
+
+    @Override
+    public ByteCodeAppender appender(final Target implementationTarget) {
+      return (methodVisitor, implementationContext, instrumentedMethod) -> {
+        int numLocals = 1 + instrumentedMethod.getParameters().size();
+        StackManipulation.Size size = new StackManipulation.Size(0, numLocals);
+
+        // Create the Row builder.
+        StackManipulation createRowBuilder =
+            new StackManipulation.Compound(
+                MethodVariableAccess.loadThis(),
+                FieldAccess.forField(
+                        implementationTarget
+                            .getInstrumentedType()
+                            .getDeclaredFields()
+                            
.filter(ElementMatchers.named(SELECT_SCHEMA_FIELD_NAME))
+                            .getOnly())
+                    .read(),
+                MethodInvocation.invoke(
+                    new ForLoadedType(Row.class)
+                        .getDeclaredMethods()
+                        .filter(ElementMatchers.named("withSchema"))
+                        .getOnly()));
+        size = size.aggregate(createRowBuilder.apply(methodVisitor, 
implementationContext));
+
+        // Create a new object array with one entry for each field in the 
output schema.
+        ArrayManager arrayManager = new 
ArrayManager(outputSchema.getFieldCount());
+        size =
+            size.aggregate(arrayManager.createArray().apply(methodVisitor, 
implementationContext));
+
+        // Store the current input row into a local variable.
+        StackManipulation storeRowInLocalVariable =
+            localVariables.copy(INPUT_ROW_ARG, currentSelectRowArg);
+        size = size.aggregate(storeRowInLocalVariable.apply(methodVisitor, 
implementationContext));
+
+        // Fill the array values with those selected from the row.
+        size =
+            size.aggregate(
+                selectIntoArray(
+                    inputSchema,
+                    fieldAccessDescriptor,
+                    arrayManager,
+                    methodVisitor,
+                    implementationContext));
+
+        // Return the actual row.
+        StackManipulation attachToRow =
+            new StackManipulation.Compound(
+                MethodInvocation.invoke(
+                    new ForLoadedType(Row.Builder.class)
+                        .getDeclaredMethods()
+                        .filter(
+                            ElementMatchers.named("attachValues")
+                                
.and(ElementMatchers.takesArguments(Object[].class)))
+                        .getOnly()),
+                MethodInvocation.invoke(
+                    new ForLoadedType(Row.Builder.class)
+                        .getDeclaredMethods()
+                        .filter(ElementMatchers.named("build"))
+                        .getOnly()),
+                MethodReturn.REFERENCE);
+        size = size.aggregate(attachToRow.apply(methodVisitor, 
implementationContext));
+        return new Size(size.getMaximalSize(), 
localVariables.getTotalNumVariables());
+      };
+    }
+
+    // Selects a field from the current row being selected (the one stored in
+    // currentSelectRowArg).
+    private StackManipulation getCurrentRowFieldValue(int i) {
+      return new StackManipulation.Compound(
+          localVariables.readVariable(currentSelectRowArg, Row.class),
+          IntegerConstant.forValue(i),
+          MethodInvocation.invoke(
+              ROW_LOADED_TYPE
+                  .getDeclaredMethods()
+                  .filter(
+                      ElementMatchers.named("getValue")
+                          .and(ElementMatchers.takesArguments(int.class)))
+                  .getOnly()));
+    }
+
+    // Generate bytecode to select all specified fields from the Row. The 
current row being selected
+    // is stored
+    // in the local variable at position 2. The current array being written to 
is at the top of the
+    // stack.
+    StackManipulation.Size selectIntoArray(
+        Schema inputSchema,
+        FieldAccessDescriptor fieldAccessDescriptor,
+        ArrayManager arrayManager,
+        MethodVisitor methodVisitor,
+        Context implementationContext) {
+      StackManipulation.Size size = new StackManipulation.Size(0, 0);
+      if (fieldAccessDescriptor.getAllFields()) {
+        StackManipulation storeAllValues =
+            new StackManipulation.Compound(
+                IntStream.range(0, inputSchema.getFieldCount())
+                    .mapToObj(i -> 
arrayManager.append(getCurrentRowFieldValue(i)))
+                    .collect(Collectors.toList()));
+        return size.aggregate(storeAllValues.apply(methodVisitor, 
implementationContext));
+      }
+
+      StackManipulation storeAllValues =
+          new StackManipulation.Compound(
+              fieldAccessDescriptor.fieldIdsAccessed().stream()
+                  .map(i -> arrayManager.append(getCurrentRowFieldValue(i)))
+                  .collect(Collectors.toList()));
+      size = size.aggregate(storeAllValues.apply(methodVisitor, 
implementationContext));
+
+      for (Map.Entry<FieldDescriptor, FieldAccessDescriptor> nested :
+          fieldAccessDescriptor.getNestedFieldsAccessed().entrySet()) {
+        FieldDescriptor field = nested.getKey();
+        FieldAccessDescriptor nestedAccess = nested.getValue();
+        FieldType nestedInputType = 
inputSchema.getField(field.getFieldId()).getType();
+
+        StackManipulation.Size subSelectSize =
+            selectIntoArrayHelper(
+                field.getQualifiers(),
+                0,
+                field.getFieldId(),
+                nestedAccess,
+                nestedInputType,
+                arrayManager,
+                methodVisitor,
+                implementationContext);
+        size = size.aggregate(subSelectSize);
+      }
+      return size;
+    }
+
+    // Load the current field value to process. If a fieldId is specified, 
then we assume that we
+    // need to select
+    // a field from the Row stored in currentSelectRowArg. Otherwise we assume 
that the value has
+    // already been stored
+    // in fieldValueArg, and we read from there.
+    private StackManipulation loadFieldValue(int fieldId) {
+      if (fieldId != -1) {
+        // If a field id was specified, then load it.
+        return getCurrentRowFieldValue(fieldId);
+      } else {
+        // If not specified, then assume it was stored in this member variable.
+        return localVariables.readVariable(fieldValueArg);
+      }
+    }
+
+    private StackManipulation.Size selectIntoArrayHelper(
+        List<Qualifier> qualifiers,
+        int qualifierPosition,
+        int fieldId,
+        FieldAccessDescriptor fieldAccessDescriptor,
+        FieldType inputType,
+        ArrayManager arrayManager,
+        MethodVisitor methodVisitor,
+        Context implementationContext) {
+      StackManipulation.Size size = new StackManipulation.Size(0, 0);
+
+      if (qualifierPosition >= qualifiers.size()) {
+        // We have already constructed all arrays and maps. What remains must 
be a Row.
+        ByteBuddyLocalVariableManager.BackupLocalVariable backup =
+            localVariables.backupVariable(currentSelectRowArg);
+        StackManipulation updateLocalVariable =
+            new StackManipulation.Compound(
+                loadFieldValue(fieldId),
+                // Backup the current value of the currentSelectRowArg 
variable.
+                backup.backup(),
+                // Update the row local with the current stack value.
+                localVariables.writeVariable(currentSelectRowArg));
+        size = size.aggregate(updateLocalVariable.apply(methodVisitor, 
implementationContext));
+
+        size =
+            size.aggregate(
+                selectIntoArray(
+                    inputType.getRowSchema(),
+                    fieldAccessDescriptor,
+                    arrayManager,
+                    methodVisitor,
+                    implementationContext));
+        // Restore the value of currentSelectRowArg from the temp variable.
+        size = size.aggregate(backup.restore().apply(methodVisitor, 
implementationContext));
+        return size;
+      }
+
+      Qualifier qualifier = qualifiers.get(qualifierPosition);
+      switch (qualifier.getKind()) {
+        case LIST:
+          return size.aggregate(
+              processList(
+                  inputType,
+                  fieldAccessDescriptor,
+                  qualifiers,
+                  qualifierPosition,
+                  fieldId,
+                  arrayManager,
+                  methodVisitor,
+                  implementationContext));
+
+        case MAP:
+          return size.aggregate(
+              processMap(
+                  inputType,
+                  fieldAccessDescriptor,
+                  qualifiers,
+                  qualifierPosition,
+                  fieldId,
+                  arrayManager,
+                  methodVisitor,
+                  implementationContext));
+
+        default:
+          throw new RuntimeException("Unexpected type " + qualifier.getKind());
+      }
+    }
+
+    private StackManipulation.Size processList(
+        FieldType inputType,
+        FieldAccessDescriptor fieldAccessDescriptor,
+        List<Qualifier> qualifiers,
+        int qualifierPosition,
+        int fieldId,
+        ArrayManager arrayManager,
+        MethodVisitor methodVisitor,
+        Context implementationContext) {
+      StackManipulation.Size size = new StackManipulation.Size(0, 0);
+      FieldType nestedInputType = 
checkNotNull(inputType.getCollectionElementType());
+      Schema nestedSchema = getNestedSchema(nestedInputType, 
fieldAccessDescriptor);
+
+      // We create temp local variables to store all the arrays we create. 
Each field in
+      // nestedSchema corresponds to a separate array in the output.
+      int[] localVariablesForArrays =
+          IntStream.range(0, nestedSchema.getFieldCount())
+              .map(i -> localVariables.createVariable())
+              .toArray();
+
+      // Each field returned in nestedSchema will become it's own list in the 
output. So let's
+      // iterate and create arrays and store each one in the output.
+      StackManipulation createAllArrayLists =
+          new StackManipulation.Compound(
+              IntStream.range(0, nestedSchema.getFieldCount())
+                  .mapToObj(
+                      i -> {
+                        StackManipulation createArrayList =
+                            new StackManipulation.Compound(
+                                MethodInvocation.invoke(LISTS_NEW_ARRAYLIST),
+                                // Store the ArrayList in a local variable.
+                                Duplication.SINGLE,
+                                
localVariables.writeVariable(localVariablesForArrays[i]));
+                        return arrayManager.append(createArrayList);
+                      })
+                  .collect(Collectors.toList()));
+      size = size.aggregate(createAllArrayLists.apply(methodVisitor, 
implementationContext));
+
+      // Now iterate over the value, selecting from each element.
+      StackManipulation readListIterator =
+          new StackManipulation.Compound(
+              loadFieldValue(fieldId),
+              TypeCasting.to(new ForLoadedType(Iterable.class)),
+              MethodInvocation.invoke(ITERABLE_ITERATOR));
+      size = size.aggregate(readListIterator.apply(methodVisitor, 
implementationContext));
+
+      // Loop over the entire iterable.
+      Label startLoopLabel = new Label();
+      Label exitLoopLabel = new Label();
+      methodVisitor.visitLabel(startLoopLabel);
+
+      StackManipulation checkTerminationCondition =
+          new StackManipulation.Compound(
+              Duplication.SINGLE, MethodInvocation.invoke(ITERATOR_HASNEXT));
+      size = size.aggregate(checkTerminationCondition.apply(methodVisitor, 
implementationContext));
+      methodVisitor.visitJumpInsn(Opcodes.IFEQ, exitLoopLabel); // Exit the 
loop if !hasNext().
+      size = size.aggregate(StackSize.SINGLE.toDecreasingSize());
+
+      // Read the next value in the iterator.
+      StackManipulation getNext =
+          new StackManipulation.Compound(
+              Duplication.SINGLE, MethodInvocation.invoke(ITERATOR_NEXT));
+      size = size.aggregate(getNext.apply(methodVisitor, 
implementationContext));
+
+      ByteBuddyLocalVariableManager.BackupLocalVariable backupFieldValue = 
null;
+      if (fieldId == -1) {
+        // Save the field value arg before overwriting it, as we need it in 
subsequent
+        // iterations of the
+        // loop.
+        backupFieldValue = localVariables.backupVariable(fieldValueArg);
+        size =
+            size.aggregate(backupFieldValue.backup().apply(methodVisitor, 
implementationContext));
+      }
+
+      // Recursively generate select with one qualifier consumed. Since we 
pass in -1 as the
+      // field id, the iterator.next() value will be consumed as the value to 
select instead
+      // of accessing the fieldId.
+      size =
+          size.aggregate(
+              localVariables
+                  .writeVariable(fieldValueArg)
+                  .apply(methodVisitor, implementationContext));
+
+      // Select the fields of interest from this row.
+      // Create a new object array with one entry for each field selected from 
this row.
+      ArrayManager nestedArrayManager = new 
ArrayManager(nestedSchema.getFieldCount());
+      size =
+          size.aggregate(
+              nestedArrayManager.createArray().apply(methodVisitor, 
implementationContext));
+
+      size =
+          size.aggregate(
+              selectIntoArrayHelper(
+                  qualifiers,
+                  qualifierPosition + 1,
+                  -1,
+                  fieldAccessDescriptor,
+                  nestedInputType,
+                  nestedArrayManager,
+                  methodVisitor,
+                  implementationContext));
+
+      if (backupFieldValue != null) {
+        // Restore the field value.
+        size =
+            size.aggregate(backupFieldValue.restore().apply(methodVisitor, 
implementationContext));
+      }
+
+      // Now the top of the stack holds an array containing all the fields 
selected from the
+      // row.
+      // Now we need to distribute these fields into the separate arrays we 
created in the
+      // result.
+      // That is: if this select returned {a, b}, our final resulting schema 
will contain two
+      // lists,
+      // so we must add a to the first and b to the second list.
+
+      int tempVariableForField = localVariables.createVariable();
+      for (int i = 0; i < nestedSchema.getFieldCount(); ++i) {
+        // Extract the field and store it in a temp variable.
+        StackManipulation extractField =
+            new StackManipulation.Compound(
+                Duplication.SINGLE,
+                IntegerConstant.forValue(i),
+                ArrayAccess.REFERENCE.load(),
+                localVariables.writeVariable(tempVariableForField));
+        StackManipulation addItemToList =
+            new StackManipulation.Compound(
+                localVariables.readVariable(localVariablesForArrays[i]),
+                localVariables.readVariable(tempVariableForField),
+                MethodInvocation.invoke(LIST_ADD),
+                // Ignore return value from add().
+                Removal.SINGLE);
+        size =
+            size.aggregate(
+                new StackManipulation.Compound(extractField, addItemToList)
+                    .apply(methodVisitor, implementationContext));
+      }
+
+      // Pop the created array from the top of the stack.
+      size = size.aggregate(Removal.SINGLE.apply(methodVisitor, 
implementationContext));
+
+      // Go back to the beginning of the loop.
+      methodVisitor.visitJumpInsn(Opcodes.GOTO, startLoopLabel);
+      methodVisitor.visitLabel(exitLoopLabel);
+      // Remove the iterator from the top of the stack.
+      size = size.aggregate(Removal.SINGLE.apply(methodVisitor, 
implementationContext));
+      return size;
+    }
+
+    private StackManipulation.Size processMap(
+        FieldType inputType,
+        FieldAccessDescriptor fieldAccessDescriptor,
+        List<Qualifier> qualifiers,
+        int qualifierPosition,
+        int fieldId,
+        ArrayManager arrayManager,
+        MethodVisitor methodVisitor,
+        Context implementationContext) {
+      StackManipulation.Size size = new StackManipulation.Size(0, 0);
+      FieldType nestedInputType = checkNotNull(inputType.getMapValueType());
+      Schema nestedSchema = getNestedSchema(nestedInputType, 
fieldAccessDescriptor);
+
+      // We create temp local variables to store all the maps we create. Each 
field in
+      // nestedSchema corresponds to a separate array in the output.
+      int[] localVariablesForMaps =
+          IntStream.range(0, nestedSchema.getFieldCount())
+              .map(i -> localVariables.createVariable())
+              .toArray();
+
+      // Each field returned in nestedSchema will become it's own map in the 
output. So let's
+      // iterate and create arrays and store each one in the output.
+      StackManipulation createAllHashMaps =
+          new StackManipulation.Compound(
+              IntStream.range(0, nestedSchema.getFieldCount())
+                  .mapToObj(
+                      i -> {
+                        StackManipulation createHashMap =
+                            new StackManipulation.Compound(
+                                MethodInvocation.invoke(MAPS_NEW_HASHMAP),
+                                // Store the HashMap in a local variable.
+                                Duplication.SINGLE,
+                                
localVariables.writeVariable(localVariablesForMaps[i]));
+                        return arrayManager.append(createHashMap);
+                      })
+                  .collect(Collectors.toList()));
+      size = size.aggregate(createAllHashMaps.apply(methodVisitor, 
implementationContext));
+
+      // Now iterate over the value, selecting from each element.
+      StackManipulation readMapEntriesIterator =
+          new StackManipulation.Compound(
+              loadFieldValue(fieldId),
+              TypeCasting.to(new ForLoadedType(Map.class)),
+              MethodInvocation.invoke(MAP_ENTRYSET),
+              MethodInvocation.invoke(ITERABLE_ITERATOR));
+      size = size.aggregate(readMapEntriesIterator.apply(methodVisitor, 
implementationContext));
+
+      // Loop over the entire entrySet iterable.
+      Label startLoopLabel = new Label();
+      Label exitLoopLabel = new Label();
+      methodVisitor.visitLabel(startLoopLabel);
+
+      StackManipulation checkTerminationCondition =
+          new StackManipulation.Compound(
+              Duplication.SINGLE, MethodInvocation.invoke(ITERATOR_HASNEXT));
+      size = size.aggregate(checkTerminationCondition.apply(methodVisitor, 
implementationContext));
+      methodVisitor.visitJumpInsn(Opcodes.IFEQ, exitLoopLabel); // Exit the 
loop if !hasNext().
+      size = size.aggregate(StackSize.SINGLE.toDecreasingSize());
+
+      int keyVariable = localVariables.createVariable();
+      // Read the next value in the iterator.
+      StackManipulation getNext =
+          new StackManipulation.Compound(
+              Duplication.SINGLE,
+              MethodInvocation.invoke(ITERATOR_NEXT),
+              // Get the key and store it in the keyVariable.
+              Duplication.SINGLE,
+              MethodInvocation.invoke(MAPENTRY_GETKEY),
+              localVariables.writeVariable(keyVariable),
+              // Get the value and leave it on the stack.
+              MethodInvocation.invoke(MAPENTRY_GETVALUE));
+      size = size.aggregate(getNext.apply(methodVisitor, 
implementationContext));
+
+      ByteBuddyLocalVariableManager.BackupLocalVariable backupFieldValue = 
null;
+      if (fieldId == -1) {
+        // Save the field value arg before overwriting it, as we need it in 
subsequent
+        // iterations of the
+        // loop.
+        backupFieldValue = localVariables.backupVariable(fieldValueArg);
+        size =
+            size.aggregate(backupFieldValue.backup().apply(methodVisitor, 
implementationContext));
+      }
+
+      // Recursively generate select with one qualifier consumed. Since we 
pass in -1 as the
+      // field id, the iterator.next() value will be consumed as the value to 
select instead
+      // of accessing the fieldId.
+      size =
+          size.aggregate(
+              localVariables
+                  .writeVariable(fieldValueArg)
+                  .apply(methodVisitor, implementationContext));
+
+      // Select the fields of interest from this row.
+      // Create a new object array with one entry for each field selected from 
this row.
+      ArrayManager nestedArrayManager = new 
ArrayManager(nestedSchema.getFieldCount());
+      size =
+          size.aggregate(
+              nestedArrayManager.createArray().apply(methodVisitor, 
implementationContext));
+
+      size =
+          size.aggregate(
+              selectIntoArrayHelper(
+                  qualifiers,
+                  qualifierPosition + 1,
+                  -1,
+                  fieldAccessDescriptor,
+                  nestedInputType,
+                  nestedArrayManager,
+                  methodVisitor,
+                  implementationContext));
+
+      if (backupFieldValue != null) {
+        // Restore the field value.
+        size =
+            size.aggregate(backupFieldValue.restore().apply(methodVisitor, 
implementationContext));
+      }
+
+      // Now the top of the stack holds an array containing all the fields 
selected from the
+      // row.
+      // Now we need to distribute these fields into the separate arrays we 
created in the
+      // result.
+      // That is: if this select returned {a, b}, our final resulting schema 
will contain two
+      // maps,
+      // so we must add a to the first and b to the second map.
+
+      int tempVariableForField = localVariables.createVariable();
+      for (int i = 0; i < nestedSchema.getFieldCount(); ++i) {
+        // Extract the field and store it in a temp variable.
+        StackManipulation extractField =
+            new StackManipulation.Compound(
+                Duplication.SINGLE,
+                IntegerConstant.forValue(i),
+                ArrayAccess.REFERENCE.load(),
+                localVariables.writeVariable(tempVariableForField));
+        StackManipulation addItemToMap =
+            new StackManipulation.Compound(
+                localVariables.readVariable(localVariablesForMaps[i]),
+                localVariables.readVariable(keyVariable),
+                localVariables.readVariable(tempVariableForField),
+                MethodInvocation.invoke(MAP_PUT),
+                // Ignore return value from add().
+                Removal.SINGLE);
+        size =
+            size.aggregate(
+                new StackManipulation.Compound(extractField, addItemToMap)
+                    .apply(methodVisitor, implementationContext));
+      }
+
+      // Pop the created array from the top of the stack.
+      size = size.aggregate(Removal.SINGLE.apply(methodVisitor, 
implementationContext));
+
+      // Go back to the beginning of the loop.
+      methodVisitor.visitJumpInsn(Opcodes.GOTO, startLoopLabel);
+      methodVisitor.visitLabel(exitLoopLabel);
+      // Remove the iterator from the top of the stack.
+      size = size.aggregate(Removal.SINGLE.apply(methodVisitor, 
implementationContext));
+      return size;
+    }
+
+    private Schema getNestedSchema(
+        FieldType nestedInputType, FieldAccessDescriptor 
fieldAccessDescriptor) {
+      // When selecting multiple subelements under a list, we distribute the 
select
+      // resulting in multiple lists. For example, if there is a field "list" 
with type
+      // {a: string, b: int}[], selecting list.a, list.b results in a schema 
of type
+      // {a: string[], b: int[]}. This preserves the invariant that the name 
selected always
+      // appears in the top-level schema.
+      Schema tempSchema = Schema.builder().addField("a", 
nestedInputType).build();
+      FieldAccessDescriptor tempAccessDescriptor =
+          FieldAccessDescriptor.create()
+              .withNestedField("a", fieldAccessDescriptor)
+              .resolve(tempSchema);
+      return SelectHelpers.getOutputSchema(tempSchema, tempAccessDescriptor);
+    }
+  }
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java
index 7fc3ec5..8526ba8 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.schemas.utils;
 import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
 
+import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -179,12 +180,45 @@ public class SelectHelpers {
     }
   }
 
+  public static RowSelector getRowSelectorOptimized(
+      Schema inputSchema, FieldAccessDescriptor fieldAccessDescriptor) {
+    return SelectByteBuddyHelpers.getRowSelector(inputSchema, 
fieldAccessDescriptor);
+  }
+
+  public static RowSelector getRowSelector(
+      Schema inputSchema, FieldAccessDescriptor fieldAccessDescriptor) {
+    Schema outputSchema = getOutputSchema(inputSchema, fieldAccessDescriptor);
+    return input -> selectRow(input, fieldAccessDescriptor, outputSchema);
+  }
+
+  public static class RowSelectorContainer implements RowSelector, 
Serializable {
+    private transient RowSelector rowSelector;
+    private final Schema inputSchema;
+    private final FieldAccessDescriptor fieldAccessDescriptor;
+    private final boolean optimized;
+
+    public RowSelectorContainer(
+        Schema inputSchema, FieldAccessDescriptor fieldAccessDescriptor, 
boolean optimized) {
+      this.inputSchema = inputSchema;
+      this.fieldAccessDescriptor = fieldAccessDescriptor;
+      this.optimized = optimized;
+    }
+
+    @Override
+    public Row select(Row input) {
+      if (this.rowSelector == null) {
+        rowSelector =
+            optimized
+                ? getRowSelectorOptimized(inputSchema, fieldAccessDescriptor)
+                : getRowSelector(inputSchema, fieldAccessDescriptor);
+      }
+      return rowSelector.select(input);
+    }
+  }
+
   /** Select a sub Row from an input Row. */
-  public static Row selectRow(
-      Row input,
-      FieldAccessDescriptor fieldAccessDescriptor,
-      Schema inputSchema,
-      Schema outputSchema) {
+  private static Row selectRow(
+      Row input, FieldAccessDescriptor fieldAccessDescriptor, Schema 
outputSchema) {
     if (fieldAccessDescriptor.getAllFields()) {
       return input;
     }
@@ -195,7 +229,7 @@ public class SelectHelpers {
   }
 
   /** Select out of a given {@link Row} object. */
-  public static void selectIntoRow(
+  private static void selectIntoRow(
       Row input, Row.Builder output, FieldAccessDescriptor 
fieldAccessDescriptor) {
     if (fieldAccessDescriptor.getAllFields()) {
       output.addValues(input.getValues());
@@ -214,8 +248,9 @@ public class SelectHelpers {
       FieldAccessDescriptor nestedAccess = nested.getValue();
       FieldType nestedInputType = 
input.getSchema().getField(field.getFieldId()).getType();
       FieldType nestedOutputType = 
outputSchema.getField(output.nextFieldId()).getType();
-      selectIntoRowHelper(
+      selectIntoRowWithQualifiers(
           field.getQualifiers(),
+          0,
           input.getValue(field.getFieldId()),
           output,
           nestedAccess,
@@ -224,26 +259,6 @@ public class SelectHelpers {
     }
   }
 
-  @SuppressWarnings("unchecked")
-  private static void selectIntoRowHelper(
-      List<Qualifier> qualifiers,
-      Object value,
-      Row.Builder output,
-      FieldAccessDescriptor fieldAccessDescriptor,
-      FieldType inputType,
-      FieldType outputType) {
-    if (qualifiers.isEmpty()) {
-      Row row = (Row) value;
-      selectIntoRow(row, output, fieldAccessDescriptor);
-      return;
-    }
-
-    // There are qualifiers. That means that the result will be either a list 
or a map, so
-    // construct the result and add that to our Row.
-    selectIntoRowWithQualifiers(
-        qualifiers, 0, value, output, fieldAccessDescriptor, inputType, 
outputType);
-  }
-
   private static void selectIntoRowWithQualifiers(
       List<Qualifier> qualifiers,
       int qualifierPosition,
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnSchemaInformation.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnSchemaInformation.java
index b131365..1c54e73 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnSchemaInformation.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnSchemaInformation.java
@@ -30,7 +30,8 @@ import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.SchemaCoder;
 import 
org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.DefaultTypeConversionsFactory;
 import org.apache.beam.sdk.schemas.utils.ConvertHelpers;
-import org.apache.beam.sdk.schemas.utils.SelectHelpers;
+import org.apache.beam.sdk.schemas.utils.RowSelector;
+import org.apache.beam.sdk.schemas.utils.SelectHelpers.RowSelectorContainer;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
@@ -147,6 +148,7 @@ public abstract class DoFnSchemaInformation implements 
Serializable {
     private final FieldAccessDescriptor selectDescriptor;
     private final Schema selectOutputSchema;
     private final boolean unbox;
+    private final RowSelector rowSelector;
 
     private ConversionFunction(
         Schema inputSchema,
@@ -161,6 +163,7 @@ public abstract class DoFnSchemaInformation implements 
Serializable {
       this.selectDescriptor = selectDescriptor;
       this.selectOutputSchema = selectOutputSchema;
       this.unbox = unbox;
+      this.rowSelector = new RowSelectorContainer(inputSchema, 
selectDescriptor, true);
     }
 
     public static <InputT, OutputT> ConversionFunction of(
@@ -177,8 +180,7 @@ public abstract class DoFnSchemaInformation implements 
Serializable {
     @Override
     public OutputT apply(InputT input) {
       Row row = toRowFunction.apply(input);
-      Row selected =
-          SelectHelpers.selectRow(row, selectDescriptor, inputSchema, 
selectOutputSchema);
+      Row selected = rowSelector.select(row);
       if (unbox) {
         selected = selected.getRow(0);
       }
@@ -199,6 +201,7 @@ public abstract class DoFnSchemaInformation implements 
Serializable {
     private final FieldType primitiveType;
     private final TypeDescriptor<?> primitiveOutputType;
     private transient SerializableFunction<InputT, OutputT> conversionFunction;
+    private final RowSelector rowSelector;
 
     private UnboxingConversionFunction(
         Schema inputSchema,
@@ -212,6 +215,7 @@ public abstract class DoFnSchemaInformation implements 
Serializable {
       this.selectOutputSchema = selectOutputSchema;
       this.primitiveType = selectOutputSchema.getField(0).getType();
       this.primitiveOutputType = primitiveOutputType;
+      this.rowSelector = new RowSelectorContainer(inputSchema, 
selectDescriptor, true);
     }
 
     public static <InputT, OutputT> UnboxingConversionFunction of(
@@ -227,8 +231,7 @@ public abstract class DoFnSchemaInformation implements 
Serializable {
     @Override
     public OutputT apply(InputT input) {
       Row row = toRowFunction.apply(input);
-      Row selected =
-          SelectHelpers.selectRow(row, selectDescriptor, inputSchema, 
selectOutputSchema);
+      Row selected = rowSelector.select(row);
       return getConversionFunction().apply(selected.getValue(0));
     }
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
index 9c35657..3f881c0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
@@ -607,6 +607,10 @@ public abstract class Row implements Serializable {
       return this;
     }
 
+    public Builder attachValues(Object... values) {
+      return attachValues(Arrays.asList(values));
+    }
+
     public Builder withFieldValueGetters(
         Factory<List<FieldValueGetter>> fieldValueGetterFactory, Object 
getterTarget) {
       this.fieldValueGetterFactory = fieldValueGetterFactory;
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java
index 783d282..afa8186 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java
@@ -19,6 +19,8 @@ package org.apache.beam.sdk.schemas.utils;
 
 import static org.junit.Assert.assertEquals;
 
+import java.util.Arrays;
+import java.util.Collection;
 import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
@@ -27,9 +29,20 @@ import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Immutabl
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 /** Tests for {@link SelectHelpers}. */
+@RunWith(Parameterized.class)
 public class SelectHelpersTest {
+  @Parameterized.Parameter public boolean useOptimizedSelect;
+
+  @Parameters
+  public static Collection<Object[]> data() {
+    return Arrays.asList(new Object[][] {{false}, {true}});
+  }
+
   static final Schema FLAT_SCHEMA =
       Schema.builder()
           .addStringField("field1")
@@ -101,6 +114,14 @@ public class SelectHelpersTest {
           .addValue(ImmutableMap.of(1, ImmutableList.of(FLAT_ROW)))
           .build();
 
+  Row selectRow(Schema inputScema, FieldAccessDescriptor 
fieldAccessDescriptor, Row row) {
+    RowSelector rowSelector =
+        useOptimizedSelect
+            ? SelectHelpers.getRowSelectorOptimized(inputScema, 
fieldAccessDescriptor)
+            : SelectHelpers.getRowSelector(inputScema, fieldAccessDescriptor);
+    return rowSelector.select(row);
+  }
+
   @Test
   public void testSelectAll() {
     FieldAccessDescriptor fieldAccessDescriptor =
@@ -108,7 +129,7 @@ public class SelectHelpersTest {
     Schema outputSchema = SelectHelpers.getOutputSchema(FLAT_SCHEMA, 
fieldAccessDescriptor);
     assertEquals(FLAT_SCHEMA, outputSchema);
 
-    Row row = SelectHelpers.selectRow(FLAT_ROW, fieldAccessDescriptor, 
FLAT_SCHEMA, outputSchema);
+    Row row = selectRow(FLAT_SCHEMA, fieldAccessDescriptor, FLAT_ROW);
     assertEquals(FLAT_ROW, row);
   }
 
@@ -120,7 +141,7 @@ public class SelectHelpersTest {
     Schema expectedSchema = Schema.builder().addStringField("field1").build();
     assertEquals(expectedSchema, outputSchema);
 
-    Row row = SelectHelpers.selectRow(FLAT_ROW, fieldAccessDescriptor, 
FLAT_SCHEMA, outputSchema);
+    Row row = selectRow(FLAT_SCHEMA, fieldAccessDescriptor, FLAT_ROW);
     Row expectedRow = Row.withSchema(expectedSchema).addValue("first").build();
     assertEquals(expectedRow, row);
   }
@@ -133,7 +154,7 @@ public class SelectHelpersTest {
     Schema expectedSchema = 
Schema.builder().addStringField("field_extra").build();
     assertEquals(expectedSchema, outputSchema);
 
-    Row row = SelectHelpers.selectRow(FLAT_ROW, fieldAccessDescriptor, 
FLAT_SCHEMA, outputSchema);
+    Row row = selectRow(FLAT_SCHEMA, fieldAccessDescriptor, FLAT_ROW);
     Row expectedRow = Row.withSchema(expectedSchema).addValue("extra").build();
     assertEquals(expectedRow, row);
   }
@@ -147,7 +168,7 @@ public class SelectHelpersTest {
         
Schema.builder().addStringField("field1").addDoubleField("field3").build();
     assertEquals(expectedSchema, outputSchema);
 
-    Row row = SelectHelpers.selectRow(FLAT_ROW, fieldAccessDescriptor, 
FLAT_SCHEMA, outputSchema);
+    Row row = selectRow(FLAT_SCHEMA, fieldAccessDescriptor, FLAT_ROW);
     Row expectedRow = Row.withSchema(expectedSchema).addValues("first", 
3.14).build();
     assertEquals(expectedRow, row);
   }
@@ -160,8 +181,7 @@ public class SelectHelpersTest {
     Schema expectedSchema = Schema.builder().addRowField("nested", 
FLAT_SCHEMA).build();
     assertEquals(expectedSchema, outputSchema);
 
-    Row row =
-        SelectHelpers.selectRow(NESTED_ROW, fieldAccessDescriptor, 
NESTED_SCHEMA, outputSchema);
+    Row row = selectRow(NESTED_SCHEMA, fieldAccessDescriptor, NESTED_ROW);
     Row expectedRow = 
Row.withSchema(expectedSchema).addValue(FLAT_ROW).build();
     assertEquals(expectedRow, row);
   }
@@ -174,8 +194,7 @@ public class SelectHelpersTest {
     Schema expectedSchema = Schema.builder().addStringField("field1").build();
     assertEquals(expectedSchema, outputSchema);
 
-    Row row =
-        SelectHelpers.selectRow(NESTED_ROW, fieldAccessDescriptor, 
NESTED_SCHEMA, outputSchema);
+    Row row = selectRow(NESTED_SCHEMA, fieldAccessDescriptor, NESTED_ROW);
     Row expectedRow = Row.withSchema(expectedSchema).addValue("first").build();
     assertEquals(expectedRow, row);
   }
@@ -187,8 +206,7 @@ public class SelectHelpersTest {
     Schema outputSchema = SelectHelpers.getOutputSchema(NESTED_SCHEMA, 
fieldAccessDescriptor);
     assertEquals(FLAT_SCHEMA, outputSchema);
 
-    Row row =
-        SelectHelpers.selectRow(NESTED_ROW, fieldAccessDescriptor, 
NESTED_SCHEMA, outputSchema);
+    Row row = selectRow(NESTED_SCHEMA, fieldAccessDescriptor, NESTED_ROW);
     assertEquals(FLAT_ROW, row);
   }
 
@@ -201,9 +219,7 @@ public class SelectHelpersTest {
     Schema expectedSchema = Schema.builder().addStringField("field1").build();
     assertEquals(expectedSchema, outputSchema);
 
-    Row row =
-        SelectHelpers.selectRow(
-            DOUBLE_NESTED_ROW, fieldAccessDescriptor, DOUBLE_NESTED_SCHEMA, 
outputSchema);
+    Row row = selectRow(DOUBLE_NESTED_SCHEMA, fieldAccessDescriptor, 
DOUBLE_NESTED_ROW);
     Row expectedRow = Row.withSchema(expectedSchema).addValue("first").build();
     assertEquals(expectedRow, row);
   }
@@ -217,7 +233,7 @@ public class SelectHelpersTest {
         Schema.builder().addArrayField("primitiveArray", 
FieldType.INT32).build();
     assertEquals(expectedSchema, outputSchema);
 
-    Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor, 
ARRAY_SCHEMA, outputSchema);
+    Row row = selectRow(ARRAY_SCHEMA, fieldAccessDescriptor, ARRAY_ROW);
     Row expectedRow = Row.withSchema(expectedSchema).addArray(1, 2).build();
     assertEquals(expectedRow, row);
   }
@@ -231,8 +247,7 @@ public class SelectHelpersTest {
         Schema.builder().addIterableField("primitiveIter", 
FieldType.INT32).build();
     assertEquals(expectedSchema, outputSchema);
 
-    Row row =
-        SelectHelpers.selectRow(ITERABLE_ROW, fieldAccessDescriptor, 
ITERABLE_SCHEMA, outputSchema);
+    Row row = selectRow(ITERABLE_SCHEMA, fieldAccessDescriptor, ITERABLE_ROW);
     Row expectedRow = 
Row.withSchema(expectedSchema).addIterable(ImmutableList.of(1, 2)).build();
     assertEquals(expectedRow, row);
   }
@@ -246,7 +261,7 @@ public class SelectHelpersTest {
         Schema.builder().addArrayField("rowArray", 
FieldType.row(FLAT_SCHEMA)).build();
     assertEquals(expectedSchema, outputSchema);
 
-    Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor, 
ARRAY_SCHEMA, outputSchema);
+    Row row = selectRow(ARRAY_SCHEMA, fieldAccessDescriptor, ARRAY_ROW);
     Row expectedRow = Row.withSchema(expectedSchema).addArray(FLAT_ROW, 
FLAT_ROW).build();
     assertEquals(expectedRow, row);
   }
@@ -260,8 +275,7 @@ public class SelectHelpersTest {
         Schema.builder().addIterableField("rowIter", 
FieldType.row(FLAT_SCHEMA)).build();
     assertEquals(expectedSchema, outputSchema);
 
-    Row row =
-        SelectHelpers.selectRow(ITERABLE_ROW, fieldAccessDescriptor, 
ITERABLE_SCHEMA, outputSchema);
+    Row row = selectRow(ITERABLE_SCHEMA, fieldAccessDescriptor, ITERABLE_ROW);
     Row expectedRow =
         Row.withSchema(expectedSchema).addIterable(ImmutableList.of(FLAT_ROW, 
FLAT_ROW)).build();
     assertEquals(expectedRow, row);
@@ -276,7 +290,7 @@ public class SelectHelpersTest {
     Schema expectedSchema = Schema.builder().addArrayField("field1", 
FieldType.STRING).build();
     assertEquals(expectedSchema, outputSchema);
 
-    Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor, 
ARRAY_SCHEMA, outputSchema);
+    Row row = selectRow(ARRAY_SCHEMA, fieldAccessDescriptor, ARRAY_ROW);
     Row expectedRow = Row.withSchema(expectedSchema).addArray("first", 
"first").build();
     assertEquals(expectedRow, row);
   }
@@ -290,8 +304,7 @@ public class SelectHelpersTest {
     Schema expectedSchema = Schema.builder().addIterableField("field1", 
FieldType.STRING).build();
     assertEquals(expectedSchema, outputSchema);
 
-    Row row =
-        SelectHelpers.selectRow(ITERABLE_ROW, fieldAccessDescriptor, 
ITERABLE_SCHEMA, outputSchema);
+    Row row = selectRow(ITERABLE_SCHEMA, fieldAccessDescriptor, ITERABLE_ROW);
     Row expectedRow =
         Row.withSchema(expectedSchema).addIterable(ImmutableList.of("first", 
"first")).build();
     assertEquals(expectedRow, row);
@@ -307,7 +320,7 @@ public class SelectHelpersTest {
         Schema.builder().addArrayField("field1", 
FieldType.array(FieldType.STRING)).build();
     assertEquals(expectedSchema, outputSchema);
 
-    Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor, 
ARRAY_SCHEMA, outputSchema);
+    Row row = selectRow(ARRAY_SCHEMA, fieldAccessDescriptor, ARRAY_ROW);
 
     Row expectedRow =
         Row.withSchema(expectedSchema)
@@ -326,8 +339,7 @@ public class SelectHelpersTest {
         Schema.builder().addIterableField("field1", 
FieldType.iterable(FieldType.STRING)).build();
     assertEquals(expectedSchema, outputSchema);
 
-    Row row =
-        SelectHelpers.selectRow(ITERABLE_ROW, fieldAccessDescriptor, 
ITERABLE_SCHEMA, outputSchema);
+    Row row = selectRow(ITERABLE_SCHEMA, fieldAccessDescriptor, ITERABLE_ROW);
 
     Row expectedRow =
         Row.withSchema(expectedSchema)
@@ -347,7 +359,7 @@ public class SelectHelpersTest {
     Schema expectedSchema = Schema.builder().addArrayField("field1", 
FieldType.STRING).build();
     assertEquals(expectedSchema, outputSchema);
 
-    Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor, 
ARRAY_SCHEMA, outputSchema);
+    Row row = selectRow(ARRAY_SCHEMA, fieldAccessDescriptor, ARRAY_ROW);
     Row expectedRow = Row.withSchema(expectedSchema).addArray("first", 
"first").build();
     assertEquals(expectedRow, row);
   }
@@ -363,8 +375,7 @@ public class SelectHelpersTest {
     Schema expectedSchema = Schema.builder().addIterableField("field1", 
FieldType.STRING).build();
     assertEquals(expectedSchema, outputSchema);
 
-    Row row =
-        SelectHelpers.selectRow(ITERABLE_ROW, fieldAccessDescriptor, 
ITERABLE_SCHEMA, outputSchema);
+    Row row = selectRow(ITERABLE_SCHEMA, fieldAccessDescriptor, ITERABLE_ROW);
     Row expectedRow =
         Row.withSchema(expectedSchema).addIterable(ImmutableList.of("first", 
"first")).build();
     assertEquals(expectedRow, row);
@@ -381,7 +392,7 @@ public class SelectHelpersTest {
         Schema.builder().addMapField("field1", FieldType.INT32, 
FieldType.STRING).build();
     assertEquals(expectedSchema, outputSchema);
 
-    Row row = SelectHelpers.selectRow(MAP_ROW, fieldAccessDescriptor, 
MAP_SCHEMA, outputSchema);
+    Row row = selectRow(MAP_SCHEMA, fieldAccessDescriptor, MAP_ROW);
     Row expectedRow = 
Row.withSchema(expectedSchema).addValue(ImmutableMap.of(1, "first")).build();
     assertEquals(expectedRow, row);
   }
@@ -400,7 +411,7 @@ public class SelectHelpersTest {
             .build();
     assertEquals(expectedSchema, outputSchema);
 
-    Row row = SelectHelpers.selectRow(MAP_ROW, fieldAccessDescriptor, 
MAP_SCHEMA, outputSchema);
+    Row row = selectRow(MAP_SCHEMA, fieldAccessDescriptor, MAP_ROW);
     Row expectedRow =
         Row.withSchema(expectedSchema)
             .addValue(ImmutableMap.of(1, FLAT_ROW.getValue(0)))
@@ -423,9 +434,7 @@ public class SelectHelpersTest {
             .build();
     assertEquals(expectedSchema, outputSchema);
 
-    Row row =
-        SelectHelpers.selectRow(
-            MAP_ARRAY_ROW, fieldAccessDescriptor, MAP_ARRAY_SCHEMA, 
outputSchema);
+    Row row = selectRow(MAP_ARRAY_SCHEMA, fieldAccessDescriptor, 
MAP_ARRAY_ROW);
 
     Row expectedRow =
         Row.withSchema(expectedSchema)
@@ -446,9 +455,7 @@ public class SelectHelpersTest {
             .build();
     assertEquals(expectedSchema, outputSchema);
 
-    Row row =
-        SelectHelpers.selectRow(
-            MAP_ITERABLE_ROW, fieldAccessDescriptor, MAP_ITERABLE_SCHEMA, 
outputSchema);
+    Row row = selectRow(MAP_ITERABLE_SCHEMA, fieldAccessDescriptor, 
MAP_ITERABLE_ROW);
 
     Row expectedRow =
         Row.withSchema(expectedSchema)
@@ -472,7 +479,7 @@ public class SelectHelpersTest {
 
     Schema outputSchema = SelectHelpers.getOutputSchema(f3, 
fieldAccessDescriptor);
 
-    Row out = SelectHelpers.selectRow(r3, fieldAccessDescriptor, 
r3.getSchema(), outputSchema);
+    Row out = selectRow(f3, fieldAccessDescriptor, r3);
 
     assertEquals(f2, outputSchema);
     assertEquals(r2, out);
@@ -495,7 +502,7 @@ public class SelectHelpersTest {
 
     Schema outputSchema = SelectHelpers.getOutputSchema(f4, 
fieldAccessDescriptor);
 
-    Row out = SelectHelpers.selectRow(r4, fieldAccessDescriptor, 
r4.getSchema(), outputSchema);
+    Row out = selectRow(f4, fieldAccessDescriptor, r4);
 
     assertEquals(f3, outputSchema);
     assertEquals(r3, out);
@@ -519,7 +526,7 @@ public class SelectHelpersTest {
     Schema expectedSchema =
         Schema.builder().addArrayField("f0", 
FieldType.array(FieldType.STRING)).build();
     assertEquals(expectedSchema, outputSchema);
-    Row out = SelectHelpers.selectRow(r4, fieldAccessDescriptor, 
r4.getSchema(), outputSchema);
+    Row out = selectRow(f4, fieldAccessDescriptor, r4);
     Row expected =
         Row.withSchema(outputSchema)
             .addArray(Lists.newArrayList("first", "first"), 
Lists.newArrayList("first", "first"))
@@ -545,7 +552,7 @@ public class SelectHelpersTest {
     Schema expectedSchema =
         Schema.builder().addIterableField("f0", 
FieldType.iterable(FieldType.STRING)).build();
     assertEquals(expectedSchema, outputSchema);
-    Row out = SelectHelpers.selectRow(r4, fieldAccessDescriptor, 
r4.getSchema(), outputSchema);
+    Row out = selectRow(f4, fieldAccessDescriptor, r4);
     Row expected =
         Row.withSchema(outputSchema)
             .addIterable(

Reply via email to