ahmedabu98 commented on code in PR #28540:
URL: https://github.com/apache/beam/pull/28540#discussion_r1331946360
##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -414,7 +414,7 @@ def __init__(
def expand(self, pcolls):
# Expand the transform using the expansion service.
- return pcolls | ExternalTransform(
+ return pcolls | self._payload_builder._identifier >> ExternalTransform(
Review Comment:
I tried the following on Dataflow and it worked fine:
```
with beam.Pipeline(options=opts) as p:
input = p | beam.Create([{"hi": "hello"}])
input | "write1" >> beam.io.WriteToBigQuery(table="...1",
method="STORAGE_WRITE_API",
schema="hi:STRING")
input | "write2" >> beam.io.WriteToBigQuery(table="...2",
method="STORAGE_WRITE_API",
schema="hi:STRING")
```
I think it just cares about the top-level transform naming. Our IOs and
other composites have consistent naming and it's up to the user to label top
level transforms where needed. Without the top-level labels, I get the
following error:
`A transform with label "WriteToBigQuery" already exists in the pipeline. To
apply a transform with a specified label write pvalue | "label" >> transform`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]