This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new f4513c0  Revert "KubernetesJobWatcher no longer inherits from Process 
(#11017)" (#11065)
f4513c0 is described below

commit f4513c03890f544655c7ff65de8f3db979bef80b
Author: Daniel Imberman <[email protected]>
AuthorDate: Mon Sep 21 15:28:00 2020 -0700

    Revert "KubernetesJobWatcher no longer inherits from Process (#11017)" 
(#11065)
    
    This reverts commit 1539bd051cfbc41c1c7aa317fc7df82dab28f9f8.
---
 airflow/executors/kubernetes_executor.py    | 27 +-------------
 tests/executors/test_kubernetes_executor.py | 55 +----------------------------
 2 files changed, 2 insertions(+), 80 deletions(-)

diff --git a/airflow/executors/kubernetes_executor.py 
b/airflow/executors/kubernetes_executor.py
index 2c639d0..889c73d 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -126,7 +126,7 @@ class KubeConfig:  # pylint: 
disable=too-many-instance-attributes
             return int(val)
 
 
-class KubernetesJobWatcher(LoggingMixin):
+class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
     """Watches for Kubernetes jobs"""
 
     def __init__(self,
@@ -143,31 +143,6 @@ class KubernetesJobWatcher(LoggingMixin):
         self.watcher_queue = watcher_queue
         self.resource_version = resource_version
         self.kube_config = kube_config
-        self.watcher_process = multiprocessing.Process(target=self.run, 
args=())
-
-    def start(self):
-        """
-        Start the watcher process
-        """
-        self.watcher_process.start()
-
-    def is_alive(self):
-        """
-        Check if the watcher process is alive
-        """
-        self.watcher_process.is_alive()
-
-    def join(self):
-        """
-        Join watcher process
-        """
-        self.watcher_process.join()
-
-    def terminate(self):
-        """
-        Terminate watcher process
-        """
-        self.watcher_process.terminate()
 
     def run(self) -> None:
         """Performs watching"""
diff --git a/tests/executors/test_kubernetes_executor.py 
b/tests/executors/test_kubernetes_executor.py
index 038ec28..9bd1868 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -15,7 +15,6 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-import multiprocessing
 import random
 import re
 import string
@@ -31,9 +30,7 @@ from tests.test_utils.config import conf_vars
 try:
     from kubernetes.client.rest import ApiException
 
-    from airflow.executors.kubernetes_executor import (
-        AirflowKubernetesScheduler, KubernetesExecutor, KubernetesJobWatcher,
-    )
+    from airflow.executors.kubernetes_executor import 
AirflowKubernetesScheduler, KubernetesExecutor
     from airflow.kubernetes import pod_generator
     from airflow.kubernetes.pod_generator import PodGenerator
     from airflow.utils.state import State
@@ -41,56 +38,6 @@ except ImportError:
     AirflowKubernetesScheduler = None  # type: ignore
 
 
-class TestKubernetesJobWatcher(unittest.TestCase):
-    def setUp(self) -> None:
-        self.watcher_queue = multiprocessing.Manager().Queue()
-        self.watcher = KubernetesJobWatcher(
-            namespace="namespace",
-            multi_namespace_mode=False,
-            watcher_queue=self.watcher_queue,
-            resource_version="0",
-            worker_uuid="0",
-            kube_config=None,
-        )
-
-    def test_running_task(self):
-        self.watcher.process_status(
-            pod_id="pod_id",
-            namespace="namespace",
-            status="Running",
-            annotations={"foo": "bar"},
-            resource_version="5",
-            event={"type": "ADDED"}
-        )
-        self.assertTrue(self.watcher_queue.empty())
-
-    def test_succeeded_task(self):
-        self.watcher.process_status(
-            pod_id="pod_id",
-            namespace="namespace",
-            status="Succeeded",
-            annotations={"foo": "bar"},
-            resource_version="5",
-            event={"type": "ADDED"}
-        )
-        result = self.watcher_queue.get_nowait()
-        self.assertEqual(('pod_id', 'namespace', None, {'foo': 'bar'}, '5'), 
result)
-        self.assertTrue(self.watcher_queue.empty())
-
-    def test_failed_task(self):
-        self.watcher.process_status(
-            pod_id="pod_id",
-            namespace="namespace",
-            status="Failed",
-            annotations={"foo": "bar"},
-            resource_version="5",
-            event={"type": "ADDED"}
-        )
-        result = self.watcher_queue.get_nowait()
-        self.assertEqual(('pod_id', 'namespace', "failed", {'foo': 'bar'}, 
'5'), result)
-        self.assertTrue(self.watcher_queue.empty())
-
-
 # pylint: disable=unused-argument
 class TestAirflowKubernetesScheduler(unittest.TestCase):
     @staticmethod

Reply via email to