dheerajturaga commented on PR #51738:
URL: https://github.com/apache/airflow/pull/51738#issuecomment-3015519235

   Just for reference, in Airflow 3, retrieving the triggering user from event 
logs requires the following approach. I've encapsulated the access_token 
handling within the `get_airflow_client_configuration` method for clarity.
   
   That said, there may be scenarios where the logical date does not align as 
expected, which introduces additional complexity—particularly in our unit tests 
where we need to mock API responses. Given that over 50% of our DAGs rely on 
this functionality, its absence could significantly delay our adoption of 
Airflow 3.
   
   I hope this concern is understandable and that accommodating this request is 
feasible. I truly appreciate your consideration.
   
   ```
   def _find_owner_v3(dag_run=None) -> str | None:
       """
       This is only for Airflow3, use the Airflow Client API
       to fetch the event logs
       """
       # Only run for manual runs
       if dag_run.run_type.name.upper() != 'MANUAL':
           logger.error(f"Not manually triggered. run_type: {dag_run.run_type}")
           return
   
       # Cant co-relate if logical date is missing for dag run
       if not dag_run.logical_date:
           logger.error(f"No logical date available for this run, cant find 
owner")
           return
   
       logical_date = 
dag_run.logical_date.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
   
       import airflow_client.client
       from airflow_client.client.rest import ApiException
       from custom.api.utils import get_airflow_client_configuration
   
       configuration = get_airflow_client_configuration()
   
       # Enter a context with an instance of the API client
       with airflow_client.client.ApiClient(configuration) as api_client:
           # Create an instance of the API class
           api_instance = airflow_client.client.EventLogApi(api_client)
           event = 'trigger_dag_run'
           try:
               logger.info("#################################################")
               logger.info(f"DAG ID: {dag_run.dag_id}")
               logger.info(f"RUN ID: {dag_run.run_id}")
               logger.info(f"Logical Date: {logical_date}")
               logger.info("#################################################")
   
               # Get Event Logs
               api_response = 
api_instance.get_event_logs(dag_id=dag_run.dag_id, event=event)
   
               if not api_response:
                   logger.error("No trigger events found!")
                   return
   
               for event in reversed(api_response.event_logs):
                   logger.info(event)
                   if event.extra:
                       event_info = {}
                       event_info = json.loads(event.extra)
                       if "logical_date" in event_info:
                           if logical_date == event_info["logical_date"]:
                               logger.info(f"Matching Event: {event}")
                               logger.info(f"Dag triggered by: {event.owner}")
                               return event.owner
           except Exception as e:
               raise AirflowException("Exception when calling 
EventLogApi->get_event_logs: %s\n" % e)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to