[ 
https://issues.apache.org/jira/browse/BEAM-12795?focusedWorklogId=667189&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-667189
 ]

ASF GitHub Bot logged work on BEAM-12795:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Oct/21 18:25
            Start Date: 19/Oct/21 18:25
    Worklog Time Spent: 10m 
      Work Description: reuvenlax commented on a change in pull request #15465:
URL: https://github.com/apache/beam/pull/15465#discussion_r732134710



##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
##########
@@ -1015,12 +1303,129 @@ public TupleTagList getAdditionalOutputTags() {
       return PCollectionViews.toAdditionalInputs(sideInputs.values());
     }
 
+    public FieldAccessDescriptor getKeyFieldsDescriptor() {
+      return keyFieldsDescriptor;
+    }
+
+    @Internal
+    public DoFnSchemaInformation getDoFnSchemaInformation() {
+      return doFnSchemaInformation;
+    }
+
     @Override
     public String toString() {
       return fn.toString();
     }
   }
 
+  public static <T> PTransform<PCollection<T>, PCollection<KV<Row, T>>> 
getWithSchemaKeys(
+      FieldAccessDescriptor fieldAccessDescriptor) {
+    return new SchemaToKv<>(fieldAccessDescriptor);
+  }
+
+  private static class SchemaToKv<T> extends PTransform<PCollection<T>, 
PCollection<KV<Row, T>>> {
+    private RowSelector rowSelector;
+    private final FieldAccessDescriptor fieldAccessDescriptor;
+
+    SchemaToKv(FieldAccessDescriptor fieldAccessDescriptor) {
+      this.fieldAccessDescriptor = fieldAccessDescriptor;
+    }
+
+    @Override
+    public PCollection<KV<Row, T>> expand(PCollection<T> input) {
+      Schema schema = input.getSchema();
+      TypeDescriptor<T> typeDescriptor = input.getTypeDescriptor();
+      SerializableFunction<T, Row> toRowFunction = input.getToRowFunction();
+      SerializableFunction<Row, T> fromRowFunction = 
input.getFromRowFunction();
+
+      FieldAccessDescriptor resolved = fieldAccessDescriptor.resolve(schema);
+      rowSelector = new RowSelectorContainer(schema, resolved, true);
+      Schema keySchema = SelectHelpers.getOutputSchema(schema, resolved);
+
+      return input
+          .apply(
+              "selectKeys",
+              ParDo.of(
+                  new DoFn<T, KV<Row, T>>() {
+                    @ProcessElement
+                    public void process(
+                        @Element Row row, // Beam will convert the element to 
a row.
+                        @Element T element, // Beam will return the original 
element.
+                        OutputReceiver<KV<Row, T>> o) {
+                      o.output(KV.of(rowSelector.select(row), element));
+                    }
+                  }))
+          .setCoder(
+              KvCoder.of(
+                  SchemaCoder.of(keySchema),
+                  SchemaCoder.of(schema, typeDescriptor, toRowFunction, 
fromRowFunction)));
+    }
+  }
+
+  public static class MultiOutputSchemaKeyFields<InputT, DoFnInputT, OutputT>

Review comment:
       @lukecwik do you think it's worth investigating this approach further, 
despite the typing problems?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 667189)
    Time Spent: 2h 40m  (was: 2.5h)

> KVs should not be needed when using schemas
> -------------------------------------------
>
>                 Key: BEAM-12795
>                 URL: https://issues.apache.org/jira/browse/BEAM-12795
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-java-core
>            Reporter: Reuven Lax
>            Priority: P2
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> When using schemas there should be no need to require a KV, as any. key or 
> keys can be act as the K or a V in a KV. Users can use schemas without the 
> need for KV in all cases except for state/timers DoFns, which today require 
> that the input have type KV. We would like to eliminate this requirement. 
> Users should be able to specify a key field extraction in their ParDo, e.g.
> ParDo.of(...).withKeyFields("user.location.city"));
> Or in the DoFn itself.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to