This is an automated email from the ASF dual-hosted git repository.

aviemzur pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git


The following commit(s) were added to refs/heads/master by this push:
     new 02913bf  add missing execute method to end_operator
02913bf is described below

commit 02913bf714837038cdf961b1bf1703ddcd2d37c0
Author: zionrubin <[email protected]>
AuthorDate: Thu Nov 5 14:35:54 2020 +0200

    add missing execute method to end_operator
---
 .../airflow/operators/job_status_operator.py       | 35 ++++++++++++----------
 1 file changed, 20 insertions(+), 15 deletions(-)

diff --git a/liminal/runners/airflow/operators/job_status_operator.py 
b/liminal/runners/airflow/operators/job_status_operator.py
index 8ea997d..ca12524 100644
--- a/liminal/runners/airflow/operators/job_status_operator.py
+++ b/liminal/runners/airflow/operators/job_status_operator.py
@@ -100,34 +100,39 @@ class JobEndOperator(JobStatusOperator):
         self.application_name = application_name
         self.__job_result = 0
 
+    def execute(self, context):
+        self.__calculate_job_result(context)
+        super().execute(context)
+        
     def metrics(self, context):
-        duration = round((pytz.utc.localize(datetime.utcnow()) - context[
-            'ti'].get_dagrun().start_date).total_seconds())
+        duration = round((pytz.utc.localize(datetime.utcnow()) - 
context['ti'].get_dagrun().start_date).total_seconds())
 
         self.log.info('Elapsed time: %s' % duration)
 
-        task_instances = context['dag_run'].get_task_instances()
-
-        task_states = [self.__log_and_get_state(task_instance)
-                       for task_instance in task_instances
-                       if task_instance.task_id != 
context['task_instance'].task_id]
-
-        if all((state == State.SUCCESS or state == State.SKIPPED) for state in 
task_states):
-            self.__job_result = 1
+        self.log.info(f'dag final job result: {self.__job_result}')
 
         return [
-            Metric(self.namespace, 'JobResult', self.__job_result,
-                   [Tag('ApplicationName', self.application_name)]),
-            Metric(self.namespace, 'JobDuration', duration,
-                   [Tag('ApplicationName', self.application_name)])
+            Metric(self.namespace, 'JobResult', self.__job_result, 
[Tag('ApplicationName', self.application_name)]),
+            Metric(self.namespace, 'JobDuration', duration, 
[Tag('ApplicationName', self.application_name)])
         ]
 
     def __log_and_get_state(self, task_instance):
         state = task_instance.state
 
-        self.log.info(f'{task_instance.task_id} finished with state: {state}')
+        self.log.info(f'Task {task_instance.task_id} finished with state = 
{state}')
 
         return state
+    
+    def __calculate_job_result(self, context):
+        self.log.info('scanning task instances states.. ')
+        task_instances = context['dag_run'].get_task_instances()
+        task_states = [self.__log_and_get_state(task_instance)
+                       for task_instance in task_instances
+                       if task_instance.task_id != 
context['task_instance'].task_id]
+
+        self.__job_result = 0
+        if all((state == State.SUCCESS or state == State.SKIPPED) for state in 
task_states):
+            self.__job_result = 1
 
     @apply_lineage
     @provide_session

Reply via email to