This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 323fc9e Merge pull request #8006: [BEAM-6772] Change Select semantics
to match what a user expects
323fc9e is described below
commit 323fc9e14b9a166fc90e17302701396335b292b5
Author: reuvenlax <[email protected]>
AuthorDate: Fri Apr 12 08:16:23 2019 -0700
Merge pull request #8006: [BEAM-6772] Change Select semantics to match what
a user expects
---
.../beam/sdk/schemas/transforms/Convert.java | 52 ++-
.../apache/beam/sdk/schemas/transforms/Select.java | 3 +-
.../beam/sdk/schemas/utils/SelectHelpers.java | 262 ++++++++++----
.../main/java/org/apache/beam/sdk/values/Row.java | 11 +
.../beam/sdk/schemas/transforms/GroupTest.java | 23 +-
.../beam/sdk/schemas/transforms/SelectTest.java | 183 +++++-----
.../beam/sdk/schemas/utils/SelectHelpersTest.java | 381 +++++++++++++++++++++
7 files changed, 735 insertions(+), 180 deletions(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java
index 1d8116c..9b01b35 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java
@@ -17,9 +17,12 @@
*/
package org.apache.beam.sdk.schemas.transforms;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.transforms.DoFn;
@@ -70,7 +73,9 @@ public class Convert {
*
* <p>This function allows converting between two types as long as the two
types have
* <i>compatible</i> schemas. Two schemas are said to be <i>compatible</i>
if they recursively
- * have fields with the same names, but possibly different orders.
+ * have fields with the same names, but possibly different orders. If the
source schema can be
+ * unboxed to match the target schema (i.e. the source schema contains a
single field that is
+ * compatible with the target schema), then conversion also succeeds.
*/
public static <InputT, OutputT> PTransform<PCollection<InputT>,
PCollection<OutputT>> to(
Class<OutputT> clazz) {
@@ -82,7 +87,9 @@ public class Convert {
*
* <p>This function allows converting between two types as long as the two
types have
* <i>compatible</i> schemas. Two schemas are said to be <i>compatible</i>
if they recursively
- * have fields with the same names, but possibly different orders.
+ * have fields with the same names, but possibly different orders. If the
source schema can be
+ * unboxed to match the target schema (i.e. the source schema contains a
single field that is
+ * compatible with the target schema), then conversion also succeeds.
*/
public static <InputT, OutputT> PTransform<PCollection<InputT>,
PCollection<OutputT>> to(
TypeDescriptor<OutputT> typeDescriptor) {
@@ -92,11 +99,24 @@ public class Convert {
private static class ConvertTransform<InputT, OutputT>
extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
TypeDescriptor<OutputT> outputTypeDescriptor;
+ Schema unboxedSchema = null;
ConvertTransform(TypeDescriptor<OutputT> outputTypeDescriptor) {
this.outputTypeDescriptor = outputTypeDescriptor;
}
+ @Nullable
+ private static Schema getBoxedNestedSchema(Schema schema) {
+ if (schema.getFieldCount() != 1) {
+ return null;
+ }
+ FieldType fieldType = schema.getField(0).getType();
+ if (!fieldType.getTypeName().isCompositeType()) {
+ return null;
+ }
+ return fieldType.getRowSchema();
+ }
+
@Override
@SuppressWarnings("unchecked")
public PCollection<OutputT> expand(PCollection<InputT> input) {
@@ -124,15 +144,21 @@ public class Convert {
registry.getSchema(outputTypeDescriptor),
registry.getToRowFunction(outputTypeDescriptor),
registry.getFromRowFunction(outputTypeDescriptor));
- // assert matches input schema.
- // TODO: Properly handle nullable.
- if
(!outputSchemaCoder.getSchema().assignableToIgnoreNullable(input.getSchema())) {
- throw new RuntimeException(
- "Cannot convert between types that don't have equivalent
schemas."
- + " input schema: "
- + input.getSchema()
- + " output schema: "
- + outputSchemaCoder.getSchema());
+
+ Schema outputSchema = outputSchemaCoder.getSchema();
+ if (!outputSchema.assignableToIgnoreNullable(input.getSchema())) {
+ // We also support unboxing nested Row schemas, so attempt that.
+ // TODO: Support unboxing to primitive types as well.
+ unboxedSchema = getBoxedNestedSchema(input.getSchema());
+ if (unboxedSchema == null ||
!outputSchema.assignableToIgnoreNullable(unboxedSchema)) {
+ Schema checked = (unboxedSchema == null) ? input.getSchema() :
unboxedSchema;
+ throw new RuntimeException(
+ "Cannot convert between types that don't have equivalent
schemas."
+ + " input schema: "
+ + checked
+ + " output schema: "
+ + outputSchemaCoder.getSchema());
+ }
}
} catch (NoSuchSchemaException e) {
throw new RuntimeException("No schema registered for " +
outputTypeDescriptor);
@@ -145,7 +171,9 @@ public class Convert {
new DoFn<InputT, OutputT>() {
@ProcessElement
public void processElement(@Element Row row,
OutputReceiver<OutputT> o) {
-
o.output(outputSchemaCoder.getFromRowFunction().apply(row));
+ // Read the row, potentially unboxing if necessary.
+ Row input = (unboxedSchema == null) ? row :
row.getValue(0);
+
o.output(outputSchemaCoder.getFromRowFunction().apply(input));
}
}))
.setSchema(
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java
index 8686a7a..077cc33 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java
@@ -64,7 +64,8 @@ import org.apache.beam.sdk.values.Row;
*
* <pre>{@code
* PCollection<UserEvent> events = readUserEvents();
- * PCollection<Row> rows = event.apply(Select.fieldNames("location.*"));
+ * PCollection<Row> rows = event.apply(Select.fieldNames("location")
+ * .apply(Convert.to(Location.class));
* }</pre>
*/
@Experimental(Kind.SCHEMAS)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java
index e3baf68..37742a5 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SelectHelpers.java
@@ -34,32 +34,84 @@ import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
-/** Helper methods to select fields from a Schema. */
+/** Helper methods to select subrows out of rows. */
public class SelectHelpers {
- // Currently we don't flatten selected nested fields.
+
+ private static Schema union(Iterable<Schema> schemas) {
+ Schema.Builder unioned = Schema.builder();
+ for (Schema schema : schemas) {
+ unioned.addFields(schema.getFields());
+ }
+ return unioned.build();
+ }
+
+ /**
+ * Get the output schema resulting from selecting the given {@link
FieldAccessDescriptor} from the
+ * given schema.
+ *
+ * <p>Fields are always extracted and then stored in a new Row. For example,
consider the
+ * following Java POJOs:
+ *
+ * <pre>{@code
+ * class UserEvent {
+ * String userId;
+ * String eventId;
+ * int eventType;
+ * Location location;
+ * }
+ * }</pre>
+ *
+ * <pre>{@code
+ * class Location {
+ * double latitude;
+ * double longtitude;
+ * }
+ * }</pre>
+ *
+ * <p>If selecting just the location field, then the returned schema will
wrap that of the
+ * singular field being selected; in this case the returned schema will be a
Row containing a
+ * single Location field. If location.latitude is selected, then the
returned Schema will be a Row
+ * containing a double latitude field.
+ *
+ * <p>The same holds true when selecting from lists or maps. For example:
+ *
+ * <pre>{@code
+ * class EventList {
+ * List<UserEvent> events;
+ * }
+ * }</pre>
+ *
+ * <p>If selecting events.location.latitude, the returned schema will
contain a single array of
+ * Row, where that Row contains a single double latitude field; it will not
contain an array of
+ * double.
+ */
public static Schema getOutputSchema(
Schema inputSchema, FieldAccessDescriptor fieldAccessDescriptor) {
if (fieldAccessDescriptor.getAllFields()) {
return inputSchema;
}
- Schema.Builder builder = new Schema.Builder();
+
+ List<Schema> schemas = Lists.newArrayList();
+ Schema.Builder builder = Schema.builder();
for (int fieldId : fieldAccessDescriptor.fieldIdsAccessed()) {
builder.addField(inputSchema.getField(fieldId));
}
+ schemas.add(builder.build());
for (Map.Entry<FieldDescriptor, FieldAccessDescriptor> nested :
fieldAccessDescriptor.getNestedFieldsAccessed().entrySet()) {
FieldDescriptor fieldDescriptor = nested.getKey();
+ FieldAccessDescriptor nestedAccess = nested.getValue();
Field field =
inputSchema.getField(checkNotNull(fieldDescriptor.getFieldId()));
- FieldType outputType =
- getOutputSchemaHelper(
- field.getType(), nested.getValue(),
fieldDescriptor.getQualifiers(), 0);
- builder.addField(field.getName(), outputType);
+ Schema outputSchema =
+ getOutputSchemaHelper(field.getType(), nestedAccess,
fieldDescriptor.getQualifiers(), 0);
+ schemas.add(outputSchema);
}
- return builder.build();
+
+ return union(schemas);
}
- private static FieldType getOutputSchemaHelper(
+ private static Schema getOutputSchemaHelper(
FieldType inputFieldType,
FieldAccessDescriptor fieldAccessDescriptor,
List<Qualifier> qualifiers,
@@ -68,34 +120,45 @@ public class SelectHelpers {
// We have walked through any containers, and are at a row type. Extract
the subschema
// for the row, preserving nullable attributes.
checkArgument(inputFieldType.getTypeName().isCompositeType());
- return FieldType.row(getOutputSchema(inputFieldType.getRowSchema(),
fieldAccessDescriptor))
- .withNullable(inputFieldType.getNullable());
+ return getOutputSchema(inputFieldType.getRowSchema(),
fieldAccessDescriptor);
}
Qualifier qualifier = qualifiers.get(qualifierPosition);
+ Schema.Builder builder = Schema.builder();
switch (qualifier.getKind()) {
case LIST:
checkArgument(qualifier.getList().equals(ListQualifier.ALL));
FieldType componentType =
checkNotNull(inputFieldType.getCollectionElementType());
- FieldType outputComponent =
+ Schema outputComponent =
getOutputSchemaHelper(
- componentType, fieldAccessDescriptor, qualifiers,
qualifierPosition + 1)
- .withNullable(componentType.getNullable());
- return
FieldType.array(outputComponent).withNullable(inputFieldType.getNullable());
+ componentType, fieldAccessDescriptor, qualifiers,
qualifierPosition + 1);
+ for (Field field : outputComponent.getFields()) {
+ Field newField =
+ Field.of(field.getName(), FieldType.array(field.getType()))
+ .withNullable(inputFieldType.getNullable());
+ builder.addField(newField);
+ }
+ return builder.build();
case MAP:
checkArgument(qualifier.getMap().equals(MapQualifier.ALL));
FieldType keyType = checkNotNull(inputFieldType.getMapKeyType());
FieldType valueType = checkNotNull(inputFieldType.getMapValueType());
- FieldType outputValueType =
+ Schema outputValueSchema =
getOutputSchemaHelper(
- valueType, fieldAccessDescriptor, qualifiers,
qualifierPosition + 1)
- .withNullable(valueType.getNullable());
- return FieldType.map(keyType,
outputValueType).withNullable(inputFieldType.getNullable());
+ valueType, fieldAccessDescriptor, qualifiers,
qualifierPosition + 1);
+ for (Field field : outputValueSchema.getFields()) {
+ Field newField =
+ Field.of(field.getName(), FieldType.map(keyType,
field.getType()))
+ .withNullable(inputFieldType.getNullable());
+ builder.addField(newField);
+ }
+ return builder.build();
default:
throw new RuntimeException("unexpected");
}
}
+ /** Select a sub Row from an input Row. */
public static Row selectRow(
Row input,
FieldAccessDescriptor fieldAccessDescriptor,
@@ -106,47 +169,73 @@ public class SelectHelpers {
}
Row.Builder output = Row.withSchema(outputSchema);
+ selectIntoRow(input, output, fieldAccessDescriptor);
+ return output.build();
+ }
+
+ /** Select out of a given {@link Row} object. */
+ public static void selectIntoRow(
+ Row input, Row.Builder output, FieldAccessDescriptor
fieldAccessDescriptor) {
+ if (fieldAccessDescriptor.getAllFields()) {
+ output.addValues(input.getValues());
+ return;
+ }
+
for (int fieldId : fieldAccessDescriptor.fieldIdsAccessed()) {
// TODO: Once we support specific qualifiers (like array slices),
extract them here.
output.addValue(input.getValue(fieldId));
}
+ Schema outputSchema = output.getSchema();
for (Map.Entry<FieldDescriptor, FieldAccessDescriptor> nested :
fieldAccessDescriptor.getNestedFieldsAccessed().entrySet()) {
FieldDescriptor field = nested.getKey();
- String fieldName = inputSchema.nameOf(checkNotNull(field.getFieldId()));
- FieldType nestedInputType =
inputSchema.getField(field.getFieldId()).getType();
- FieldType nestedOutputType = outputSchema.getField(fieldName).getType();
- Object value =
- selectRowHelper(
- field.getQualifiers(),
- 0,
- input.getValue(fieldName),
- nested.getValue(),
- nestedInputType,
- nestedOutputType);
- output.addValue(value);
+ FieldAccessDescriptor nestedAccess = nested.getValue();
+ FieldType nestedInputType =
input.getSchema().getField(field.getFieldId()).getType();
+ FieldType nestedOutputType =
outputSchema.getField(output.nextFieldId()).getType();
+ selectIntoRowHelper(
+ field.getQualifiers(),
+ input.getValue(field.getFieldId()),
+ output,
+ nestedAccess,
+ nestedInputType,
+ nestedOutputType);
}
- return output.build();
}
@SuppressWarnings("unchecked")
- private static Object selectRowHelper(
+ private static void selectIntoRowHelper(
List<Qualifier> qualifiers,
- int qualifierPosition,
Object value,
+ Row.Builder output,
FieldAccessDescriptor fieldAccessDescriptor,
FieldType inputType,
FieldType outputType) {
- if (qualifierPosition >= qualifiers.size()) {
+ if (qualifiers.isEmpty()) {
Row row = (Row) value;
- return selectRow(
- row, fieldAccessDescriptor, inputType.getRowSchema(),
outputType.getRowSchema());
+ selectIntoRow(row, output, fieldAccessDescriptor);
+ return;
}
- if (fieldAccessDescriptor.getAllFields()) {
- // Since we are selecting all fields (and we do not yet support array
slicing), short circuit.
- return value;
+ // There are qualifiers. That means that the result will be either a list
or a map, so
+ // construct the result and add that to our Row.
+ selectIntoRowWithQualifiers(
+ qualifiers, 0, value, output, fieldAccessDescriptor, inputType,
outputType);
+ }
+
+ private static void selectIntoRowWithQualifiers(
+ List<Qualifier> qualifiers,
+ int qualifierPosition,
+ Object value,
+ Row.Builder output,
+ FieldAccessDescriptor fieldAccessDescriptor,
+ FieldType inputType,
+ FieldType outputType) {
+ if (qualifierPosition >= qualifiers.size()) {
+ // We have already constructed all arrays and maps. What remains must be
a Row.
+ Row row = (Row) value;
+ selectIntoRow(row, output, fieldAccessDescriptor);
+ return;
}
Qualifier qualifier = qualifiers.get(qualifierPosition);
@@ -156,38 +245,87 @@ public class SelectHelpers {
FieldType nestedInputType =
checkNotNull(inputType.getCollectionElementType());
FieldType nestedOutputType =
checkNotNull(outputType.getCollectionElementType());
List<Object> list = (List) value;
- List selectedList = Lists.newArrayListWithCapacity(list.size());
+
+ // When selecting multiple subelements under a list, we distribute
the select
+ // resulting in multiple lists. For example, if there is a field
"list" with type
+ // {a: string, b: int}[], selecting list.a, list.b results in a
schema of type
+ // {a: string[], b: int[]}. This preserves the invariant that the
name selected always
+ // appears in the top-level schema.
+ Schema tempSchema = Schema.builder().addField("a",
nestedInputType).build();
+ FieldAccessDescriptor tempAccessDescriptor =
+ FieldAccessDescriptor.create()
+ .withNestedField("a", fieldAccessDescriptor)
+ .resolve(tempSchema);
+ // TODO: doing this on each element might be inefficient. Consider
caching this, or
+ // using codegen based on the schema.
+ Schema nestedSchema = getOutputSchema(tempSchema,
tempAccessDescriptor);
+
+ List<List<Object>> selectedLists =
+ Lists.newArrayListWithCapacity(nestedSchema.getFieldCount());
+ for (int i = 0; i < nestedSchema.getFieldCount(); i++) {
+ selectedLists.add(Lists.newArrayListWithCapacity(list.size()));
+ }
for (Object o : list) {
- Object selected =
- selectRowHelper(
- qualifiers,
- qualifierPosition + 1,
- o,
- fieldAccessDescriptor,
- nestedInputType,
- nestedOutputType);
- selectedList.add(selected);
+ Row.Builder selectElementBuilder = Row.withSchema(nestedSchema);
+ selectIntoRowWithQualifiers(
+ qualifiers,
+ qualifierPosition + 1,
+ o,
+ selectElementBuilder,
+ fieldAccessDescriptor,
+ nestedInputType,
+ nestedOutputType);
+
+ Row elementBeforeDistribution = selectElementBuilder.build();
+ for (int i = 0; i < nestedSchema.getFieldCount(); ++i) {
+ selectedLists.get(i).add(elementBeforeDistribution.getValue(i));
+ }
}
- return selectedList;
+ for (List aList : selectedLists) {
+ output.addValue(aList);
+ }
+ break;
}
case MAP:
{
FieldType nestedInputType =
checkNotNull(inputType.getMapValueType());
FieldType nestedOutputType =
checkNotNull(outputType.getMapValueType());
+
+ // When selecting multiple subelements under a map, we distribute
the select
+ // resulting in multiple maps. The semantics are the same as for
lists above (except we
+ // only support subelement select for map values, not for map keys).
+ Schema tempSchema = Schema.builder().addField("a",
nestedInputType).build();
+ FieldAccessDescriptor tempAccessDescriptor =
+ FieldAccessDescriptor.create()
+ .withNestedField("a", fieldAccessDescriptor)
+ .resolve(tempSchema);
+ Schema nestedSchema = getOutputSchema(tempSchema,
tempAccessDescriptor);
+ List<Map> selectedMaps =
Lists.newArrayListWithExpectedSize(nestedSchema.getFieldCount());
+ for (int i = 0; i < nestedSchema.getFieldCount(); ++i) {
+ selectedMaps.add(Maps.newHashMap());
+ }
+
Map<Object, Object> map = (Map) value;
- Map selectedMap = Maps.newHashMapWithExpectedSize(map.size());
for (Map.Entry<Object, Object> entry : map.entrySet()) {
- Object selected =
- selectRowHelper(
- qualifiers,
- qualifierPosition + 1,
- entry.getValue(),
- fieldAccessDescriptor,
- nestedInputType,
- nestedOutputType);
- selectedMap.put(entry.getKey(), selected);
+ Row.Builder selectValueBuilder = Row.withSchema(nestedSchema);
+ selectIntoRowWithQualifiers(
+ qualifiers,
+ qualifierPosition + 1,
+ entry.getValue(),
+ selectValueBuilder,
+ fieldAccessDescriptor,
+ nestedInputType,
+ nestedOutputType);
+
+ Row valueBeforeDistribution = selectValueBuilder.build();
+ for (int i = 0; i < nestedSchema.getFieldCount(); ++i) {
+ selectedMaps.get(i).put(entry.getKey(),
valueBeforeDistribution.getValue(i));
+ }
+ }
+ for (Map aMap : selectedMaps) {
+ output.addValue(aMap);
}
- return selectedMap;
+ break;
}
default:
throw new RuntimeException("Unexpected type " + qualifier.getKind());
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
index 47958e2..1f6e386 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
@@ -500,6 +500,17 @@ public abstract class Row implements Serializable {
this.schema = schema;
}
+ public int nextFieldId() {
+ if (fieldValueGetterFactory != null) {
+ throw new RuntimeException("Not supported");
+ }
+ return values.size();
+ }
+
+ public Schema getSchema() {
+ return schema;
+ }
+
public Builder addValue(@Nullable Object values) {
this.values.add(values);
return this;
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java
index cd8ec7a..0229c55 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java
@@ -216,22 +216,16 @@ public class GroupTest implements Serializable {
new OuterPOJO(new POJO("key2", 2L, "value4"))))
.apply(Group.byFieldNames("inner.field1", "inner.field2"));
- Schema selectedSchema =
-
Schema.builder().addStringField("field1").addInt64Field("field2").build();
- Schema keySchema = Schema.builder().addRowField("inner",
selectedSchema).build();
+ Schema keySchema =
Schema.builder().addStringField("field1").addInt64Field("field2").build();
List<KV<Row, Collection<OuterPOJO>>> expected =
ImmutableList.of(
KV.of(
- Row.withSchema(keySchema)
- .addValue(Row.withSchema(selectedSchema).addValues("key1",
1L).build())
- .build(),
+ Row.withSchema(keySchema).addValues("key1", 1L).build(),
ImmutableList.of(
new OuterPOJO(new POJO("key1", 1L, "value1")),
new OuterPOJO(new POJO("key1", 1L, "value2")))),
KV.of(
- Row.withSchema(keySchema)
- .addValue(Row.withSchema(selectedSchema).addValues("key2",
2L).build())
- .build(),
+ Row.withSchema(keySchema).addValues("key2", 2L).build(),
ImmutableList.of(
new OuterPOJO(new POJO("key2", 2L, "value3")),
new OuterPOJO(new POJO("key2", 2L, "value4")))));
@@ -535,8 +529,7 @@ public class GroupTest implements Serializable {
.aggregateField("inner.field3", Sum.ofIntegers(),
"field3_sum")
.aggregateField("inner.field1", Top.largestLongsFn(1),
"field1_top"));
- Schema innerKeySchema = Schema.builder().addInt64Field("field2").build();
- Schema keySchema = Schema.builder().addRowField("inner",
innerKeySchema).build();
+ Schema keySchema = Schema.builder().addInt64Field("field2").build();
Schema valueSchema =
Schema.builder()
.addInt64Field("field1_sum")
@@ -547,14 +540,10 @@ public class GroupTest implements Serializable {
List<KV<Row, Row>> expected =
ImmutableList.of(
KV.of(
- Row.withSchema(keySchema)
-
.addValue(Row.withSchema(innerKeySchema).addValue(1L).build())
- .build(),
+ Row.withSchema(keySchema).addValue(1L).build(),
Row.withSchema(valueSchema).addValue(3L).addValue(5).addArray(2L).build()),
KV.of(
- Row.withSchema(keySchema)
-
.addValue(Row.withSchema(innerKeySchema).addValue(2L).build())
- .build(),
+ Row.withSchema(keySchema).addValue(2L).build(),
Row.withSchema(valueSchema).addValue(7L).addValue(9).addArray(4L).build()));
PAssert.that(aggregations).satisfies(actual -> containsKvs(expected,
actual));
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java
index 3238ebd..f2728e0 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java
@@ -127,52 +127,6 @@ public class SelectTest {
}
}
- /** A pojo matching the schema results from selection field2.*. */
- @DefaultSchema(JavaFieldSchema.class)
- static class POJO2NestedAll {
- POJO1 field2 = new POJO1();
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- POJO2NestedAll that = (POJO2NestedAll) o;
- return Objects.equals(field2, that.field2);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(field2);
- }
- }
-
- /** A pojo matching the schema results from selection field2.field1,
field2.field3. */
- @DefaultSchema(JavaFieldSchema.class)
- static class POJO2NestedPartial {
- POJO1Selected field2 = new POJO1Selected();
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- POJO2NestedPartial that = (POJO2NestedPartial) o;
- return Objects.equals(field2, that.field2);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(field2);
- }
- }
-
@Test
@Category(NeedsRunner.class)
public void testSelectMissingFieldName() {
@@ -216,24 +170,36 @@ public class SelectTest {
@Test
@Category(NeedsRunner.class)
public void testSelectNestedAll() {
- PCollection<POJO2NestedAll> pojos =
+ PCollection<POJO1> pojos =
+ pipeline
+ .apply(Create.of(new POJO2()))
+ .apply(Select.fieldNames("field2"))
+ .apply(Convert.to(POJO1.class));
+ PAssert.that(pojos).containsInAnyOrder(new POJO1());
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testSelectNestedAllWildcard() {
+ PCollection<POJO1> pojos =
pipeline
.apply(Create.of(new POJO2()))
.apply(Select.fieldNames("field2.*"))
- .apply(Convert.to(POJO2NestedAll.class));
- PAssert.that(pojos).containsInAnyOrder(new POJO2NestedAll());
+ .apply(Convert.to(POJO1.class));
+ PAssert.that(pojos).containsInAnyOrder(new POJO1());
pipeline.run();
}
@Test
@Category(NeedsRunner.class)
public void testSelectNestedPartial() {
- PCollection<POJO2NestedPartial> pojos =
+ PCollection<POJO1Selected> pojos =
pipeline
.apply(Create.of(new POJO2()))
.apply(Select.fieldNames("field2.field1", "field2.field3"))
- .apply(Convert.to(POJO2NestedPartial.class));
- PAssert.that(pojos).containsInAnyOrder(new POJO2NestedPartial());
+ .apply(Convert.to(POJO1Selected.class));
+ PAssert.that(pojos).containsInAnyOrder(new POJO1Selected());
pipeline.run();
}
@@ -295,8 +261,8 @@ public class SelectTest {
@DefaultSchema(JavaFieldSchema.class)
static class PartialRowSingleArray {
- List<POJO1Selected> field1 =
- ImmutableList.of(new POJO1Selected(), new POJO1Selected(), new
POJO1Selected());
+ List<String> field1 = ImmutableList.of("field1", "field1", "field1");
+ List<Double> field3 = ImmutableList.of(3.14, 3.14, 3.14);
@Override
public boolean equals(Object o) {
@@ -307,12 +273,12 @@ public class SelectTest {
return false;
}
PartialRowSingleArray that = (PartialRowSingleArray) o;
- return Objects.equals(field1, that.field1);
+ return Objects.equals(field1, that.field1) && Objects.equals(field3,
that.field3);
}
@Override
public int hashCode() {
- return Objects.hash(field1);
+ return Objects.hash(field1, field3);
}
}
@@ -356,11 +322,16 @@ public class SelectTest {
@DefaultSchema(JavaFieldSchema.class)
static class PartialRowSingleMap {
- Map<String, POJO1Selected> field1 =
+ Map<String, String> field1 =
ImmutableMap.of(
- "key1", new POJO1Selected(),
- "key2", new POJO1Selected(),
- "key3", new POJO1Selected());
+ "key1", "field1",
+ "key2", "field1",
+ "key3", "field1");
+ Map<String, Double> field3 =
+ ImmutableMap.of(
+ "key1", 3.14,
+ "key2", 3.14,
+ "key3", 3.14);
@Override
public boolean equals(Object o) {
@@ -371,12 +342,12 @@ public class SelectTest {
return false;
}
PartialRowSingleMap that = (PartialRowSingleMap) o;
- return Objects.equals(field1, that.field1);
+ return Objects.equals(field1, that.field1) && Objects.equals(field3,
that.field3);
}
@Override
public int hashCode() {
- return Objects.hash(field1);
+ return Objects.hash(field1, field3);
}
}
@@ -427,8 +398,17 @@ public class SelectTest {
private static final List<List<POJO1Selected>> POJO_LIST_LIST =
ImmutableList.of(POJO_LIST, POJO_LIST, POJO_LIST);
- List<List<List<POJO1Selected>>> field1 =
- ImmutableList.of(POJO_LIST_LIST, POJO_LIST_LIST, POJO_LIST_LIST);
+ private static final List<String> STRING_LIST = ImmutableList.of("field1",
"field1", "field1");
+ private static final List<List<String>> STRING_LISTLIST =
+ ImmutableList.of(STRING_LIST, STRING_LIST, STRING_LIST);
+ List<List<List<String>>> field1 =
+ ImmutableList.of(STRING_LISTLIST, STRING_LISTLIST, STRING_LISTLIST);
+
+ private static final List<Double> DOUBLE_LIST = ImmutableList.of(3.14,
3.14, 3.14);
+ private static final List<List<Double>> DOUBLE_LISTLIST =
+ ImmutableList.of(DOUBLE_LIST, DOUBLE_LIST, DOUBLE_LIST);
+ List<List<List<Double>>> field3 =
+ ImmutableList.of(DOUBLE_LISTLIST, DOUBLE_LISTLIST, DOUBLE_LISTLIST);
@Override
public boolean equals(Object o) {
@@ -507,21 +487,37 @@ public class SelectTest {
@DefaultSchema(JavaFieldSchema.class)
static class PartialRowMultipleMaps {
- static final Map<String, POJO1Selected> POJO_MAP =
+ static final Map<String, String> STRING_MAP =
ImmutableMap.of(
- "key1", new POJO1Selected(),
- "key2", new POJO1Selected(),
- "key3", new POJO1Selected());
- static final Map<String, Map<String, POJO1Selected>> POJO_MAP_MAP =
+ "key1", "field1",
+ "key2", "field1",
+ "key3", "field1");
+ static final Map<String, Map<String, String>> STRING_MAPMAP =
ImmutableMap.of(
- "key1", POJO_MAP,
- "key2", POJO_MAP,
- "key3", POJO_MAP);
- Map<String, Map<String, Map<String, POJO1Selected>>> field1 =
+ "key1", STRING_MAP,
+ "key2", STRING_MAP,
+ "key3", STRING_MAP);
+ Map<String, Map<String, Map<String, String>>> field1 =
ImmutableMap.of(
- "key1", POJO_MAP_MAP,
- "key2", POJO_MAP_MAP,
- "key3", POJO_MAP_MAP);
+ "key1", STRING_MAPMAP,
+ "key2", STRING_MAPMAP,
+ "key3", STRING_MAPMAP);
+ static final Map<String, Double> DOUBLE_MAP =
+ ImmutableMap.of(
+ "key1", 3.14,
+ "key2", 3.14,
+ "key3", 3.14);
+ static final Map<String, Map<String, Double>> DOUBLE_MAPMAP =
+ ImmutableMap.of(
+ "key1", DOUBLE_MAP,
+ "key2", DOUBLE_MAP,
+ "key3", DOUBLE_MAP);
+
+ Map<String, Map<String, Map<String, Double>>> field3 =
+ ImmutableMap.of(
+ "key1", DOUBLE_MAPMAP,
+ "key2", DOUBLE_MAPMAP,
+ "key3", DOUBLE_MAPMAP);
@Override
public boolean equals(Object o) {
@@ -532,12 +528,12 @@ public class SelectTest {
return false;
}
PartialRowMultipleMaps that = (PartialRowMultipleMaps) o;
- return Objects.equals(field1, that.field1);
+ return Objects.equals(field1, that.field1) && Objects.equals(field3,
that.field3);
}
@Override
public int hashCode() {
- return Objects.hash(field1);
+ return Objects.hash(field1, field3);
}
}
@@ -592,15 +588,20 @@ public class SelectTest {
@DefaultSchema(JavaFieldSchema.class)
static class PartialRowNestedArraysAndMaps {
- static final List<POJO1Selected> POJO_LIST =
- ImmutableList.of(new POJO1Selected(), new POJO1Selected(), new
POJO1Selected());
- static final Map<String, List<POJO1Selected>> POJO_MAP_LIST =
+ static final Map<String, List<String>> STRING_MAP =
ImmutableMap.of(
- "key1", POJO_LIST,
- "key2", POJO_LIST,
- "key3", POJO_LIST);
- List<Map<String, List<POJO1Selected>>> field1 =
- ImmutableList.of(POJO_MAP_LIST, POJO_MAP_LIST, POJO_MAP_LIST);
+ "key1", ImmutableList.of("field1", "field1", "field1"),
+ "key2", ImmutableList.of("field1", "field1", "field1"),
+ "key3", ImmutableList.of("field1", "field1", "field1"));
+ List<Map<String, List<String>>> field1 = ImmutableList.of(STRING_MAP,
STRING_MAP, STRING_MAP);
+
+ static final Map<String, List<Double>> DOUBLE_MAP =
+ ImmutableMap.of(
+ "key1", ImmutableList.of(3.14, 3.14, 3.14),
+ "key2", ImmutableList.of(3.14, 3.14, 3.14),
+ "key3", ImmutableList.of(3.14, 3.14, 3.14));
+
+ List<Map<String, List<Double>>> field3 = ImmutableList.of(DOUBLE_MAP,
DOUBLE_MAP, DOUBLE_MAP);
@Override
public boolean equals(Object o) {
@@ -611,12 +612,17 @@ public class SelectTest {
return false;
}
PartialRowNestedArraysAndMaps that = (PartialRowNestedArraysAndMaps) o;
- return Objects.equals(field1, that.field1);
+ return Objects.equals(field1, that.field1) && Objects.equals(field3,
that.field3);
}
@Override
public int hashCode() {
- return Objects.hash(field1);
+ return Objects.hash(field1, field3);
+ }
+
+ @Override
+ public String toString() {
+ return "PartialRowNestedArraysAndMaps{" + "field1=" + field1 + ",
field3=" + field3 + '}';
}
}
@@ -637,6 +643,7 @@ public class SelectTest {
.apply("convert2",
Convert.to(PartialRowNestedArraysAndMaps.class));
PAssert.that(selected).containsInAnyOrder(new
PartialRowNestedArraysAndMaps());
+ PAssert.that(selected2).containsInAnyOrder(new
PartialRowNestedArraysAndMaps());
pipeline.run();
}
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java
new file mode 100644
index 0000000..77183bf
--- /dev/null
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SelectHelpersTest.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.utils;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
+import org.junit.Test;
+
+/** Tests for {@link SelectHelpers}. */
+public class SelectHelpersTest {
+ static final Schema FLAT_SCHEMA =
+ Schema.builder()
+ .addStringField("field1")
+ .addInt32Field("field2")
+ .addDoubleField("field3")
+ .build();
+ static final Row FLAT_ROW = Row.withSchema(FLAT_SCHEMA).addValues("first",
42, 3.14).build();
+
+ static final Schema NESTED_SCHEMA =
+ Schema.builder().addRowField("nested",
FLAT_SCHEMA).addStringField("foo").build();
+ static final Row NESTED_ROW =
Row.withSchema(NESTED_SCHEMA).addValues(FLAT_ROW, "").build();
+
+ static final Schema DOUBLE_NESTED_SCHEMA =
+ Schema.builder().addRowField("nested2", NESTED_SCHEMA).build();
+ static final Row DOUBLE_NESTED_ROW =
+ Row.withSchema(DOUBLE_NESTED_SCHEMA).addValue(NESTED_ROW).build();
+
+ static final Schema ARRAY_SCHEMA =
+ Schema.builder()
+ .addArrayField("primitiveArray", FieldType.INT32)
+ .addArrayField("rowArray", FieldType.row(FLAT_SCHEMA))
+ .addArrayField("arrayOfRowArray",
FieldType.array(FieldType.row(FLAT_SCHEMA)))
+ .addArrayField("nestedRowArray", FieldType.row(NESTED_SCHEMA))
+ .build();
+ static final Row ARRAY_ROW =
+ Row.withSchema(ARRAY_SCHEMA)
+ .addArray(1, 2)
+ .addArray(FLAT_ROW, FLAT_ROW)
+ .addArray(ImmutableList.of(FLAT_ROW), ImmutableList.of(FLAT_ROW))
+ .addArray(NESTED_ROW, NESTED_ROW)
+ .build();
+
+ static final Schema MAP_SCHEMA =
+ Schema.builder().addMapField("map", FieldType.INT32,
FieldType.row(FLAT_SCHEMA)).build();
+ static final Row MAP_ROW =
+ Row.withSchema(MAP_SCHEMA).addValue(ImmutableMap.of(1,
FLAT_ROW)).build();
+
+ static final Schema MAP_ARRAY_SCHEMA =
+ Schema.builder()
+ .addMapField("map", FieldType.INT32,
FieldType.array(FieldType.row(FLAT_SCHEMA)))
+ .build();
+ static final Row MAP_ARRAY_ROW =
+ Row.withSchema(MAP_ARRAY_SCHEMA)
+ .addValue(ImmutableMap.of(1, ImmutableList.of(FLAT_ROW)))
+ .build();
+
+ @Test
+ public void testSelectAll() {
+ FieldAccessDescriptor fieldAccessDescriptor =
+ FieldAccessDescriptor.withFieldNames("*").resolve(FLAT_SCHEMA);
+ Schema outputSchema = SelectHelpers.getOutputSchema(FLAT_SCHEMA,
fieldAccessDescriptor);
+ assertEquals(FLAT_SCHEMA, outputSchema);
+
+ Row row = SelectHelpers.selectRow(FLAT_ROW, fieldAccessDescriptor,
FLAT_SCHEMA, outputSchema);
+ assertEquals(FLAT_ROW, row);
+ }
+
+ @Test
+ public void testsSimpleSelectSingle() {
+ FieldAccessDescriptor fieldAccessDescriptor =
+ FieldAccessDescriptor.withFieldNames("field1").resolve(FLAT_SCHEMA);
+ Schema outputSchema = SelectHelpers.getOutputSchema(FLAT_SCHEMA,
fieldAccessDescriptor);
+ Schema expectedSchema = Schema.builder().addStringField("field1").build();
+ assertEquals(expectedSchema, outputSchema);
+
+ Row row = SelectHelpers.selectRow(FLAT_ROW, fieldAccessDescriptor,
FLAT_SCHEMA, outputSchema);
+ Row expectedRow = Row.withSchema(expectedSchema).addValue("first").build();
+ assertEquals(expectedRow, row);
+ }
+
+ @Test
+ public void testsSimpleSelectMultiple() {
+ FieldAccessDescriptor fieldAccessDescriptor =
+ FieldAccessDescriptor.withFieldNames("field1",
"field3").resolve(FLAT_SCHEMA);
+ Schema outputSchema = SelectHelpers.getOutputSchema(FLAT_SCHEMA,
fieldAccessDescriptor);
+ Schema expectedSchema =
+
Schema.builder().addStringField("field1").addDoubleField("field3").build();
+ assertEquals(expectedSchema, outputSchema);
+
+ Row row = SelectHelpers.selectRow(FLAT_ROW, fieldAccessDescriptor,
FLAT_SCHEMA, outputSchema);
+ Row expectedRow = Row.withSchema(expectedSchema).addValues("first",
3.14).build();
+ assertEquals(expectedRow, row);
+ }
+
+ @Test
+ public void testSelectedNested() {
+ FieldAccessDescriptor fieldAccessDescriptor =
+ FieldAccessDescriptor.withFieldNames("nested").resolve(NESTED_SCHEMA);
+ Schema outputSchema = SelectHelpers.getOutputSchema(NESTED_SCHEMA,
fieldAccessDescriptor);
+ Schema expectedSchema = Schema.builder().addRowField("nested",
FLAT_SCHEMA).build();
+ assertEquals(expectedSchema, outputSchema);
+
+ Row row =
+ SelectHelpers.selectRow(NESTED_ROW, fieldAccessDescriptor,
NESTED_SCHEMA, outputSchema);
+ Row expectedRow =
Row.withSchema(expectedSchema).addValue(FLAT_ROW).build();
+ assertEquals(expectedRow, row);
+ }
+
+ @Test
+ public void testSelectedNestedSingle() {
+ FieldAccessDescriptor fieldAccessDescriptor =
+
FieldAccessDescriptor.withFieldNames("nested.field1").resolve(NESTED_SCHEMA);
+ Schema outputSchema = SelectHelpers.getOutputSchema(NESTED_SCHEMA,
fieldAccessDescriptor);
+ Schema expectedSchema = Schema.builder().addStringField("field1").build();
+ assertEquals(expectedSchema, outputSchema);
+
+ Row row =
+ SelectHelpers.selectRow(NESTED_ROW, fieldAccessDescriptor,
NESTED_SCHEMA, outputSchema);
+ Row expectedRow = Row.withSchema(expectedSchema).addValue("first").build();
+ assertEquals(expectedRow, row);
+ }
+
+ @Test
+ public void testSelectedNestedWildcard() {
+ FieldAccessDescriptor fieldAccessDescriptor =
+
FieldAccessDescriptor.withFieldNames("nested.*").resolve(NESTED_SCHEMA);
+ Schema outputSchema = SelectHelpers.getOutputSchema(NESTED_SCHEMA,
fieldAccessDescriptor);
+ assertEquals(FLAT_SCHEMA, outputSchema);
+
+ Row row =
+ SelectHelpers.selectRow(NESTED_ROW, fieldAccessDescriptor,
NESTED_SCHEMA, outputSchema);
+ assertEquals(FLAT_ROW, row);
+ }
+
+ @Test
+ public void testSelectDoubleNested() {
+ FieldAccessDescriptor fieldAccessDescriptor =
+
FieldAccessDescriptor.withFieldNames("nested2.nested.field1").resolve(DOUBLE_NESTED_SCHEMA);
+ Schema outputSchema =
+ SelectHelpers.getOutputSchema(DOUBLE_NESTED_SCHEMA,
fieldAccessDescriptor);
+ Schema expectedSchema = Schema.builder().addStringField("field1").build();
+ assertEquals(expectedSchema, outputSchema);
+
+ Row row =
+ SelectHelpers.selectRow(
+ DOUBLE_NESTED_ROW, fieldAccessDescriptor, DOUBLE_NESTED_SCHEMA,
outputSchema);
+ Row expectedRow = Row.withSchema(expectedSchema).addValue("first").build();
+ assertEquals(expectedRow, row);
+ }
+
+ @Test
+ public void testSelectArrayOfPrimitive() {
+ FieldAccessDescriptor fieldAccessDescriptor =
+
FieldAccessDescriptor.withFieldNames("primitiveArray").resolve(ARRAY_SCHEMA);
+ Schema outputSchema = SelectHelpers.getOutputSchema(ARRAY_SCHEMA,
fieldAccessDescriptor);
+ Schema expectedSchema =
+ Schema.builder().addArrayField("primitiveArray",
FieldType.INT32).build();
+ assertEquals(expectedSchema, outputSchema);
+
+ Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor,
ARRAY_SCHEMA, outputSchema);
+ Row expectedRow = Row.withSchema(expectedSchema).addArray(1, 2).build();
+ assertEquals(expectedRow, row);
+ }
+
+ @Test
+ public void testSelectArrayOfRow() {
+ FieldAccessDescriptor fieldAccessDescriptor =
+ FieldAccessDescriptor.withFieldNames("rowArray").resolve(ARRAY_SCHEMA);
+ Schema outputSchema = SelectHelpers.getOutputSchema(ARRAY_SCHEMA,
fieldAccessDescriptor);
+ Schema expectedSchema =
+ Schema.builder().addArrayField("rowArray",
FieldType.row(FLAT_SCHEMA)).build();
+ assertEquals(expectedSchema, outputSchema);
+
+ Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor,
ARRAY_SCHEMA, outputSchema);
+ Row expectedRow = Row.withSchema(expectedSchema).addArray(FLAT_ROW,
FLAT_ROW).build();
+ assertEquals(expectedRow, row);
+ }
+
+ @Test
+ public void testSelectArrayOfRowPartial() {
+ FieldAccessDescriptor fieldAccessDescriptor =
+
FieldAccessDescriptor.withFieldNames("rowArray[].field1").resolve(ARRAY_SCHEMA);
+ Schema outputSchema = SelectHelpers.getOutputSchema(ARRAY_SCHEMA,
fieldAccessDescriptor);
+
+ Schema expectedSchema = Schema.builder().addArrayField("field1",
FieldType.STRING).build();
+ assertEquals(expectedSchema, outputSchema);
+
+ Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor,
ARRAY_SCHEMA, outputSchema);
+ Row expectedRow = Row.withSchema(expectedSchema).addArray("first",
"first").build();
+ assertEquals(expectedRow, row);
+ }
+
+ @Test
+ public void testSelectArrayOfRowArray() {
+ FieldAccessDescriptor fieldAccessDescriptor =
+
FieldAccessDescriptor.withFieldNames("arrayOfRowArray[][].field1").resolve(ARRAY_SCHEMA);
+ Schema outputSchema = SelectHelpers.getOutputSchema(ARRAY_SCHEMA,
fieldAccessDescriptor);
+
+ Schema expectedSchema =
+ Schema.builder().addArrayField("field1",
FieldType.array(FieldType.STRING)).build();
+ assertEquals(expectedSchema, outputSchema);
+
+ Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor,
ARRAY_SCHEMA, outputSchema);
+
+ Row expectedRow =
+ Row.withSchema(expectedSchema)
+ .addArray(ImmutableList.of("first"), ImmutableList.of("first"))
+ .build();
+ assertEquals(expectedRow, row);
+ }
+
+ @Test
+ public void testSelectArrayOfNestedRow() {
+ FieldAccessDescriptor fieldAccessDescriptor =
+ FieldAccessDescriptor.withFieldNames("nestedRowArray[].nested.field1")
+ .resolve(ARRAY_SCHEMA);
+ Schema outputSchema = SelectHelpers.getOutputSchema(ARRAY_SCHEMA,
fieldAccessDescriptor);
+
+ Schema expectedElementSchema =
Schema.builder().addStringField("field1").build();
+ Schema expectedSchema = Schema.builder().addArrayField("field1",
FieldType.STRING).build();
+ assertEquals(expectedSchema, outputSchema);
+
+ Row row = SelectHelpers.selectRow(ARRAY_ROW, fieldAccessDescriptor,
ARRAY_SCHEMA, outputSchema);
+ Row expectedRow = Row.withSchema(expectedSchema).addArray("first",
"first").build();
+ assertEquals(expectedRow, row);
+ }
+
+ @Test
+ public void testSelectMapOfRowSelectSingle() {
+ FieldAccessDescriptor fieldAccessDescriptor =
+
FieldAccessDescriptor.withFieldNames("map{}.field1").resolve(MAP_SCHEMA);
+ Schema outputSchema = SelectHelpers.getOutputSchema(MAP_SCHEMA,
fieldAccessDescriptor);
+
+ Schema expectedValueSchema =
Schema.builder().addStringField("field1").build();
+ Schema expectedSchema =
+ Schema.builder().addMapField("field1", FieldType.INT32,
FieldType.STRING).build();
+ assertEquals(expectedSchema, outputSchema);
+
+ Row row = SelectHelpers.selectRow(MAP_ROW, fieldAccessDescriptor,
MAP_SCHEMA, outputSchema);
+ Row expectedRow =
Row.withSchema(expectedSchema).addValue(ImmutableMap.of(1, "first")).build();
+ assertEquals(expectedRow, row);
+ }
+
+ @Test
+ public void testSelectMapOfRowSelectAll() {
+ FieldAccessDescriptor fieldAccessDescriptor =
+ FieldAccessDescriptor.withFieldNames("map{}.*").resolve(MAP_SCHEMA);
+ Schema outputSchema = SelectHelpers.getOutputSchema(MAP_SCHEMA,
fieldAccessDescriptor);
+ Schema expectedSchema =
+ Schema.builder()
+ .addMapField("field1", FieldType.INT32, FieldType.STRING)
+ .addMapField("field2", FieldType.INT32, FieldType.INT32)
+ .addMapField("field3", FieldType.INT32, FieldType.DOUBLE)
+ .build();
+ assertEquals(expectedSchema, outputSchema);
+
+ Row row = SelectHelpers.selectRow(MAP_ROW, fieldAccessDescriptor,
MAP_SCHEMA, outputSchema);
+ Row expectedRow =
+ Row.withSchema(expectedSchema)
+ .addValue(ImmutableMap.of(1, FLAT_ROW.getValue(0)))
+ .addValue(ImmutableMap.of(1, FLAT_ROW.getValue(1)))
+ .addValue(ImmutableMap.of(1, FLAT_ROW.getValue(2)))
+ .build();
+ assertEquals(expectedRow, row);
+ }
+
+ @Test
+ public void testSelectMapOfArray() {
+ FieldAccessDescriptor fieldAccessDescriptor =
+
FieldAccessDescriptor.withFieldNames("map.field1").resolve(MAP_ARRAY_SCHEMA);
+ Schema outputSchema = SelectHelpers.getOutputSchema(MAP_ARRAY_SCHEMA,
fieldAccessDescriptor);
+
+ Schema expectedSchema =
+ Schema.builder()
+ .addMapField("field1", FieldType.INT32,
FieldType.array(FieldType.STRING))
+ .build();
+ assertEquals(expectedSchema, outputSchema);
+
+ Row row =
+ SelectHelpers.selectRow(
+ MAP_ARRAY_ROW, fieldAccessDescriptor, MAP_ARRAY_SCHEMA,
outputSchema);
+
+ Row expectedRow =
+ Row.withSchema(expectedSchema)
+ .addValue(ImmutableMap.of(1, ImmutableList.of("first")))
+ .build();
+ assertEquals(expectedRow, row);
+ }
+
+ @Test
+ public void testSelectFieldOfRecord() {
+ Schema f1 = Schema.builder().addInt64Field("f0").build();
+ Schema f2 = Schema.builder().addRowField("f1", f1).build();
+ Schema f3 = Schema.builder().addRowField("f2", f2).build();
+
+ Row r1 = Row.withSchema(f1).addValue(42L).build(); // {"f0": 42}
+ Row r2 = Row.withSchema(f2).addValue(r1).build(); // {"f1": {"f0": 42}}
+ Row r3 = Row.withSchema(f3).addValue(r2).build(); // {"f2": {"f1": {"f0":
42}}}
+
+ FieldAccessDescriptor fieldAccessDescriptor =
+ FieldAccessDescriptor.withFieldNames("f2.f1").resolve(f3);
+
+ Schema outputSchema = SelectHelpers.getOutputSchema(f3,
fieldAccessDescriptor);
+
+ Row out = SelectHelpers.selectRow(r3, fieldAccessDescriptor,
r3.getSchema(), outputSchema);
+
+ assertEquals(f2, outputSchema);
+ assertEquals(r2, out);
+ }
+
+ @Test
+ public void testSelectFieldOfRecordOrRecord() {
+ Schema f1 = Schema.builder().addInt64Field("f0").build();
+ Schema f2 = Schema.builder().addRowField("f1", f1).build();
+ Schema f3 = Schema.builder().addRowField("f2", f2).build();
+ Schema f4 = Schema.builder().addRowField("f3", f3).build();
+
+ Row r1 = Row.withSchema(f1).addValue(42L).build(); // {"f0": 42}
+ Row r2 = Row.withSchema(f2).addValue(r1).build(); // {"f1": {"f0": 42}}
+ Row r3 = Row.withSchema(f3).addValue(r2).build(); // {"f2": {"f1": {"f0":
42}}}
+ Row r4 = Row.withSchema(f4).addValue(r3).build(); // {"f3": {"f2": {"f1":
{"f0": 42}}}}
+
+ FieldAccessDescriptor fieldAccessDescriptor =
+ FieldAccessDescriptor.withFieldNames("f3.f2").resolve(f4);
+
+ Schema outputSchema = SelectHelpers.getOutputSchema(f4,
fieldAccessDescriptor);
+
+ Row out = SelectHelpers.selectRow(r4, fieldAccessDescriptor,
r4.getSchema(), outputSchema);
+
+ assertEquals(f3, outputSchema);
+ assertEquals(r3, out);
+ }
+
+ @Test
+ public void testArrayRowArray() {
+ Schema f1 = Schema.builder().addStringField("f0").build();
+ Schema f2 = Schema.builder().addArrayField("f1",
FieldType.row(f1)).build();
+ Schema f3 = Schema.builder().addRowField("f2", f2).build();
+ Schema f4 = Schema.builder().addArrayField("f3",
FieldType.row(f3)).build();
+
+ Row r1 = Row.withSchema(f1).addValue("first").build();
+ Row r2 = Row.withSchema(f2).addArray(r1, r1).build();
+ Row r3 = Row.withSchema(f3).addValue(r2).build();
+ Row r4 = Row.withSchema(f4).addArray(r3, r3).build();
+
+ FieldAccessDescriptor fieldAccessDescriptor =
+ FieldAccessDescriptor.withFieldNames("f3.f2.f1.f0").resolve(f4);
+ Schema outputSchema = SelectHelpers.getOutputSchema(f4,
fieldAccessDescriptor);
+ Schema expectedSchema =
+ Schema.builder().addArrayField("f0",
FieldType.array(FieldType.STRING)).build();
+ assertEquals(expectedSchema, outputSchema);
+ Row out = SelectHelpers.selectRow(r4, fieldAccessDescriptor,
r4.getSchema(), outputSchema);
+ Row expected =
+ Row.withSchema(outputSchema)
+ .addArray(Lists.newArrayList("first", "first"),
Lists.newArrayList("first", "first"))
+ .build();
+ assertEquals(expected, out);
+ }
+}