[ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=131078&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-131078
 ]

ASF GitHub Bot logged work on BEAM-4461:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Aug/18 21:26
            Start Date: 03/Aug/18 21:26
    Worklog Time Spent: 10m 
      Work Description: reuvenlax closed pull request #6133: [BEAM-4461] Add 
Select and Filter transforms for schema types.
URL: https://github.com/apache/beam/pull/6133
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java
index 637091105e7..ed33673a91c 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java
@@ -106,7 +106,7 @@ public static FieldAccessDescriptor 
withFieldNames(String... names) {
    * in a recursive {@link FieldAccessDescriptor}.
    */
   public static FieldAccessDescriptor withFieldNames(Iterable<String> 
fieldNames) {
-    return 
builder().setFieldNamesAccessed(Sets.newHashSet(fieldNames)).build();
+    return 
builder().setFieldNamesAccessed(Sets.newTreeSet(fieldNames)).build();
   }
 
   /**
@@ -128,7 +128,12 @@ public static FieldAccessDescriptor 
withFieldIds(Integer... ids) {
    * in a recursive {@link FieldAccessDescriptor}.
    */
   public static FieldAccessDescriptor withFieldIds(Iterable<Integer> ids) {
-    return builder().setFieldIdsAccessed(Sets.newHashSet(ids)).build();
+    return builder().setFieldIdsAccessed(Sets.newTreeSet(ids)).build();
+  }
+
+  /** Return an empty {@link FieldAccessDescriptor}. */
+  public static FieldAccessDescriptor create() {
+    return builder().build();
   }
 
   /**
@@ -194,7 +199,7 @@ public FieldAccessDescriptor resolve(Schema schema) {
   }
 
   private Set<Integer> resolveFieldIdsAccessed(Schema schema) {
-    Set<Integer> fieldIds = 
Sets.newHashSetWithExpectedSize(getFieldIdsAccessed().size());
+    Set<Integer> fieldIds = Sets.newTreeSet();
     for (int fieldId : getFieldIdsAccessed()) {
       fieldIds.add(validateFieldId(schema, fieldId));
     }
@@ -229,7 +234,7 @@ private FieldAccessDescriptor resolvedNestedFieldsHelper(
   }
 
   private Map<Integer, FieldAccessDescriptor> 
resolveNestedFieldsAccessed(Schema schema) {
-    Map<Integer, FieldAccessDescriptor> nestedFields = Maps.newHashMap();
+    Map<Integer, FieldAccessDescriptor> nestedFields = Maps.newTreeMap();
 
     nestedFields.putAll(
         getNestedFieldsAccessedByName()
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java
new file mode 100644
index 00000000000..4f3da61d700
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A {@link PTransform} for filtering a collection of schema types.
+ *
+ * <p>Separate Predicates can be registered for different schema fields, and 
the result is allowed
+ * to pass if all predicates return true. The output type is the same as the 
input type.
+ *
+ * <p>For example, consider the following schema type:
+ *
+ * <pre>{@code
+ * public class Location {
+ *   public double latitude;
+ *   public double longitude;
+ * }
+ * }</pre>
+ *
+ * In order to examine only locations in south Manhattan, you would write:
+ *
+ * <pre>{@code
+ * PCollection<Location> locations = readLocations();
+ * locations.apply(Filter
+ *    .whereFieldName("latitude", lat -> lat < 40.720 && lat > 40.699)
+ *    .whereFieldName("longitude", long -> long < -73.969 && long > -74.747));
+ * }</pre>
+ *
+ * Predicates that require examining multiple fields at once are also 
supported. For example,
+ * consider the following class representing a user account:
+ *
+ * <pre>{@code
+ * class UserAccount {
+ *   public double spendOnBooks;
+ *   public double spendOnMovies;
+ *       ...
+ * }
+ * }</pre>
+ *
+ * Say you want to examine only users whos total spend is above $100. You 
could write:
+ *
+ * <pre>{@code
+ * PCollection<UserAccount> users = readUsers();
+ * users.apply(Filter
+ *    .whereFieldNames(Lists.newArrayList("spendOnBooks", "spendOnMovies"),
+ *        row -> return row.getDouble("spendOnBooks") + 
row.getDouble("spendOnMovies") > 100.00));
+ * }</pre>
+ */
+@Experimental(Kind.SCHEMAS)
+public class Filter {
+  public static <T> Inner<T> create() {
+    return new Inner<T>();
+  }
+
+  /** Implementation of the filter. */
+  public static class Inner<T> extends PTransform<PCollection<T>, 
PCollection<T>> {
+    private final Map<String, SerializableFunction<?, Boolean>> 
fieldNameFilters =
+        Maps.newHashMap();
+    private final Map<Integer, SerializableFunction<?, Boolean>> 
fieldIdFilters = Maps.newHashMap();
+    private final Map<List<String>, SerializableFunction<Row, Boolean>> 
fieldNamesFilters =
+        Maps.newHashMap();
+    private final Map<List<Integer>, SerializableFunction<Row, Boolean>> 
fieldIdsFilters =
+        Maps.newHashMap();
+
+    /** Set a predicate based on the value of a field, where the field is 
specified by name. */
+    public Inner<T> whereFieldName(String fieldName, SerializableFunction<?, 
Boolean> predicate) {
+      fieldNameFilters.put(fieldName, predicate);
+      return this;
+    }
+
+    /** Set a predicate based on the value of a field, where the field is 
specified by id. */
+    public Inner<T> whereFieldId(int fieldId, SerializableFunction<?, Boolean> 
predicate) {
+      fieldIdFilters.put(fieldId, predicate);
+      return this;
+    }
+
+    /** Set a predicate based on the value of multipled fields, specified by 
name. */
+    public Inner<T> whereFieldNames(
+        List<String> fieldNames, SerializableFunction<Row, Boolean> predicate) 
{
+      fieldNamesFilters.put(fieldNames, predicate);
+      return this;
+    }
+
+    /** Set a predicate based on the value of multipled fields, specified by 
id. */
+    public Inner<T> whereFieldIds(
+        List<Integer> fieldIds, SerializableFunction<Row, Boolean> predicate) {
+      fieldIdsFilters.put(fieldIds, predicate);
+      return this;
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<T> input) {
+      // Validate that all referenced fields are in the schema.
+      Schema schema = input.getSchema();
+      for (String fieldName :
+          Sets.union(
+              fieldNameFilters.keySet(),
+              fieldNamesFilters
+                  .keySet()
+                  .stream()
+                  .flatMap(List::stream)
+                  .collect(Collectors.toSet()))) {
+        schema.getField(fieldName);
+      }
+      for (int fieldIndex :
+          Sets.union(
+              fieldIdFilters.keySet(),
+              fieldIdsFilters
+                  .keySet()
+                  .stream()
+                  .flatMap(List::stream)
+                  .collect(Collectors.toSet()))) {
+        if (fieldIndex >= schema.getFieldCount() || fieldIndex < 0) {
+          throw new IllegalArgumentException(
+              "Field index " + fieldIndex + " does not exist in the schema.");
+        }
+      }
+
+      // TODO: Once BEAM-4457 is fixed, tag this ParDo with a 
FieldAccessDescriptor so that Beam
+      // knows which fields are being accessed.
+
+      return input.apply(
+          ParDo.of(
+              new DoFn<T, T>() {
+                @ProcessElement
+                public void process(@Element Row row, OutputReceiver<Row> o) {
+                  for (Map.Entry<String, SerializableFunction<?, Boolean>> 
entry :
+                      fieldNameFilters.entrySet()) {
+                    if (!entry.getValue().apply(row.getValue(entry.getKey()))) 
{
+                      return;
+                    }
+                  }
+
+                  for (Map.Entry<Integer, SerializableFunction<?, Boolean>> 
entry :
+                      fieldIdFilters.entrySet()) {
+                    if (!entry.getValue().apply(row.getValue(entry.getKey()))) 
{
+                      return;
+                    }
+                  }
+
+                  for (SerializableFunction<Row, Boolean> predicate : 
fieldNamesFilters.values()) {
+                    if (!predicate.apply(row)) {
+                      return;
+                    }
+                  }
+
+                  for (SerializableFunction<Row, Boolean> predicate : 
fieldIdsFilters.values()) {
+                    if (!predicate.apply(row)) {
+                      return;
+                    }
+                  }
+                  // All filters passed. Output the row.
+                  o.output(row);
+                }
+              }));
+    }
+  }
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java
new file mode 100644
index 00000000000..a5024ef7400
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.transforms;
+
+import java.util.Map;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A {@link PTransform} for selecting a subset of fields from a schema type.
+ *
+ * <p>This transforms allows projecting out a subset of fields from a schema 
type. The output of
+ * this transform is of type {@link Row}, though that can be converted into 
any other type with
+ * matching schema using the {@link Convert} transform.
+ *
+ * <p>For example, consider the following POJO type:
+ *
+ * <pre>{@code
+ * {@literal @}DefaultSchema(JavaFieldSchema.class)
+ * public class UserEvent {
+ *   public String userId;
+ *   public String eventId;
+ *   public int eventType;
+ *   public Location location;
+ * }
+ *
+ * {@literal @}DefaultSchema(JavaFieldSchema.class)
+ * public class Location {
+ *   public double latitude;
+ *   public double longtitude;
+ * }
+ * }</pre>
+ *
+ * Say you want to select just the set of userId, eventId pairs from each 
element, you would write
+ * the following:
+ *
+ * <pre>{@code
+ * PCollection<UserEvent> events = readUserEvents();
+ * PCollection<Row> rows = event.apply(Select.fieldNameFilters("userId", 
"eventId"));
+ * }</pre>
+ *
+ * It's possible to select a nested field as well. For example, if you want 
just the location
+ * information from each element:
+ *
+ * <pre>{@code
+ * PCollection<UserEvent> events = readUserEvents();
+ * PCollection<Row> rows = 
event.apply(Select.fieldAccess(FieldAccessDescriptor.create()
+ *      .withNestedField("location",
+ *          FieldAccessDescriptor.withAllFields())));
+ * }</pre>
+ */
+@Experimental(Kind.SCHEMAS)
+public class Select<T> extends PTransform<PCollection<T>, PCollection<Row>> {
+  private final FieldAccessDescriptor fieldAccessDescriptor;
+
+  private Select(FieldAccessDescriptor fieldAccessDescriptor) {
+    this.fieldAccessDescriptor = fieldAccessDescriptor;
+  }
+
+  /** Select a set of top-level field ids from the row. */
+  public static <T> Select<T> fieldIds(Integer... ids) {
+    return new Select(FieldAccessDescriptor.withFieldIds(ids));
+  }
+
+  /** Select a set of top-level field names from the row. */
+  public static <T> Select<T> fieldNames(String... names) {
+    return new Select(FieldAccessDescriptor.withFieldNames(names));
+  }
+
+  /**
+   * Select a set of fields described in a {@link FieldAccessDescriptor}.
+   *
+   * <p>This allows for nested fields to be selected as well.
+   */
+  public static <T> Select<T> fieldAccess(FieldAccessDescriptor 
fieldAccessDescriptor) {
+    return new Select(fieldAccessDescriptor);
+  }
+
+  // TODO: Support Xpath or JsonPath as a way of describing fields.
+
+  @Override
+  public PCollection<Row> expand(PCollection<T> input) {
+    Schema inputSchema = input.getSchema();
+    FieldAccessDescriptor resolved = 
fieldAccessDescriptor.resolve(inputSchema);
+    Schema outputSchema = getOutputSchema(inputSchema, resolved);
+
+    PCollection<Row> selected =
+        input
+            .apply(
+                ParDo.of(
+                    new DoFn<T, Row>() {
+                      // TODO: This should be the same as resolved so that 
Beam knows which fields
+                      // are being accessed. Currently Beam only supports 
wildcard descriptors.
+                      // Once BEAM-4457 is fixed, fix this.
+                      @FieldAccess("filterFields")
+                      final FieldAccessDescriptor fieldAccessDescriptor =
+                          FieldAccessDescriptor.withAllFields();
+
+                      @ProcessElement
+                      public void process(
+                          @FieldAccess("filterFields") Row row, 
OutputReceiver<Row> r) {
+                        r.output(selectRow(row, resolved, inputSchema, 
outputSchema));
+                      }
+                    }))
+            .setRowSchema(outputSchema);
+
+    return selected;
+  }
+
+  static Schema getOutputSchema(Schema inputSchema, FieldAccessDescriptor 
fieldAccessDescriptor) {
+    if (fieldAccessDescriptor.allFields()) {
+      return inputSchema;
+    }
+    Schema.Builder builder = new Schema.Builder();
+    for (int fieldId : fieldAccessDescriptor.fieldIdsAccessed()) {
+      builder.addField(inputSchema.getField(fieldId));
+    }
+    for (Map.Entry<Integer, FieldAccessDescriptor> nested :
+        fieldAccessDescriptor.nestedFields().entrySet()) {
+      Field field = inputSchema.getField(nested.getKey());
+      FieldAccessDescriptor nestedDescriptor = nested.getValue();
+      FieldType nestedType =
+          FieldType.row(getOutputSchema(field.getType().getRowSchema(), 
nestedDescriptor));
+      if (field.getNullable()) {
+        builder.addNullableField(field.getName(), nestedType);
+      } else {
+        builder.addField(field.getName(), nestedType);
+      }
+    }
+    return builder.build();
+  }
+
+  static Row selectRow(
+      Row input,
+      FieldAccessDescriptor fieldAccessDescriptor,
+      Schema inputSchema,
+      Schema outputSchema) {
+    if (fieldAccessDescriptor.allFields()) {
+      return input;
+    } else {
+      Row.Builder output = Row.withSchema(outputSchema);
+      for (int fieldId : fieldAccessDescriptor.fieldIdsAccessed()) {
+        output.addValue(input.getValue(fieldId));
+      }
+      for (Map.Entry<Integer, FieldAccessDescriptor> nested :
+          fieldAccessDescriptor.nestedFields().entrySet()) {
+        String fieldName = inputSchema.nameOf(nested.getKey());
+        Schema nestedInputSchema = 
inputSchema.getField(nested.getKey()).getType().getRowSchema();
+        Schema nestedOutputSchema = 
outputSchema.getField(fieldName).getType().getRowSchema();
+        output.addValue(
+            selectRow(
+                input.getValue(fieldName),
+                nested.getValue(),
+                nestedInputSchema,
+                nestedOutputSchema));
+      }
+      return output.build();
+    }
+  }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
index 57c0abed728..e94dc4b8bb4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
@@ -325,7 +325,7 @@ public int hashCode() {
 
   @Override
   public String toString() {
-    return Arrays.deepToString(Iterables.toArray(getValues(), Object.class));
+    return "Row:" + Arrays.deepToString(Iterables.toArray(getValues(), 
Object.class));
   }
 
   /**
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/ConvertTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/ConvertTest.java
index 267295cb43d..98ef0b84879 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/ConvertTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/ConvertTest.java
@@ -181,7 +181,6 @@ public boolean equals(Object o) {
 
     @Override
     public int hashCode() {
-
       return Objects.hash(yard2, yard1);
     }
   }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/FilterTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/FilterTest.java
new file mode 100644
index 00000000000..f303b3ec46c
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/FilterTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.common.collect.Lists;
+import java.util.Objects;
+import org.apache.beam.sdk.schemas.DefaultSchema;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+
+/** Test for {@link Filter}. * */
+public class FilterTest {
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+  /** POJO used to test schemas. * */
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class POJO {
+    public String field1;
+    public int field2;
+    public int field3;
+
+    public POJO(String field1, int field2, int field3) {
+      this.field1 = field1;
+      this.field2 = field2;
+      this.field3 = field3;
+    }
+
+    public POJO() {}
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      POJO pojo = (POJO) o;
+      return Objects.equals(field1, pojo.field1)
+          && Objects.equals(field2, pojo.field2)
+          && Objects.equals(field3, pojo.field3);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(field1, field2, field3);
+    }
+  };
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testMissingFieldName() {
+    thrown.expect(IllegalArgumentException.class);
+    pipeline
+        .apply(Create.of(new POJO("pass", 52, 2)))
+        .apply(Filter.<POJO>create().whereFieldName("missing", f -> true));
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testMissingFieldIndex() {
+    thrown.expect(IllegalArgumentException.class);
+    pipeline
+        .apply(Create.of(new POJO("pass", 52, 2)))
+        .apply(Filter.<POJO>create().whereFieldId(23, f -> true));
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testFilterFieldsByName() {
+    // Pass only elements where field1 == "pass && field2 > 50.
+    PCollection<POJO> filtered =
+        pipeline
+            .apply(
+                Create.of(
+                    new POJO("pass", 52, 2), new POJO("pass", 2, 2), new 
POJO("fail", 100, 100)))
+            .apply(
+                Filter.<POJO>create()
+                    .whereFieldName("field1", s -> "pass".equals(s))
+                    .whereFieldName("field2", i -> (Integer) i > 50));
+    PAssert.that(filtered).containsInAnyOrder(new POJO("pass", 52, 2));
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testFilterMultipleFields() {
+    // Pass only elements where field1 + field2 >= 100.
+    PCollection<POJO> filtered =
+        pipeline
+            .apply(Create.of(new POJO("", 52, 48), new POJO("", 52, 2), new 
POJO("", 70, 33)))
+            .apply(
+                Filter.<POJO>create()
+                    .whereFieldNames(
+                        Lists.newArrayList("field2", "field3"),
+                        r -> r.getInt32("field2") + r.getInt32("field3") >= 
100));
+    PAssert.that(filtered).containsInAnyOrder(new POJO("", 52, 48), new 
POJO("", 70, 33));
+    pipeline.run();
+  }
+}
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java
new file mode 100644
index 00000000000..3b6aed68497
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.transforms;
+
+import java.util.Objects;
+import org.apache.beam.sdk.schemas.DefaultSchema;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+
+/** Test for {@link Select}. */
+public class SelectTest {
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+  /** flat POJO to selection from. */
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class POJO1 {
+    public String field1 = "field1";
+    public Integer field2 = 42;
+    public Double field3 = 3.14;
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      POJO1 pojo1 = (POJO1) o;
+      return Objects.equals(field1, pojo1.field1)
+          && Objects.equals(field2, pojo1.field2)
+          && Objects.equals(field3, pojo1.field3);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(field1, field2, field3);
+    }
+
+    @Override
+    public String toString() {
+      return "POJO1{"
+          + "field1='"
+          + field1
+          + '\''
+          + ", field2="
+          + field2
+          + ", field3="
+          + field3
+          + '}';
+    }
+  };
+
+  /** A pojo matching the schema resulting from selection field1, field3. */
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class POJO1Selected {
+    public String field1 = "field1";
+    public Double field3 = 3.14;
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      POJO1Selected that = (POJO1Selected) o;
+      return Objects.equals(field1, that.field1) && Objects.equals(field3, 
that.field3);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(field1, field3);
+    }
+  }
+
+  /** A nested POJO. */
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class POJO2 {
+    public String field1 = "field1";
+    public POJO1 field2 = new POJO1();
+  }
+
+  /** A pojo matching the schema results from selection field2.*. */
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class POJO2NestedAll {
+    public POJO1 field2 = new POJO1();
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      POJO2NestedAll that = (POJO2NestedAll) o;
+      return Objects.equals(field2, that.field2);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(field2);
+    }
+  }
+
+  /** A pojo matching the schema results from selection field2.field1, 
field2.field3. */
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class POJO2NestedPartial {
+    public POJO1Selected field2 = new POJO1Selected();
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      POJO2NestedPartial that = (POJO2NestedPartial) o;
+      return Objects.equals(field2, that.field2);
+    }
+
+    @Override
+    public int hashCode() {
+
+      return Objects.hash(field2);
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testSelectMissingFieldName() {
+    thrown.expect(IllegalArgumentException.class);
+    pipeline.apply(Create.of(new POJO1())).apply(Select.fieldNames("missing"));
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testSelectMissingFieldIndex() {
+    thrown.expect(IllegalArgumentException.class);
+    pipeline.apply(Create.of(new POJO1())).apply(Select.fieldIds(42));
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testSelectAll() {
+    PCollection<POJO1> pojos =
+        pipeline
+            .apply(Create.of(new POJO1()))
+            .apply(Select.fieldAccess(FieldAccessDescriptor.withAllFields()))
+            .apply(Convert.to(POJO1.class));
+    PAssert.that(pojos).containsInAnyOrder(new POJO1());
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testSimpleSelect() {
+    PCollection<POJO1Selected> pojos =
+        pipeline
+            .apply(Create.of(new POJO1()))
+            .apply(Select.fieldNames("field1", "field3"))
+            .apply(Convert.to(POJO1Selected.class));
+    PAssert.that(pojos).containsInAnyOrder(new POJO1Selected());
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testSelectNestedAll() {
+    PCollection<POJO2NestedAll> pojos =
+        pipeline
+            .apply(Create.of(new POJO2()))
+            .apply(
+                Select.fieldAccess(
+                    FieldAccessDescriptor.create()
+                        .withNestedField("field2", 
FieldAccessDescriptor.withAllFields())))
+            .apply(Convert.to(POJO2NestedAll.class));
+    PAssert.that(pojos).containsInAnyOrder(new POJO2NestedAll());
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testSelectNestedPartial() {
+    PCollection<POJO2NestedPartial> pojos =
+        pipeline
+            .apply(Create.of(new POJO2()))
+            .apply(
+                Select.fieldAccess(
+                    FieldAccessDescriptor.create()
+                        .withNestedField(
+                            "field2", 
FieldAccessDescriptor.withFieldNames("field1", "field3"))))
+            .apply(Convert.to(POJO2NestedPartial.class));
+    PAssert.that(pojos).containsInAnyOrder(new POJO2NestedPartial());
+    pipeline.run();
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 131078)
    Time Spent: 2.5h  (was: 2h 20m)

> Create a library of useful transforms that use schemas
> ------------------------------------------------------
>
>                 Key: BEAM-4461
>                 URL: https://issues.apache.org/jira/browse/BEAM-4461
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-java-core
>            Reporter: Reuven Lax
>            Assignee: Reuven Lax
>            Priority: Major
>          Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to