utkarsharma2 commented on code in PR #44238:
URL: https://github.com/apache/airflow/pull/44238#discussion_r1851928603
##########
tests/api_fastapi/core_api/routes/public/test_task_instances.py:
##########
@@ -1671,3 +1683,354 @@ def test_raises_404_for_nonexistent_task_instance(self,
test_client, session):
assert response.json() == {
"detail": "The Task Instance with dag_id:
`example_python_operator`, run_id: `TEST_DAG_RUN_ID`, task_id:
`nonexistent_task`, try_number: `0` and map_index: `-1` was not found"
}
+
+
+class TestTaskInstancesLog:
+ DAG_ID = "dag_for_testing_log_endpoint"
+ RUN_ID = "dag_run_id_for_testing_log_endpoint"
+ TASK_ID = "task_for_testing_log_endpoint"
+ MAPPED_TASK_ID = "mapped_task_for_testing_log_endpoint"
+ TRY_NUMBER = 1
+
+ default_time = "2020-06-10T20:00:00+00:00"
+
+ @pytest.fixture(autouse=True)
+ def setup_attrs(self, configure_loggers, dag_maker, session) -> None:
+ self.app = create_app()
+ self.client = TestClient(self.app)
+ # Make sure that the configure_logging is not cached
+ self.old_modules = dict(sys.modules)
+
+ with dag_maker(self.DAG_ID,
start_date=timezone.parse(self.default_time), session=session) as dag:
+ EmptyOperator(task_id=self.TASK_ID)
+
+ @task(task_id=self.MAPPED_TASK_ID)
+ def add_one(x: int):
+ return x + 1
+
+ add_one.expand(x=[1, 2, 3])
+
+ dr = dag_maker.create_dagrun(
+ run_id=self.RUN_ID,
+ run_type=DagRunType.SCHEDULED,
+ logical_date=timezone.parse(self.default_time),
+ start_date=timezone.parse(self.default_time),
+ )
+
+ self.app.state.dag_bag.bag_dag(dag)
+
+ # Add dummy dag for checking picking correct log with same task_id and
different dag_id case.
+ with dag_maker(
+ f"{self.DAG_ID}_copy",
start_date=timezone.parse(self.default_time), session=session
+ ) as dummy_dag:
+ EmptyOperator(task_id=self.TASK_ID)
+ dr2 = dag_maker.create_dagrun(
+ run_id=self.RUN_ID,
+ run_type=DagRunType.SCHEDULED,
+ logical_date=timezone.parse(self.default_time),
+ start_date=timezone.parse(self.default_time),
+ )
+ self.app.state.dag_bag.bag_dag(dummy_dag)
+
+ for ti in dr.task_instances:
+ ti.try_number = 1
+ ti.hostname = "localhost"
+ session.merge(ti)
+ for ti in dr2.task_instances:
+ ti.try_number = 1
+ ti.hostname = "localhost"
+ session.merge(ti)
+ session.flush()
+ dag.clear()
+ dummy_dag.clear()
+ for ti in dr.task_instances:
+ ti.try_number = 2
+ ti.hostname = "localhost"
+ session.merge(ti)
+ for ti in dr2.task_instances:
+ ti.try_number = 2
+ ti.hostname = "localhost"
+ session.merge(ti)
+ session.flush()
+
+ @pytest.fixture
+ def configure_loggers(self, tmp_path, create_log_template):
+ self.log_dir = tmp_path
+
+ # TASK_ID
+ dir_path = tmp_path / f"dag_id={self.DAG_ID}" /
f"run_id={self.RUN_ID}" / f"task_id={self.TASK_ID}"
+ dir_path.mkdir(parents=True)
+
+ log = dir_path / "attempt=1.log"
+ log.write_text("Log for testing.")
+
+ # try number 2
+ log = dir_path / "attempt=2.log"
+ log.write_text("Log for testing 2.")
+
+ # MAPPED_TASK_ID
+ for map_index in range(3):
+ dir_path = (
+ tmp_path
+ / f"dag_id={self.DAG_ID}"
+ / f"run_id={self.RUN_ID}"
+ / f"task_id={self.MAPPED_TASK_ID}"
+ / f"map_index={map_index}"
+ )
+
+ dir_path.mkdir(parents=True)
+
+ log = dir_path / "attempt=1.log"
+ log.write_text("Log for testing.")
+
+ # try number 2
+ log = dir_path / "attempt=2.log"
+ log.write_text("Log for testing 2.")
+
+ # Create a custom logging configuration
+ logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
+ logging_config["handlers"]["task"]["base_log_folder"] = self.log_dir
+
+ logging.config.dictConfig(logging_config)
+
+ yield
+
+ logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
+
+ def teardown_method(self):
+ clear_db_runs()
+
+ @pytest.mark.parametrize("try_number", [1, 2])
+ def test_should_respond_200_json(self, try_number):
+ key = self.app.state.secret_key
+ serializer = URLSafeSerializer(key)
+ token = serializer.dumps({"download_logs": False})
+ response = self.client.get(
+
f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}/logs/{try_number}",
+ params={"token": token},
+ headers={"Accept": "application/json"},
+ # environ_overrides={"REMOTE_USER": "test"},
+ )
+ expected_filename =
f"{self.log_dir}/dag_id={self.DAG_ID}/run_id={self.RUN_ID}/task_id={self.TASK_ID}/attempt={try_number}.log"
+ log_content = "Log for testing." if try_number == 1 else "Log for
testing 2."
+ print("\n\n\n response.json()", response.json())
+ assert "[('localhost'," in response.json()["content"]
+ assert f"*** Found local files:\\n*** * {expected_filename}\\n" in
response.json()["content"]
+ assert f"{log_content}')]" in response.json()["content"]
+
+ info = serializer.loads(response.json()["continuation_token"])
+ assert info == {"end_of_log": True, "log_pos": 16 if try_number == 1
else 18}
+ assert 200 == response.status_code
+
+ @pytest.mark.parametrize(
+ "request_url, expected_filename, extra_query_string, try_number",
+ [
+ (
+
f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}/logs/1",
+
f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={TASK_ID}/attempt=1.log",
+ {},
+ 1,
+ ),
+ (
+
f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{MAPPED_TASK_ID}/logs/1",
+
f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={MAPPED_TASK_ID}/map_index=0/attempt=1.log",
+ {"map_index": 0},
+ 1,
+ ),
+ # try_number 2
+ (
+
f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}/logs/2",
+
f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={TASK_ID}/attempt=2.log",
+ {},
+ 2,
+ ),
+ (
+
f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{MAPPED_TASK_ID}/logs/2",
+
f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={MAPPED_TASK_ID}/map_index=0/attempt=2.log",
+ {"map_index": 0},
+ 2,
+ ),
+ ],
+ )
+ def test_should_respond_200_text_plain(
+ self, request_url, expected_filename, extra_query_string, try_number
+ ):
+ expected_filename = expected_filename.replace("LOG_DIR",
str(self.log_dir))
+
+ key = self.app.state.secret_key
+ serializer = URLSafeSerializer(key)
+ token = serializer.dumps({"download_logs": True})
+
+ response = self.client.get(
+ request_url,
+ params={"token": token, **extra_query_string},
+ headers={"Accept": "text/plain"},
+ # environ_overrides={"REMOTE_USER": "test"},
+ )
+ assert 200 == response.status_code
+
+ log_content = "Log for testing." if try_number == 1 else "Log for
testing 2."
+ assert "localhost\n" in response.content.decode("utf-8")
+ assert f"*** Found local files:\n*** * {expected_filename}\n" in
response.content.decode("utf-8")
+ assert f"{log_content}\n" in response.content.decode("utf-8")
+
+ @pytest.mark.parametrize(
+ "request_url, expected_filename, extra_query_string, try_number",
+ [
+ (
+
f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}/logs/1",
+
f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={TASK_ID}/attempt=1.log",
+ {},
+ 1,
+ ),
+ (
+
f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{MAPPED_TASK_ID}/logs/1",
+
f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={MAPPED_TASK_ID}/map_index=0/attempt=1.log",
+ {"map_index": 0},
+ 1,
+ ),
+ (
+
f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}/logs/2",
+
f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={TASK_ID}/attempt=2.log",
+ {},
+ 2,
+ ),
+ (
+
f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{MAPPED_TASK_ID}/logs/2",
+
f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={MAPPED_TASK_ID}/map_index=0/attempt=2.log",
+ {"map_index": 0},
+ 2,
+ ),
+ ],
+ )
+ def test_get_logs_of_removed_task(self, request_url, expected_filename,
extra_query_string, try_number):
+ expected_filename = expected_filename.replace("LOG_DIR",
str(self.log_dir))
+
+ # Recreate DAG without tasks
+ dagbag = self.app.state.dag_bag
+ dag = DAG(self.DAG_ID, schedule=None,
start_date=timezone.parse(self.default_time))
+ del dagbag.dags[self.DAG_ID]
+ dagbag.bag_dag(dag=dag)
+
+ key = self.app.state.secret_key
+ serializer = URLSafeSerializer(key)
+ token = serializer.dumps({"download_logs": True})
+
+ response = self.client.get(
+ request_url,
+ params={"token": token, **extra_query_string},
+ headers={"Accept": "text/plain"},
+ # environ_overrides={"REMOTE_USER": "test"},
+ )
+
+ assert 200 == response.status_code
+
+ log_content = "Log for testing." if try_number == 1 else "Log for
testing 2."
+ assert "localhost\n" in response.content.decode("utf-8")
+ assert f"*** Found local files:\n*** * {expected_filename}\n" in
response.content.decode("utf-8")
+ assert f"{log_content}\n" in response.content.decode("utf-8")
+
+ @pytest.mark.parametrize("try_number", [1, 2])
+ def test_get_logs_response_with_ti_equal_to_none(self, try_number):
+ key = self.app.state.secret_key
+ serializer = URLSafeSerializer(key)
+ token = serializer.dumps({"download_logs": True})
+
+ response = self.client.get(
+
f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/Invalid-Task-ID/logs/{try_number}",
+ params={"token": token},
+ # environ_overrides={"REMOTE_USER": "test"},
Review Comment:
Removed the commented code, PTAL
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]