[
https://issues.apache.org/jira/browse/AIRFLOW-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ash Berlin-Taylor resolved AIRFLOW-3046.
----------------------------------------
Resolution: Fixed
> ECS Operator mistakenly reports success when task is killed due to EC2 host
> termination
> ---------------------------------------------------------------------------------------
>
> Key: AIRFLOW-3046
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3046
> Project: Apache Airflow
> Issue Type: Bug
> Components: contrib, operators
> Reporter: Dan MacTough
> Priority: Major
> Fix For: 1.10.1
>
>
> We have ECS clusters made up of EC2 spot fleets. Among other things, this
> means hosts can be terminated on short notice. When this happens, all tasks
> (and associated containers) get terminated, as well.
> We expect that when that happens for Airflow task instances using the ECS
> Operator, those instances will be marked as failures and retried.
> Instead, they are marked as successful.
> As a result, the immediate downstream task fails, causing the scheduled DAG
> run to fail.
> Here's an example of the Airflow log output when this happens:
> {noformat}
> [2018-09-12 01:02:02,712] {ecs_operator.py:112} INFO - ECS Task stopped,
> check status: {'tasks': [{'taskArn':
> 'arn:aws:ecs:us-east-1:111111111111:task/32d43a1d-fbc7-4659-815d-9133bde11cdc',
> 'clusterArn': 'arn:aws:ecs:us-east-1:111111111111:cluster/processing',
> 'taskDefinitionArn':
> 'arn:aws:ecs:us-east-1:111111111111:task-definition/foobar-testing_dataEngineering_rd:76',
> 'containerInstanceArn':
> 'arn:aws:ecs:us-east-1:111111111111:container-instance/7431f0a6-8fc5-4eff-8196-32f77d286a61',
> 'overrides': {'containerOverrides': [{'name': 'foobar-testing', 'command':
> ['./bin/generate-features.sh', '2018-09-11']}]}, 'lastStatus': 'STOPPED',
> 'desiredStatus': 'STOPPED', 'cpu': '4096', 'memory': '60000', 'containers':
> [{'containerArn':
> 'arn:aws:ecs:us-east-1:111111111111:container/0d5cc553-f894-4f9a-b17c-9f80f7ce8d0a',
> 'taskArn':
> 'arn:aws:ecs:us-east-1:111111111111:task/32d43a1d-fbc7-4659-815d-9133bde11cdc',
> 'name': 'foobar-testing', 'lastStatus': 'RUNNING', 'networkBindings': [],
> 'networkInterfaces': [], 'healthStatus': 'UNKNOWN'}], 'startedBy': 'Airflow',
> 'version': 3, 'stoppedReason': 'Host EC2 (instance i-02cf23bbd5ae26194)
> terminated.', 'connectivity': 'CONNECTED', 'connectivityAt':
> datetime.datetime(2018, 9, 12, 0, 6, 30, 245000, tzinfo=tzlocal()),
> 'pullStartedAt': datetime.datetime(2018, 9, 12, 0, 6, 32, 748000,
> tzinfo=tzlocal()), 'pullStoppedAt': datetime.datetime(2018, 9, 12, 0, 6, 59,
> 748000, tzinfo=tzlocal()), 'createdAt': datetime.datetime(2018, 9, 12, 0, 6,
> 30, 245000, tzinfo=tzlocal()), 'startedAt': datetime.datetime(2018, 9, 12, 0,
> 7, 0, 748000, tzinfo=tzlocal()), 'stoppingAt': datetime.datetime(2018, 9, 12,
> 1, 2, 0, 91000, tzinfo=tzlocal()), 'stoppedAt': datetime.datetime(2018, 9,
> 12, 1, 2, 0, 91000, tzinfo=tzlocal()), 'group':
> 'family:foobar-testing_dataEngineering_rd', 'launchType': 'EC2',
> 'attachments': [], 'healthStatus': 'UNKNOWN'}], 'failures': [],
> 'ResponseMetadata': {'RequestId': '758c791f-b627-11e8-83f7-2b76f4796ed2',
> 'HTTPStatusCode': 200, 'HTTPHeaders': {'server': 'Server', 'date': 'Wed, 12
> Sep 2018 01:02:02 GMT', 'content-type': 'application/x-amz-json-1.1',
> 'content-length': '1412', 'connection': 'keep-alive', 'x-amzn-requestid':
> '758c791f-b627-11e8-83f7-2b76f4796ed2'}, 'RetryAttempts': 0}}{noformat}
> I believe the function that checks whether the task is successful needs at
> least one more check.
> We are currently running a modified version of the ECS Operator that contains
> the following {{_check_success_task}} function to address this failure
> condition:
> {code}
> def _check_success_task(self):
> response = self.client.describe_tasks(
> cluster=self.cluster,
> tasks=[self.arn]
> )
> self.log.info('ECS Task stopped, check status: %s', response)
> if len(response.get('failures', [])) > 0:
> raise AirflowException(response)
> for task in response['tasks']:
> if 'terminated' in task.get('stoppedReason', '').lower():
> raise AirflowException('The task was stopped because the host
> instance terminated: {}'.format(
> task.get('stoppedReason', '')))
> containers = task['containers']
> for container in containers:
> if container.get('lastStatus') == 'STOPPED' and \
> container['exitCode'] != 0:
> raise AirflowException(
> 'This task is not in success state {}'.format(task))
> elif container.get('lastStatus') == 'PENDING':
> raise AirflowException(
> 'This task is still pending {}'.format(task))
> elif 'error' in container.get('reason', '').lower():
> raise AirflowException(
> 'This containers encounter an error during launching
> : {}'.
> format(container.get('reason', '').lower()))
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)