reuvenlax commented on a change in pull request #15465:
URL: https://github.com/apache/beam/pull/15465#discussion_r732134710



##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
##########
@@ -1015,12 +1303,129 @@ public TupleTagList getAdditionalOutputTags() {
       return PCollectionViews.toAdditionalInputs(sideInputs.values());
     }
 
+    public FieldAccessDescriptor getKeyFieldsDescriptor() {
+      return keyFieldsDescriptor;
+    }
+
+    @Internal
+    public DoFnSchemaInformation getDoFnSchemaInformation() {
+      return doFnSchemaInformation;
+    }
+
     @Override
     public String toString() {
       return fn.toString();
     }
   }
 
+  public static <T> PTransform<PCollection<T>, PCollection<KV<Row, T>>> 
getWithSchemaKeys(
+      FieldAccessDescriptor fieldAccessDescriptor) {
+    return new SchemaToKv<>(fieldAccessDescriptor);
+  }
+
+  private static class SchemaToKv<T> extends PTransform<PCollection<T>, 
PCollection<KV<Row, T>>> {
+    private RowSelector rowSelector;
+    private final FieldAccessDescriptor fieldAccessDescriptor;
+
+    SchemaToKv(FieldAccessDescriptor fieldAccessDescriptor) {
+      this.fieldAccessDescriptor = fieldAccessDescriptor;
+    }
+
+    @Override
+    public PCollection<KV<Row, T>> expand(PCollection<T> input) {
+      Schema schema = input.getSchema();
+      TypeDescriptor<T> typeDescriptor = input.getTypeDescriptor();
+      SerializableFunction<T, Row> toRowFunction = input.getToRowFunction();
+      SerializableFunction<Row, T> fromRowFunction = 
input.getFromRowFunction();
+
+      FieldAccessDescriptor resolved = fieldAccessDescriptor.resolve(schema);
+      rowSelector = new RowSelectorContainer(schema, resolved, true);
+      Schema keySchema = SelectHelpers.getOutputSchema(schema, resolved);
+
+      return input
+          .apply(
+              "selectKeys",
+              ParDo.of(
+                  new DoFn<T, KV<Row, T>>() {
+                    @ProcessElement
+                    public void process(
+                        @Element Row row, // Beam will convert the element to 
a row.
+                        @Element T element, // Beam will return the original 
element.
+                        OutputReceiver<KV<Row, T>> o) {
+                      o.output(KV.of(rowSelector.select(row), element));
+                    }
+                  }))
+          .setCoder(
+              KvCoder.of(
+                  SchemaCoder.of(keySchema),
+                  SchemaCoder.of(schema, typeDescriptor, toRowFunction, 
fromRowFunction)));
+    }
+  }
+
+  public static class MultiOutputSchemaKeyFields<InputT, DoFnInputT, OutputT>

Review comment:
       @lukecwik do you think it's worth investigating this approach further, 
despite the typing problems?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to