GitHub user bohdan-pd edited a comment on the discussion: Accessing
TaskInstances from DagRun context after Airflow 3.x API changes
@Wolferise, here is a small example that works for me
```
from airflow.sdk.api.client import Client
class BaseAirflowAPIOperator:
def get_client(self):
"""Get authenticated API client."""
base_url, token = self.get_api_credentials()
return Client(base_url=base_url, token=token)
def make_api_request(self, method: str, endpoint: str, params=None):
"""Make API request with built-in retry and error handling."""
with self.get_client() as client:
response = client.request(method, endpoint, params=params)
return response.json()
class GetTaskInstanceOperator(BaseAirflowAPIOperator):
"""Operator to get task instance for specified DAG run and task."""
def __init__(self, target_dag_id: str, target_run_id: str, target_task_id:
str, *args, **kwargs):
super().__init__(*args, **kwargs)
self.target_dag_id = target_dag_id
self.target_run_id = target_run_id
self.target_task_id = target_task_id
def execute(self, context: Context):
task_instance = self.make_api_request(
"GET",
f"/api/v2/dags/{self.target_dag_id}/dagRuns/{self.target_run_id}/taskInstances/{self.target_task_id}"
)
if task_instance:
logging.info(f"Found task instance: {task_instance['task_id']} with
state {task_instance.get('state')}")
return task_instance
```
GitHub link:
https://github.com/apache/airflow/discussions/56037#discussioncomment-15095738
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]