cebasfu93 commented on issue #30177:
URL: https://github.com/apache/beam/issues/30177#issuecomment-2908992591

   Can't share the whole thing, but here is the main logic:
   
   ```
   import logging
   
   from typing import Iterable, Tuple
   
   import apache_beam as beam
   
   from apache_beam.pvalue import TaggedOutput
   from pydantic import BaseModel, ConfigDict, Field
   
   
   class Result(BaseModel):
       model_config = ConfigDict(arbitrary_types_allowed=True)
       # Payload is a Pydantic model defined elsewhere
       payload: list[Payload] = Field(...)
   
   
   class MyFn(beam.DoFn):
   
       def __init__(
           self,
       ) -> None:
           super().__init__()
   
       def process(
           self, my_input: MyInput  # MyInput is a Pydantic model defined 
elsewhere,
       ) -> Iterable[Result | TaggedOutput]:  # type: ignore
           try:
               # build a Payload object from the input
               yield Payload(my_input)
           except Exception as e:  # noqa: BLE001
               error_msg = f"Failed to build Payload {e}"
               logging.error(error_msg)
               yield TaggedOutput("errors", my_input)
   
   
   class MyTransform(beam.PTransform):
       def __init__(self) -> None:
           super().__init__()
           self.my_fn = MyFn()
   
       def expand(self, pcoll: MyInput) -> None:
           payloads, errors = (
               pcoll
               | "Build payloads" >> beam.ParDo(self.my_fn)
               .with_outputs("errors", main="payloads")
               .with_output_types(Tuple[Payload, bytes])
           )
           _ = (
               errors
               | "Log Errors" >> MySink(
                   # a custom Transform to save errors to GCS
               )
           )
   
           _ = (
               payloads
               | "Decode Entities" >> beam.Map(lambda ent: ent.model_dump())
               | "write to BQ" >> beam.io.WriteToBigQuery(   # type: ignore
                   table=my_table,  # a google.cloud.bigquery.Table instance
                   schema=my_table.to_api_repr()["schema"],
                   write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,  
 # type: ignore
                   create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER, 
 # type: ignore
                   method="STORAGE_WRITE_API",
               )
           )
   
   
   if __name__ == "__main__":
       pipeline = beam.Pipeline(options=options)
   
       # Read from Pub/Sub and decode into JSON
       my_inputs: PCollection[MyInput] = (
           pipeline
           | "Read from Pub/Sub" >> beam.io.ReadFromPubSub(  # type: ignore
               subscription=f"projects/<project>/subscriptions/<subscription>",
           )
           | "Fixed-size Windows" >> 
beam.WindowInto(window.FixedWindows(size=5, offset=0))
       )
   
       # NER with Spacy and upload to BigQuery
       _ = (
           my_inputs
           | "Spacy NER" >> NerTransform(
               ner_table=ner_table,
               model_type=NerModelTypes.SPACY,
               errors_location=config.errors_location,
               errors_webhook_url=webhook_url,
               job_name=config.job_name,
           )
       )
       pipeline.run()
   ```
   The header of the Dockerfile is:
   ```
   FROM python:3.12-slim
   COPY --from=apache/beam_python3.12_sdk:2.65.0 /opt/apache/beam 
/opt/apache/beam
   COPY 
--from=gcr.io/dataflow-templates-base/python312-template-launcher-base:20250124-rc00
 /opt/google/dataflow/python_template_launcher 
/opt/google/dataflow/python_template_launcher
   # ...
   ENTRYPOINT ["/opt/apache/beam/boot"]
   ```
   
   Some relevant pipeline options are: 
   `dataflow_service_options=streaming_mode_at_least_once`
   `pickle_library="cloudpickle"`
   
   Let me know if you need more info :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to