This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-test by this push:
new c2e1270 Fix displaying Executor Class Name in "Base Job" table (#8679)
c2e1270 is described below
commit c2e12700c801b393fcbd06f275b67ca5391d0652
Author: Kaxil Naik <[email protected]>
AuthorDate: Sat May 2 18:30:45 2020 +0100
Fix displaying Executor Class Name in "Base Job" table (#8679)
(cherry picked from commit 0a7b5004ac5df830389e16f9344ef549d27e0353)
---
airflow/jobs/base_job.py | 2 +-
tests/jobs/test_base_job.py | 24 +++++++++++++++++++++++-
2 files changed, 24 insertions(+), 2 deletions(-)
diff --git a/airflow/jobs/base_job.py b/airflow/jobs/base_job.py
index 1135696..94f5969 100644
--- a/airflow/jobs/base_job.py
+++ b/airflow/jobs/base_job.py
@@ -84,7 +84,7 @@ class BaseJob(Base, LoggingMixin):
*args, **kwargs):
self.hostname = get_hostname()
self.executor = executor or executors.get_default_executor()
- self.executor_class = executor.__class__.__name__
+ self.executor_class = self.executor.__class__.__name__
self.start_date = timezone.utcnow()
self.latest_heartbeat = timezone.utcnow()
if heartrate is not None:
diff --git a/tests/jobs/test_base_job.py b/tests/jobs/test_base_job.py
index 7487b13..8ffa5bb 100644
--- a/tests/jobs/test_base_job.py
+++ b/tests/jobs/test_base_job.py
@@ -24,10 +24,12 @@ import unittest
from sqlalchemy.exc import OperationalError
from airflow.jobs import BaseJob
+from airflow.executors.sequential_executor import SequentialExecutor
from airflow.utils import timezone
from airflow.utils.db import create_session
from airflow.utils.state import State
from tests.compat import Mock, patch
+from tests.test_utils.config import conf_vars
class BaseJobTest(unittest.TestCase):
@@ -119,4 +121,24 @@ class BaseJobTest(unittest.TestCase):
job.heartbeat()
- self.assertEqual(job.latest_heartbeat, when, "attriubte not
updated when heartbeat fails")
+ self.assertEqual(job.latest_heartbeat, when, "attribute not
updated when heartbeat fails")
+
+ @conf_vars({('scheduler', 'max_tis_per_query'): '100'})
+ @patch('airflow.jobs.base_job.executors.get_default_executor')
+ @patch('airflow.jobs.base_job.get_hostname')
+ @patch('airflow.jobs.base_job.getpass.getuser')
+ def test_essential_attr(self, mock_getuser, mock_hostname,
mock_default_executor):
+ mock_sequential_executor = SequentialExecutor()
+ mock_hostname.return_value = "test_hostname"
+ mock_getuser.return_value = "testuser"
+ mock_default_executor.return_value = mock_sequential_executor
+
+ test_job = self.TestJob(None, heartrate=10, dag_id="example_dag",
state=State.RUNNING)
+ self.assertEqual(test_job.executor_class, "SequentialExecutor")
+ self.assertEqual(test_job.heartrate, 10)
+ self.assertEqual(test_job.dag_id, "example_dag")
+ self.assertEqual(test_job.hostname, "test_hostname")
+ self.assertEqual(test_job.max_tis_per_query, 100)
+ self.assertEqual(test_job.unixname, "testuser")
+ self.assertEqual(test_job.state, "running")
+ self.assertEqual(test_job.executor, mock_sequential_executor)