GitHub user bohdan-pd edited a comment on the discussion: Accessing 
TaskInstances from DagRun context after Airflow 3.x API changes

@Wolferise, here is a small example that works for me

```
from airflow.sdk.api.client import Client

class BaseAirflowAPIOperator:
    def get_client(self):
        """Get authenticated API client."""
        base_url, token = self.get_api_credentials()
        return Client(base_url=base_url, token=token)

    def make_api_request(self, method: str, endpoint: str, params=None):
        """Make API request with built-in retry and error handling."""
        with self.get_client() as client:
            response = client.request(method, endpoint, params=params)
            return response.json()

class GetTaskInstanceOperator(BaseAirflowAPIOperator):
    """Operator to get task instance for specified DAG run and task."""

    def __init__(self, target_dag_id: str, target_run_id: str, target_task_id: 
str, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.target_dag_id = target_dag_id
        self.target_run_id = target_run_id
        self.target_task_id = target_task_id

    def execute(self, context: Context):
        task_instance = self.make_api_request(
            "GET",
            
f"/api/v2/dags/{self.target_dag_id}/dagRuns/{self.target_run_id}/taskInstances/{self.target_task_id}"
        )

        if task_instance:
            logging.info(f"Found task instance: {task_instance['task_id']} with 
state {task_instance.get('state')}")

        return task_instance
```

GitHub link: 
https://github.com/apache/airflow/discussions/56037#discussioncomment-15095738

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to