ferruzzi opened a new issue, #32159:
URL: https://github.com/apache/airflow/issues/32159

   ### Apache Airflow version
   
   2.6.2
   
   ### What happened
   
   There is an issue with the `airflow_ti.start.<dag_id>.<task_id>.<state>` and 
`airflow_ti.finish.<dag_id>.<task_id>.<state>` metrics when running Airflow 
with OpenTelemetry.  Both of those get emitted when run with StatsD but are 
flaky under OTel.
   
   I am submitting this as an Issue since I will be a little distracted for the 
next bit and figured someone may be able to have a look in the meantime.  
Please do not assign it to me, I'll get it when I can is nobody else does.
   
   ### What you think should happen instead
   
   Behavior should be consistent.
   
   ### How to reproduce
   
   To reproduce, you can run Breeze with the statsd or the otel integration 
(for example `breeze start-airflow  --integration otel`) and run one or more of 
the following DAGs, then open the [OTel](http://localhost:28889/metrics) or 
[StatsD](http://localhost:29102/metrics) raw data view to verify.
   
   
   These two DAGs don't generate any `airflow_ti_finish.*` metrics:
   
   ```
   from airflow import DAG
   from airflow.decorators import task
   from airflow.utils.timezone import datetime
   
   
   @task
   def task1():
       return 'Hello'
   
   
   @task
   def task2():
       return 'World!'
   
   @task
   def task3(in1, in2):
       print(f'{in1} {in2}')
   
   
   with DAG(
       dag_id='taskflow_demo',
       start_date=datetime(2021, 1, 1),
       schedule=None,
       catchup=False
   ) as dag:
   
       task3(task1(), task2())
   ```
   
   ```
   import time
   
   from airflow import DAG
   from airflow.decorators import task
   from airflow.utils.timezone import datetime
   
   
   @task
   def task1():
       time.sleep(10)
   
   
   with DAG(
       dag_id='sleep_10',
       start_date=datetime(2021, 1, 1),
       schedule=None,
       catchup=False
   ) as dag:
   
       task1()
   ```
   
   but this one:
   
   ```
   import time
   from datetime import timedelta
   
   from airflow import DAG
   from airflow.decorators import task
   from airflow.utils.timezone import datetime
   
   
   def sla_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
       print(
           "The callback arguments are: ",
           {
               "dag": dag,
               "task_list": task_list,
               "blocking_task_list": blocking_task_list,
               "slas": slas,
               "blocking_tis": blocking_tis,
           },
       )
   
   
   @task(sla=timedelta(seconds=10))
   def sleep_20():
       """Sleep for 20 seconds"""
       time.sleep(20)
   
   
   @task
   def sleep_30():
       """Sleep for 30 seconds"""
       time.sleep(30)
   
   
   with DAG(
       dag_id='fail_S_L_A',
       start_date=datetime(2021, 1, 1),
       schedule="*/2 * * * *",
       catchup=False,
       sla_miss_callback=sla_callback,
   ) as dag:
   
       sleep_20() >> sleep_30()
   ```
   
   triggers all of the following....
   
   ```
   # HELP airflow_ti_finish_fail_s_l_a_sleep_30_deferred 
   # TYPE airflow_ti_finish_fail_s_l_a_sleep_30_deferred counter
   
airflow_ti_finish_fail_s_l_a_sleep_30_deferred{dag_id="fail_S_L_A",job="Airflow",task_id="sleep_30"}
 0
   # HELP airflow_ti_finish_fail_s_l_a_sleep_30_failed 
   # TYPE airflow_ti_finish_fail_s_l_a_sleep_30_failed counter
   
airflow_ti_finish_fail_s_l_a_sleep_30_failed{dag_id="fail_S_L_A",job="Airflow",task_id="sleep_30"}
 0
   # HELP airflow_ti_finish_fail_s_l_a_sleep_30_none 
   # TYPE airflow_ti_finish_fail_s_l_a_sleep_30_none counter
   
airflow_ti_finish_fail_s_l_a_sleep_30_none{dag_id="fail_S_L_A",job="Airflow",task_id="sleep_30"}
 0
   # HELP airflow_ti_finish_fail_s_l_a_sleep_30_queued 
   # TYPE airflow_ti_finish_fail_s_l_a_sleep_30_queued counter
   
airflow_ti_finish_fail_s_l_a_sleep_30_queued{dag_id="fail_S_L_A",job="Airflow",task_id="sleep_30"}
 0
   # HELP airflow_ti_finish_fail_s_l_a_sleep_30_removed 
   # TYPE airflow_ti_finish_fail_s_l_a_sleep_30_removed counter
   
airflow_ti_finish_fail_s_l_a_sleep_30_removed{dag_id="fail_S_L_A",job="Airflow",task_id="sleep_30"}
 0
   # HELP airflow_ti_finish_fail_s_l_a_sleep_30_restarting 
   # TYPE airflow_ti_finish_fail_s_l_a_sleep_30_restarting counter
   
airflow_ti_finish_fail_s_l_a_sleep_30_restarting{dag_id="fail_S_L_A",job="Airflow",task_id="sleep_30"}
 0
   # HELP airflow_ti_finish_fail_s_l_a_sleep_30_running 
   # TYPE airflow_ti_finish_fail_s_l_a_sleep_30_running counter
   
airflow_ti_finish_fail_s_l_a_sleep_30_running{dag_id="fail_S_L_A",job="Airflow",task_id="sleep_30"}
 0
   # HELP airflow_ti_finish_fail_s_l_a_sleep_30_scheduled 
   # TYPE airflow_ti_finish_fail_s_l_a_sleep_30_scheduled counter
   
airflow_ti_finish_fail_s_l_a_sleep_30_scheduled{dag_id="fail_S_L_A",job="Airflow",task_id="sleep_30"}
 0
   # HELP airflow_ti_finish_fail_s_l_a_sleep_30_shutdown 
   # TYPE airflow_ti_finish_fail_s_l_a_sleep_30_shutdown counter
   
airflow_ti_finish_fail_s_l_a_sleep_30_shutdown{dag_id="fail_S_L_A",job="Airflow",task_id="sleep_30"}
 0
   # HELP airflow_ti_finish_fail_s_l_a_sleep_30_skipped 
   # TYPE airflow_ti_finish_fail_s_l_a_sleep_30_skipped counter
   
airflow_ti_finish_fail_s_l_a_sleep_30_skipped{dag_id="fail_S_L_A",job="Airflow",task_id="sleep_30"}
 0
   # HELP airflow_ti_finish_fail_s_l_a_sleep_30_success 
   # TYPE airflow_ti_finish_fail_s_l_a_sleep_30_success counter
   
airflow_ti_finish_fail_s_l_a_sleep_30_success{dag_id="fail_S_L_A",job="Airflow",task_id="sleep_30"}
 0
   # HELP airflow_ti_finish_fail_s_l_a_sleep_30_up_for_reschedule 
   # TYPE airflow_ti_finish_fail_s_l_a_sleep_30_up_for_reschedule counter
   
airflow_ti_finish_fail_s_l_a_sleep_30_up_for_reschedule{dag_id="fail_S_L_A",job="Airflow",task_id="sleep_30"}
 0
   # HELP airflow_ti_finish_fail_s_l_a_sleep_30_up_for_retry 
   # TYPE airflow_ti_finish_fail_s_l_a_sleep_30_up_for_retry counter
   
airflow_ti_finish_fail_s_l_a_sleep_30_up_for_retry{dag_id="fail_S_L_A",job="Airflow",task_id="sleep_30"}
 0
   # HELP airflow_ti_finish_fail_s_l_a_sleep_30_upstream_failed 
   # TYPE airflow_ti_finish_fail_s_l_a_sleep_30_upstream_failed counter
   
airflow_ti_finish_fail_s_l_a_sleep_30_upstream_failed{dag_id="fail_S_L_A",job="Airflow",task_id="sleep_30"}
 0
   ```
   
   Of note:  it hit every stage and reported on it, and of note, it's only 
reporting for that one particular method (sleep_30) so perhaps that's my 
misunderstanding of when/why it gets triggered. 
   
   ##Things I have tried and (possibly?) ruled out
   - The one which triggers the metric has a `schedule` but scheduling the 
other two DAGs does not get the metric emitted so it is unlikely to be a 
scheduled/manual issue.
   - Even if thta is the only DAG ever run in a fresh environment it gets the 
same result so it does not appear to be a name/key collision in the MetricsMap 
storage object (ie task1 overwriting task2 or something like that).
   
   
   ### Operating System
   
   ubuntu 
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to