robertwb commented on code in PR #17280:
URL: https://github.com/apache/beam/pull/17280#discussion_r844532488
##########
model/job-management/src/main/proto/beam_expansion_api.proto:
##########
@@ -46,6 +46,9 @@ message ExpansionRequest {
// A namespace (prefix) to use for the id of any newly created
// components.
string namespace = 3;
+
+ // (Optional) If this coder is set, overrides the output type of
ExternalTransform
+ org.apache.beam.model.pipeline.v1.Coder output_coder_override = 4;
Review Comment:
This needs to be a map<tag, Coder> as a transform may have many outputs.
##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java:
##########
@@ -113,12 +115,12 @@ private static int getFreshNamespaceIndex() {
public MultiOutputExpandableTransform<InputT> withMultiOutputs() {
return new MultiOutputExpandableTransform<>(
- getUrn(), getPayload(), getEndpoint(), getNamespaceIndex());
+ getUrn(), getPayload(), getEndpoint(), getNamespaceIndex(),
getOutputCoder());
Review Comment:
Not sure what this means. The coder should be used for all outputs? Likely
we should disallow this to be non-null.
##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -431,10 +431,15 @@ def __init__(self, urn, payload, expansion_service=None):
self._payload = (
payload.payload() if isinstance(payload, PayloadBuilder) else payload)
self._expansion_service = expansion_service
+ self._output_coder = None
self._external_namespace = self._fresh_namespace()
self._inputs = {} # type: Dict[str, pvalue.PCollection]
self._outputs = {} # type: Dict[str, pvalue.PCollection]
+ def with_output_coder(self, output_coder):
Review Comment:
PTransforms already have a with_output_type. Let's grab that and (if
present) infer the coder rather than adding a new method.
##########
sdks/python/apache_beam/runners/portability/expansion_service_test.py:
##########
@@ -270,6 +270,20 @@ def from_runner_api_parameter(unused_ptransform, payload,
unused_context):
return PayloadTransform(payload.decode('ascii'))
[email protected]_urn('sum_without_type', None)
+class SumWithoutTypeTransform(ptransform.PTransform):
+ def expand(self, pcoll):
+ return pcoll | beam.CombineGlobally(sum)
Review Comment:
Can we never infer this? Better to use some type that we know we'll never be
able to infer (e.g. due to a data dependency like "Map(lambda x: str_literal if
x else int_literal)" that would technically be a union type but we could ensure
is not).
##########
sdks/python/apache_beam/runners/portability/expansion_service.py:
##########
@@ -63,6 +64,10 @@ def with_pipeline(component, pcoll_id=None):
}
transform = with_pipeline(
ptransform.PTransform.from_runner_api(request.transform, context))
+ if request.output_coder_override.spec.urn:
+ output_coder = Coder.from_runner_api(
+ request.output_coder_override, context)
+ transform = transform.with_output_types(output_coder.to_type_hint())
Review Comment:
This could be unsafe. Should we at least make sure it's compatible? (This
would have to happen after application...)
##########
model/job-management/src/main/proto/beam_expansion_api.proto:
##########
@@ -46,6 +46,9 @@ message ExpansionRequest {
// A namespace (prefix) to use for the id of any newly created
// components.
string namespace = 3;
+
+ // (Optional) If this coder is set, overrides the output type of
ExternalTransform
Review Comment:
Let's let this be a request that the outputs have the given coders. This way
the expansion service can do its best to satisfy the request, and if it can't
the caller has one more chance to do its best to interpret the results.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]