This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8abc90b Merge pull request #11025: [BEAM-6428] Improve select
performance with codegen
8abc90b is described below
commit 8abc90bb13fd0b8c46a26fc59752161b012a9f03
Author: reuvenlax <[email protected]>
AuthorDate: Wed Mar 4 10:57:01 2020 -0800
Merge pull request #11025: [BEAM-6428] Improve select performance with
codegen
---
.../beam/sdk/schemas/transforms/CoGroup.java | 8 +-
.../apache/beam/sdk/schemas/transforms/Filter.java | 26 +-
.../apache/beam/sdk/schemas/transforms/Group.java | 8 +-
.../sdk/schemas/transforms/SchemaAggregateFn.java | 46 +-
.../apache/beam/sdk/schemas/transforms/Select.java | 13 +-
.../utils/ByteBuddyLocalVariableManager.java | 93 +++
.../apache/beam/sdk/schemas/utils/RowSelector.java | 25 +
.../sdk/schemas/utils/SelectByteBuddyHelpers.java | 843 +++++++++++++++++++++
.../beam/sdk/schemas/utils/SelectHelpers.java | 69 +-
.../beam/sdk/transforms/DoFnSchemaInformation.java | 13 +-
.../main/java/org/apache/beam/sdk/values/Row.java | 4 +
.../beam/sdk/schemas/utils/SelectHelpersTest.java | 87 ++-
12 files changed, 1126 insertions(+), 109 deletions(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java
index 8386eac..cde224a 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java
@@ -35,7 +35,9 @@ import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.utils.RowSelector;
import org.apache.beam.sdk.schemas.utils.SelectHelpers;
+import org.apache.beam.sdk.schemas.utils.SelectHelpers.RowSelectorContainer;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -399,10 +401,12 @@ public class CoGroup {
"extractKey" + tag,
ParDo.of(
new DoFn<T, KV<Row, Row>>() {
+ private RowSelector rowSelector =
+ new RowSelectorContainer(schema, keyFields, true);
+
@ProcessElement
public void process(@Element Row row,
OutputReceiver<KV<Row, Row>> o) {
- o.output(
- KV.of(SelectHelpers.selectRow(row, keyFields,
schema, keySchema), row));
+ o.output(KV.of(rowSelector.select(row), row));
}
}))
.setCoder(KvCoder.of(SchemaCoder.of(keySchema),
SchemaCoder.of(schema)));
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java
index 60f86b1..0015835 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.RowSelector;
import org.apache.beam.sdk.schemas.utils.SelectHelpers;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
@@ -87,6 +88,8 @@ public class Filter {
/** Implementation of the filter. */
public static class Inner<T> extends PTransform<PCollection<T>,
PCollection<T>> {
+ private RowSelector rowSelector;
+
@AutoValue
abstract static class FilterDescription<FieldT> implements Serializable {
abstract FieldAccessDescriptor getFieldAccessDescriptor();
@@ -98,6 +101,9 @@ public class Filter {
abstract boolean getSelectsSingleField();
+ @Nullable
+ abstract Schema getInputSchema();
+
abstract Builder<FieldT> toBuilder();
@AutoValue.Builder
@@ -111,8 +117,20 @@ public class Filter {
abstract Builder<FieldT> setSelectsSingleField(boolean unbox);
+ abstract Builder<FieldT> setInputSchema(@Nullable Schema inputSchema);
+
abstract FilterDescription<FieldT> build();
}
+
+ transient RowSelector rowSelector;
+
+ public RowSelector getRowSelector() {
+ if (rowSelector == null) {
+ rowSelector =
+ SelectHelpers.getRowSelectorOptimized(getInputSchema(),
getFieldAccessDescriptor());
+ }
+ return rowSelector;
+ }
}
private final List<FilterDescription<?>> filters = Lists.newArrayList();
@@ -179,6 +197,7 @@ public class Filter {
.map(
f ->
f.toBuilder()
+ .setInputSchema(inputSchema)
.setSelectedSchema(
SelectHelpers.getOutputSchema(
inputSchema, f.getFieldAccessDescriptor()))
@@ -191,12 +210,7 @@ public class Filter {
@ProcessElement
public void process(@Element Row row, OutputReceiver<Row> o) {
for (FilterDescription filter : resolvedFilters) {
- Row selected =
- SelectHelpers.selectRow(
- row,
- filter.getFieldAccessDescriptor(),
- inputSchema,
- filter.getSelectedSchema());
+ Row selected = filter.getRowSelector().select(row);
if (filter.getSelectsSingleField()) {
SerializableFunction<Object, Boolean> predicate =
(SerializableFunction<Object, Boolean>)
filter.getPredicate();
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java
index f0e891c..2f95753 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java
@@ -27,7 +27,9 @@ import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.utils.RowSelector;
import org.apache.beam.sdk.schemas.utils.SelectHelpers;
+import org.apache.beam.sdk.schemas.utils.SelectHelpers.RowSelectorContainer;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DoFn;
@@ -495,18 +497,20 @@ public class Group {
}
class ToKv extends PTransform<PCollection<InputT>, PCollection<KV<Row,
Iterable<Row>>>> {
+ private RowSelector rowSelector;
+
@Override
public PCollection<KV<Row, Iterable<Row>>> expand(PCollection<InputT>
input) {
Schema schema = input.getSchema();
FieldAccessDescriptor resolved =
getFieldAccessDescriptor().resolve(schema);
+ rowSelector = new RowSelectorContainer(schema, resolved, true);
Schema keySchema = getKeySchema(schema);
return input
.apply("toRow", Convert.toRows())
.apply(
"selectKeys",
- WithKeys.of((Row e) -> SelectHelpers.selectRow(e, resolved,
schema, keySchema))
- .withKeyType(TypeDescriptors.rows()))
+ WithKeys.of((Row e) ->
rowSelector.select(e)).withKeyType(TypeDescriptors.rows()))
.setCoder(KvCoder.of(SchemaCoder.of(keySchema),
SchemaCoder.of(schema)))
.apply(GroupByKey.create());
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java
index a07afdd..0ee2942 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java
@@ -32,7 +32,9 @@ import org.apache.beam.sdk.schemas.FieldTypeDescriptors;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.utils.RowSelector;
import org.apache.beam.sdk.schemas.utils.SelectHelpers;
+import org.apache.beam.sdk.schemas.utils.SelectHelpers.RowSelectorContainer;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.CombineFns;
import org.apache.beam.sdk.transforms.CombineFns.CoCombineResult;
@@ -158,12 +160,12 @@ class SchemaAggregateFn {
SimpleFunction<Row, ?> extractFunction;
Coder extractOutputCoder;
if (fieldAggregation.fieldsToAggregate.referencesSingleField()) {
- extractFunction = new ExtractSingleFieldFunction(fieldAggregation);
+ extractFunction = new ExtractSingleFieldFunction(inputSchema,
fieldAggregation);
extractOutputCoder =
SchemaCoder.coderForFieldType(
fieldAggregation.flattenedInputSubSchema.getField(0).getType());
} else {
- extractFunction = new ExtractFieldsFunction(fieldAggregation);
+ extractFunction = new ExtractFieldsFunction(inputSchema,
fieldAggregation);
extractOutputCoder = SchemaCoder.of(fieldAggregation.inputSubSchema);
}
if (i == 0) {
@@ -229,27 +231,28 @@ class SchemaAggregateFn {
/** Extract a single field from an input {@link Row}. */
private static class ExtractSingleFieldFunction<OutputT> extends
SimpleFunction<Row, OutputT> {
+ private final RowSelector rowSelector;
+ @Nullable private final RowSelector flatteningSelector;
private final FieldAggregation fieldAggregation;
- private ExtractSingleFieldFunction(FieldAggregation fieldAggregation) {
+ private ExtractSingleFieldFunction(Schema inputSchema, FieldAggregation
fieldAggregation) {
+ rowSelector =
+ new RowSelectorContainer(inputSchema,
fieldAggregation.fieldsToAggregate, true);
+ flatteningSelector =
+ fieldAggregation.needsFlattening
+ ? new RowSelectorContainer(
+ fieldAggregation.inputSubSchema,
+ fieldAggregation.flattenedFieldAccessDescriptor,
+ true)
+ : null;
this.fieldAggregation = fieldAggregation;
}
@Override
public OutputT apply(Row row) {
- Row selected =
- SelectHelpers.selectRow(
- row,
- fieldAggregation.fieldsToAggregate,
- row.getSchema(),
- fieldAggregation.inputSubSchema);
+ Row selected = rowSelector.select(row);
if (fieldAggregation.needsFlattening) {
- selected =
- SelectHelpers.selectRow(
- selected,
- fieldAggregation.flattenedFieldAccessDescriptor,
- row.getSchema(),
- fieldAggregation.flattenedInputSubSchema);
+ selected = flatteningSelector.select(selected);
}
return selected.getValue(0);
}
@@ -257,19 +260,18 @@ class SchemaAggregateFn {
/** Extract multiple fields from an input {@link Row}. */
private static class ExtractFieldsFunction extends SimpleFunction<Row,
Row> {
- private FieldAggregation fieldAggregation;
+ private final RowSelector rowSelector;
+ private final FieldAggregation fieldAggregation;
- private ExtractFieldsFunction(FieldAggregation fieldAggregation) {
+ private ExtractFieldsFunction(Schema inputSchema, FieldAggregation
fieldAggregation) {
+ rowSelector =
+ new RowSelectorContainer(inputSchema,
fieldAggregation.fieldsToAggregate, true);
this.fieldAggregation = fieldAggregation;
}
@Override
public Row apply(Row row) {
- return SelectHelpers.selectRow(
- row,
- fieldAggregation.fieldsToAggregate,
- row.getSchema(),
- fieldAggregation.inputSubSchema);
+ return rowSelector.select(row);
}
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java
index 0a67b37..ba6dc9b 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java
@@ -27,9 +27,10 @@ import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.RowSelector;
import org.apache.beam.sdk.schemas.utils.SelectHelpers;
+import org.apache.beam.sdk.schemas.utils.SelectHelpers.RowSelectorContainer;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.FieldAccess;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -114,9 +115,10 @@ public class Select {
}
private static class SelectDoFn<T> extends DoFn<T, Row> {
- private FieldAccessDescriptor fieldAccessDescriptor;
- private Schema inputSchema;
- private Schema outputSchema;
+ private final FieldAccessDescriptor fieldAccessDescriptor;
+ private final Schema inputSchema;
+ private final Schema outputSchema;
+ RowSelector rowSelector;
// TODO: This should be the same as resolved so that Beam knows which
fields
// are being accessed. Currently Beam only supports wildcard descriptors.
@@ -129,11 +131,12 @@ public class Select {
this.fieldAccessDescriptor = fieldAccessDescriptor;
this.inputSchema = inputSchema;
this.outputSchema = outputSchema;
+ this.rowSelector = new RowSelectorContainer(inputSchema,
fieldAccessDescriptor, true);
}
@ProcessElement
public void process(@FieldAccess("selectFields") @Element Row row,
OutputReceiver<Row> r) {
- r.output(SelectHelpers.selectRow(row, fieldAccessDescriptor,
inputSchema, outputSchema));
+ r.output(rowSelector.select(row));
}
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyLocalVariableManager.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyLocalVariableManager.java
new file mode 100644
index 0000000..d988e76
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyLocalVariableManager.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.utils;
+
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.description.type.TypeDescription.ForLoadedType;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.StackManipulation;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.StackManipulation.Compound;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.assign.TypeCasting;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
+
+/** This class allows managing local variables in a ByteBuddy-generated
function. */
+class ByteBuddyLocalVariableManager {
+ private int nextLocalVariableIndex;
+
+ // Initialize with the number of arguments to the function (including the
this parameter if
+ // applicable).
+ public ByteBuddyLocalVariableManager(int numFunctionArgs) {
+ nextLocalVariableIndex = numFunctionArgs;
+ }
+
+ // Create a new variable.
+ public int createVariable() {
+ return nextLocalVariableIndex++;
+ }
+
+ // Returns a StackManipulation to read a variable.
+ public StackManipulation readVariable(int variableIndex) {
+ checkArgument(variableIndex < nextLocalVariableIndex);
+ return MethodVariableAccess.REFERENCE.loadFrom(variableIndex);
+ }
+
+ // Returns a StackManipulation to read a variable, casting to the specified
type.
+ StackManipulation readVariable(int variableIndex, Class<?> type) {
+ return new Compound(readVariable(variableIndex), TypeCasting.to(new
ForLoadedType(type)));
+ }
+
+ // Returns a StackManipulation to write a variable.
+ StackManipulation writeVariable(int variableIndex) {
+ checkArgument(variableIndex < nextLocalVariableIndex);
+ return MethodVariableAccess.REFERENCE.storeAt(variableIndex);
+ }
+
+ // Returns a StackManipulation to copy a variable.
+ StackManipulation copy(int sourceVariableIndex, int destVariableIndex) {
+ return new Compound(readVariable(sourceVariableIndex),
writeVariable(destVariableIndex));
+ }
+
+ // Returns a class that can be used to backup and restore a variable, using
a newly-created temp
+ // variable.
+ BackupLocalVariable backupVariable(int variableToBackup) {
+ return new BackupLocalVariable(variableToBackup);
+ }
+
+ // Gets the total num variables in the function. Should be used when
returning the Size parameter.
+ int getTotalNumVariables() {
+ return nextLocalVariableIndex;
+ }
+
+ public class BackupLocalVariable {
+ private final int variableToBackup; // Variable to save.
+ private final int tempVariable; // Temp variable we are saving in.
+
+ public BackupLocalVariable(int variableToBackup) {
+ this.variableToBackup = variableToBackup;
+ this.tempVariable = createVariable();
+ }
+
+ public StackManipulation backup() {
+ return copy(variableToBackup, tempVariable);
+ }
+
+ public StackManipulation restore() {
+ return copy(tempVariable, variableToBackup);
+ }
+ }
+}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/RowSelector.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/RowSelector.java
new file mode 100644
index 0000000..0ea251b
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/RowSelector.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.utils;
+
+import org.apache.beam.sdk.values.Row;
+
+/** A selector interface for extracting fields from a row. * */
+public interface RowSelector {
+ Row select(Row input);
+}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectByteBuddyHelpers.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectByteBuddyHelpers.java
new file mode 100644
index 0000000..a482bcb
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectByteBuddyHelpers.java
@@ -0,0 +1,843 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.utils;
+
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Modifier;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor.FieldDescriptor;
+import
org.apache.beam.sdk.schemas.FieldAccessDescriptor.FieldDescriptor.Qualifier;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.ByteBuddy;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.asm.AsmVisitorWrapper;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.description.method.MethodDescription;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.description.modifier.FieldManifestation;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.description.modifier.Visibility;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.description.type.TypeDescription.ForLoadedType;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.description.type.TypeDescription.Generic;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.dynamic.DynamicType;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.dynamic.scaffold.InstrumentedType;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.Implementation;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.Implementation.Context;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.ByteCodeAppender;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.ByteCodeAppender.Size;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.Duplication;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.Removal;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.StackManipulation;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.StackSize;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.assign.TypeCasting;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.collection.ArrayAccess;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.constant.IntegerConstant;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.member.FieldAccess;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.member.MethodInvocation;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.member.MethodReturn;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.jar.asm.ClassWriter;
+import org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.jar.asm.Label;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.jar.asm.MethodVisitor;
+import org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.jar.asm.Opcodes;
+import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.matcher.ElementMatchers;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+
+public class SelectByteBuddyHelpers {
+ private static final ByteBuddy BYTE_BUDDY = new ByteBuddy();
+ private static final String SELECT_SCHEMA_FIELD_NAME = "OUTPUTSCHEMA";
+
+ private static final ForLoadedType ROW_LOADED_TYPE = new
ForLoadedType(Row.class);
+ private static final ForLoadedType LIST_LOADED_TYPE = new
ForLoadedType(List.class);
+ private static final ForLoadedType LISTS_LOADED_TYPE = new
ForLoadedType(Lists.class);
+ private static final ForLoadedType MAP_LOADED_TYPE = new
ForLoadedType(Map.class);
+ private static final ForLoadedType MAPS_LOADED_TYPE = new
ForLoadedType(Maps.class);
+ private static final ForLoadedType MAPENTRY_LOADED_TYPE = new
ForLoadedType(Map.Entry.class);
+ private static final ForLoadedType ITERABLE_LOADED_TYPE = new
ForLoadedType(Iterable.class);
+ private static final ForLoadedType ITERATOR_LOADED_TYPE = new
ForLoadedType(Iterator.class);
+
+ private static final MethodDescription LIST_ADD =
+ LIST_LOADED_TYPE
+ .getDeclaredMethods()
+
.filter(ElementMatchers.named("add").and(ElementMatchers.takesArguments(1)))
+ .getOnly();
+
+ private static final MethodDescription LISTS_NEW_ARRAYLIST =
+ LISTS_LOADED_TYPE
+ .getDeclaredMethods()
+
.filter(ElementMatchers.named("newArrayList").and(ElementMatchers.takesArguments(0)))
+ .getOnly();
+
+ private static final MethodDescription MAP_ENTRYSET =
+
MAP_LOADED_TYPE.getDeclaredMethods().filter(ElementMatchers.named("entrySet")).getOnly();
+
+ private static final MethodDescription MAP_PUT =
+ MAP_LOADED_TYPE
+ .getDeclaredMethods()
+
.filter(ElementMatchers.named("put").and(ElementMatchers.takesArguments(2)))
+ .getOnly();
+
+ private static final MethodDescription MAPS_NEW_HASHMAP =
+ MAPS_LOADED_TYPE
+ .getDeclaredMethods()
+
.filter(ElementMatchers.named("newHashMap").and(ElementMatchers.takesArguments(0)))
+ .getOnly();
+
+ private static final MethodDescription ITERABLE_ITERATOR =
+
ITERABLE_LOADED_TYPE.getDeclaredMethods().filter(ElementMatchers.named("iterator")).getOnly();
+
+ private static final MethodDescription ITERATOR_HASNEXT =
+
ITERATOR_LOADED_TYPE.getDeclaredMethods().filter(ElementMatchers.named("hasNext")).getOnly();
+
+ private static final MethodDescription ITERATOR_NEXT =
+
ITERATOR_LOADED_TYPE.getDeclaredMethods().filter(ElementMatchers.named("next")).getOnly();
+
+ private static final MethodDescription MAPENTRY_GETKEY =
+
MAPENTRY_LOADED_TYPE.getDeclaredMethods().filter(ElementMatchers.named("getKey")).getOnly();
+
+ private static final MethodDescription MAPENTRY_GETVALUE =
+
MAPENTRY_LOADED_TYPE.getDeclaredMethods().filter(ElementMatchers.named("getValue")).getOnly();
+
+ @AutoValue
+ abstract static class SchemaAndDescriptor {
+ abstract Schema getSchema();
+
+ abstract FieldAccessDescriptor getFieldAccessDecriptor();
+
+ static SchemaAndDescriptor of(Schema schema, FieldAccessDescriptor
fieldAccessDescriptor) {
+ return new AutoValue_SelectByteBuddyHelpers_SchemaAndDescriptor(
+ schema, fieldAccessDescriptor);
+ }
+ }
+
+ private static final Map<SchemaAndDescriptor, RowSelector> CACHED_SELECTORS =
+ Maps.newConcurrentMap();
+
+ public static RowSelector getRowSelector(
+ Schema inputSchema, FieldAccessDescriptor fieldAccessDescriptor) {
+ return CACHED_SELECTORS.computeIfAbsent(
+ SchemaAndDescriptor.of(inputSchema, fieldAccessDescriptor),
+ SelectByteBuddyHelpers::createRowSelector);
+ }
+
+ static RowSelector createRowSelector(SchemaAndDescriptor
schemaAndDescriptor) {
+ Schema outputSchema =
+ SelectHelpers.getOutputSchema(
+ schemaAndDescriptor.getSchema(),
schemaAndDescriptor.getFieldAccessDecriptor());
+ try {
+ DynamicType.Builder<RowSelector> builder =
+ BYTE_BUDDY
+ .subclass(RowSelector.class)
+ .method(ElementMatchers.named("select"))
+ .intercept(
+ new SelectInstruction(
+ schemaAndDescriptor.getFieldAccessDecriptor(),
+ schemaAndDescriptor.getSchema(),
+ outputSchema))
+ .defineField(
+ SELECT_SCHEMA_FIELD_NAME,
+ Schema.class,
+ Visibility.PRIVATE,
+ FieldManifestation.FINAL)
+ .defineConstructor(Modifier.PUBLIC)
+ .withParameters(Schema.class)
+ .intercept(new SelectInstructionConstructor());
+
+ return builder
+ .visit(new
AsmVisitorWrapper.ForDeclaredMethods().writerFlags(ClassWriter.COMPUTE_FRAMES))
+ .make()
+ .load(Row.class.getClassLoader(),
ClassLoadingStrategy.Default.INJECTION)
+ .getLoaded()
+ .getDeclaredConstructor(Schema.class)
+ .newInstance(outputSchema);
+ } catch (InstantiationException
+ | IllegalAccessException
+ | NoSuchMethodException
+ | InvocationTargetException e) {
+ throw new RuntimeException("Unable to generate");
+ }
+ }
+
+ private static class SelectInstructionConstructor implements Implementation {
+ @Override
+ public InstrumentedType prepare(InstrumentedType instrumentedType) {
+ return instrumentedType;
+ }
+
+ @Override
+ public ByteCodeAppender appender(final Target implementationTarget) {
+ return (methodVisitor, implementationContext, instrumentedMethod) -> {
+ int numLocals = 1 + instrumentedMethod.getParameters().size();
+ StackManipulation stackManipulation =
+ new StackManipulation.Compound(
+ // Call the base constructor for Object.
+ MethodVariableAccess.loadThis(),
+ Duplication.SINGLE,
+ MethodInvocation.invoke(
+ new ForLoadedType(Object.class)
+ .getDeclaredMethods()
+ .filter(
+
ElementMatchers.isConstructor().and(ElementMatchers.takesArguments(0)))
+ .getOnly()),
+ MethodVariableAccess.REFERENCE.loadFrom(1),
+ FieldAccess.forField(
+ implementationTarget
+ .getInstrumentedType()
+ .getDeclaredFields()
+
.filter(ElementMatchers.named(SELECT_SCHEMA_FIELD_NAME))
+ .getOnly())
+ .write(),
+ MethodReturn.VOID);
+ StackManipulation.Size size = stackManipulation.apply(methodVisitor,
implementationContext);
+ return new Size(size.getMaximalSize(), numLocals);
+ };
+ }
+ }
+
+ // Manage array creation and appending.
+ private static class ArrayManager {
+ private final StackManipulation.Size sizeDecreaseArrayStore;
+ private final int arraySize;
+ int currentArrayField = 0;
+
+ ArrayManager(int arraySize) {
+ // Size decreases by index and array reference (2) and array element (1,
2) after each element
+ // storage.
+ this.sizeDecreaseArrayStore =
+ StackSize.DOUBLE
+ .toDecreasingSize()
+ .aggregate(Generic.OBJECT.getStackSize().toDecreasingSize());
+ this.arraySize = arraySize;
+ }
+
+ public StackManipulation createArray() {
+ return new StackManipulation() {
+ @Override
+ public boolean isValid() {
+ return true;
+ }
+
+ @Override
+ public Size apply(MethodVisitor methodVisitor, Context context) {
+ Size size = IntegerConstant.forValue(arraySize).apply(methodVisitor,
context);
+ methodVisitor.visitTypeInsn(
+ Opcodes.ANEWARRAY, Generic.OBJECT.asErasure().getInternalName());
+ size = size.aggregate(StackSize.ZERO.toDecreasingSize());
+ return size;
+ }
+ };
+ }
+
+ public StackManipulation append(StackManipulation valueToWrite) {
+ return store(currentArrayField++, valueToWrite);
+ }
+
+ public StackManipulation store(int arrayIndexToWrite, StackManipulation
valueToWrite) {
+ Preconditions.checkArgument(arrayIndexToWrite < arraySize);
+ return new StackManipulation() {
+ @Override
+ public boolean isValid() {
+ return true;
+ }
+
+ @Override
+ public Size apply(MethodVisitor methodVisitor, Context context) {
+ StackManipulation stackManipulation =
+ new StackManipulation.Compound(
+ Duplication.SINGLE, // Duplicate the array reference
+ IntegerConstant.forValue(arrayIndexToWrite),
+ valueToWrite);
+ StackManipulation.Size size = stackManipulation.apply(methodVisitor,
context);
+ methodVisitor.visitInsn(Opcodes.AASTORE);
+ size = size.aggregate(sizeDecreaseArrayStore);
+ return size;
+ }
+ };
+ }
+ }
+
+ private static class SelectInstruction implements Implementation {
+ private final FieldAccessDescriptor fieldAccessDescriptor;
+ private final Schema inputSchema;
+ private final Schema outputSchema;
+ private ByteBuddyLocalVariableManager localVariables;
+ private static final int INPUT_ROW_ARG = 1;
+ private final int currentSelectRowArg;
+ private final int fieldValueArg;
+
+ private static final int NUM_FUNCTION_ARGS = 2; // Args are this and the
row to select.
+
+ public SelectInstruction(
+ FieldAccessDescriptor fieldAccessDescriptor, Schema inputSchema,
Schema outputSchema) {
+ this.fieldAccessDescriptor = fieldAccessDescriptor;
+ this.inputSchema = inputSchema;
+ this.outputSchema = outputSchema;
+ this.localVariables = new
ByteBuddyLocalVariableManager(NUM_FUNCTION_ARGS);
+ currentSelectRowArg = localVariables.createVariable();
+ fieldValueArg = localVariables.createVariable();
+ }
+
+ @Override
+ public InstrumentedType prepare(InstrumentedType instrumentedType) {
+ return instrumentedType;
+ }
+
+ @Override
+ public ByteCodeAppender appender(final Target implementationTarget) {
+ return (methodVisitor, implementationContext, instrumentedMethod) -> {
+ int numLocals = 1 + instrumentedMethod.getParameters().size();
+ StackManipulation.Size size = new StackManipulation.Size(0, numLocals);
+
+ // Create the Row builder.
+ StackManipulation createRowBuilder =
+ new StackManipulation.Compound(
+ MethodVariableAccess.loadThis(),
+ FieldAccess.forField(
+ implementationTarget
+ .getInstrumentedType()
+ .getDeclaredFields()
+
.filter(ElementMatchers.named(SELECT_SCHEMA_FIELD_NAME))
+ .getOnly())
+ .read(),
+ MethodInvocation.invoke(
+ new ForLoadedType(Row.class)
+ .getDeclaredMethods()
+ .filter(ElementMatchers.named("withSchema"))
+ .getOnly()));
+ size = size.aggregate(createRowBuilder.apply(methodVisitor,
implementationContext));
+
+ // Create a new object array with one entry for each field in the
output schema.
+ ArrayManager arrayManager = new
ArrayManager(outputSchema.getFieldCount());
+ size =
+ size.aggregate(arrayManager.createArray().apply(methodVisitor,
implementationContext));
+
+ // Store the current input row into a local variable.
+ StackManipulation storeRowInLocalVariable =
+ localVariables.copy(INPUT_ROW_ARG, currentSelectRowArg);
+ size = size.aggregate(storeRowInLocalVariable.apply(methodVisitor,
implementationContext));
+
+ // Fill the array values with those selected from the row.
+ size =
+ size.aggregate(
+ selectIntoArray(
+ inputSchema,
+ fieldAccessDescriptor,
+ arrayManager,
+ methodVisitor,
+ implementationContext));
+
+ // Return the actual row.
+ StackManipulation attachToRow =
+ new StackManipulation.Compound(
+ MethodInvocation.invoke(
+ new ForLoadedType(Row.Builder.class)
+ .getDeclaredMethods()
+ .filter(
+ ElementMatchers.named("attachValues")
+
.and(ElementMatchers.takesArguments(Object[].class)))
+ .getOnly()),
+ MethodInvocation.invoke(
+ new ForLoadedType(Row.Builder.class)
+ .getDeclaredMethods()
+ .filter(ElementMatchers.named("build"))
+ .getOnly()),
+ MethodReturn.REFERENCE);
+ size = size.aggregate(attachToRow.apply(methodVisitor,
implementationContext));
+ return new Size(size.getMaximalSize(),
localVariables.getTotalNumVariables());
+ };
+ }
+
+ // Selects a field from the current row being selected (the one stored in
+ // currentSelectRowArg).
+ private StackManipulation getCurrentRowFieldValue(int i) {
+ return new StackManipulation.Compound(
+ localVariables.readVariable(currentSelectRowArg, Row.class),
+ IntegerConstant.forValue(i),
+ MethodInvocation.invoke(
+ ROW_LOADED_TYPE
+ .getDeclaredMethods()
+ .filter(
+ ElementMatchers.named("getValue")
+ .and(ElementMatchers.takesArguments(int.class)))
+ .getOnly()));
+ }
+
+ // Generate bytecode to select all specified fields from the Row. The
current row being selected
+ // is stored
+ // in the local variable at position 2. The current array being written to
is at the top of the
+ // stack.
+ StackManipulation.Size selectIntoArray(
+ Schema inputSchema,
+ FieldAccessDescriptor fieldAccessDescriptor,
+ ArrayManager arrayManager,
+ MethodVisitor methodVisitor,
+ Context implementationContext) {
+ StackManipulation.Size size = new StackManipulation.Size(0, 0);
+ if (fieldAccessDescriptor.getAllFields()) {
+ StackManipulation storeAllValues =
+ new StackManipulation.Compound(
+ IntStream.range(0, inputSchema.getFieldCount())
+ .mapToObj(i ->
arrayManager.append(getCurrentRowFieldValue(i)))
+ .collect(Collectors.toList()));
+ return size.aggregate(storeAllValues.apply(methodVisitor,
implementationContext));
+ }
+
+ StackManipulation storeAllValues =
+ new StackManipulation.Compound(
+ fieldAccessDescriptor.fieldIdsAccessed().stream()
+ .map(i -> arrayManager.append(getCurrentRowFieldValue(i)))
+ .collect(Collectors.toList()));
+ size = size.aggregate(storeAllValues.apply(methodVisitor,
implementationContext));
+
+ for (Map.Entry<FieldDescriptor, FieldAccessDescriptor> nested :
+ fieldAccessDescriptor.getNestedFieldsAccessed().entrySet()) {
+ FieldDescriptor field = nested.getKey();
+ FieldAccessDescriptor nestedAccess = nested.getValue();
+ FieldType nestedInputType =
inputSchema.getField(field.getFieldId()).getType();
+
+ StackManipulation.Size subSelectSize =
+ selectIntoArrayHelper(
+ field.getQualifiers(),
+ 0,
+ field.getFieldId(),
+ nestedAccess,
+ nestedInputType,
+ arrayManager,
+ methodVisitor,
+ implementationContext);
+ size = size.aggregate(subSelectSize);
+ }
+ return size;
+ }
+
+ // Load the current field value to process. If a fieldId is specified,
then we assume that we
+ // need to select
+ // a field from the Row stored in currentSelectRowArg. Otherwise we assume
that the value has
+ // already been stored
+ // in fieldValueArg, and we read from there.
+ private StackManipulation loadFieldValue(int fieldId) {
+ if (fieldId != -1) {
+ // If a field id was specified, then load it.
+ return getCurrentRowFieldValue(fieldId);
+ } else {
+ // If not specified, then assume it was stored in this member variable.
+ return localVariables.readVariable(fieldValueArg);
+ }
+ }
+
+ private StackManipulation.Size selectIntoArrayHelper(
+ List<Qualifier> qualifiers,
+ int qualifierPosition,
+ int fieldId,
+ FieldAccessDescriptor fieldAccessDescriptor,
+ FieldType inputType,
+ ArrayManager arrayManager,
+ MethodVisitor methodVisitor,
+ Context implementationContext) {
+ StackManipulation.Size size = new StackManipulation.Size(0, 0);
+
+ if (qualifierPosition >= qualifiers.size()) {
+ // We have already constructed all arrays and maps. What remains must
be a Row.
+ ByteBuddyLocalVariableManager.BackupLocalVariable backup =
+ localVariables.backupVariable(currentSelectRowArg);
+ StackManipulation updateLocalVariable =
+ new StackManipulation.Compound(
+ loadFieldValue(fieldId),
+ // Backup the current value of the currentSelectRowArg
variable.
+ backup.backup(),
+ // Update the row local with the current stack value.
+ localVariables.writeVariable(currentSelectRowArg));
+ size = size.aggregate(updateLocalVariable.apply(methodVisitor,
implementationContext));
+
+ size =
+ size.aggregate(
+ selectIntoArray(
+ inputType.getRowSchema(),
+ fieldAccessDescriptor,
+ arrayManager,
+ methodVisitor,
+ implementationContext));
+ // Restore the value of currentSelectRowArg from the temp variable.
+ size = size.aggregate(backup.restore().apply(methodVisitor,
implementationContext));
+ return size;
+ }
+
+ Qualifier qualifier = qualifiers.get(qualifierPosition);
+ switch (qualifier.getKind()) {
+ case LIST:
+ return size.aggregate(
+ processList(
+ inputType,
+ fieldAccessDescriptor,
+ qualifiers,
+ qualifierPosition,
+ fieldId,
+ arrayManager,
+ methodVisitor,
+ implementationContext));
+
+ case MAP:
+ return size.aggregate(
+ processMap(
+ inputType,
+ fieldAccessDescriptor,
+ qualifiers,
+ qualifierPosition,
+ fieldId,
+ arrayManager,
+ methodVisitor,
+ implementationContext));
+
+ default:
+ throw new RuntimeException("Unexpected type " + qualifier.getKind());
+ }
+ }
+
+ private StackManipulation.Size processList(
+ FieldType inputType,
+ FieldAccessDescriptor fieldAccessDescriptor,
+ List<Qualifier> qualifiers,
+ int qualifierPosition,
+ int fieldId,
+ ArrayManager arrayManager,
+ MethodVisitor methodVisitor,
+ Context implementationContext) {
+ StackManipulation.Size size = new StackManipulation.Size(0, 0);
+ FieldType nestedInputType =
checkNotNull(inputType.getCollectionElementType());
+ Schema nestedSchema = getNestedSchema(nestedInputType,
fieldAccessDescriptor);
+
+ // We create temp local variables to store all the arrays we create.
Each field in
+ // nestedSchema corresponds to a separate array in the output.
+ int[] localVariablesForArrays =
+ IntStream.range(0, nestedSchema.getFieldCount())
+ .map(i -> localVariables.createVariable())
+ .toArray();
+
+ // Each field returned in nestedSchema will become it's own list in the
output. So let's
+ // iterate and create arrays and store each one in the output.
+ StackManipulation createAllArrayLists =
+ new StackManipulation.Compound(
+ IntStream.range(0, nestedSchema.getFieldCount())
+ .mapToObj(
+ i -> {
+ StackManipulation createArrayList =
+ new StackManipulation.Compound(
+ MethodInvocation.invoke(LISTS_NEW_ARRAYLIST),
+ // Store the ArrayList in a local variable.
+ Duplication.SINGLE,
+
localVariables.writeVariable(localVariablesForArrays[i]));
+ return arrayManager.append(createArrayList);
+ })
+ .collect(Collectors.toList()));
+ size = size.aggregate(createAllArrayLists.apply(methodVisitor,
implementationContext));
+
+ // Now iterate over the value, selecting from each element.
+ StackManipulation readListIterator =
+ new StackManipulation.Compound(
+ loadFieldValue(fieldId),
+ TypeCasting.to(new ForLoadedType(Iterable.class)),
+ MethodInvocation.invoke(ITERABLE_ITERATOR));
+ size = size.aggregate(readListIterator.apply(methodVisitor,
implementationContext));
+
+ // Loop over the entire iterable.
+ Label startLoopLabel = new Label();
+ Label exitLoopLabel = new Label();
+ methodVisitor.visitLabel(startLoopLabel);
+
+ StackManipulation checkTerminationCondition =
+ new StackManipulation.Compound(
+ Duplication.SINGLE, MethodInvocation.invoke(ITERATOR_HASNEXT));
+ size = size.aggregate(checkTerminationCondition.apply(methodVisitor,
implementationContext));
+ methodVisitor.visitJumpInsn(Opcodes.IFEQ, exitLoopLabel); // Exit the
loop if !hasNext().
+ size = size.aggregate(StackSize.SINGLE.toDecreasingSize());
+
+ // Read the next value in the iterator.
+ StackManipulation getNext =
+ new StackManipulation.Compound(
+ Duplication.SINGLE, MethodInvocation.invoke(ITERATOR_NEXT));
+ size = size.aggregate(getNext.apply(methodVisitor,
implementationContext));
+
+ ByteBuddyLocalVariableManager.BackupLocalVariable backupFieldValue =
null;
+ if (fieldId == -1) {
+ // Save the field value arg before overwriting it, as we need it in
subsequent
+ // iterations of the
+ // loop.
+ backupFieldValue = localVariables.backupVariable(fieldValueArg);
+ size =
+ size.aggregate(backupFieldValue.backup().apply(methodVisitor,
implementationContext));
+ }
+
+ // Recursively generate select with one qualifier consumed. Since we
pass in -1 as the
+ // field id, the iterator.next() value will be consumed as the value to
select instead
+ // of accessing the fieldId.
+ size =
+ size.aggregate(
+ localVariables
+ .writeVariable(fieldValueArg)
+ .apply(methodVisitor, implementationContext));
+
+ // Select the fields of interest from this row.
+ // Create a new object array with one entry for each field selected from
this row.
+ ArrayManager nestedArrayManager = new
ArrayManager(nestedSchema.getFieldCount());
+ size =
+ size.aggregate(
+ nestedArrayManager.createArray().apply(methodVisitor,
implementationContext));
+
+ size =
+ size.aggregate(
+ selectIntoArrayHelper(
+ qualifiers,
+ qualifierPosition + 1,
+ -1,
+ fieldAccessDescriptor,
+ nestedInputType,
+ nestedArrayManager,
+ methodVisitor,
+ implementationContext));
+
+ if (backupFieldValue != null) {
+ // Restore the field value.
+ size =
+ size.aggregate(backupFieldValue.restore().apply(methodVisitor,
implementationContext));
+ }
+
+ // Now the top of the stack holds an array containing all the fields
selected from the
+ // row.
+ // Now we need to distribute these fields into the separate arrays we
created in the
+ // result.
+ // That is: if this select returned {a, b}, our final resulting schema
will contain two
+ // lists,
+ // so we must add a to the first and b to the second list.
+
+ int tempVariableForField = localVariables.createVariable();
+ for (int i = 0; i < nestedSchema.getFieldCount(); ++i) {
+ // Extract the field and store it in a temp variable.
+ StackManipulation extractField =
+ new StackManipulation.Compound(
+ Duplication.SINGLE,
+ IntegerConstant.forValue(i),
+ ArrayAccess.REFERENCE.load(),
+ localVariables.writeVariable(tempVariableForField));
+ StackManipulation addItemToList =
+ new StackManipulation.Compound(
+ localVariables.readVariable(localVariablesForArrays[i]),
+ localVariables.readVariable(tempVariableForField),
+ MethodInvocation.invoke(LIST_ADD),
+ // Ignore return value from add().
+ Removal.SINGLE);
+ size =
+ size.aggregate(
+ new StackManipulation.Compound(extractField, addItemToList)
+ .apply(methodVisitor, implementationContext));
+ }
+
+ // Pop the created array from the top of the stack.
+ size = size.aggregate(Removal.SINGLE.apply(methodVisitor,
implementationContext));
+
+ // Go back to the beginning of the loop.
+ methodVisitor.visitJumpInsn(Opcodes.GOTO, startLoopLabel);
+ methodVisitor.visitLabel(exitLoopLabel);
+ // Remove the iterator from the top of the stack.
+ size = size.aggregate(Removal.SINGLE.apply(methodVisitor,
implementationContext));
+ return size;
+ }
+
+ private StackManipulation.Size processMap(
+ FieldType inputType,
+ FieldAccessDescriptor fieldAccessDescriptor,
+ List<Qualifier> qualifiers,
+ int qualifierPosition,
+ int fieldId,
+ ArrayManager arrayManager,
+ MethodVisitor methodVisitor,
+ Context implementationContext) {
+ StackManipulation.Size size = new StackManipulation.Size(0, 0);
+ FieldType nestedInputType = checkNotNull(inputType.getMapValueType());
+ Schema nestedSchema = getNestedSchema(nestedInputType,
fieldAccessDescriptor);
+
+ // We create temp local variables to store all the maps we create. Each
field in
+ // nestedSchema corresponds to a separate array in the output.
+ int[] localVariablesForMaps =
+ IntStream.range(0, nestedSchema.getFieldCount())
+ .map(i -> localVariables.createVariable())
+ .toArray();
+
+ // Each field returned in nestedSchema will become it's own map in the
output. So let's
+ // iterate and create arrays and store each one in the output.
+ StackManipulation createAllHashMaps =
+ new StackManipulation.Compound(
+ IntStream.range(0, nestedSchema.getFieldCount())
+ .mapToObj(
+ i -> {
+ StackManipulation createHashMap =
+ new StackManipulation.Compound(
+ MethodInvocation.invoke(MAPS_NEW_HASHMAP),
+ // Store the HashMap in a local variable.
+ Duplication.SINGLE,
+
localVariables.writeVariable(localVariablesForMaps[i]));
+ return arrayManager.append(createHashMap);
+ })
+ .collect(Collectors.toList()));
+ size = size.aggregate(createAllHashMaps.apply(methodVisitor,
implementationContext));
+
+ // Now iterate over the value, selecting from each element.
+ StackManipulation readMapEntriesIterator =
+ new StackManipulation.Compound(
+ loadFieldValue(fieldId),
+ TypeCasting.to(new ForLoadedType(Map.class)),
+ MethodInvocation.invoke(MAP_ENTRYSET),
+ MethodInvocation.invoke(ITERABLE_ITERATOR));
+ size = size.aggregate(readMapEntriesIterator.apply(methodVisitor,
implementationContext));
+
+ // Loop over the entire entrySet iterable.
+ Label startLoopLabel = new Label();
+ Label exitLoopLabel = new Label();
+ methodVisitor.visitLabel(startLoopLabel);
+
+ StackManipulation checkTerminationCondition =
+ new StackManipulation.Compound(
+ Duplication.SINGLE, MethodInvocation.invoke(ITERATOR_HASNEXT));
+ size = size.aggregate(checkTerminationCondition.apply(methodVisitor,
implementationContext));
+ methodVisitor.visitJumpInsn(Opcodes.IFEQ, exitLoopLabel); // Exit the
loop if !hasNext().
+ size = size.aggregate(StackSize.SINGLE.toDecreasingSize());
+
+ int keyVariable = localVariables.createVariable();
+ // Read the next value in the iterator.
+ StackManipulation getNext =
+ new StackManipulation.Compound(
+ Duplication.SINGLE,
+ MethodInvocation.invoke(ITERATOR_NEXT),
+ // Get the key and store it in the keyVariable.
+ Duplication.SINGLE,
+ MethodInvocation.invoke(MAPENTRY_GETKEY),
+ localVariables.writeVariable(keyVariable),
+ // Get the value and leave it on the stack.
+ MethodInvocation.invoke(MAPENTRY_GETVALUE));
+ size = size.aggregate(getNext.apply(methodVisitor,
implementationContext));
+
+ ByteBuddyLocalVariableManager.BackupLocalVariable backupFieldValue =
null;
+ if (fieldId == -1) {
+ // Save the field value arg before overwriting it, as we need it in
subsequent
+ // iterations of the
+ // loop.
+ backupFieldValue = localVariables.backupVariable(fieldValueArg);
+ size =
+ size.aggregate(backupFieldValue.backup().apply(methodVisitor,
implementationContext));
+ }
+
+ // Recursively generate select with one qualifier consumed. Since we
pass in -1 as the
+ // field id, the iterator.next() value will be consumed as the value to
select instead
+ // of accessing the fieldId.
+ size =
+ size.aggregate(
+ localVariables
+ .writeVariable(fieldValueArg)
+ .apply(methodVisitor, implementationContext));
+
+ // Select the fields of interest from this row.
+ // Create a new object array with one entry for each field selected from
this row.
+ ArrayManager nestedArrayManager = new
ArrayManager(nestedSchema.getFieldCount());
+ size =
+ size.aggregate(
+ nestedArrayManager.createArray().apply(methodVisitor,
implementationContext));
+
+ size =
+ size.aggregate(
+ selectIntoArrayHelper(
+ qualifiers,
+ qualifierPosition + 1,
+ -1,
+ fieldAccessDescriptor,
+ nestedInputType,
+ nestedArrayManager,
+ methodVisitor,
+ implementationContext));
+
+ if (backupFieldValue != null) {
+ // Restore the field value.
+ size =
+ size.aggregate(backupFieldValue.restore().apply(methodVisitor,
implementationContext));
+ }
+
+ // Now the top of the stack holds an array containing all the fields
selected from the
+ // row.
+ // Now we need to distribute these fields into the separate arrays we
created in the
+ // result.
+ // That is: if this select returned {a, b}, our final resulting schema
will contain two
+ // maps,
+ // so we must add a to the first and b to the second map.
+
+ int tempVariableForField = localVariables.createVariable();
+ for (int i = 0; i < nestedSchema.getFieldCount(); ++i) {
+ // Extract the field and store it in a temp variable.
+ StackManipulation extractField =
+ new StackManipulation.Compound(
+ Duplication.SINGLE,
+ IntegerConstant.forValue(i),
+ ArrayAccess.REFERENCE.load(),
+ localVariables.writeVariable(tempVariableForField));
+ StackManipulation addItemToMap =
+ new StackManipulation.Compound(
+ localVariables.readVariable(localVariablesForMaps[i]),
+ localVariables.readVariable(keyVariable),
+ localVariables.readVariable(tempVariableForField),
+ MethodInvocation.invoke(MAP_PUT),
+ // Ignore return value from add().
+ Removal.SINGLE);
+ size =
+ size.aggregate(
+ new StackManipulation.Compound(extractField, addItemToMap)
+ .apply(methodVisitor, implementationContext));
+ }
+
+ // Pop the created array from the top of the stack.
+ size = size.aggregate(Removal.SINGLE.apply(methodVisitor,
implementationContext));
+
+ // Go back to the beginning of the loop.
+ methodVisitor.visitJumpInsn(Opcodes.GOTO, startLoopLabel);
+ methodVisitor.visitLabel(exitLoopLabel);
+ // Remove the iterator from the top of the stack.
+ size = size.aggregate(Removal.SINGLE.apply(methodVisitor,
implementationContext));
+ return size;
+ }
+
+ private Schema getNestedSchema(
+ FieldType nestedInputType, FieldAccessDescriptor
fieldAccessDescriptor) {
+ // When selecting multiple subelements under a list, we distribute the
select
+ // resulting in multiple lists. For example, if there is a field "list"
with type
+ // {a: string, b: int}[], selecting list.a, list.b results in a schema
of type
+ // {a: string[], b: int[]}. This preserves the invariant that the name
selected always
+ // appears in the top-level schema.
+ Schema tempSchema = Schema.builder().addField("a",
nestedInputType).build();
+ FieldAccessDescriptor tempAccessDescriptor =
+ FieldAccessDescriptor.create()
+ .withNestedField("a", fieldAccessDescriptor)
+ .resolve(tempSchema);
+ return SelectHelpers.getOutputSchema(tempSchema, tempAccessDescriptor);
+ }
+ }
+}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java
index 7fc3ec5..8526ba8 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.schemas.utils;
import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import java.io.Serializable;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.annotations.Experimental;
@@ -179,12 +180,45 @@ public class SelectHelpers {
}
}
+ public static RowSelector getRowSelectorOptimized(
+ Schema inputSchema, FieldAccessDescriptor fieldAccessDescriptor) {
+ return SelectByteBuddyHelpers.getRowSelector(inputSchema,
fieldAccessDescriptor);
+ }
+
+ public static RowSelector getRowSelector(
+ Schema inputSchema, FieldAccessDescriptor fieldAccessDescriptor) {
+ Schema outputSchema = getOutputSchema(inputSchema, fieldAccessDescriptor);
+ return input -> selectRow(input, fieldAccessDescriptor, outputSchema);
+ }
+
+ public static class RowSelectorContainer implements RowSelector,
Serializable {
+ private transient RowSelector rowSelector;
+ private final Schema inputSchema;
+ private final FieldAccessDescriptor fieldAccessDescriptor;
+ private final boolean optimized;
+
+ public RowSelectorContainer(
+ Schema inputSchema, FieldAccessDescriptor fieldAccessDescriptor,
boolean optimized) {
+ this.inputSchema = inputSchema;
+ this.fieldAccessDescriptor = fieldAccessDescriptor;
+ this.optimized = optimized;
+ }
+
+ @Override
+ public Row select(Row input) {
+ if (this.rowSelector == null) {
+ rowSelector =
+ optimized
+ ? getRowSelectorOptimized(inputSchema, fieldAccessDescriptor)
+ : getRowSelector(inputSchema, fieldAccessDescriptor);
+ }
+ return rowSelector.select(input);
+ }
+ }
+
/** Select a sub Row from an input Row. */
- public static Row selectRow(
- Row input,
- FieldAccessDescriptor fieldAccessDescriptor,
- Schema inputSchema,
- Schema outputSchema) {
+ private static Row selectRow(
+ Row input, FieldAccessDescriptor fieldAccessDescriptor, Schema
outputSchema) {
if (fieldAccessDescriptor.getAllFields()) {
return input;
}
@@ -195,7 +229,7 @@ public class SelectHelpers {
}
/** Select out of a given {@link Row} object. */
- public static void selectIntoRow(
+ private static void selectIntoRow(
Row input, Row.Builder output, FieldAccessDescriptor
fieldAccessDescriptor) {
if (fieldAccessDescriptor.getAllFields()) {
output.addValues(input.getValues());
@@ -214,8 +248,9 @@ public class SelectHelpers {
FieldAccessDescriptor nestedAccess = nested.getValue();
FieldType nestedInputType =
input.getSchema().getField(field.getFieldId()).getType();
FieldType nestedOutputType =
outputSchema.getField(output.nextFieldId()).getType();
- selectIntoRowHelper(
+ selectIntoRowWithQualifiers(
field.getQualifiers(),
+ 0,
input.getValue(field.getFieldId()),
output,
nestedAccess,
@@ -224,26 +259,6 @@ public class SelectHelpers {
}
}
- @SuppressWarnings("unchecked")
- private static void selectIntoRowHelper(
- List<Qualifier> qualifiers,
- Object value,
- Row.Builder output,
- FieldAccessDescriptor fieldAccessDescriptor,
- FieldType inputType,
- FieldType outputType) {
- if (qualifiers.isEmpty()) {
- Row row = (Row) value;
- selectIntoRow(row, output, fieldAccessDescriptor);
- return;
- }
-
- // There are qualifiers. That means that the result will be either a list
or a map, so
- // construct the result and add that to our Row.
- selectIntoRowWithQualifiers(
- qualifiers, 0, value, output, fieldAccessDescriptor, inputType,
outputType);
- }
-
private static void selectIntoRowWithQualifiers(
List<Qualifier> qualifiers,
int qualifierPosition,
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnSchemaInformation.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnSchemaInformation.java
index b131365..1c54e73 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnSchemaInformation.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnSchemaInformation.java
@@ -30,7 +30,8 @@ import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.SchemaCoder;
import
org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.DefaultTypeConversionsFactory;
import org.apache.beam.sdk.schemas.utils.ConvertHelpers;
-import org.apache.beam.sdk.schemas.utils.SelectHelpers;
+import org.apache.beam.sdk.schemas.utils.RowSelector;
+import org.apache.beam.sdk.schemas.utils.SelectHelpers.RowSelectorContainer;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
@@ -147,6 +148,7 @@ public abstract class DoFnSchemaInformation implements
Serializable {
private final FieldAccessDescriptor selectDescriptor;
private final Schema selectOutputSchema;
private final boolean unbox;
+ private final RowSelector rowSelector;
private ConversionFunction(
Schema inputSchema,
@@ -161,6 +163,7 @@ public abstract class DoFnSchemaInformation implements
Serializable {
this.selectDescriptor = selectDescriptor;
this.selectOutputSchema = selectOutputSchema;
this.unbox = unbox;
+ this.rowSelector = new RowSelectorContainer(inputSchema,
selectDescriptor, true);
}
public static <InputT, OutputT> ConversionFunction of(
@@ -177,8 +180,7 @@ public abstract class DoFnSchemaInformation implements
Serializable {
@Override
public OutputT apply(InputT input) {
Row row = toRowFunction.apply(input);
- Row selected =
- SelectHelpers.selectRow(row, selectDescriptor, inputSchema,
selectOutputSchema);
+ Row selected = rowSelector.select(row);
if (unbox) {
selected = selected.getRow(0);
}
@@ -199,6 +201,7 @@ public abstract class DoFnSchemaInformation implements
Serializable {
private final FieldType primitiveType;
private final TypeDescriptor<?> primitiveOutputType;
private transient SerializableFunction<InputT, OutputT> conversionFunction;
+ private final RowSelector rowSelector;
private UnboxingConversionFunction(
Schema inputSchema,
@@ -212,6 +215,7 @@ public abstract class DoFnSchemaInformation implements
Serializable {
this.selectOutputSchema = selectOutputSchema;
this.primitiveType = selectOutputSchema.getField(0).getType();
this.primitiveOutputType = primitiveOutputType;
+ this.rowSelector = new RowSelectorContainer(inputSchema,
selectDescriptor, true);
}
public static <InputT, OutputT> UnboxingConversionFunction of(
@@ -227,8 +231,7 @@ public abstract class DoFnSchemaInformation implements
Serializable {
@Override
public OutputT apply(InputT input) {
Row row = toRowFunction.apply(input);
- Row selected =
- SelectHelpers.selectRow(row, selectDescriptor, inputSchema,
selectOutputSchema);
+ Row selected = rowSelector.select(row);
return getConversionFunction().apply(selected.getValue(0));
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
index 9c35657..3f881c0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
@@ -607,6 +607,10 @@ public abstract class Row implements Serializable {
return this;
}
+ public Builder attachValues(Object... values) {
+ return attachValues(Arrays.asList(values));
+ }
+
public Builder withFieldValueGetters(
Factory<List<FieldValueGetter>> fieldValueGetterFactory, Object
getterTarget) {
this.fieldValueGetterFactory = fieldValueGetterFactory;
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java
index 783d282..afa8186 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java
@@ -19,6 +19,8 @@ package org.apache.beam.sdk.schemas.utils;
import static org.junit.Assert.assertEquals;
+import java.util.Arrays;
+import java.util.Collection;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
@@ -27,9 +29,20 @@ import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Immutabl
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
/** Tests for {@link SelectHelpers}. */
+@RunWith(Parameterized.class)
public class SelectHelpersTest {
+ @Parameterized.Parameter public boolean useOptimizedSelect;
+
+ @Parameters
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {{false}, {true}});
+ }
+
static final Schema FLAT_SCHEMA =
Schema.builder()
.addStringField("field1")
@@ -101,6 +114,14 @@ public class SelectHelpersTest {
.addValue(ImmutableMap.of(1, ImmutableList.of(FLAT_ROW)))
.build();
+ Row selectRow(Schema inputScema, FieldAccessDescriptor
fieldAccessDescriptor, Row row) {
+ RowSelector rowSelector =
+ useOptimizedSelect
+ ? SelectHelpers.getRowSelectorOptimized(inputScema,
fieldAccessDescriptor)
+ : SelectHelpers.getRowSelector(inputScema, fieldAccessDescriptor);
+ return rowSelector.select(row);
+ }
+
@Test
public void testSelectAll() {
FieldAccessDescriptor fieldAccessDescriptor =
@@ -108,7 +129,7 @@ public class SelectHelpersTest {
Schema outputSchema = SelectHelpers.getOutputSchema(FLAT_SCHEMA,
fieldAccessDescriptor);
assertEquals(FLAT_SCHEMA, outputSchema);
- Row row = SelectHelpers.selectRow(FLAT_ROW, fieldAccessDescriptor,
FLAT_SCHEMA, outputSchema);
+ Row row = selectRow(FLAT_SCHEMA, fieldAccessDescriptor, FLAT_ROW);
assertEquals(FLAT_ROW, row);
}
@@ -120,7 +141,7 @@ public class SelectHelpersTest {
Schema expectedSchema = Schema.builder().addStringField("field1").build();
assertEquals(expectedSchema, outputSchema);
- Row row = SelectHelpers.selectRow(FLAT_ROW, fieldAccessDescriptor,
FLAT_SCHEMA, outputSchema);
+ Row row = selectRow(FLAT_SCHEMA, fieldAccessDescriptor, FLAT_ROW);
Row expectedRow = Row.withSchema(expectedSchema).addValue("first").build();
assertEquals(expectedRow, row);
}
@@ -133,7 +154,7 @@ public class SelectHelpersTest {
Schema expectedSchema =
Schema.builder().addStringField("field_extra").build();
assertEquals(expectedSchema, outputSchema);
- Row row = SelectHelpers.selectRow(FLAT_ROW, fieldAccessDescriptor,
FLAT_SCHEMA, outputSchema);
+ Row row = selectRow(FLAT_SCHEMA, fieldAccessDescriptor, FLAT_ROW);
Row expectedRow = Row.withSchema(expectedSchema).addValue("extra").build();
assertEquals(expectedRow, row);
}
@@ -147,7 +168,7 @@ public class SelectHelpersTest {
Schema.builder().addStringField("field1").addDoubleField("field3").build();
assertEquals(expectedSchema, outputSchema);
- Row row = SelectHelpers.selectRow(FLAT_ROW, fieldAccessDescriptor,
FLAT_SCHEMA, outputSchema);
+ Row row = selectRow(FLAT_SCHEMA, fieldAccessDescriptor, FLAT_ROW);
Row expectedRow = Row.withSchema(expectedSchema).addValues("first",
3.14).build();
assertEquals(expectedRow, row);
}
@@ -160,8 +181,7 @@ public class SelectHelpersTest {
Schema expectedSchema = Schema.builder().addRowField("nested",
FLAT_SCHEMA).build();
assertEquals(expectedSchema, outputSchema);
- Row row =
- SelectHelpers.selectRow(NESTED_ROW, fieldAccessDescriptor,
NESTED_SCHEMA, outputSchema);
+ Row row = selectRow(NESTED_SCHEMA, fieldAccessDescriptor, NESTED_ROW);
Row expectedRow =
Row.withSchema(expectedSchema).addValue(FLAT_ROW).build();
assertEquals(expectedRow, row);
}
@@ -174,8 +194,7 @@ public class SelectHelpersTest {
Schema expectedSchema = Schema.builder().addStringField("field1").build();
assertEquals(expectedSchema, outputSchema);
- Row row =
- SelectHelpers.selectRow(NESTED_ROW, fieldAccessDescriptor,
NESTED_SCHEMA, outputSchema);
+ Row row = selectRow(NESTED_SCHEMA, fieldAccessDescriptor, NESTED_ROW);
Row expectedRow = Row.withSchema(expectedSchema).addValue("first").build();
assertEquals(expectedRow, row);
}
@@ -187,8 +206,7 @@ public class SelectHelpersTest {
Schema outputSchema = SelectHelpers.getOutputSchema(NESTED_SCHEMA,
fieldAccessDescriptor);
assertEquals(FLAT_SCHEMA, outputSchema);
- Row row =
- SelectHelpers.selectRow(NESTED_ROW, fieldAccessDescriptor,
NESTED_SCHEMA, outputSchema);
+ Row row = selectRow(NESTED_SCHEMA, fieldAccessDescriptor, NESTED_ROW);
assertEquals(FLAT_ROW, row);
}
@@ -201,9 +219,7 @@ public class SelectHelpersTest {
Schema expectedSchema = Schema.builder().addStringField("field1").build();
assertEquals(expectedSchema, outputSchema);
- Row row =
- SelectHelpers.selectRow(
- DOUBLE_NESTED_ROW, fieldAccessDescriptor, DOUBLE_NESTED_SCHEMA,
outputSchema);
+ Row row = selectRow(DOUBLE_NESTED_SCHEMA, fieldAccessDescriptor,
DOUBLE_NESTED_ROW);
Row expectedRow = Row.withSchema(expectedSchema).addValue("first").build();
assertEquals(expectedRow, row);
}
@@ -217,7 +233,7 @@ public class SelectHelpersTest {
Schema.builder().addArrayField("primitiveArray",
FieldType.INT32).build();
assertEquals(expectedSchema, outputSchema);
- Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor,
ARRAY_SCHEMA, outputSchema);
+ Row row = selectRow(ARRAY_SCHEMA, fieldAccessDescriptor, ARRAY_ROW);
Row expectedRow = Row.withSchema(expectedSchema).addArray(1, 2).build();
assertEquals(expectedRow, row);
}
@@ -231,8 +247,7 @@ public class SelectHelpersTest {
Schema.builder().addIterableField("primitiveIter",
FieldType.INT32).build();
assertEquals(expectedSchema, outputSchema);
- Row row =
- SelectHelpers.selectRow(ITERABLE_ROW, fieldAccessDescriptor,
ITERABLE_SCHEMA, outputSchema);
+ Row row = selectRow(ITERABLE_SCHEMA, fieldAccessDescriptor, ITERABLE_ROW);
Row expectedRow =
Row.withSchema(expectedSchema).addIterable(ImmutableList.of(1, 2)).build();
assertEquals(expectedRow, row);
}
@@ -246,7 +261,7 @@ public class SelectHelpersTest {
Schema.builder().addArrayField("rowArray",
FieldType.row(FLAT_SCHEMA)).build();
assertEquals(expectedSchema, outputSchema);
- Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor,
ARRAY_SCHEMA, outputSchema);
+ Row row = selectRow(ARRAY_SCHEMA, fieldAccessDescriptor, ARRAY_ROW);
Row expectedRow = Row.withSchema(expectedSchema).addArray(FLAT_ROW,
FLAT_ROW).build();
assertEquals(expectedRow, row);
}
@@ -260,8 +275,7 @@ public class SelectHelpersTest {
Schema.builder().addIterableField("rowIter",
FieldType.row(FLAT_SCHEMA)).build();
assertEquals(expectedSchema, outputSchema);
- Row row =
- SelectHelpers.selectRow(ITERABLE_ROW, fieldAccessDescriptor,
ITERABLE_SCHEMA, outputSchema);
+ Row row = selectRow(ITERABLE_SCHEMA, fieldAccessDescriptor, ITERABLE_ROW);
Row expectedRow =
Row.withSchema(expectedSchema).addIterable(ImmutableList.of(FLAT_ROW,
FLAT_ROW)).build();
assertEquals(expectedRow, row);
@@ -276,7 +290,7 @@ public class SelectHelpersTest {
Schema expectedSchema = Schema.builder().addArrayField("field1",
FieldType.STRING).build();
assertEquals(expectedSchema, outputSchema);
- Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor,
ARRAY_SCHEMA, outputSchema);
+ Row row = selectRow(ARRAY_SCHEMA, fieldAccessDescriptor, ARRAY_ROW);
Row expectedRow = Row.withSchema(expectedSchema).addArray("first",
"first").build();
assertEquals(expectedRow, row);
}
@@ -290,8 +304,7 @@ public class SelectHelpersTest {
Schema expectedSchema = Schema.builder().addIterableField("field1",
FieldType.STRING).build();
assertEquals(expectedSchema, outputSchema);
- Row row =
- SelectHelpers.selectRow(ITERABLE_ROW, fieldAccessDescriptor,
ITERABLE_SCHEMA, outputSchema);
+ Row row = selectRow(ITERABLE_SCHEMA, fieldAccessDescriptor, ITERABLE_ROW);
Row expectedRow =
Row.withSchema(expectedSchema).addIterable(ImmutableList.of("first",
"first")).build();
assertEquals(expectedRow, row);
@@ -307,7 +320,7 @@ public class SelectHelpersTest {
Schema.builder().addArrayField("field1",
FieldType.array(FieldType.STRING)).build();
assertEquals(expectedSchema, outputSchema);
- Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor,
ARRAY_SCHEMA, outputSchema);
+ Row row = selectRow(ARRAY_SCHEMA, fieldAccessDescriptor, ARRAY_ROW);
Row expectedRow =
Row.withSchema(expectedSchema)
@@ -326,8 +339,7 @@ public class SelectHelpersTest {
Schema.builder().addIterableField("field1",
FieldType.iterable(FieldType.STRING)).build();
assertEquals(expectedSchema, outputSchema);
- Row row =
- SelectHelpers.selectRow(ITERABLE_ROW, fieldAccessDescriptor,
ITERABLE_SCHEMA, outputSchema);
+ Row row = selectRow(ITERABLE_SCHEMA, fieldAccessDescriptor, ITERABLE_ROW);
Row expectedRow =
Row.withSchema(expectedSchema)
@@ -347,7 +359,7 @@ public class SelectHelpersTest {
Schema expectedSchema = Schema.builder().addArrayField("field1",
FieldType.STRING).build();
assertEquals(expectedSchema, outputSchema);
- Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor,
ARRAY_SCHEMA, outputSchema);
+ Row row = selectRow(ARRAY_SCHEMA, fieldAccessDescriptor, ARRAY_ROW);
Row expectedRow = Row.withSchema(expectedSchema).addArray("first",
"first").build();
assertEquals(expectedRow, row);
}
@@ -363,8 +375,7 @@ public class SelectHelpersTest {
Schema expectedSchema = Schema.builder().addIterableField("field1",
FieldType.STRING).build();
assertEquals(expectedSchema, outputSchema);
- Row row =
- SelectHelpers.selectRow(ITERABLE_ROW, fieldAccessDescriptor,
ITERABLE_SCHEMA, outputSchema);
+ Row row = selectRow(ITERABLE_SCHEMA, fieldAccessDescriptor, ITERABLE_ROW);
Row expectedRow =
Row.withSchema(expectedSchema).addIterable(ImmutableList.of("first",
"first")).build();
assertEquals(expectedRow, row);
@@ -381,7 +392,7 @@ public class SelectHelpersTest {
Schema.builder().addMapField("field1", FieldType.INT32,
FieldType.STRING).build();
assertEquals(expectedSchema, outputSchema);
- Row row = SelectHelpers.selectRow(MAP_ROW, fieldAccessDescriptor,
MAP_SCHEMA, outputSchema);
+ Row row = selectRow(MAP_SCHEMA, fieldAccessDescriptor, MAP_ROW);
Row expectedRow =
Row.withSchema(expectedSchema).addValue(ImmutableMap.of(1, "first")).build();
assertEquals(expectedRow, row);
}
@@ -400,7 +411,7 @@ public class SelectHelpersTest {
.build();
assertEquals(expectedSchema, outputSchema);
- Row row = SelectHelpers.selectRow(MAP_ROW, fieldAccessDescriptor,
MAP_SCHEMA, outputSchema);
+ Row row = selectRow(MAP_SCHEMA, fieldAccessDescriptor, MAP_ROW);
Row expectedRow =
Row.withSchema(expectedSchema)
.addValue(ImmutableMap.of(1, FLAT_ROW.getValue(0)))
@@ -423,9 +434,7 @@ public class SelectHelpersTest {
.build();
assertEquals(expectedSchema, outputSchema);
- Row row =
- SelectHelpers.selectRow(
- MAP_ARRAY_ROW, fieldAccessDescriptor, MAP_ARRAY_SCHEMA,
outputSchema);
+ Row row = selectRow(MAP_ARRAY_SCHEMA, fieldAccessDescriptor,
MAP_ARRAY_ROW);
Row expectedRow =
Row.withSchema(expectedSchema)
@@ -446,9 +455,7 @@ public class SelectHelpersTest {
.build();
assertEquals(expectedSchema, outputSchema);
- Row row =
- SelectHelpers.selectRow(
- MAP_ITERABLE_ROW, fieldAccessDescriptor, MAP_ITERABLE_SCHEMA,
outputSchema);
+ Row row = selectRow(MAP_ITERABLE_SCHEMA, fieldAccessDescriptor,
MAP_ITERABLE_ROW);
Row expectedRow =
Row.withSchema(expectedSchema)
@@ -472,7 +479,7 @@ public class SelectHelpersTest {
Schema outputSchema = SelectHelpers.getOutputSchema(f3,
fieldAccessDescriptor);
- Row out = SelectHelpers.selectRow(r3, fieldAccessDescriptor,
r3.getSchema(), outputSchema);
+ Row out = selectRow(f3, fieldAccessDescriptor, r3);
assertEquals(f2, outputSchema);
assertEquals(r2, out);
@@ -495,7 +502,7 @@ public class SelectHelpersTest {
Schema outputSchema = SelectHelpers.getOutputSchema(f4,
fieldAccessDescriptor);
- Row out = SelectHelpers.selectRow(r4, fieldAccessDescriptor,
r4.getSchema(), outputSchema);
+ Row out = selectRow(f4, fieldAccessDescriptor, r4);
assertEquals(f3, outputSchema);
assertEquals(r3, out);
@@ -519,7 +526,7 @@ public class SelectHelpersTest {
Schema expectedSchema =
Schema.builder().addArrayField("f0",
FieldType.array(FieldType.STRING)).build();
assertEquals(expectedSchema, outputSchema);
- Row out = SelectHelpers.selectRow(r4, fieldAccessDescriptor,
r4.getSchema(), outputSchema);
+ Row out = selectRow(f4, fieldAccessDescriptor, r4);
Row expected =
Row.withSchema(outputSchema)
.addArray(Lists.newArrayList("first", "first"),
Lists.newArrayList("first", "first"))
@@ -545,7 +552,7 @@ public class SelectHelpersTest {
Schema expectedSchema =
Schema.builder().addIterableField("f0",
FieldType.iterable(FieldType.STRING)).build();
assertEquals(expectedSchema, outputSchema);
- Row out = SelectHelpers.selectRow(r4, fieldAccessDescriptor,
r4.getSchema(), outputSchema);
+ Row out = selectRow(f4, fieldAccessDescriptor, r4);
Row expected =
Row.withSchema(outputSchema)
.addIterable(