kaxil commented on a change in pull request #15197:
URL: https://github.com/apache/airflow/pull/15197#discussion_r607088711



##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -215,6 +217,79 @@ def test_invalid_executor_config(self, 
mock_get_kube_client, mock_kubernetes_job
 
         assert list(executor.event_buffer.values())[0][1] == "Invalid 
executor_config passed"
 
+    @pytest.mark.execution_timeout(10)
+    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python 
package is not installed')
+    @mock.patch('airflow.kubernetes.pod_launcher.PodLauncher.run_pod_async')
+    @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
+    def test_pod_template_file_override_in_executor_config(self, 
mock_get_kube_client, mock_run_pod_async):
+        current_folder = pathlib.Path(__file__).parent.absolute()
+        template_file = str(
+            (current_folder / "kubernetes_executor_template_files" / 
"basic_template.yaml").absolute()
+        )
+
+        mock_kube_client = mock.patch('kubernetes.client.CoreV1Api', 
autospec=True)
+        mock_get_kube_client.return_value = mock_kube_client
+
+        with conf_vars({('kubernetes', 'pod_template_file'): ''}):
+            executor = self.kubernetes_executor
+            executor.start()
+
+            assert executor.event_buffer == {}
+            assert executor.task_queue.empty()
+
+            executor.execute_async(
+                key=('dag', 'task', datetime.utcnow(), 1),
+                queue=None,
+                command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
+                executor_config={
+                    "pod_template_file": template_file,
+                    "pod_override": 
k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
+                },
+            )
+
+            assert not executor.task_queue.empty()
+            task = executor.task_queue.get_nowait()
+            _, _, expected_executor_config, expected_pod_template_file = task
+
+            # Test that the correct values have been put to queue
+            assert expected_executor_config.metadata.labels == {'release': 
'stable'}
+            assert expected_pod_template_file == template_file
+
+            self.kubernetes_executor.kube_scheduler.run_next(task)
+            mock_run_pod_async.assert_called_once_with(
+                k8s.V1Pod(
+                    api_version="v1",
+                    kind="Pod",
+                    metadata=k8s.V1ObjectMeta(
+                        name=mock.ANY,
+                        namespace="default",
+                        annotations={
+                            'dag_id': 'dag',
+                            'execution_date': mock.ANY,
+                            'task_id': 'task',
+                            'try_number': '1',
+                        },
+                        labels={
+                            'airflow-worker': '5',
+                            'airflow_version': mock.ANY,
+                            'dag_id': 'dag',
+                            'execution_date': mock.ANY,
+                            'kubernetes_executor': 'True',
+                            'mylabel': 'foo',
+                            'release': 'stable',
+                            'task_id': 'task',
+                            'try_number': '1',
+                        },
+                    ),
+                    spec=k8s.V1PodSpec(
+                        containers=mock.ANY,

Review comment:
       I was in two minds with that --  I added it back in 
https://github.com/apache/airflow/pull/15197/commits/855d401ccadfd96c2798ecdf458bd9cb82f44e50

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -215,6 +217,79 @@ def test_invalid_executor_config(self, 
mock_get_kube_client, mock_kubernetes_job
 
         assert list(executor.event_buffer.values())[0][1] == "Invalid 
executor_config passed"
 
+    @pytest.mark.execution_timeout(10)
+    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python 
package is not installed')
+    @mock.patch('airflow.kubernetes.pod_launcher.PodLauncher.run_pod_async')
+    @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
+    def test_pod_template_file_override_in_executor_config(self, 
mock_get_kube_client, mock_run_pod_async):
+        current_folder = pathlib.Path(__file__).parent.absolute()
+        template_file = str(
+            (current_folder / "kubernetes_executor_template_files" / 
"basic_template.yaml").absolute()
+        )
+
+        mock_kube_client = mock.patch('kubernetes.client.CoreV1Api', 
autospec=True)
+        mock_get_kube_client.return_value = mock_kube_client
+
+        with conf_vars({('kubernetes', 'pod_template_file'): ''}):
+            executor = self.kubernetes_executor
+            executor.start()
+
+            assert executor.event_buffer == {}
+            assert executor.task_queue.empty()
+
+            executor.execute_async(
+                key=('dag', 'task', datetime.utcnow(), 1),

Review comment:
       Done in 
https://github.com/apache/airflow/pull/15197/commits/855d401ccadfd96c2798ecdf458bd9cb82f44e50




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to