CYarros10 opened a new issue, #30007:
URL: https://github.com/apache/airflow/issues/30007

   ### Apache Airflow version
   
   2.5.1
   
   ### What happened
   
   
[BeamRunPythonPipelineOperator](https://github.com/apache/airflow/blob/main/airflow/providers/apache/beam/operators/beam.py#L343))
 does not push values to xcoms when the pipeline starts. But Dataflow Sensors 
work like this:
   
   ```
           discover_cancelling_jobs = DataflowJobStatusSensor(
               task_id="discover_cancelling_jobs",
               
job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_config']['job_id']}}",
               expected_statuses={DataflowJobStatus.JOB_STATE_CANCELLING},
               location="{{region}}",
               mode="poke"
           )
   ```
   
   Since the only way to retrieve Dataflow Job ID from a 
BeamRunPythonPipelineOperator is through xcom, and 
BeamRunPythonPipelineOperator does not push this xcom until the pipeline ends, 
the Sensor can't "sense". It will only be able to read jobs that are done.
   
   
   
   
   ### What you think should happen instead
   
   The dataflow Job ID should be pushed to xcom when/before the pipeline 
starts. 
   
   ### How to reproduce
   
   Sample Code
   
   ```
    # -------------------------------------------------------------------------
       # Dataflow
       # 
-------------------------------------------------------------------------
   
       with TaskGroup(group_id="dataflow_tg1") as dataflow_tg1:
   
   
           start_python_job = BeamRunPythonPipelineOperator(
               task_id="start_python_job",
               runner="DataflowRunner",
               py_file="gs://{{gcs_download_bucket}}/{{df_python_script}}",
               py_options=[],
               pipeline_options={
                   "output": "gs://{{gcs_download_bucket}}/dataflow_output",
               },
               py_requirements=["apache-beam[gcp]==2.36.0"],
               py_interpreter="python3",
               py_system_site_packages=False,
               dataflow_config={
                   "job_name": "{{df_job}}-python",
                   "wait_until_finished": False,
               },
           )
   
           start_python_job_async = BeamRunPythonPipelineOperator(
               task_id="start_python_job_async",
               runner="DataflowRunner",
               py_file="gs://{{gcs_download_bucket}}/{{df_python_script}}",
               py_options=[],
               pipeline_options={
                   "output": "gs://{{gcs_download_bucket}}/dataflow_output",
               },
               py_requirements=["apache-beam[gcp]==2.36.0"],
               py_interpreter="python3",
               py_system_site_packages=False,
               dataflow_config={
                   "job_name": "{{df_job}}-aysnc",
                   "wait_until_finished": False,
               },
           )
   
           start_template_job = DataflowTemplatedJobStartOperator(
               task_id="start_template_job",
               job_name="{{df_job}}-template",
               project_id="{{ project_id }}",
               template="gs://dataflow-templates/latest/Word_Count",
               parameters={"inputFile": 
"gs://{{gcs_download_bucket}}/{{gcs_download_obj}}", "output": 
"gs://{{gcs_download_bucket}}/dataflow_output"},
               location="{{region}}",
           )
   
   
           wait_for_python_job_async_done = DataflowJobStatusSensor(
               task_id="wait_for_python_job_async_done",
               
job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_config']['job_id']}}",
               expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
               location="{{region}}",
               mode="reschedule",
               poke_interval=60
           )
   
           def check_metric_scalar_gte(metric_name: str, value: int) -> 
Callable:
               """Check is metric greater than equals to given value."""
   
               def callback(metrics) -> bool:
                   dag.log.info("Looking for '%s' >= %d", metric_name, value)
                   for metric in metrics:
                       context = metric.get("name", {}).get("context", {})
                       original_name = context.get("original_name", "")
                       tentative = context.get("tentative", "")
                       if original_name == "Service-cpu_num_seconds" and not 
tentative:
                           return metric["scalar"] >= value
                   raise AirflowException(f"Metric '{metric_name}' not found in 
metrics")
   
               return callback
   
           wait_for_python_job_async_metric = DataflowJobMetricsSensor(
               task_id="wait_for_python_job_async_metric",
               
job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_config']['job_id']}}",
 # this doesnt work
               location="{{region}}",
               
callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", 
value=100),
               fail_on_terminal_state=False,
               mode="reschedule",
               poke_interval=60
           )
   
   
           def check_autoscaling_event(autoscaling_events) -> bool:
               """Check autoscaling event"""
               for autoscaling_event in autoscaling_events:
                   if "Worker pool started." in 
autoscaling_event.get("description", {}).get("messageText", ""):
                       return True
               return False
   
           wait_for_python_job_async_autoscaling_event = 
DataflowJobAutoScalingEventsSensor(
               task_id="wait_for_python_job_async_autoscaling_event",
               
job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_config']['job_id']}}",
 # this doesnt work
               location="{{region}}",
               callback=check_autoscaling_event,
               fail_on_terminal_state=False,
               mode="reschedule",
               poke_interval=60
           )
   
           stop_python_job = DataflowStopJobOperator(
               task_id="stop_python_dataflow_job",
               location="{{region}}",
               
job_name_prefix="{{task_instance.xcom_pull('start_python_job')['dataflow_job_config']['job_id']}}",
           )
   
           stop_template_job = DataflowStopJobOperator(
               task_id="stop_dataflow_job",
               location="{{region}}",
               job_name_prefix="{{df_job}}-template",
           )
   
           stop_async_job = DataflowStopJobOperator(
               task_id="stop_async_job",
               location="{{region}}",
               
job_name_prefix="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_config']['job_id']}}",
           )
   
           start_python_job >> stop_python_job
       
           start_template_job >> stop_template_job
   
           start_python_job_async >> stop_async_job
   
           wait_for_python_job_async_metric
   
           wait_for_python_job_async_autoscaling_event
           
           wait_for_python_job_async_done
    ```
   
   ### Operating System
   
   composer-2.1.5-airflow-2.4.3
   
   ### Versions of Apache Airflow Providers
   
   2.4.3
   
   ### Deployment
   
   Google Cloud Composer
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   Occurs every time
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to