XD-DENG commented on a change in pull request #15197:
URL: https://github.com/apache/airflow/pull/15197#discussion_r607011484
##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -215,6 +217,79 @@ def test_invalid_executor_config(self,
mock_get_kube_client, mock_kubernetes_job
assert list(executor.event_buffer.values())[0][1] == "Invalid
executor_config passed"
+ @pytest.mark.execution_timeout(10)
+ @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python
package is not installed')
+ @mock.patch('airflow.kubernetes.pod_launcher.PodLauncher.run_pod_async')
+ @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
+ def test_pod_template_file_override_in_executor_config(self,
mock_get_kube_client, mock_run_pod_async):
+ current_folder = pathlib.Path(__file__).parent.absolute()
+ template_file = str(
+ (current_folder / "kubernetes_executor_template_files" /
"basic_template.yaml").absolute()
+ )
+
+ mock_kube_client = mock.patch('kubernetes.client.CoreV1Api',
autospec=True)
+ mock_get_kube_client.return_value = mock_kube_client
+
+ with conf_vars({('kubernetes', 'pod_template_file'): ''}):
+ executor = self.kubernetes_executor
+ executor.start()
+
+ assert executor.event_buffer == {}
+ assert executor.task_queue.empty()
+
+ executor.execute_async(
+ key=('dag', 'task', datetime.utcnow(), 1),
+ queue=None,
+ command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
+ executor_config={
+ "pod_template_file": template_file,
+ "pod_override":
k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
+ },
+ )
+
+ assert not executor.task_queue.empty()
+ task = executor.task_queue.get_nowait()
+ _, _, expected_executor_config, expected_pod_template_file = task
+
+ # Test that the correct values have been put to queue
+ assert expected_executor_config.metadata.labels == {'release':
'stable'}
+ assert expected_pod_template_file == template_file
+
+ self.kubernetes_executor.kube_scheduler.run_next(task)
+ mock_run_pod_async.assert_called_once_with(
+ k8s.V1Pod(
+ api_version="v1",
+ kind="Pod",
+ metadata=k8s.V1ObjectMeta(
+ name=mock.ANY,
+ namespace="default",
+ annotations={
+ 'dag_id': 'dag',
+ 'execution_date': mock.ANY,
+ 'task_id': 'task',
+ 'try_number': '1',
+ },
+ labels={
+ 'airflow-worker': '5',
+ 'airflow_version': mock.ANY,
+ 'dag_id': 'dag',
+ 'execution_date': mock.ANY,
+ 'kubernetes_executor': 'True',
+ 'mylabel': 'foo',
+ 'release': 'stable',
+ 'task_id': 'task',
+ 'try_number': '1',
+ },
+ ),
+ spec=k8s.V1PodSpec(
+ containers=mock.ANY,
Review comment:
Possible to make the assertion more granular? Container spec is worth
testing carefully, e.g. the image used.
##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -215,6 +217,79 @@ def test_invalid_executor_config(self,
mock_get_kube_client, mock_kubernetes_job
assert list(executor.event_buffer.values())[0][1] == "Invalid
executor_config passed"
+ @pytest.mark.execution_timeout(10)
+ @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python
package is not installed')
+ @mock.patch('airflow.kubernetes.pod_launcher.PodLauncher.run_pod_async')
+ @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
+ def test_pod_template_file_override_in_executor_config(self,
mock_get_kube_client, mock_run_pod_async):
+ current_folder = pathlib.Path(__file__).parent.absolute()
+ template_file = str(
+ (current_folder / "kubernetes_executor_template_files" /
"basic_template.yaml").absolute()
+ )
+
+ mock_kube_client = mock.patch('kubernetes.client.CoreV1Api',
autospec=True)
+ mock_get_kube_client.return_value = mock_kube_client
+
+ with conf_vars({('kubernetes', 'pod_template_file'): ''}):
+ executor = self.kubernetes_executor
+ executor.start()
+
+ assert executor.event_buffer == {}
+ assert executor.task_queue.empty()
+
+ executor.execute_async(
+ key=('dag', 'task', datetime.utcnow(), 1),
Review comment:
May be redundant, but to be on more safe side, we may want to assert the
execution date here as well, rather than just having `mock.ANY`.
We have have something like below here
```python
execution_date = datetime.utcnow()
executor.execute_async(
key=('dag', 'task', execution_date, 1),
... ...
```
Then in `mock_run_pod_async.assert_called_once_with()`, have
```python
annotations={
'dag_id': 'dag',
'execution_date': execution_date.isoformat(),
... ...
}
```
and
```python
labels={
'airflow-worker': '5',
'airflow_version': mock.ANY,
'dag_id': 'dag',
'execution_date':
pod_generator.datetime_to_label_safe_datestring(execution_date),
... ...
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]