This is an automated email from the ASF dual-hosted git repository.
aviemzur pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git
The following commit(s) were added to refs/heads/master by this push:
new 02913bf add missing execute method to end_operator
02913bf is described below
commit 02913bf714837038cdf961b1bf1703ddcd2d37c0
Author: zionrubin <[email protected]>
AuthorDate: Thu Nov 5 14:35:54 2020 +0200
add missing execute method to end_operator
---
.../airflow/operators/job_status_operator.py | 35 ++++++++++++----------
1 file changed, 20 insertions(+), 15 deletions(-)
diff --git a/liminal/runners/airflow/operators/job_status_operator.py
b/liminal/runners/airflow/operators/job_status_operator.py
index 8ea997d..ca12524 100644
--- a/liminal/runners/airflow/operators/job_status_operator.py
+++ b/liminal/runners/airflow/operators/job_status_operator.py
@@ -100,34 +100,39 @@ class JobEndOperator(JobStatusOperator):
self.application_name = application_name
self.__job_result = 0
+ def execute(self, context):
+ self.__calculate_job_result(context)
+ super().execute(context)
+
def metrics(self, context):
- duration = round((pytz.utc.localize(datetime.utcnow()) - context[
- 'ti'].get_dagrun().start_date).total_seconds())
+ duration = round((pytz.utc.localize(datetime.utcnow()) -
context['ti'].get_dagrun().start_date).total_seconds())
self.log.info('Elapsed time: %s' % duration)
- task_instances = context['dag_run'].get_task_instances()
-
- task_states = [self.__log_and_get_state(task_instance)
- for task_instance in task_instances
- if task_instance.task_id !=
context['task_instance'].task_id]
-
- if all((state == State.SUCCESS or state == State.SKIPPED) for state in
task_states):
- self.__job_result = 1
+ self.log.info(f'dag final job result: {self.__job_result}')
return [
- Metric(self.namespace, 'JobResult', self.__job_result,
- [Tag('ApplicationName', self.application_name)]),
- Metric(self.namespace, 'JobDuration', duration,
- [Tag('ApplicationName', self.application_name)])
+ Metric(self.namespace, 'JobResult', self.__job_result,
[Tag('ApplicationName', self.application_name)]),
+ Metric(self.namespace, 'JobDuration', duration,
[Tag('ApplicationName', self.application_name)])
]
def __log_and_get_state(self, task_instance):
state = task_instance.state
- self.log.info(f'{task_instance.task_id} finished with state: {state}')
+ self.log.info(f'Task {task_instance.task_id} finished with state =
{state}')
return state
+
+ def __calculate_job_result(self, context):
+ self.log.info('scanning task instances states.. ')
+ task_instances = context['dag_run'].get_task_instances()
+ task_states = [self.__log_and_get_state(task_instance)
+ for task_instance in task_instances
+ if task_instance.task_id !=
context['task_instance'].task_id]
+
+ self.__job_result = 0
+ if all((state == State.SUCCESS or state == State.SKIPPED) for state in
task_states):
+ self.__job_result = 1
@apply_lineage
@provide_session