[ 
https://issues.apache.org/jira/browse/BEAM-14251?focusedWorklogId=758002&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-758002
 ]

ASF GitHub Bot logged work on BEAM-14251:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Apr/22 17:09
            Start Date: 18/Apr/22 17:09
    Worklog Time Spent: 10m 
      Work Description: robertwb commented on code in PR #17280:
URL: https://github.com/apache/beam/pull/17280#discussion_r852260611


##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java:
##########
@@ -126,12 +127,33 @@ PCollection<OutputT> toOutputCollection(Map<TupleTag<?>, 
PCollection> output) {
 
     public MultiOutputExpandableTransform<InputT> withMultiOutputs() {
       return new MultiOutputExpandableTransform<>(
-          getUrn(), getPayload(), getEndpoint(), getClientFactory(), 
getNamespaceIndex());
+          getUrn(),
+          getPayload(),
+          getEndpoint(),
+          getClientFactory(),
+          getNamespaceIndex(),
+          getOutputCoders());
     }
 
-    public <T> SingleOutputExpandableTransform<InputT, T> withOutputType() {
+    public SingleOutputExpandableTransform<InputT, OutputT> 
withOutputCoder(Coder outputCoder) {
       return new SingleOutputExpandableTransform<>(
-          getUrn(), getPayload(), getEndpoint(), getClientFactory(), 
getNamespaceIndex());
+          getUrn(),
+          getPayload(),
+          getEndpoint(),
+          getClientFactory(),
+          getNamespaceIndex(),
+          ImmutableMap.of("0", outputCoder));
+    }
+
+    public SingleOutputExpandableTransform<InputT, OutputT> withOutputCoder(

Review Comment:
   Does this make sense on SingleOutputExpandableTransform? Likely it should 
only be on the Multi variant. 



##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -433,6 +434,9 @@ def __init__(self, urn, payload, expansion_service=None):
     self._inputs = {}  # type: Dict[str, pvalue.PCollection]
     self._outputs = {}  # type: Dict[str, pvalue.PCollection]
 
+  def with_output_types(self, *args, **kwargs):

Review Comment:
   Is this needed? (Shouldn't it be inherited?)



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java:
##########
@@ -225,9 +251,25 @@ public OutputT expand(InputT input) {
         }
       }
 
+      ExpansionApi.ExpansionRequest.Builder requestBuilder =
+          ExpansionApi.ExpansionRequest.newBuilder();
+      if (!outputCoders.isEmpty()) {

Review Comment:
   No need to gate on this--if it's empty it'll still do the right thing.



##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -498,11 +502,24 @@ def expand(self, pvalueish):
               spec=beam_runner_api_pb2.FunctionSpec(
                   urn=common_urns.primitives.IMPULSE.urn),
               outputs={'out': transform_proto.inputs[tag]}))
+    output_coder = None
+    if self._type_hints.output_types:
+      if self._type_hints.output_types[0]:
+        output_coder = dict((str(k), context.coder_id_from_element_type(v))
+                            for k,

Review Comment:
   You can do `for (k, v) in ...` to make yapf happy. (Same below.)



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java:
##########
@@ -225,9 +251,25 @@ public OutputT expand(InputT input) {
         }
       }
 
+      ExpansionApi.ExpansionRequest.Builder requestBuilder =
+          ExpansionApi.ExpansionRequest.newBuilder();
+      if (!outputCoders.isEmpty()) {
+        requestBuilder.putAllOutputCoderOverride(
+            outputCoders.entrySet().stream()
+                .collect(
+                    Collectors.toMap(
+                        Map.Entry::getKey,
+                        v -> {

Review Comment:
   The variable `v` makes it sound like it's just the value. Maybe `kv` or `e` 
for entry?



##########
model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/beam_expansion_api.proto:
##########
@@ -46,6 +46,12 @@ message ExpansionRequest {
   // A namespace (prefix) to use for the id of any newly created
   // components.
   string namespace = 3;
+
+  // (Optional) Map from a local output tag to a coder id.
+  // If it is set, asks the expansion service to use the given
+  // coders for the output PCollections. Note that the request
+  // may not be fulfilled.
+  map<string, string> output_coder_override = 4;

Review Comment:
   Maybe call this output_coder_requests? 



##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -498,11 +502,24 @@ def expand(self, pvalueish):
               spec=beam_runner_api_pb2.FunctionSpec(
                   urn=common_urns.primitives.IMPULSE.urn),
               outputs={'out': transform_proto.inputs[tag]}))
+    output_coder = None

Review Comment:
   Nit: output_coder*s*. 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 758002)
    Time Spent: 2h 50m  (was: 2h 40m)

> add output_coder_override to ExpansionRequest
> ---------------------------------------------
>
>                 Key: BEAM-14251
>                 URL: https://issues.apache.org/jira/browse/BEAM-14251
>             Project: Beam
>          Issue Type: Improvement
>          Components: cross-language
>            Reporter: Heejong Lee
>            Assignee: Heejong Lee
>            Priority: P2
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> add output_coder_override to ExpansionRequest



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to