dheerajturaga commented on PR #51738:
URL: https://github.com/apache/airflow/pull/51738#issuecomment-3015519235
Just for reference, in Airflow 3, retrieving the triggering user from event
logs requires the following approach. I've encapsulated the access_token
handling within the `get_airflow_client_configuration` method for clarity.
That said, there may be scenarios where the logical date does not align as
expected, which introduces additional complexity—particularly in our unit tests
where we need to mock API responses. Given that over 50% of our DAGs rely on
this functionality, its absence could significantly delay our adoption of
Airflow 3.
I hope this concern is understandable and that accommodating this request is
feasible. I truly appreciate your consideration.
```
def _find_owner_v3(dag_run=None) -> str | None:
"""
This is only for Airflow3, use the Airflow Client API
to fetch the event logs
"""
# Only run for manual runs
if dag_run.run_type.name.upper() != 'MANUAL':
logger.error(f"Not manually triggered. run_type: {dag_run.run_type}")
return
# Cant co-relate if logical date is missing for dag run
if not dag_run.logical_date:
logger.error(f"No logical date available for this run, cant find
owner")
return
logical_date =
dag_run.logical_date.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
import airflow_client.client
from airflow_client.client.rest import ApiException
from custom.api.utils import get_airflow_client_configuration
configuration = get_airflow_client_configuration()
# Enter a context with an instance of the API client
with airflow_client.client.ApiClient(configuration) as api_client:
# Create an instance of the API class
api_instance = airflow_client.client.EventLogApi(api_client)
event = 'trigger_dag_run'
try:
logger.info("#################################################")
logger.info(f"DAG ID: {dag_run.dag_id}")
logger.info(f"RUN ID: {dag_run.run_id}")
logger.info(f"Logical Date: {logical_date}")
logger.info("#################################################")
# Get Event Logs
api_response =
api_instance.get_event_logs(dag_id=dag_run.dag_id, event=event)
if not api_response:
logger.error("No trigger events found!")
return
for event in reversed(api_response.event_logs):
logger.info(event)
if event.extra:
event_info = {}
event_info = json.loads(event.extra)
if "logical_date" in event_info:
if logical_date == event_info["logical_date"]:
logger.info(f"Matching Event: {event}")
logger.info(f"Dag triggered by: {event.owner}")
return event.owner
except Exception as e:
raise AirflowException("Exception when calling
EventLogApi->get_event_logs: %s\n" % e)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]