This is an automated email from the ASF dual-hosted git repository. jbonofre pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git
commit fac89af6108d5d426ed415a997eab88d5ada761a Author: zionrubin <[email protected]> AuthorDate: Tue Jun 23 15:16:55 2020 +0300 fix jobEndStatus tasks state check --- .../airflow/operators/job_status_operator.py | 38 +++++++++++++++++++--- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/rainbow/runners/airflow/operators/job_status_operator.py b/rainbow/runners/airflow/operators/job_status_operator.py index ae9382a..8ea997d 100644 --- a/rainbow/runners/airflow/operators/job_status_operator.py +++ b/rainbow/runners/airflow/operators/job_status_operator.py @@ -17,12 +17,16 @@ # specific language governing permissions and limitations # under the License. from datetime import datetime +from typing import Any import pytz from airflow.contrib.hooks.aws_hook import AwsHook from airflow.exceptions import AirflowException +from airflow.lineage import apply_lineage from airflow.models import BaseOperator +from airflow.utils.db import provide_session from airflow.utils.decorators import apply_defaults +from airflow.utils.state import State class JobStatusOperator(BaseOperator): @@ -94,6 +98,7 @@ class JobEndOperator(JobStatusOperator): super().__init__(backends=backends, *args, **kwargs) self.namespace = namespace self.application_name = application_name + self.__job_result = 0 def metrics(self, context): duration = round((pytz.utc.localize(datetime.utcnow()) - context[ @@ -102,19 +107,40 @@ class JobEndOperator(JobStatusOperator): self.log.info('Elapsed time: %s' % duration) task_instances = context['dag_run'].get_task_instances() - task_states = [task_instance.state for task_instance in task_instances[:-1]] - job_result = 0 - if all(state == 'success' for state in task_states): - job_result = 1 + task_states = [self.__log_and_get_state(task_instance) + for task_instance in task_instances + if task_instance.task_id != context['task_instance'].task_id] + + if all((state == State.SUCCESS or state == State.SKIPPED) for state in task_states): + self.__job_result = 1 return [ - Metric(self.namespace, 'JobResult', job_result, + Metric(self.namespace, 'JobResult', self.__job_result, [Tag('ApplicationName', self.application_name)]), Metric(self.namespace, 'JobDuration', duration, [Tag('ApplicationName', self.application_name)]) ] + def __log_and_get_state(self, task_instance): + state = task_instance.state + + self.log.info(f'{task_instance.task_id} finished with state: {state}') + + return state + + @apply_lineage + @provide_session + def post_execute(self, context: Any, result: Any = None, session=None): + if self.__job_result == 0: + self.log.info("Failing this DAG run due to task failure.") + + dag_run = context['ti'].get_dagrun() + dag_run.end_date = datetime.utcnow() + dag_run.state = State.FAILED + + session.merge(dag_run) + # noinspection PyAbstractClass class CloudWatchHook(AwsHook): @@ -150,6 +176,8 @@ class CloudWatchHook(AwsHook): ] ) + self.log.info(f'Published metric: {metric.name} with value: {value}') + class Metric: """
