lianliangzhang commented on PR #33087:
URL: https://github.com/apache/beam/pull/33087#issuecomment-2562316519

   @jrmccluskey , I meet a issue want to reference BigQueryEnrichementHandler 
to generate a SpannerEnrichemenetHandler.
   
   Could you please help check how can batch list data can be received in     
def __call__(self, request: Union[beam.Row, list[beam.Row]], *args, **kwargs):  
method in SpannerEnrichemenetHandler()
   I want to use EnrichemenetHandler to handle batch element instead of one by 
one beam. raw element, but when I put input data as below list, the data still 
comes to handler one by one.    requests = [
           beam.Row(sale_id=1, 
Consumer_Id='0b47c98f-58bf-4ae6-8577-9257f14ee7e6', product_id=1, quantity=1),
           beam.Row(sale_id=3, 
Consumer_Id='e687e75a-d4be-411b-9301-4777183b2611', product_id=2, quantity=3),
           beam.Row(sale_id=5, 
Consumer_Id='5c17e957-ceea-4ab9-a9d9-2d9f84c91380', product_id=4, quantity=2)
       ]
   
   Create pipline as below approach
   
     spanner_handler = SpannerMultiLineEnrichmentHandler(
           project=project_id,
           instance_id=instance_id,
           database=database,
           row_restriction_template=row_key + " ='{}'",
           table_name=table_id,
           fields=fields,
           min_batch_size=3,
           max_batch_size=3)
   
       with beam.Pipeline() as p:
           _ = (
                   p
                   | "Create" >> beam.Create(requests)
                   | "Enrich W/ Spanner" >> Enrichment(spanner_handler)
                   | "Print" >> beam.Map(print))


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to