jbandoro opened a new issue, #39026:
URL: https://github.com/apache/beam/issues/39026

   ### What happened?
   
   We noticed this after upgrading from v2.66.0 to v2.74.0 for our python sdk 
dataflow runner streaming jobs. We use side inputs from a `PeriodicImpulse` 
that fire at intervals that are longer than our streaming PCollection windows. 
With the upgrade to v2.74.0 the dataflow streaming job's reported watermark age 
for the stage with the side inputs and the overall job's watermark age now 
follows the side input periodic impulses' interval.
   
   So if we have a side input that fires every hour, the job's watermark will 
have a saw tooth pattern climbing up to 1hour, when previously this didn't 
happen. The issue we are having is that we have alerts based on the job's 
watermark age which would now following the periodic impulse with the longest 
firing interval.
   
   To reproduce this I created a minimal example below to run with the dataflow 
runner with both v2.66.0 and v2.74.0 and I'm showing screenshots of the data 
freshness reported by dataflow:
   
   # Setup
   Create a pubsub topic 
   ```shell
   gcloud pubsub topics create periodicimpulse-watermark --project=<project>
   ```
   Run the following script to publish messages:
   ```sh
   #!/bin/bash
   while true; do
       gcloud pubsub topics publish 
projects/aclima-lab/topics/periodicimpulse-watermark \
           --message="Hello!" --project=aclima-lab
       sleep 1
   done
   ```
   Created the following python sdk pipeline to submit with a side input with a 
periodic impulse firing every 300s:
   ```python
   import argparse
   import time
   import logging
   
   import apache_beam as beam
   from apache_beam.io.gcp.pubsub import ReadFromPubSub
   from apache_beam.options.pipeline_options import PipelineOptions, 
StandardOptions
   from apache_beam.transforms.periodicsequence import PeriodicImpulse
   from apache_beam.transforms.trigger import AccumulationMode, AfterCount, 
AfterProcessingTime, Repeatedly
   from apache_beam.transforms.window import GlobalWindows
   
   PUBSUB_TOPIC = "projects/aclima-lab/topics/periodicimpulse-watermark"
   SIDE_INPUT_INTERVAL = 300
   
   
   class JoinWithSideInput(beam.DoFn):
       def process(
           self,
           element,
           side_values=beam.DoFn.SideInputParam,
       ):
           processing_time = time.time()
           event_time = element.publish_time.timestamp()
   
           yield {
               "message_id": element.message_id,
               "event_time": int(event_time),
               "processing_time": int(processing_time),
               "side_input_time": int(list(side_values)[0]) if side_values else 
None,
           }
   
   
   def run(argv=None):
       parser = argparse.ArgumentParser()
       parser.add_argument(
           "--output_table",
           required=True,
           help="BigQuery table to write results to, e.g. 
project:dataset.table",
       )
       known_args, pipeline_argv = parser.parse_known_args(argv)
       output_table = known_args.output_table
   
       options = PipelineOptions(pipeline_argv)
       options.view_as(StandardOptions).streaming = True
   
       schema = {
           "fields": [
               {"name": "message_id", "type": "STRING"},
               {"name": "event_time", "type": "INTEGER"},
               {"name": "processing_time", "type": "INTEGER"},
               {"name": "side_input_time", "type": "INTEGER"},
           ]
       }
   
       with beam.Pipeline(options=options) as p:
           side_input_pcol = (
               p
               | "SideInputImpulse" >> PeriodicImpulse(
                   fire_interval=SIDE_INPUT_INTERVAL
               )
               | "SideInputWindow" >> beam.WindowInto(
                   GlobalWindows(),
                   trigger=Repeatedly(AfterCount(1)),
                   accumulation_mode=AccumulationMode.DISCARDING,
               )
           )
           results = (
               p
               | "ReadPubSub" >> ReadFromPubSub(topic=PUBSUB_TOPIC, 
with_attributes=True)
               | "MainWindow" >> beam.WindowInto(
                   GlobalWindows(),
                   trigger=Repeatedly(AfterProcessingTime(10)),
                   accumulation_mode=AccumulationMode.DISCARDING,
               )
               | "JoinWithSideInput" >> beam.ParDo(
                   JoinWithSideInput(),
                   side_values=beam.pvalue.AsIter(side_input_pcol),
               )
           )
   
           results | "WriteToBQ" >> beam.io.WriteToBigQuery(
               table=output_table,
               schema=schema,
               create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
               write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
               method=beam.io.WriteToBigQuery.Method.STREAMING_INSERTS,
           )
   
   
   if __name__ == "__main__":
       logging.getLogger().setLevel(logging.INFO)
       run()
   ```
   # Submitting dataflow jobs
   Submit with apache-beam v2.74.0
   ```shell
   python periodic_impulse_watermark.py \
     --project=<project> \
     --job_name=periodicimpulse-beam-274 \
     --region=<region> \
     --runner=DataflowRunner \
     --temp_location=gs://<bucket>/temp \
     --output_table=<project>:<dataset>.periodic_impulse_watermark_v2_74
   ```
   and submit another job with apache-beam v2.66.0
   ```shell
   python periodic_impulse_watermark.py \
     --project=<project> \
     --job_name=periodicimpulse-beam-266 \
     --region=<region> \
     --runner=DataflowRunner \
     --temp_location=gs://<bucket>/temp \
     --output_table=<project>:<dataset>.periodic_impulse_watermark_v2_66
   ```
   # Result
   Both jobs successfully write to their BigQuery tables. The difference is in 
the job's watermark age reported on dataflow. 
   
   ## v2.74.0
   The watermark age is a sawtooth climbing to the 300s of the periodic impulse:
   <img width="700"  alt="Image" 
src="https://github.com/user-attachments/assets/00b4180a-eedd-4072-bcd9-fb60a659839f";
 />
   The same with the job's watermark age:
   <img width="700" alt="Image" 
src="https://github.com/user-attachments/assets/ae969c7b-9931-4ba5-b322-e3ccdade3c8e";
 />
   
   ## v2.66.0
   There is no sawtooth pattern:
   
   <img width="700" alt="Image" 
src="https://github.com/user-attachments/assets/6bc7cb79-ceed-4e8d-a1c2-42c1dc26ba20";
 />
   
   <img width="700" alt="Image" 
src="https://github.com/user-attachments/assets/250ad513-f1af-4a38-af40-f3448c7419b7";
 />
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [x] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Prism Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [x] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to