jbandoro opened a new issue, #39026:
URL: https://github.com/apache/beam/issues/39026
### What happened?
We noticed this after upgrading from v2.66.0 to v2.74.0 for our python sdk
dataflow runner streaming jobs. We use side inputs from a `PeriodicImpulse`
that fire at intervals that are longer than our streaming PCollection windows.
With the upgrade to v2.74.0 the dataflow streaming job's reported watermark age
for the stage with the side inputs and the overall job's watermark age now
follows the side input periodic impulses' interval.
So if we have a side input that fires every hour, the job's watermark will
have a saw tooth pattern climbing up to 1hour, when previously this didn't
happen. The issue we are having is that we have alerts based on the job's
watermark age which would now following the periodic impulse with the longest
firing interval.
To reproduce this I created a minimal example below to run with the dataflow
runner with both v2.66.0 and v2.74.0 and I'm showing screenshots of the data
freshness reported by dataflow:
# Setup
Create a pubsub topic
```shell
gcloud pubsub topics create periodicimpulse-watermark --project=<project>
```
Run the following script to publish messages:
```sh
#!/bin/bash
while true; do
gcloud pubsub topics publish
projects/aclima-lab/topics/periodicimpulse-watermark \
--message="Hello!" --project=aclima-lab
sleep 1
done
```
Created the following python sdk pipeline to submit with a side input with a
periodic impulse firing every 300s:
```python
import argparse
import time
import logging
import apache_beam as beam
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.options.pipeline_options import PipelineOptions,
StandardOptions
from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.transforms.trigger import AccumulationMode, AfterCount,
AfterProcessingTime, Repeatedly
from apache_beam.transforms.window import GlobalWindows
PUBSUB_TOPIC = "projects/aclima-lab/topics/periodicimpulse-watermark"
SIDE_INPUT_INTERVAL = 300
class JoinWithSideInput(beam.DoFn):
def process(
self,
element,
side_values=beam.DoFn.SideInputParam,
):
processing_time = time.time()
event_time = element.publish_time.timestamp()
yield {
"message_id": element.message_id,
"event_time": int(event_time),
"processing_time": int(processing_time),
"side_input_time": int(list(side_values)[0]) if side_values else
None,
}
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
"--output_table",
required=True,
help="BigQuery table to write results to, e.g.
project:dataset.table",
)
known_args, pipeline_argv = parser.parse_known_args(argv)
output_table = known_args.output_table
options = PipelineOptions(pipeline_argv)
options.view_as(StandardOptions).streaming = True
schema = {
"fields": [
{"name": "message_id", "type": "STRING"},
{"name": "event_time", "type": "INTEGER"},
{"name": "processing_time", "type": "INTEGER"},
{"name": "side_input_time", "type": "INTEGER"},
]
}
with beam.Pipeline(options=options) as p:
side_input_pcol = (
p
| "SideInputImpulse" >> PeriodicImpulse(
fire_interval=SIDE_INPUT_INTERVAL
)
| "SideInputWindow" >> beam.WindowInto(
GlobalWindows(),
trigger=Repeatedly(AfterCount(1)),
accumulation_mode=AccumulationMode.DISCARDING,
)
)
results = (
p
| "ReadPubSub" >> ReadFromPubSub(topic=PUBSUB_TOPIC,
with_attributes=True)
| "MainWindow" >> beam.WindowInto(
GlobalWindows(),
trigger=Repeatedly(AfterProcessingTime(10)),
accumulation_mode=AccumulationMode.DISCARDING,
)
| "JoinWithSideInput" >> beam.ParDo(
JoinWithSideInput(),
side_values=beam.pvalue.AsIter(side_input_pcol),
)
)
results | "WriteToBQ" >> beam.io.WriteToBigQuery(
table=output_table,
schema=schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
method=beam.io.WriteToBigQuery.Method.STREAMING_INSERTS,
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
run()
```
# Submitting dataflow jobs
Submit with apache-beam v2.74.0
```shell
python periodic_impulse_watermark.py \
--project=<project> \
--job_name=periodicimpulse-beam-274 \
--region=<region> \
--runner=DataflowRunner \
--temp_location=gs://<bucket>/temp \
--output_table=<project>:<dataset>.periodic_impulse_watermark_v2_74
```
and submit another job with apache-beam v2.66.0
```shell
python periodic_impulse_watermark.py \
--project=<project> \
--job_name=periodicimpulse-beam-266 \
--region=<region> \
--runner=DataflowRunner \
--temp_location=gs://<bucket>/temp \
--output_table=<project>:<dataset>.periodic_impulse_watermark_v2_66
```
# Result
Both jobs successfully write to their BigQuery tables. The difference is in
the job's watermark age reported on dataflow.
## v2.74.0
The watermark age is a sawtooth climbing to the 300s of the periodic impulse:
<img width="700" alt="Image"
src="https://github.com/user-attachments/assets/00b4180a-eedd-4072-bcd9-fb60a659839f"
/>
The same with the job's watermark age:
<img width="700" alt="Image"
src="https://github.com/user-attachments/assets/ae969c7b-9931-4ba5-b322-e3ccdade3c8e"
/>
## v2.66.0
There is no sawtooth pattern:
<img width="700" alt="Image"
src="https://github.com/user-attachments/assets/6bc7cb79-ceed-4e8d-a1c2-42c1dc26ba20"
/>
<img width="700" alt="Image"
src="https://github.com/user-attachments/assets/250ad513-f1af-4a38-af40-f3448c7419b7"
/>
### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [x] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Infrastructure
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Prism Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [x] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]