[ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=383100&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-383100
 ]

ASF GitHub Bot logged work on BEAM-4461:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/Feb/20 19:33
            Start Date: 06/Feb/20 19:33
    Worklog Time Spent: 10m 
      Work Description: reuvenlax commented on pull request #10766: [BEAM-4461] 
Add Selected.flattenedSchema
URL: https://github.com/apache/beam/pull/10766#discussion_r376038435
 
 

 ##########
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java
 ##########
 @@ -69,55 +75,117 @@
  * }</pre>
  */
 @Experimental(Kind.SCHEMAS)
-public class Select<T> extends PTransform<PCollection<T>, PCollection<Row>> {
-  private final FieldAccessDescriptor fieldAccessDescriptor;
-
-  private Select(FieldAccessDescriptor fieldAccessDescriptor) {
-    this.fieldAccessDescriptor = fieldAccessDescriptor;
+public class Select {
+  public static <T> Select.Fields<T> create() {
+    return new Fields<>(FieldAccessDescriptor.create());
   }
 
   /** Select a set of top-level field ids from the row. */
-  public static <T> Select<T> fieldIds(Integer... ids) {
-    return new Select<>(FieldAccessDescriptor.withFieldIds(ids));
+  public static <T> Fields<T> fieldIds(Integer... ids) {
+    return new Fields<>(FieldAccessDescriptor.withFieldIds(ids));
   }
 
   /** Select a set of top-level field names from the row. */
-  public static <T> Select<T> fieldNames(String... names) {
-    return new Select<>(FieldAccessDescriptor.withFieldNames(names));
+  public static <T> Fields<T> fieldNames(String... names) {
+    return new Fields<>(FieldAccessDescriptor.withFieldNames(names));
   }
 
   /**
    * Select a set of fields described in a {@link FieldAccessDescriptor}.
    *
    * <p>This allows for nested fields to be selected as well.
    */
-  public static <T> Select<T> fieldAccess(FieldAccessDescriptor 
fieldAccessDescriptor) {
-    return new Select<>(fieldAccessDescriptor);
+  public static <T> Fields<T> fieldAccess(FieldAccessDescriptor 
fieldAccessDescriptor) {
+    return new Fields<>(fieldAccessDescriptor);
+  }
+
+  /**
+   * Selects every leaf-level field. This results in a a nested schema being 
flattened into a single
+   * top-level schema.
+   */
+  public static <T> Flattened<T> flattenedSchema() {
+    return new Flattened<>();
+  }
+
+  private static class SelectDoFn<T> extends DoFn<T, Row> {
+    private FieldAccessDescriptor fieldAccessDescriptor;
+    private Schema inputSchema;
+    private Schema outputSchema;
+
+    // TODO: This should be the same as resolved so that Beam knows which 
fields
+    // are being accessed. Currently Beam only supports wildcard descriptors.
+    // Once BEAM-4457 is fixed, fix this.
+    @FieldAccess("selectFields")
+    final FieldAccessDescriptor fieldAccess = 
FieldAccessDescriptor.withAllFields();
+
+    public SelectDoFn(
+        FieldAccessDescriptor fieldAccessDescriptor, Schema inputSchema, 
Schema outputSchema) {
+      this.fieldAccessDescriptor = fieldAccessDescriptor;
+      this.inputSchema = inputSchema;
+      this.outputSchema = outputSchema;
+    }
+
+    @ProcessElement
+    public void process(@FieldAccess("selectFields") @Element Row row, 
OutputReceiver<Row> r) {
+      r.output(SelectHelpers.selectRow(row, fieldAccessDescriptor, 
inputSchema, outputSchema));
+    }
+  }
+
+  public static class Fields<T> extends PTransform<PCollection<T>, 
PCollection<Row>> {
+    private FieldAccessDescriptor fieldAccessDescriptor;
+
+    public Fields(FieldAccessDescriptor fieldAccessDescriptor) {
+      this.fieldAccessDescriptor = fieldAccessDescriptor;
+    }
+
+    @Override
+    public PCollection<Row> expand(PCollection<T> input) {
+      Schema inputSchema = input.getSchema();
+      FieldAccessDescriptor resolved = 
fieldAccessDescriptor.resolve(inputSchema);
+      Schema outputSchema = SelectHelpers.getOutputSchema(inputSchema, 
resolved);
+      return input
+          .apply(ParDo.of(new SelectDoFn<>(resolved, inputSchema, 
outputSchema)))
+          .setRowSchema(outputSchema);
+    }
   }
 
-  @Override
-  public PCollection<Row> expand(PCollection<T> input) {
-    Schema inputSchema = input.getSchema();
-    FieldAccessDescriptor resolved = 
fieldAccessDescriptor.resolve(inputSchema);
-    Schema outputSchema = SelectHelpers.getOutputSchema(inputSchema, resolved);
-
-    return input
-        .apply(
-            ParDo.of(
-                new DoFn<T, Row>() {
-                  // TODO: This should be the same as resolved so that Beam 
knows which fields
-                  // are being accessed. Currently Beam only supports wildcard 
descriptors.
-                  // Once BEAM-4457 is fixed, fix this.
-                  @FieldAccess("selectFields")
-                  final FieldAccessDescriptor fieldAccessDescriptor =
-                      FieldAccessDescriptor.withAllFields();
-
-                  @ProcessElement
-                  public void process(
-                      @FieldAccess("selectFields") @Element Row row, 
OutputReceiver<Row> r) {
-                    r.output(SelectHelpers.selectRow(row, resolved, 
inputSchema, outputSchema));
-                  }
-                }))
-        .setRowSchema(outputSchema);
+  /** A {@link PTransform} representing a flattened schema. */
+  public static class Flattened<T> extends PTransform<PCollection<T>, 
PCollection<Row>> {
+    private SerializableFunction<List<String>, String> nameFn;
+
+    Flattened() {
+      this(CONCAT_FIELD_NAMES);
+    }
+
+    Flattened(SerializableFunction<List<String>, String> nameFn) {
+      this.nameFn = nameFn;
+    }
+
+    /**
+     * For nested fields, concatenate all the names separated by a _ character 
in the flattened
+     * schema.
 
 Review comment:
   Added withFieldNameAs to allow the user to rename fields
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 383100)
    Time Spent: 29h 50m  (was: 29h 40m)

> Create a library of useful transforms that use schemas
> ------------------------------------------------------
>
>                 Key: BEAM-4461
>                 URL: https://issues.apache.org/jira/browse/BEAM-4461
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-java-core
>            Reporter: Reuven Lax
>            Assignee: Reuven Lax
>            Priority: Major
>          Time Spent: 29h 50m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to