robertwb commented on code in PR #17280:
URL: https://github.com/apache/beam/pull/17280#discussion_r852260611


##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java:
##########
@@ -126,12 +127,33 @@ PCollection<OutputT> toOutputCollection(Map<TupleTag<?>, 
PCollection> output) {
 
     public MultiOutputExpandableTransform<InputT> withMultiOutputs() {
       return new MultiOutputExpandableTransform<>(
-          getUrn(), getPayload(), getEndpoint(), getClientFactory(), 
getNamespaceIndex());
+          getUrn(),
+          getPayload(),
+          getEndpoint(),
+          getClientFactory(),
+          getNamespaceIndex(),
+          getOutputCoders());
     }
 
-    public <T> SingleOutputExpandableTransform<InputT, T> withOutputType() {
+    public SingleOutputExpandableTransform<InputT, OutputT> 
withOutputCoder(Coder outputCoder) {
       return new SingleOutputExpandableTransform<>(
-          getUrn(), getPayload(), getEndpoint(), getClientFactory(), 
getNamespaceIndex());
+          getUrn(),
+          getPayload(),
+          getEndpoint(),
+          getClientFactory(),
+          getNamespaceIndex(),
+          ImmutableMap.of("0", outputCoder));
+    }
+
+    public SingleOutputExpandableTransform<InputT, OutputT> withOutputCoder(

Review Comment:
   Does this make sense on SingleOutputExpandableTransform? Likely it should 
only be on the Multi variant. 



##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -433,6 +434,9 @@ def __init__(self, urn, payload, expansion_service=None):
     self._inputs = {}  # type: Dict[str, pvalue.PCollection]
     self._outputs = {}  # type: Dict[str, pvalue.PCollection]
 
+  def with_output_types(self, *args, **kwargs):

Review Comment:
   Is this needed? (Shouldn't it be inherited?)



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java:
##########
@@ -225,9 +251,25 @@ public OutputT expand(InputT input) {
         }
       }
 
+      ExpansionApi.ExpansionRequest.Builder requestBuilder =
+          ExpansionApi.ExpansionRequest.newBuilder();
+      if (!outputCoders.isEmpty()) {

Review Comment:
   No need to gate on this--if it's empty it'll still do the right thing.



##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -498,11 +502,24 @@ def expand(self, pvalueish):
               spec=beam_runner_api_pb2.FunctionSpec(
                   urn=common_urns.primitives.IMPULSE.urn),
               outputs={'out': transform_proto.inputs[tag]}))
+    output_coder = None
+    if self._type_hints.output_types:
+      if self._type_hints.output_types[0]:
+        output_coder = dict((str(k), context.coder_id_from_element_type(v))
+                            for k,

Review Comment:
   You can do `for (k, v) in ...` to make yapf happy. (Same below.)



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java:
##########
@@ -225,9 +251,25 @@ public OutputT expand(InputT input) {
         }
       }
 
+      ExpansionApi.ExpansionRequest.Builder requestBuilder =
+          ExpansionApi.ExpansionRequest.newBuilder();
+      if (!outputCoders.isEmpty()) {
+        requestBuilder.putAllOutputCoderOverride(
+            outputCoders.entrySet().stream()
+                .collect(
+                    Collectors.toMap(
+                        Map.Entry::getKey,
+                        v -> {

Review Comment:
   The variable `v` makes it sound like it's just the value. Maybe `kv` or `e` 
for entry?



##########
model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/beam_expansion_api.proto:
##########
@@ -46,6 +46,12 @@ message ExpansionRequest {
   // A namespace (prefix) to use for the id of any newly created
   // components.
   string namespace = 3;
+
+  // (Optional) Map from a local output tag to a coder id.
+  // If it is set, asks the expansion service to use the given
+  // coders for the output PCollections. Note that the request
+  // may not be fulfilled.
+  map<string, string> output_coder_override = 4;

Review Comment:
   Maybe call this output_coder_requests? 



##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -498,11 +502,24 @@ def expand(self, pvalueish):
               spec=beam_runner_api_pb2.FunctionSpec(
                   urn=common_urns.primitives.IMPULSE.urn),
               outputs={'out': transform_proto.inputs[tag]}))
+    output_coder = None

Review Comment:
   Nit: output_coder*s*. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to