cebasfu93 commented on issue #30177:
URL: https://github.com/apache/beam/issues/30177#issuecomment-2908992591
Can't share the whole thing, but here is the main logic:
```
import logging
from typing import Iterable, Tuple
import apache_beam as beam
from apache_beam.pvalue import TaggedOutput
from pydantic import BaseModel, ConfigDict, Field
class Result(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
# Payload is a Pydantic model defined elsewhere
payload: list[Payload] = Field(...)
class MyFn(beam.DoFn):
def __init__(
self,
) -> None:
super().__init__()
def process(
self, my_input: MyInput # MyInput is a Pydantic model defined
elsewhere,
) -> Iterable[Result | TaggedOutput]: # type: ignore
try:
# build a Payload object from the input
yield Payload(my_input)
except Exception as e: # noqa: BLE001
error_msg = f"Failed to build Payload {e}"
logging.error(error_msg)
yield TaggedOutput("errors", my_input)
class MyTransform(beam.PTransform):
def __init__(self) -> None:
super().__init__()
self.my_fn = MyFn()
def expand(self, pcoll: MyInput) -> None:
payloads, errors = (
pcoll
| "Build payloads" >> beam.ParDo(self.my_fn)
.with_outputs("errors", main="payloads")
.with_output_types(Tuple[Payload, bytes])
)
_ = (
errors
| "Log Errors" >> MySink(
# a custom Transform to save errors to GCS
)
)
_ = (
payloads
| "Decode Entities" >> beam.Map(lambda ent: ent.model_dump())
| "write to BQ" >> beam.io.WriteToBigQuery( # type: ignore
table=my_table, # a google.cloud.bigquery.Table instance
schema=my_table.to_api_repr()["schema"],
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
# type: ignore
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
# type: ignore
method="STORAGE_WRITE_API",
)
)
if __name__ == "__main__":
pipeline = beam.Pipeline(options=options)
# Read from Pub/Sub and decode into JSON
my_inputs: PCollection[MyInput] = (
pipeline
| "Read from Pub/Sub" >> beam.io.ReadFromPubSub( # type: ignore
subscription=f"projects/<project>/subscriptions/<subscription>",
)
| "Fixed-size Windows" >>
beam.WindowInto(window.FixedWindows(size=5, offset=0))
)
# NER with Spacy and upload to BigQuery
_ = (
my_inputs
| "Spacy NER" >> NerTransform(
ner_table=ner_table,
model_type=NerModelTypes.SPACY,
errors_location=config.errors_location,
errors_webhook_url=webhook_url,
job_name=config.job_name,
)
)
pipeline.run()
```
The header of the Dockerfile is:
```
FROM python:3.12-slim
COPY --from=apache/beam_python3.12_sdk:2.65.0 /opt/apache/beam
/opt/apache/beam
COPY
--from=gcr.io/dataflow-templates-base/python312-template-launcher-base:20250124-rc00
/opt/google/dataflow/python_template_launcher
/opt/google/dataflow/python_template_launcher
# ...
ENTRYPOINT ["/opt/apache/beam/boot"]
```
Some relevant pipeline options are:
`dataflow_service_options=streaming_mode_at_least_once`
`pickle_library="cloudpickle"`
Let me know if you need more info :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]