[ 
https://issues.apache.org/jira/browse/BEAM-14251?focusedWorklogId=758091&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-758091
 ]

ASF GitHub Bot logged work on BEAM-14251:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Apr/22 19:37
            Start Date: 18/Apr/22 19:37
    Worklog Time Spent: 10m 
      Work Description: ihji commented on code in PR #17280:
URL: https://github.com/apache/beam/pull/17280#discussion_r852359639


##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java:
##########
@@ -126,12 +127,33 @@ PCollection<OutputT> toOutputCollection(Map<TupleTag<?>, 
PCollection> output) {
 
     public MultiOutputExpandableTransform<InputT> withMultiOutputs() {
       return new MultiOutputExpandableTransform<>(
-          getUrn(), getPayload(), getEndpoint(), getClientFactory(), 
getNamespaceIndex());
+          getUrn(),
+          getPayload(),
+          getEndpoint(),
+          getClientFactory(),
+          getNamespaceIndex(),
+          getOutputCoders());
     }
 
-    public <T> SingleOutputExpandableTransform<InputT, T> withOutputType() {
+    public SingleOutputExpandableTransform<InputT, OutputT> 
withOutputCoder(Coder outputCoder) {
       return new SingleOutputExpandableTransform<>(
-          getUrn(), getPayload(), getEndpoint(), getClientFactory(), 
getNamespaceIndex());
+          getUrn(),
+          getPayload(),
+          getEndpoint(),
+          getClientFactory(),
+          getNamespaceIndex(),
+          ImmutableMap.of("0", outputCoder));
+    }
+
+    public SingleOutputExpandableTransform<InputT, OutputT> withOutputCoder(

Review Comment:
   Ah, good catch. It doesn't make sense on single output transform. Moved to 
multi output variant.



##########
model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/beam_expansion_api.proto:
##########
@@ -46,6 +46,12 @@ message ExpansionRequest {
   // A namespace (prefix) to use for the id of any newly created
   // components.
   string namespace = 3;
+
+  // (Optional) Map from a local output tag to a coder id.
+  // If it is set, asks the expansion service to use the given
+  // coders for the output PCollections. Note that the request
+  // may not be fulfilled.
+  map<string, string> output_coder_override = 4;

Review Comment:
   Done.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 758091)
    Time Spent: 3h  (was: 2h 50m)

> add output_coder_override to ExpansionRequest
> ---------------------------------------------
>
>                 Key: BEAM-14251
>                 URL: https://issues.apache.org/jira/browse/BEAM-14251
>             Project: Beam
>          Issue Type: Improvement
>          Components: cross-language
>            Reporter: Heejong Lee
>            Assignee: Heejong Lee
>            Priority: P2
>          Time Spent: 3h
>  Remaining Estimate: 0h
>
> add output_coder_override to ExpansionRequest



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to