GitHub user bohdan-pd edited a comment on the discussion: Accessing TaskInstances from DagRun context after Airflow 3.x API changes
@Wolferise, here is a small example that works for me You can check endpoints [here](https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html) ``` from airflow.sdk.api.client import Client class BaseAirflowAPIOperator: def get_client(self): """Get authenticated API client.""" base_url, token = self.get_api_credentials() return Client(base_url=base_url, token=token) def make_api_request(self, method: str, endpoint: str, params=None): """Make API request with built-in retry and error handling.""" with self.get_client() as client: response = client.request(method, endpoint, params=params) return response.json() class GetTaskInstanceOperator(BaseAirflowAPIOperator): """Operator to get task instance for specified DAG run and task.""" def __init__(self, target_dag_id: str, target_run_id: str, target_task_id: str, *args, **kwargs): super().__init__(*args, **kwargs) self.target_dag_id = target_dag_id self.target_run_id = target_run_id self.target_task_id = target_task_id def execute(self, context: Context): task_instance = self.make_api_request( "GET", f"/api/v2/dags/{self.target_dag_id}/dagRuns/{self.target_run_id}/taskInstances/{self.target_task_id}" ) if task_instance: logging.info(f"Found task instance: {task_instance['task_id']} with state {task_instance.get('state')}") return task_instance ``` GitHub link: https://github.com/apache/airflow/discussions/56037#discussioncomment-15095738 ---- This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
