uranusjr commented on a change in pull request #17209:
URL: https://github.com/apache/airflow/pull/17209#discussion_r680306905
##########
File path: airflow/providers/amazon/aws/operators/ecs.py
##########
@@ -136,13 +136,17 @@ class ECSOperator(BaseOperator):
Only required if you want logs to be shown in the Airflow UI after
your job has
finished.
:type awslogs_stream_prefix: str
+ :param quota_retry: Config if and how to retry _start_task() for transient
errors.
+ :type quota_retry: dict
:param reattach: If set to True, will check if the task previously
launched by the task_instance
is already running. If so, the operator will attach to it instead of
starting a new task.
This is to avoid relaunching a new task when the connection drops
between Airflow and ECS while
the task is running (when the Airflow worker is restarted for example).
:type reattach: bool
- :param quota_retry: Config if and how to retry _start_task() for transient
errors.
- :type quota_retry: dict
+ :param number_logs_exception: number of lines from the last Cloudwatch
logs to return in the
+ AirflowException if an ECS task is stopped (to receive Airflow alerts
with the logs of what
+ failed in the code running in ECS)
Review comment:
```suggestion
:param number_logs_exception: Number of lines from the last Cloudwatch
logs to return in the
AirflowException if an ECS task is stopped (to receive Airflow
alerts with the logs of what
failed in the code running in ECS).
```
Nit, the description should be a complete sentence.
##########
File path: tests/providers/amazon/aws/operators/test_ecs.py
##########
@@ -228,11 +229,11 @@ def test_check_success_tasks_raises(self):
with pytest.raises(Exception) as ctx:
self.ecs._check_success_task()
- # Ordering of str(dict) is not guaranteed.
- assert "This task is not in success state " in str(ctx.value)
- assert "'name': 'foo'" in str(ctx.value)
- assert "'lastStatus': 'STOPPED'" in str(ctx.value)
- assert "'exitCode': 1" in str(ctx.value)
+ expected_last_logs = "\n".join(mock_last_log_messages.return_value)
+ assert (
+ str(ctx.value)
+ == f"This task is not in success state - last logs from
Cloudwatch:\n{expected_last_logs}"
+ )
Review comment:
```suggestion
assert str(ctx.value) == (
f"This task is not in success state - last logs from
Cloudwatch:\n1\n2\n3\n4\n5"
)
```
Test code is encouraged to duplicate things, so the assertion call is as
close to human perception as possible.
##########
File path: tests/providers/amazon/aws/operators/test_ecs.py
##########
@@ -415,17 +416,17 @@ def test_reattach_save_task_arn_xcom(
xcom_del_mock.assert_called_once()
assert self.ecs.arn ==
'arn:aws:ecs:us-east-1:012345678910:task/d8c67b3c-ac87-4ffe-a847-4785bc3a8b55'
- @mock.patch.object(ECSOperator, '_last_log_message', return_value="Log
output")
+ @mock.patch.object(ECSOperator, '_last_log_messages', return_value=["Log
output"])
def test_execute_xcom_with_log(self, mock_cloudwatch_log_message):
self.ecs.do_xcom_push = True
- assert self.ecs.execute(None) ==
mock_cloudwatch_log_message.return_value
+ assert self.ecs.execute(None) ==
mock_cloudwatch_log_message.return_value[-1]
Review comment:
```suggestion
assert self.ecs.execute(None) == "Log output"
```
Same idea.
##########
File path: airflow/providers/amazon/aws/operators/ecs.py
##########
@@ -378,7 +387,10 @@ def _check_success_task(self) -> None:
containers = task['containers']
for container in containers:
if container.get('lastStatus') == 'STOPPED' and
container['exitCode'] != 0:
- raise AirflowException(f'This task is not in success state
{task}')
+ last_logs =
"\n".join(self._last_log_messages(self.number_logs_exception))
+ raise AirflowException(
+ f"This task is not in success state - last logs from
Cloudwatch:\n{last_logs}"
+ )
Review comment:
How about including the count in the message?
```suggestion
raise AirflowException(
f"This task is not in success state - last
{self.number_logs_exception} "
f"logs from Cloudwatch:\n{last_logs}"
)
```
##########
File path: airflow/providers/amazon/aws/operators/ecs.py
##########
@@ -178,6 +182,7 @@ def __init__(
propagate_tags: Optional[str] = None,
quota_retry: Optional[dict] = None,
reattach: bool = False,
+ number_logs_exception=10,
Review comment:
```suggestion
number_logs_exception: int = 10,
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]