lianliangzhang commented on PR #33087:
URL: https://github.com/apache/beam/pull/33087#issuecomment-2562316519
@jrmccluskey , I meet a issue want to reference BigQueryEnrichementHandler
to generate a SpannerEnrichemenetHandler.
Could you please help check how can batch list data can be received in
def __call__(self, request: Union[beam.Row, list[beam.Row]], *args, **kwargs):
method in SpannerEnrichemenetHandler()
I want to use EnrichemenetHandler to handle batch element instead of one by
one beam. raw element, but when I put input data as below list, the data still
comes to handler one by one. requests = [
beam.Row(sale_id=1,
Consumer_Id='0b47c98f-58bf-4ae6-8577-9257f14ee7e6', product_id=1, quantity=1),
beam.Row(sale_id=3,
Consumer_Id='e687e75a-d4be-411b-9301-4777183b2611', product_id=2, quantity=3),
beam.Row(sale_id=5,
Consumer_Id='5c17e957-ceea-4ab9-a9d9-2d9f84c91380', product_id=4, quantity=2)
]
Create pipline as below approach
spanner_handler = SpannerMultiLineEnrichmentHandler(
project=project_id,
instance_id=instance_id,
database=database,
row_restriction_template=row_key + " ='{}'",
table_name=table_id,
fields=fields,
min_batch_size=3,
max_batch_size=3)
with beam.Pipeline() as p:
_ = (
p
| "Create" >> beam.Create(requests)
| "Enrich W/ Spanner" >> Enrichment(spanner_handler)
| "Print" >> beam.Map(print))
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]