pierrejeambrun commented on code in PR #44238:
URL: https://github.com/apache/airflow/pull/44238#discussion_r1851736606


##########
airflow/api_fastapi/core_api/routes/public/task_instances.py:
##########
@@ -482,3 +486,91 @@ def get_mapped_task_instance_try_details(
         map_index=map_index,
         session=session,
     )
+
+
+@task_instances_router.get(
+    "/{task_id}/logs/{task_try_number}",
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    response_model=None,
+)
+def get_log(
+    *,
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    task_try_number: int,
+    request: Request,
+    session: Annotated[Session, Depends(get_session)],
+    full_content: bool = False,
+    map_index: int = -1,
+    token: str | None = None,
+) -> Response | dict:
+    """Get logs for specific task instance."""
+    if not token:
+        metadata = {}
+    else:
+        try:
+            metadata = 
URLSafeSerializer(request.app.state.secret_key).loads(token)
+        except BadSignature:
+            raise HTTPException(
+                status.HTTP_400_BAD_REQUEST, "Bad Signature. Please use only 
the tokens provided by the API."
+            )
+
+    if metadata.get("download_logs") and metadata["download_logs"]:
+        full_content = True
+
+    if full_content:
+        metadata["download_logs"] = True
+    else:
+        metadata["download_logs"] = False
+
+    task_log_reader = TaskLogReader()
+
+    if not task_log_reader.supports_read:
+        raise HTTPException(status.HTTP_400_BAD_REQUEST, "Task log handler 
does not support read logs.")
+
+    query = (
+        select(TaskInstance)
+        .where(
+            TaskInstance.task_id == task_id,
+            TaskInstance.dag_id == dag_id,
+            TaskInstance.run_id == dag_run_id,
+            TaskInstance.map_index == map_index,
+        )
+        .join(TaskInstance.dag_run)
+        
.options(joinedload(TaskInstance.trigger).joinedload(Trigger.triggerer_job))
+    )
+    ti = session.scalar(query)
+    if ti is None:
+        query = select(TaskInstanceHistory).where(
+            TaskInstanceHistory.task_id == task_id,
+            TaskInstanceHistory.dag_id == dag_id,
+            TaskInstanceHistory.run_id == dag_run_id,
+            TaskInstanceHistory.map_index == map_index,
+            TaskInstanceHistory.try_number == task_try_number,
+        )
+        ti = session.scalar(query)
+
+    if ti is None:
+        metadata["end_of_log"] = True
+        raise HTTPException(status.HTTP_404_NOT_FOUND, "TaskInstance not 
found")
+
+    dag = request.app.state.dag_bag.get_dag(dag_id)
+    if dag:
+        try:
+            ti.task = dag.get_task(ti.task_id)
+        except TaskNotFound:
+            pass
+
+    return_type = request.headers["accept"]
+    # return_type would be either the above two or None
+    logs: Any
+    if return_type == "application/json" or return_type is None:  # default
+        logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, 
metadata)
+        logs = logs[0] if task_try_number is not None else logs
+        # we must have token here, so we can safely ignore it
+        token = 
URLSafeSerializer(request.app.state.secret_key).dumps(metadata)  # type: 
ignore[assignment]
+        return TaskInstancesLogResponseObject(continuation_token=token, 
content=str(logs)).model_dump()
+    # text/plain. Stream
+    logs = task_log_reader.read_log_stream(ti, task_try_number, metadata)
+    return Response(media_type="text/plain", content="".join(list(logs)))

Review Comment:
   For the `Accept` header handling, you can take a look at 
`HeaderAcceptJsonOrText` and its usage. This way you can get rid of some manual 
handling and hard coded strings.



##########
airflow/api_fastapi/core_api/routes/public/task_instances.py:
##########


Review Comment:
   You put the endpoint in `task_instances`. This file is already really big, 
and originally it was in a separate `log_endpoint`, that might be better to 
keep it like this and create a new `log.py` file in the public routes.



##########
airflow/api_fastapi/core_api/routes/public/task_instances.py:
##########
@@ -482,3 +486,91 @@ def get_mapped_task_instance_try_details(
         map_index=map_index,
         session=session,
     )
+
+
+@task_instances_router.get(
+    "/{task_id}/logs/{task_try_number}",
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    response_model=None,

Review Comment:
   ```suggestion
   ```



##########
airflow/api_fastapi/core_api/routes/public/task_instances.py:
##########
@@ -482,3 +486,91 @@ def get_mapped_task_instance_try_details(
         map_index=map_index,
         session=session,
     )
+
+
+@task_instances_router.get(
+    "/{task_id}/logs/{task_try_number}",
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    response_model=None,
+)
+def get_log(
+    *,

Review Comment:
   ```suggestion
   ```



##########
airflow/api_fastapi/core_api/datamodels/task_instances.py:
##########
@@ -150,3 +150,11 @@ class TaskInstanceHistoryCollectionResponse(BaseModel):
 
     task_instances: list[TaskInstanceHistoryResponse]
     total_entries: int
+
+
+# Response Models

Review Comment:
   ```suggestion
   ```



##########
tests/api_fastapi/core_api/routes/public/test_task_instances.py:
##########
@@ -1671,3 +1683,354 @@ def test_raises_404_for_nonexistent_task_instance(self, 
test_client, session):
         assert response.json() == {
             "detail": "The Task Instance with dag_id: 
`example_python_operator`, run_id: `TEST_DAG_RUN_ID`, task_id: 
`nonexistent_task`, try_number: `0` and map_index: `-1` was not found"
         }
+
+
+class TestTaskInstancesLog:
+    DAG_ID = "dag_for_testing_log_endpoint"
+    RUN_ID = "dag_run_id_for_testing_log_endpoint"
+    TASK_ID = "task_for_testing_log_endpoint"
+    MAPPED_TASK_ID = "mapped_task_for_testing_log_endpoint"
+    TRY_NUMBER = 1
+
+    default_time = "2020-06-10T20:00:00+00:00"
+
+    @pytest.fixture(autouse=True)
+    def setup_attrs(self, configure_loggers, dag_maker, session) -> None:
+        self.app = create_app()

Review Comment:
   You can access `app` via the `test_client`
   ```suggestion
   ```



##########
airflow/api_fastapi/core_api/routes/public/task_instances.py:
##########
@@ -482,3 +486,91 @@ def get_mapped_task_instance_try_details(
         map_index=map_index,
         session=session,
     )
+
+
+@task_instances_router.get(
+    "/{task_id}/logs/{task_try_number}",
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    response_model=None,

Review Comment:
   You can take a look at `get_config` or `get_dag_source` endpoint.
   
   Those endpoints are also returning a plain `Response`. There is no way for 
FastAPI to populate properly the documentation. You can check there how this 
can be achieved. (appropriate `response_model` for the `application/json` 
response, and custom one through `responses` for the `text/plain`).



##########
tests/api_fastapi/core_api/routes/public/test_task_instances.py:
##########
@@ -1671,3 +1683,354 @@ def test_raises_404_for_nonexistent_task_instance(self, 
test_client, session):
         assert response.json() == {
             "detail": "The Task Instance with dag_id: 
`example_python_operator`, run_id: `TEST_DAG_RUN_ID`, task_id: 
`nonexistent_task`, try_number: `0` and map_index: `-1` was not found"
         }
+
+
+class TestTaskInstancesLog:
+    DAG_ID = "dag_for_testing_log_endpoint"
+    RUN_ID = "dag_run_id_for_testing_log_endpoint"
+    TASK_ID = "task_for_testing_log_endpoint"
+    MAPPED_TASK_ID = "mapped_task_for_testing_log_endpoint"
+    TRY_NUMBER = 1
+
+    default_time = "2020-06-10T20:00:00+00:00"
+
+    @pytest.fixture(autouse=True)
+    def setup_attrs(self, configure_loggers, dag_maker, session) -> None:
+        self.app = create_app()
+        self.client = TestClient(self.app)
+        # Make sure that the configure_logging is not cached
+        self.old_modules = dict(sys.modules)
+
+        with dag_maker(self.DAG_ID, 
start_date=timezone.parse(self.default_time), session=session) as dag:
+            EmptyOperator(task_id=self.TASK_ID)
+
+            @task(task_id=self.MAPPED_TASK_ID)
+            def add_one(x: int):
+                return x + 1
+
+            add_one.expand(x=[1, 2, 3])
+
+        dr = dag_maker.create_dagrun(
+            run_id=self.RUN_ID,
+            run_type=DagRunType.SCHEDULED,
+            logical_date=timezone.parse(self.default_time),
+            start_date=timezone.parse(self.default_time),
+        )
+
+        self.app.state.dag_bag.bag_dag(dag)
+
+        # Add dummy dag for checking picking correct log with same task_id and 
different dag_id case.
+        with dag_maker(
+            f"{self.DAG_ID}_copy", 
start_date=timezone.parse(self.default_time), session=session
+        ) as dummy_dag:
+            EmptyOperator(task_id=self.TASK_ID)
+        dr2 = dag_maker.create_dagrun(
+            run_id=self.RUN_ID,
+            run_type=DagRunType.SCHEDULED,
+            logical_date=timezone.parse(self.default_time),
+            start_date=timezone.parse(self.default_time),
+        )
+        self.app.state.dag_bag.bag_dag(dummy_dag)
+
+        for ti in dr.task_instances:
+            ti.try_number = 1
+            ti.hostname = "localhost"
+            session.merge(ti)
+        for ti in dr2.task_instances:
+            ti.try_number = 1
+            ti.hostname = "localhost"
+            session.merge(ti)
+        session.flush()
+        dag.clear()
+        dummy_dag.clear()
+        for ti in dr.task_instances:
+            ti.try_number = 2
+            ti.hostname = "localhost"
+            session.merge(ti)
+        for ti in dr2.task_instances:
+            ti.try_number = 2
+            ti.hostname = "localhost"
+            session.merge(ti)
+        session.flush()
+
+    @pytest.fixture
+    def configure_loggers(self, tmp_path, create_log_template):
+        self.log_dir = tmp_path
+
+        # TASK_ID
+        dir_path = tmp_path / f"dag_id={self.DAG_ID}" / 
f"run_id={self.RUN_ID}" / f"task_id={self.TASK_ID}"
+        dir_path.mkdir(parents=True)
+
+        log = dir_path / "attempt=1.log"
+        log.write_text("Log for testing.")
+
+        # try number 2
+        log = dir_path / "attempt=2.log"
+        log.write_text("Log for testing 2.")
+
+        # MAPPED_TASK_ID
+        for map_index in range(3):
+            dir_path = (
+                tmp_path
+                / f"dag_id={self.DAG_ID}"
+                / f"run_id={self.RUN_ID}"
+                / f"task_id={self.MAPPED_TASK_ID}"
+                / f"map_index={map_index}"
+            )
+
+            dir_path.mkdir(parents=True)
+
+            log = dir_path / "attempt=1.log"
+            log.write_text("Log for testing.")
+
+            # try number 2
+            log = dir_path / "attempt=2.log"
+            log.write_text("Log for testing 2.")
+
+        # Create a custom logging configuration
+        logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
+        logging_config["handlers"]["task"]["base_log_folder"] = self.log_dir
+
+        logging.config.dictConfig(logging_config)
+
+        yield
+
+        logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
+
+    def teardown_method(self):
+        clear_db_runs()
+
+    @pytest.mark.parametrize("try_number", [1, 2])
+    def test_should_respond_200_json(self, try_number):
+        key = self.app.state.secret_key
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": False})
+        response = self.client.get(
+            
f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}/logs/{try_number}",
+            params={"token": token},
+            headers={"Accept": "application/json"},
+            # environ_overrides={"REMOTE_USER": "test"},
+        )
+        expected_filename = 
f"{self.log_dir}/dag_id={self.DAG_ID}/run_id={self.RUN_ID}/task_id={self.TASK_ID}/attempt={try_number}.log"
+        log_content = "Log for testing." if try_number == 1 else "Log for 
testing 2."
+        print("\n\n\n response.json()", response.json())
+        assert "[('localhost'," in response.json()["content"]
+        assert f"*** Found local files:\\n***   * {expected_filename}\\n" in 
response.json()["content"]
+        assert f"{log_content}')]" in response.json()["content"]
+
+        info = serializer.loads(response.json()["continuation_token"])
+        assert info == {"end_of_log": True, "log_pos": 16 if try_number == 1 
else 18}
+        assert 200 == response.status_code
+
+    @pytest.mark.parametrize(
+        "request_url, expected_filename, extra_query_string, try_number",
+        [
+            (
+                
f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}/logs/1",
+                
f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={TASK_ID}/attempt=1.log",
+                {},
+                1,
+            ),
+            (
+                
f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{MAPPED_TASK_ID}/logs/1",
+                
f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={MAPPED_TASK_ID}/map_index=0/attempt=1.log",
+                {"map_index": 0},
+                1,
+            ),
+            # try_number 2
+            (
+                
f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}/logs/2",
+                
f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={TASK_ID}/attempt=2.log",
+                {},
+                2,
+            ),
+            (
+                
f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{MAPPED_TASK_ID}/logs/2",
+                
f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={MAPPED_TASK_ID}/map_index=0/attempt=2.log",
+                {"map_index": 0},
+                2,
+            ),
+        ],
+    )
+    def test_should_respond_200_text_plain(
+        self, request_url, expected_filename, extra_query_string, try_number
+    ):
+        expected_filename = expected_filename.replace("LOG_DIR", 
str(self.log_dir))
+
+        key = self.app.state.secret_key
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": True})
+
+        response = self.client.get(
+            request_url,
+            params={"token": token, **extra_query_string},
+            headers={"Accept": "text/plain"},
+            # environ_overrides={"REMOTE_USER": "test"},
+        )
+        assert 200 == response.status_code
+
+        log_content = "Log for testing." if try_number == 1 else "Log for 
testing 2."
+        assert "localhost\n" in response.content.decode("utf-8")
+        assert f"*** Found local files:\n***   * {expected_filename}\n" in 
response.content.decode("utf-8")
+        assert f"{log_content}\n" in response.content.decode("utf-8")
+
+    @pytest.mark.parametrize(
+        "request_url, expected_filename, extra_query_string, try_number",
+        [
+            (
+                
f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}/logs/1",
+                
f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={TASK_ID}/attempt=1.log",
+                {},
+                1,
+            ),
+            (
+                
f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{MAPPED_TASK_ID}/logs/1",
+                
f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={MAPPED_TASK_ID}/map_index=0/attempt=1.log",
+                {"map_index": 0},
+                1,
+            ),
+            (
+                
f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}/logs/2",
+                
f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={TASK_ID}/attempt=2.log",
+                {},
+                2,
+            ),
+            (
+                
f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{MAPPED_TASK_ID}/logs/2",
+                
f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={MAPPED_TASK_ID}/map_index=0/attempt=2.log",
+                {"map_index": 0},
+                2,
+            ),
+        ],
+    )
+    def test_get_logs_of_removed_task(self, request_url, expected_filename, 
extra_query_string, try_number):
+        expected_filename = expected_filename.replace("LOG_DIR", 
str(self.log_dir))
+
+        # Recreate DAG without tasks
+        dagbag = self.app.state.dag_bag
+        dag = DAG(self.DAG_ID, schedule=None, 
start_date=timezone.parse(self.default_time))
+        del dagbag.dags[self.DAG_ID]
+        dagbag.bag_dag(dag=dag)
+
+        key = self.app.state.secret_key
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": True})
+
+        response = self.client.get(
+            request_url,
+            params={"token": token, **extra_query_string},
+            headers={"Accept": "text/plain"},
+            # environ_overrides={"REMOTE_USER": "test"},
+        )
+
+        assert 200 == response.status_code
+
+        log_content = "Log for testing." if try_number == 1 else "Log for 
testing 2."
+        assert "localhost\n" in response.content.decode("utf-8")
+        assert f"*** Found local files:\n***   * {expected_filename}\n" in 
response.content.decode("utf-8")
+        assert f"{log_content}\n" in response.content.decode("utf-8")
+
+    @pytest.mark.parametrize("try_number", [1, 2])
+    def test_get_logs_response_with_ti_equal_to_none(self, try_number):
+        key = self.app.state.secret_key
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": True})
+
+        response = self.client.get(
+            
f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/Invalid-Task-ID/logs/{try_number}",
+            params={"token": token},
+            # environ_overrides={"REMOTE_USER": "test"},
+        )
+        print("response.json(): ", response.json())

Review Comment:
   ```suggestion
   ```



##########
tests/api_fastapi/core_api/routes/public/test_task_instances.py:
##########
@@ -1671,3 +1683,354 @@ def test_raises_404_for_nonexistent_task_instance(self, 
test_client, session):
         assert response.json() == {
             "detail": "The Task Instance with dag_id: 
`example_python_operator`, run_id: `TEST_DAG_RUN_ID`, task_id: 
`nonexistent_task`, try_number: `0` and map_index: `-1` was not found"
         }
+
+
+class TestTaskInstancesLog:
+    DAG_ID = "dag_for_testing_log_endpoint"
+    RUN_ID = "dag_run_id_for_testing_log_endpoint"
+    TASK_ID = "task_for_testing_log_endpoint"
+    MAPPED_TASK_ID = "mapped_task_for_testing_log_endpoint"
+    TRY_NUMBER = 1
+
+    default_time = "2020-06-10T20:00:00+00:00"
+
+    @pytest.fixture(autouse=True)
+    def setup_attrs(self, configure_loggers, dag_maker, session) -> None:
+        self.app = create_app()
+        self.client = TestClient(self.app)
+        # Make sure that the configure_logging is not cached
+        self.old_modules = dict(sys.modules)
+
+        with dag_maker(self.DAG_ID, 
start_date=timezone.parse(self.default_time), session=session) as dag:
+            EmptyOperator(task_id=self.TASK_ID)
+
+            @task(task_id=self.MAPPED_TASK_ID)
+            def add_one(x: int):
+                return x + 1
+
+            add_one.expand(x=[1, 2, 3])
+
+        dr = dag_maker.create_dagrun(
+            run_id=self.RUN_ID,
+            run_type=DagRunType.SCHEDULED,
+            logical_date=timezone.parse(self.default_time),
+            start_date=timezone.parse(self.default_time),
+        )
+
+        self.app.state.dag_bag.bag_dag(dag)
+
+        # Add dummy dag for checking picking correct log with same task_id and 
different dag_id case.
+        with dag_maker(
+            f"{self.DAG_ID}_copy", 
start_date=timezone.parse(self.default_time), session=session
+        ) as dummy_dag:
+            EmptyOperator(task_id=self.TASK_ID)
+        dr2 = dag_maker.create_dagrun(
+            run_id=self.RUN_ID,
+            run_type=DagRunType.SCHEDULED,
+            logical_date=timezone.parse(self.default_time),
+            start_date=timezone.parse(self.default_time),
+        )
+        self.app.state.dag_bag.bag_dag(dummy_dag)
+
+        for ti in dr.task_instances:
+            ti.try_number = 1
+            ti.hostname = "localhost"
+            session.merge(ti)
+        for ti in dr2.task_instances:
+            ti.try_number = 1
+            ti.hostname = "localhost"
+            session.merge(ti)
+        session.flush()
+        dag.clear()
+        dummy_dag.clear()
+        for ti in dr.task_instances:
+            ti.try_number = 2
+            ti.hostname = "localhost"
+            session.merge(ti)
+        for ti in dr2.task_instances:
+            ti.try_number = 2
+            ti.hostname = "localhost"
+            session.merge(ti)
+        session.flush()
+
+    @pytest.fixture
+    def configure_loggers(self, tmp_path, create_log_template):
+        self.log_dir = tmp_path
+
+        # TASK_ID
+        dir_path = tmp_path / f"dag_id={self.DAG_ID}" / 
f"run_id={self.RUN_ID}" / f"task_id={self.TASK_ID}"
+        dir_path.mkdir(parents=True)
+
+        log = dir_path / "attempt=1.log"
+        log.write_text("Log for testing.")
+
+        # try number 2
+        log = dir_path / "attempt=2.log"
+        log.write_text("Log for testing 2.")
+
+        # MAPPED_TASK_ID
+        for map_index in range(3):
+            dir_path = (
+                tmp_path
+                / f"dag_id={self.DAG_ID}"
+                / f"run_id={self.RUN_ID}"
+                / f"task_id={self.MAPPED_TASK_ID}"
+                / f"map_index={map_index}"
+            )
+
+            dir_path.mkdir(parents=True)
+
+            log = dir_path / "attempt=1.log"
+            log.write_text("Log for testing.")
+
+            # try number 2
+            log = dir_path / "attempt=2.log"
+            log.write_text("Log for testing 2.")
+
+        # Create a custom logging configuration
+        logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
+        logging_config["handlers"]["task"]["base_log_folder"] = self.log_dir
+
+        logging.config.dictConfig(logging_config)
+
+        yield
+
+        logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
+
+    def teardown_method(self):
+        clear_db_runs()
+
+    @pytest.mark.parametrize("try_number", [1, 2])
+    def test_should_respond_200_json(self, try_number):
+        key = self.app.state.secret_key
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": False})
+        response = self.client.get(
+            
f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}/logs/{try_number}",
+            params={"token": token},
+            headers={"Accept": "application/json"},
+            # environ_overrides={"REMOTE_USER": "test"},
+        )
+        expected_filename = 
f"{self.log_dir}/dag_id={self.DAG_ID}/run_id={self.RUN_ID}/task_id={self.TASK_ID}/attempt={try_number}.log"
+        log_content = "Log for testing." if try_number == 1 else "Log for 
testing 2."
+        print("\n\n\n response.json()", response.json())
+        assert "[('localhost'," in response.json()["content"]
+        assert f"*** Found local files:\\n***   * {expected_filename}\\n" in 
response.json()["content"]
+        assert f"{log_content}')]" in response.json()["content"]
+
+        info = serializer.loads(response.json()["continuation_token"])
+        assert info == {"end_of_log": True, "log_pos": 16 if try_number == 1 
else 18}
+        assert 200 == response.status_code
+
+    @pytest.mark.parametrize(
+        "request_url, expected_filename, extra_query_string, try_number",
+        [
+            (
+                
f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}/logs/1",
+                
f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={TASK_ID}/attempt=1.log",
+                {},
+                1,
+            ),
+            (
+                
f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{MAPPED_TASK_ID}/logs/1",
+                
f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={MAPPED_TASK_ID}/map_index=0/attempt=1.log",
+                {"map_index": 0},
+                1,
+            ),
+            # try_number 2
+            (
+                
f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}/logs/2",
+                
f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={TASK_ID}/attempt=2.log",
+                {},
+                2,
+            ),
+            (
+                
f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{MAPPED_TASK_ID}/logs/2",
+                
f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={MAPPED_TASK_ID}/map_index=0/attempt=2.log",
+                {"map_index": 0},
+                2,
+            ),
+        ],
+    )
+    def test_should_respond_200_text_plain(
+        self, request_url, expected_filename, extra_query_string, try_number
+    ):
+        expected_filename = expected_filename.replace("LOG_DIR", 
str(self.log_dir))
+
+        key = self.app.state.secret_key
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": True})
+
+        response = self.client.get(
+            request_url,
+            params={"token": token, **extra_query_string},
+            headers={"Accept": "text/plain"},
+            # environ_overrides={"REMOTE_USER": "test"},
+        )
+        assert 200 == response.status_code
+
+        log_content = "Log for testing." if try_number == 1 else "Log for 
testing 2."
+        assert "localhost\n" in response.content.decode("utf-8")
+        assert f"*** Found local files:\n***   * {expected_filename}\n" in 
response.content.decode("utf-8")
+        assert f"{log_content}\n" in response.content.decode("utf-8")
+
+    @pytest.mark.parametrize(
+        "request_url, expected_filename, extra_query_string, try_number",
+        [
+            (
+                
f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}/logs/1",
+                
f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={TASK_ID}/attempt=1.log",
+                {},
+                1,
+            ),
+            (
+                
f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{MAPPED_TASK_ID}/logs/1",
+                
f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={MAPPED_TASK_ID}/map_index=0/attempt=1.log",
+                {"map_index": 0},
+                1,
+            ),
+            (
+                
f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}/logs/2",
+                
f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={TASK_ID}/attempt=2.log",
+                {},
+                2,
+            ),
+            (
+                
f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{MAPPED_TASK_ID}/logs/2",
+                
f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={MAPPED_TASK_ID}/map_index=0/attempt=2.log",
+                {"map_index": 0},
+                2,
+            ),
+        ],
+    )
+    def test_get_logs_of_removed_task(self, request_url, expected_filename, 
extra_query_string, try_number):
+        expected_filename = expected_filename.replace("LOG_DIR", 
str(self.log_dir))
+
+        # Recreate DAG without tasks
+        dagbag = self.app.state.dag_bag
+        dag = DAG(self.DAG_ID, schedule=None, 
start_date=timezone.parse(self.default_time))
+        del dagbag.dags[self.DAG_ID]
+        dagbag.bag_dag(dag=dag)
+
+        key = self.app.state.secret_key
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": True})
+
+        response = self.client.get(
+            request_url,
+            params={"token": token, **extra_query_string},
+            headers={"Accept": "text/plain"},
+            # environ_overrides={"REMOTE_USER": "test"},
+        )
+
+        assert 200 == response.status_code
+
+        log_content = "Log for testing." if try_number == 1 else "Log for 
testing 2."
+        assert "localhost\n" in response.content.decode("utf-8")
+        assert f"*** Found local files:\n***   * {expected_filename}\n" in 
response.content.decode("utf-8")
+        assert f"{log_content}\n" in response.content.decode("utf-8")
+
+    @pytest.mark.parametrize("try_number", [1, 2])
+    def test_get_logs_response_with_ti_equal_to_none(self, try_number):
+        key = self.app.state.secret_key
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": True})
+
+        response = self.client.get(
+            
f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/Invalid-Task-ID/logs/{try_number}",
+            params={"token": token},
+            # environ_overrides={"REMOTE_USER": "test"},

Review Comment:
   to remove and others



##########
tests/api_fastapi/core_api/routes/public/test_task_instances.py:
##########
@@ -1671,3 +1683,354 @@ def test_raises_404_for_nonexistent_task_instance(self, 
test_client, session):
         assert response.json() == {
             "detail": "The Task Instance with dag_id: 
`example_python_operator`, run_id: `TEST_DAG_RUN_ID`, task_id: 
`nonexistent_task`, try_number: `0` and map_index: `-1` was not found"
         }
+
+
+class TestTaskInstancesLog:
+    DAG_ID = "dag_for_testing_log_endpoint"
+    RUN_ID = "dag_run_id_for_testing_log_endpoint"
+    TASK_ID = "task_for_testing_log_endpoint"
+    MAPPED_TASK_ID = "mapped_task_for_testing_log_endpoint"
+    TRY_NUMBER = 1
+
+    default_time = "2020-06-10T20:00:00+00:00"
+
+    @pytest.fixture(autouse=True)
+    def setup_attrs(self, configure_loggers, dag_maker, session) -> None:
+        self.app = create_app()
+        self.client = TestClient(self.app)

Review Comment:
   ```suggestion
   ```
   There is already a `test_client` fixture you can directly use in the tests.



##########
tests/api_fastapi/core_api/routes/public/test_task_instances.py:
##########
@@ -1671,3 +1683,354 @@ def test_raises_404_for_nonexistent_task_instance(self, 
test_client, session):
         assert response.json() == {
             "detail": "The Task Instance with dag_id: 
`example_python_operator`, run_id: `TEST_DAG_RUN_ID`, task_id: 
`nonexistent_task`, try_number: `0` and map_index: `-1` was not found"
         }
+
+
+class TestTaskInstancesLog:
+    DAG_ID = "dag_for_testing_log_endpoint"
+    RUN_ID = "dag_run_id_for_testing_log_endpoint"
+    TASK_ID = "task_for_testing_log_endpoint"
+    MAPPED_TASK_ID = "mapped_task_for_testing_log_endpoint"
+    TRY_NUMBER = 1
+
+    default_time = "2020-06-10T20:00:00+00:00"
+
+    @pytest.fixture(autouse=True)
+    def setup_attrs(self, configure_loggers, dag_maker, session) -> None:
+        self.app = create_app()
+        self.client = TestClient(self.app)
+        # Make sure that the configure_logging is not cached
+        self.old_modules = dict(sys.modules)
+
+        with dag_maker(self.DAG_ID, 
start_date=timezone.parse(self.default_time), session=session) as dag:
+            EmptyOperator(task_id=self.TASK_ID)
+
+            @task(task_id=self.MAPPED_TASK_ID)
+            def add_one(x: int):
+                return x + 1
+
+            add_one.expand(x=[1, 2, 3])
+
+        dr = dag_maker.create_dagrun(
+            run_id=self.RUN_ID,
+            run_type=DagRunType.SCHEDULED,
+            logical_date=timezone.parse(self.default_time),
+            start_date=timezone.parse(self.default_time),
+        )
+
+        self.app.state.dag_bag.bag_dag(dag)
+
+        # Add dummy dag for checking picking correct log with same task_id and 
different dag_id case.
+        with dag_maker(
+            f"{self.DAG_ID}_copy", 
start_date=timezone.parse(self.default_time), session=session
+        ) as dummy_dag:
+            EmptyOperator(task_id=self.TASK_ID)
+        dr2 = dag_maker.create_dagrun(
+            run_id=self.RUN_ID,
+            run_type=DagRunType.SCHEDULED,
+            logical_date=timezone.parse(self.default_time),
+            start_date=timezone.parse(self.default_time),
+        )
+        self.app.state.dag_bag.bag_dag(dummy_dag)
+
+        for ti in dr.task_instances:
+            ti.try_number = 1
+            ti.hostname = "localhost"
+            session.merge(ti)
+        for ti in dr2.task_instances:
+            ti.try_number = 1
+            ti.hostname = "localhost"
+            session.merge(ti)
+        session.flush()
+        dag.clear()
+        dummy_dag.clear()
+        for ti in dr.task_instances:
+            ti.try_number = 2
+            ti.hostname = "localhost"
+            session.merge(ti)
+        for ti in dr2.task_instances:
+            ti.try_number = 2
+            ti.hostname = "localhost"
+            session.merge(ti)
+        session.flush()
+
+    @pytest.fixture
+    def configure_loggers(self, tmp_path, create_log_template):
+        self.log_dir = tmp_path
+
+        # TASK_ID
+        dir_path = tmp_path / f"dag_id={self.DAG_ID}" / 
f"run_id={self.RUN_ID}" / f"task_id={self.TASK_ID}"
+        dir_path.mkdir(parents=True)
+
+        log = dir_path / "attempt=1.log"
+        log.write_text("Log for testing.")
+
+        # try number 2
+        log = dir_path / "attempt=2.log"
+        log.write_text("Log for testing 2.")
+
+        # MAPPED_TASK_ID
+        for map_index in range(3):
+            dir_path = (
+                tmp_path
+                / f"dag_id={self.DAG_ID}"
+                / f"run_id={self.RUN_ID}"
+                / f"task_id={self.MAPPED_TASK_ID}"
+                / f"map_index={map_index}"
+            )
+
+            dir_path.mkdir(parents=True)
+
+            log = dir_path / "attempt=1.log"
+            log.write_text("Log for testing.")
+
+            # try number 2
+            log = dir_path / "attempt=2.log"
+            log.write_text("Log for testing 2.")
+
+        # Create a custom logging configuration
+        logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
+        logging_config["handlers"]["task"]["base_log_folder"] = self.log_dir
+
+        logging.config.dictConfig(logging_config)
+
+        yield
+
+        logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
+
+    def teardown_method(self):
+        clear_db_runs()
+
+    @pytest.mark.parametrize("try_number", [1, 2])
+    def test_should_respond_200_json(self, try_number):
+        key = self.app.state.secret_key
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": False})
+        response = self.client.get(
+            
f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}/logs/{try_number}",
+            params={"token": token},
+            headers={"Accept": "application/json"},
+            # environ_overrides={"REMOTE_USER": "test"},
+        )
+        expected_filename = 
f"{self.log_dir}/dag_id={self.DAG_ID}/run_id={self.RUN_ID}/task_id={self.TASK_ID}/attempt={try_number}.log"
+        log_content = "Log for testing." if try_number == 1 else "Log for 
testing 2."
+        print("\n\n\n response.json()", response.json())

Review Comment:
   ```suggestion
   ```



##########
tests/api_fastapi/core_api/routes/public/test_task_instances.py:
##########
@@ -1671,3 +1683,354 @@ def test_raises_404_for_nonexistent_task_instance(self, 
test_client, session):
         assert response.json() == {
             "detail": "The Task Instance with dag_id: 
`example_python_operator`, run_id: `TEST_DAG_RUN_ID`, task_id: 
`nonexistent_task`, try_number: `0` and map_index: `-1` was not found"
         }
+
+
+class TestTaskInstancesLog:
+    DAG_ID = "dag_for_testing_log_endpoint"
+    RUN_ID = "dag_run_id_for_testing_log_endpoint"
+    TASK_ID = "task_for_testing_log_endpoint"
+    MAPPED_TASK_ID = "mapped_task_for_testing_log_endpoint"
+    TRY_NUMBER = 1
+
+    default_time = "2020-06-10T20:00:00+00:00"
+
+    @pytest.fixture(autouse=True)
+    def setup_attrs(self, configure_loggers, dag_maker, session) -> None:
+        self.app = create_app()
+        self.client = TestClient(self.app)
+        # Make sure that the configure_logging is not cached
+        self.old_modules = dict(sys.modules)
+
+        with dag_maker(self.DAG_ID, 
start_date=timezone.parse(self.default_time), session=session) as dag:
+            EmptyOperator(task_id=self.TASK_ID)
+
+            @task(task_id=self.MAPPED_TASK_ID)
+            def add_one(x: int):
+                return x + 1
+
+            add_one.expand(x=[1, 2, 3])
+
+        dr = dag_maker.create_dagrun(
+            run_id=self.RUN_ID,
+            run_type=DagRunType.SCHEDULED,
+            logical_date=timezone.parse(self.default_time),
+            start_date=timezone.parse(self.default_time),
+        )
+
+        self.app.state.dag_bag.bag_dag(dag)
+
+        # Add dummy dag for checking picking correct log with same task_id and 
different dag_id case.
+        with dag_maker(
+            f"{self.DAG_ID}_copy", 
start_date=timezone.parse(self.default_time), session=session
+        ) as dummy_dag:
+            EmptyOperator(task_id=self.TASK_ID)
+        dr2 = dag_maker.create_dagrun(
+            run_id=self.RUN_ID,
+            run_type=DagRunType.SCHEDULED,
+            logical_date=timezone.parse(self.default_time),
+            start_date=timezone.parse(self.default_time),
+        )
+        self.app.state.dag_bag.bag_dag(dummy_dag)
+
+        for ti in dr.task_instances:
+            ti.try_number = 1
+            ti.hostname = "localhost"
+            session.merge(ti)
+        for ti in dr2.task_instances:
+            ti.try_number = 1
+            ti.hostname = "localhost"
+            session.merge(ti)
+        session.flush()
+        dag.clear()
+        dummy_dag.clear()
+        for ti in dr.task_instances:
+            ti.try_number = 2
+            ti.hostname = "localhost"
+            session.merge(ti)
+        for ti in dr2.task_instances:
+            ti.try_number = 2
+            ti.hostname = "localhost"
+            session.merge(ti)
+        session.flush()
+
+    @pytest.fixture
+    def configure_loggers(self, tmp_path, create_log_template):
+        self.log_dir = tmp_path
+
+        # TASK_ID
+        dir_path = tmp_path / f"dag_id={self.DAG_ID}" / 
f"run_id={self.RUN_ID}" / f"task_id={self.TASK_ID}"
+        dir_path.mkdir(parents=True)
+
+        log = dir_path / "attempt=1.log"
+        log.write_text("Log for testing.")
+
+        # try number 2
+        log = dir_path / "attempt=2.log"
+        log.write_text("Log for testing 2.")
+
+        # MAPPED_TASK_ID
+        for map_index in range(3):
+            dir_path = (
+                tmp_path
+                / f"dag_id={self.DAG_ID}"
+                / f"run_id={self.RUN_ID}"
+                / f"task_id={self.MAPPED_TASK_ID}"
+                / f"map_index={map_index}"
+            )
+
+            dir_path.mkdir(parents=True)
+
+            log = dir_path / "attempt=1.log"
+            log.write_text("Log for testing.")
+
+            # try number 2
+            log = dir_path / "attempt=2.log"
+            log.write_text("Log for testing 2.")
+
+        # Create a custom logging configuration
+        logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
+        logging_config["handlers"]["task"]["base_log_folder"] = self.log_dir
+
+        logging.config.dictConfig(logging_config)
+
+        yield
+
+        logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
+
+    def teardown_method(self):
+        clear_db_runs()
+
+    @pytest.mark.parametrize("try_number", [1, 2])
+    def test_should_respond_200_json(self, try_number):
+        key = self.app.state.secret_key
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": False})
+        response = self.client.get(
+            
f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}/logs/{try_number}",
+            params={"token": token},
+            headers={"Accept": "application/json"},
+            # environ_overrides={"REMOTE_USER": "test"},

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to