This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 99a3bf2318 Deprecate `skip_exit_code` in `DockerOperator` and `KubernetesPodOperator` (#30733) 99a3bf2318 is described below commit 99a3bf23182374699f437cfd8ed3b74af3dafba7 Author: eladkal <45845474+elad...@users.noreply.github.com> AuthorDate: Wed Apr 19 19:39:54 2023 +0300 Deprecate `skip_exit_code` in `DockerOperator` and `KubernetesPodOperator` (#30733) * Deprecate `skip_exit_code` in `DockerOperator` and `KubernetesPodOperator` * satisfy mypy --- airflow/providers/cncf/kubernetes/operators/pod.py | 18 +++++++++++++----- airflow/providers/docker/operators/docker.py | 15 +++++++++++---- tests/providers/cncf/kubernetes/operators/test_pod.py | 18 +++++++++--------- tests/providers/docker/decorators/test_docker.py | 6 +++--- tests/providers/docker/operators/test_docker.py | 6 +++--- 5 files changed, 39 insertions(+), 24 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index b37866c315..b91e75ac39 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -213,7 +213,7 @@ class KubernetesPodOperator(BaseOperator): to populate the environment variables with. The contents of the target ConfigMap's Data field will represent the key-value pairs as environment variables. Extends env_from. - :param skip_exit_code: If task exits with this exit code, leave the task + :param skip_on_exit_code: If task exits with this exit code, leave the task in ``skipped`` state (default: None). If set to ``None``, any non-zero exit code will be treated as a failure. :param base_container_name: The name of the base container in the pod. This container's logs @@ -292,6 +292,7 @@ class KubernetesPodOperator(BaseOperator): termination_grace_period: int | None = None, configmaps: list[str] | None = None, skip_exit_code: int | None = None, + skip_on_exit_code: int | None = None, base_container_name: str | None = None, deferrable: bool = False, poll_interval: float = 2, @@ -361,7 +362,13 @@ class KubernetesPodOperator(BaseOperator): self.termination_grace_period = termination_grace_period self.pod_request_obj: k8s.V1Pod | None = None self.pod: k8s.V1Pod | None = None - self.skip_exit_code = skip_exit_code + if skip_exit_code is not None: + warnings.warn( + "skip_exit_code is deprecated. Please use skip_on_exit_code", DeprecationWarning, stacklevel=2 + ) + self.skip_on_exit_code: int | None = skip_exit_code + else: + self.skip_on_exit_code = skip_on_exit_code self.base_container_name = base_container_name or self.BASE_CONTAINER_NAME self.deferrable = deferrable self.poll_interval = poll_interval @@ -675,7 +682,7 @@ class KubernetesPodOperator(BaseOperator): error_message = get_container_termination_message(remote_pod, self.base_container_name) error_message = "\n" + error_message if error_message else "" - if self.skip_exit_code is not None: + if self.skip_on_exit_code is not None: container_statuses = ( remote_pod.status.container_statuses if remote_pod and remote_pod.status else None ) or [] @@ -689,9 +696,10 @@ class KubernetesPodOperator(BaseOperator): and base_container_status.last_state.terminated else None ) - if exit_code == self.skip_exit_code: + if exit_code == self.skip_on_exit_code: raise AirflowSkipException( - f"Pod {pod and pod.metadata.name} returned exit code {self.skip_exit_code}. Skipping." + f"Pod {pod and pod.metadata.name} returned exit code " + f"{self.skip_on_exit_code}. Skipping." ) raise AirflowException( f"Pod {pod and pod.metadata.name} returned a failure:\n{error_message}\n" diff --git a/airflow/providers/docker/operators/docker.py b/airflow/providers/docker/operators/docker.py index a626375e21..5af1ba7542 100644 --- a/airflow/providers/docker/operators/docker.py +++ b/airflow/providers/docker/operators/docker.py @@ -155,7 +155,7 @@ class DockerOperator(BaseOperator): If rolling the logs creates excess files, the oldest file is removed. Only effective when max-size is also set. A positive integer. Defaults to 1. :param ipc_mode: Set the IPC mode for the container. - :param skip_exit_code: If task exits with this exit code, leave the task + :param skip_on_exit_code: If task exits with this exit code, leave the task in ``skipped`` state (default: None). If set to ``None``, any non-zero exit code will be treated as a failure. """ @@ -215,6 +215,7 @@ class DockerOperator(BaseOperator): log_opts_max_file: str | None = None, ipc_mode: str | None = None, skip_exit_code: int | None = None, + skip_on_exit_code: int | None = None, **kwargs, ) -> None: super().__init__(**kwargs) @@ -276,7 +277,13 @@ class DockerOperator(BaseOperator): self.log_opts_max_size = log_opts_max_size self.log_opts_max_file = log_opts_max_file self.ipc_mode = ipc_mode - self.skip_exit_code = skip_exit_code + if skip_exit_code is not None: + warnings.warn( + "skip_exit_code is deprecated. Please use skip_on_exit_code", DeprecationWarning, stacklevel=2 + ) + self.skip_on_exit_code: int | None = skip_exit_code + else: + self.skip_on_exit_code = skip_on_exit_code @cached_property def hook(self) -> DockerHook: @@ -377,9 +384,9 @@ class DockerOperator(BaseOperator): self.log.info("%s", log_chunk) result = self.cli.wait(self.container["Id"]) - if result["StatusCode"] == self.skip_exit_code: + if result["StatusCode"] == self.skip_on_exit_code: raise AirflowSkipException( - f"Docker container returned exit code {self.skip_exit_code}. Skipping." + f"Docker container returned exit code {self.skip_on_exit_code}. Skipping." ) elif result["StatusCode"] != 0: joined_log_lines = "\n".join(log_lines) diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py b/tests/providers/cncf/kubernetes/operators/test_pod.py index 72493cd0f3..83c5c8bdcf 100644 --- a/tests/providers/cncf/kubernetes/operators/test_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_pod.py @@ -1089,16 +1089,16 @@ class TestKubernetesPodOperator: "extra_kwargs, actual_exit_code, expected_exc", [ (None, 99, AirflowException), - ({"skip_exit_code": 100}, 100, AirflowSkipException), - ({"skip_exit_code": 100}, 101, AirflowException), - ({"skip_exit_code": None}, 100, AirflowException), + ({"skip_on_exit_code": 100}, 100, AirflowSkipException), + ({"skip_on_exit_code": 100}, 101, AirflowException), + ({"skip_on_exit_code": None}, 100, AirflowException), ], ) @patch(f"{POD_MANAGER_CLASS}.await_pod_completion") def test_task_skip_when_pod_exit_with_certain_code( self, remote_pod, extra_kwargs, actual_exit_code, expected_exc ): - """Tests that an AirflowSkipException is raised when the container exits with the skip_exit_code""" + """Tests that an AirflowSkipException is raised when the container exits with the skip_on_exit_code""" k = KubernetesPodOperator( task_id="task", is_delete_operator_pod=True, **(extra_kwargs if extra_kwargs else {}) ) @@ -1284,13 +1284,13 @@ class TestKubernetesPodOperatorAsync: [ (None, 0, None, "Succeeded", "success"), (None, 99, AirflowException, "Failed", "error"), - ({"skip_exit_code": 100}, 100, AirflowSkipException, "Failed", "error"), - ({"skip_exit_code": 100}, 101, AirflowException, "Failed", "error"), - ({"skip_exit_code": None}, 100, AirflowException, "Failed", "error"), + ({"skip_on_exit_code": 100}, 100, AirflowSkipException, "Failed", "error"), + ({"skip_on_exit_code": 100}, 101, AirflowException, "Failed", "error"), + ({"skip_on_exit_code": None}, 100, AirflowException, "Failed", "error"), ], ) @patch(HOOK_CLASS) - def test_async_create_pod_with_skip_exit_code_should_skip( + def test_async_create_pod_with_skip_on_exit_code_should_skip( self, mocked_hook, extra_kwargs, @@ -1299,7 +1299,7 @@ class TestKubernetesPodOperatorAsync: pod_status, event_status, ): - """Tests that an AirflowSkipException is raised when the container exits with the skip_exit_code""" + """Tests that an AirflowSkipException is raised when the container exits with the skip_on_exit_code""" k = KubernetesPodOperator( task_id=TEST_TASK_ID, diff --git a/tests/providers/docker/decorators/test_docker.py b/tests/providers/docker/decorators/test_docker.py index 4f47333eea..e4aba52576 100644 --- a/tests/providers/docker/decorators/test_docker.py +++ b/tests/providers/docker/decorators/test_docker.py @@ -124,9 +124,9 @@ class TestDockerDecorator: "extra_kwargs, actual_exit_code, expected_state", [ (None, 99, TaskInstanceState.FAILED), - ({"skip_exit_code": 100}, 100, TaskInstanceState.SKIPPED), - ({"skip_exit_code": 100}, 101, TaskInstanceState.FAILED), - ({"skip_exit_code": None}, 0, TaskInstanceState.SUCCESS), + ({"skip_on_exit_code": 100}, 100, TaskInstanceState.SKIPPED), + ({"skip_on_exit_code": 100}, 101, TaskInstanceState.FAILED), + ({"skip_on_exit_code": None}, 0, TaskInstanceState.SUCCESS), ], ) def test_skip_docker_operator(self, extra_kwargs, actual_exit_code, expected_state, dag_maker): diff --git a/tests/providers/docker/operators/test_docker.py b/tests/providers/docker/operators/test_docker.py index d1ff2acbaf..6f20055893 100644 --- a/tests/providers/docker/operators/test_docker.py +++ b/tests/providers/docker/operators/test_docker.py @@ -516,9 +516,9 @@ class TestDockerOperator: "extra_kwargs, actual_exit_code, expected_exc", [ (None, 99, AirflowException), - ({"skip_exit_code": 100}, 100, AirflowSkipException), - ({"skip_exit_code": 100}, 101, AirflowException), - ({"skip_exit_code": None}, 100, AirflowException), + ({"skip_on_exit_code": 100}, 100, AirflowSkipException), + ({"skip_on_exit_code": 100}, 101, AirflowException), + ({"skip_on_exit_code": None}, 100, AirflowException), ], ) def test_skip(self, extra_kwargs, actual_exit_code, expected_exc):