tvalentyn commented on issue #28199:
URL: https://github.com/apache/beam/issues/28199#issuecomment-1843684078
I just tried this and I can't repro the error. My pipeline is:
```
def bq_apply_28199(p):
from apache_beam.io.gcp.internal.clients import bigquery
class SplitDataFn(beam.DoFn):
def process(self, element):
# for each input element, emit output elements for BigQuery and
Logging
yield beam.pvalue.TaggedOutput('bigquery', element)
yield beam.pvalue.TaggedOutput('log', element)
input_data = [
{
'STRING_FIELD': 'abc'
},
]
data = p | beam.Create(input_data)
split_data = data | 'Split Flow' >>
beam.ParDo(SplitDataFn()).with_outputs('bigquery', 'log', main='main')
bigquery_output = split_data.bigquery | 'Write to BigQuery' >>
beam.io.WriteToBigQuery(
table=<...>,
schema={
'fields': [{
'name': 'STRING_FIELD', 'type': 'STRING', 'mode':
'NULLABLE'
}]
},
method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
triggering_frequency=1
)
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
with beam.Pipeline(argv=pipeline_args) as p:
bq_apply_28199(p)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]