[
https://issues.apache.org/jira/browse/BEAM-14251?focusedWorklogId=758002&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-758002
]
ASF GitHub Bot logged work on BEAM-14251:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 18/Apr/22 17:09
Start Date: 18/Apr/22 17:09
Worklog Time Spent: 10m
Work Description: robertwb commented on code in PR #17280:
URL: https://github.com/apache/beam/pull/17280#discussion_r852260611
##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java:
##########
@@ -126,12 +127,33 @@ PCollection<OutputT> toOutputCollection(Map<TupleTag<?>,
PCollection> output) {
public MultiOutputExpandableTransform<InputT> withMultiOutputs() {
return new MultiOutputExpandableTransform<>(
- getUrn(), getPayload(), getEndpoint(), getClientFactory(),
getNamespaceIndex());
+ getUrn(),
+ getPayload(),
+ getEndpoint(),
+ getClientFactory(),
+ getNamespaceIndex(),
+ getOutputCoders());
}
- public <T> SingleOutputExpandableTransform<InputT, T> withOutputType() {
+ public SingleOutputExpandableTransform<InputT, OutputT>
withOutputCoder(Coder outputCoder) {
return new SingleOutputExpandableTransform<>(
- getUrn(), getPayload(), getEndpoint(), getClientFactory(),
getNamespaceIndex());
+ getUrn(),
+ getPayload(),
+ getEndpoint(),
+ getClientFactory(),
+ getNamespaceIndex(),
+ ImmutableMap.of("0", outputCoder));
+ }
+
+ public SingleOutputExpandableTransform<InputT, OutputT> withOutputCoder(
Review Comment:
Does this make sense on SingleOutputExpandableTransform? Likely it should
only be on the Multi variant.
##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -433,6 +434,9 @@ def __init__(self, urn, payload, expansion_service=None):
self._inputs = {} # type: Dict[str, pvalue.PCollection]
self._outputs = {} # type: Dict[str, pvalue.PCollection]
+ def with_output_types(self, *args, **kwargs):
Review Comment:
Is this needed? (Shouldn't it be inherited?)
##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java:
##########
@@ -225,9 +251,25 @@ public OutputT expand(InputT input) {
}
}
+ ExpansionApi.ExpansionRequest.Builder requestBuilder =
+ ExpansionApi.ExpansionRequest.newBuilder();
+ if (!outputCoders.isEmpty()) {
Review Comment:
No need to gate on this--if it's empty it'll still do the right thing.
##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -498,11 +502,24 @@ def expand(self, pvalueish):
spec=beam_runner_api_pb2.FunctionSpec(
urn=common_urns.primitives.IMPULSE.urn),
outputs={'out': transform_proto.inputs[tag]}))
+ output_coder = None
+ if self._type_hints.output_types:
+ if self._type_hints.output_types[0]:
+ output_coder = dict((str(k), context.coder_id_from_element_type(v))
+ for k,
Review Comment:
You can do `for (k, v) in ...` to make yapf happy. (Same below.)
##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java:
##########
@@ -225,9 +251,25 @@ public OutputT expand(InputT input) {
}
}
+ ExpansionApi.ExpansionRequest.Builder requestBuilder =
+ ExpansionApi.ExpansionRequest.newBuilder();
+ if (!outputCoders.isEmpty()) {
+ requestBuilder.putAllOutputCoderOverride(
+ outputCoders.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ v -> {
Review Comment:
The variable `v` makes it sound like it's just the value. Maybe `kv` or `e`
for entry?
##########
model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/beam_expansion_api.proto:
##########
@@ -46,6 +46,12 @@ message ExpansionRequest {
// A namespace (prefix) to use for the id of any newly created
// components.
string namespace = 3;
+
+ // (Optional) Map from a local output tag to a coder id.
+ // If it is set, asks the expansion service to use the given
+ // coders for the output PCollections. Note that the request
+ // may not be fulfilled.
+ map<string, string> output_coder_override = 4;
Review Comment:
Maybe call this output_coder_requests?
##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -498,11 +502,24 @@ def expand(self, pvalueish):
spec=beam_runner_api_pb2.FunctionSpec(
urn=common_urns.primitives.IMPULSE.urn),
outputs={'out': transform_proto.inputs[tag]}))
+ output_coder = None
Review Comment:
Nit: output_coder*s*.
Issue Time Tracking
-------------------
Worklog Id: (was: 758002)
Time Spent: 2h 50m (was: 2h 40m)
> add output_coder_override to ExpansionRequest
> ---------------------------------------------
>
> Key: BEAM-14251
> URL: https://issues.apache.org/jira/browse/BEAM-14251
> Project: Beam
> Issue Type: Improvement
> Components: cross-language
> Reporter: Heejong Lee
> Assignee: Heejong Lee
> Priority: P2
> Time Spent: 2h 50m
> Remaining Estimate: 0h
>
> add output_coder_override to ExpansionRequest
--
This message was sent by Atlassian Jira
(v8.20.1#820001)