This is an automated email from the ASF dual-hosted git repository.
dimberman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new f4513c0 Revert "KubernetesJobWatcher no longer inherits from Process
(#11017)" (#11065)
f4513c0 is described below
commit f4513c03890f544655c7ff65de8f3db979bef80b
Author: Daniel Imberman <[email protected]>
AuthorDate: Mon Sep 21 15:28:00 2020 -0700
Revert "KubernetesJobWatcher no longer inherits from Process (#11017)"
(#11065)
This reverts commit 1539bd051cfbc41c1c7aa317fc7df82dab28f9f8.
---
airflow/executors/kubernetes_executor.py | 27 +-------------
tests/executors/test_kubernetes_executor.py | 55 +----------------------------
2 files changed, 2 insertions(+), 80 deletions(-)
diff --git a/airflow/executors/kubernetes_executor.py
b/airflow/executors/kubernetes_executor.py
index 2c639d0..889c73d 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -126,7 +126,7 @@ class KubeConfig: # pylint:
disable=too-many-instance-attributes
return int(val)
-class KubernetesJobWatcher(LoggingMixin):
+class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
"""Watches for Kubernetes jobs"""
def __init__(self,
@@ -143,31 +143,6 @@ class KubernetesJobWatcher(LoggingMixin):
self.watcher_queue = watcher_queue
self.resource_version = resource_version
self.kube_config = kube_config
- self.watcher_process = multiprocessing.Process(target=self.run,
args=())
-
- def start(self):
- """
- Start the watcher process
- """
- self.watcher_process.start()
-
- def is_alive(self):
- """
- Check if the watcher process is alive
- """
- self.watcher_process.is_alive()
-
- def join(self):
- """
- Join watcher process
- """
- self.watcher_process.join()
-
- def terminate(self):
- """
- Terminate watcher process
- """
- self.watcher_process.terminate()
def run(self) -> None:
"""Performs watching"""
diff --git a/tests/executors/test_kubernetes_executor.py
b/tests/executors/test_kubernetes_executor.py
index 038ec28..9bd1868 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -15,7 +15,6 @@
# specific language governing permissions and limitations
# under the License.
#
-import multiprocessing
import random
import re
import string
@@ -31,9 +30,7 @@ from tests.test_utils.config import conf_vars
try:
from kubernetes.client.rest import ApiException
- from airflow.executors.kubernetes_executor import (
- AirflowKubernetesScheduler, KubernetesExecutor, KubernetesJobWatcher,
- )
+ from airflow.executors.kubernetes_executor import
AirflowKubernetesScheduler, KubernetesExecutor
from airflow.kubernetes import pod_generator
from airflow.kubernetes.pod_generator import PodGenerator
from airflow.utils.state import State
@@ -41,56 +38,6 @@ except ImportError:
AirflowKubernetesScheduler = None # type: ignore
-class TestKubernetesJobWatcher(unittest.TestCase):
- def setUp(self) -> None:
- self.watcher_queue = multiprocessing.Manager().Queue()
- self.watcher = KubernetesJobWatcher(
- namespace="namespace",
- multi_namespace_mode=False,
- watcher_queue=self.watcher_queue,
- resource_version="0",
- worker_uuid="0",
- kube_config=None,
- )
-
- def test_running_task(self):
- self.watcher.process_status(
- pod_id="pod_id",
- namespace="namespace",
- status="Running",
- annotations={"foo": "bar"},
- resource_version="5",
- event={"type": "ADDED"}
- )
- self.assertTrue(self.watcher_queue.empty())
-
- def test_succeeded_task(self):
- self.watcher.process_status(
- pod_id="pod_id",
- namespace="namespace",
- status="Succeeded",
- annotations={"foo": "bar"},
- resource_version="5",
- event={"type": "ADDED"}
- )
- result = self.watcher_queue.get_nowait()
- self.assertEqual(('pod_id', 'namespace', None, {'foo': 'bar'}, '5'),
result)
- self.assertTrue(self.watcher_queue.empty())
-
- def test_failed_task(self):
- self.watcher.process_status(
- pod_id="pod_id",
- namespace="namespace",
- status="Failed",
- annotations={"foo": "bar"},
- resource_version="5",
- event={"type": "ADDED"}
- )
- result = self.watcher_queue.get_nowait()
- self.assertEqual(('pod_id', 'namespace', "failed", {'foo': 'bar'},
'5'), result)
- self.assertTrue(self.watcher_queue.empty())
-
-
# pylint: disable=unused-argument
class TestAirflowKubernetesScheduler(unittest.TestCase):
@staticmethod