JoeCMoore opened a new issue, #28199:
URL: https://github.com/apache/beam/issues/28199
### What happened?
When writing to BigQuery with the Streaming Write API I precede the step by
a transform that splits the pipeline flow by using `TaggedOutput` as follows:
```py
class SplitDataFn(beam.DoFn):
def process(self, element):
# for each input element, emit output elements for BigQuery and
Logging
yield beam.pvalue.TaggedOutput('bigquery', element)
yield beam.pvalue.TaggedOutput('log', element)
```
I then take my pipeline steps (stored in the `data` variable) add the
connection to the `ParDo`:
```py
split_data = data | 'Split Flow' >>
beam.ParDo(SplitDataFn()).with_outputs('bigquery', 'log', main='main')
```
From there I'm able to reference the split data `PCollections` as follows:
```py
bigquery_output = split_data.bigquery | 'Write to BigQuery' >>
beam.io.WriteToBigQuery(
'<table name>',
dataset='<dataset name>',
schema=TABLE_SCHEMA,
method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
triggering_frequency=1
)
```
This will result in a type error:
```
TypeError: an integer is required [while running 'Write to
BigQuery/Map(<lambda at bigquery.py:2157>)-ptransform-56']
```
Something interesting to note is that the new "Write to BigQuery" step has
some new Map stages that weren't there previously in the job graph.

### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [X] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [X] Component: IO connector
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [X] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]