jedcunningham commented on code in PR #20578:
URL: https://github.com/apache/airflow/pull/20578#discussion_r878521528
##########
airflow/providers/cncf/kubernetes/CHANGELOG.rst:
##########
@@ -19,6 +19,19 @@
Changelog
---------
+main
+....
+
+Features
+~~~~~~~~
+
+KubernetesPodOperator now uses KubernetesHook
+`````````````````````````````````````````````
+
+Previously, KubernetesPodOperator relied on core Airflow configuration (namely
setting for kubernetes executor) for certain settings used in client
generation. Now KubernetesPodOperator uses KubernetesHook, and the
consideration of core k8s settings is officially deprecated.
+
+If you are using the Airflow configuration settings (e.g. as opposed to
operator params) to configure the kubernetes client, then prior to the next
major release you will need to add an Airflow connection and set your KPO
processes to use that connection.
Review Comment:
```suggestion
If you are using the Airflow configuration settings (e.g. as opposed to
operator params) to configure the kubernetes client, then prior to the next
major release you will need to add an Airflow connection and set your KPO tasks
to use that connection.
```
nit
##########
kubernetes_tests/test_kubernetes_pod_operator.py:
##########
@@ -582,10 +586,12 @@ def test_xcom_push(self, xcom_push):
self.expected_pod['spec']['containers'].append(container)
assert self.expected_pod == actual_pod
-
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.create_pod")
-
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_pod_completion")
- @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
- def test_envs_from_secrets(self, mock_client, await_pod_completion_mock,
create_pod):
+ @mock.patch(f"{POD_MANAGER_CLASS}.create_pod")
Review Comment:
👍
##########
tests/providers/cncf/kubernetes/hooks/test_kubernetes.py:
##########
@@ -131,6 +136,76 @@ def test_get_default_client(
mock_loader.assert_not_called()
assert isinstance(api_conn, kubernetes.client.api_client.ApiClient)
+ @pytest.mark.parametrize(
+ 'disable_verify_ssl, conn_id, disable_called',
+ (
+ (True, None, True),
+ (None, None, False),
+ (False, None, False),
+ (None, 'disable_verify_ssl', True),
+ (True, 'disable_verify_ssl', True),
+ (False, 'disable_verify_ssl', False),
+ (None, 'disable_verify_ssl_empty', False),
+ (True, 'disable_verify_ssl_empty', True),
+ (False, 'disable_verify_ssl_empty', False),
+ ),
+ )
+ @patch("kubernetes.config.incluster_config.InClusterConfigLoader",
new=MagicMock())
+ @patch(f"{HOOK_MODULE}._disable_verify_ssl")
+ def test_disable_verify_ssl(
+ self,
+ mock_disable,
+ disable_verify_ssl,
+ conn_id,
+ disable_called,
+ ):
+ """
+ Verifies whether disable verify ssl is called depending on combination
of hook param and
+ connection extra. Hook param should beat extra.
+ """
+ kubernetes_hook = KubernetesHook(conn_id=conn_id,
disable_verify_ssl=disable_verify_ssl)
+ api_conn = kubernetes_hook.get_conn()
+ if disable_called:
+ assert mock_disable.called
+ else:
+ assert not mock_disable.called
Review Comment:
```suggestion
assert mock_disable.called is disabled_called
```
nit
##########
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:
##########
@@ -561,6 +569,39 @@ def dry_run(self) -> None:
pod = self.build_pod_request_obj()
print(yaml.dump(prune_dict(pod.to_dict(), mode='strict')))
+ def _patch_deprecated_k8s_settings(self, hook: KubernetesHook):
+ """
+ Here we read config from core Airflow config [kubernetes] section.
+ In a future release we will stop looking at this section and require
users
+ to use Airflow connections to configure KPO.
+
+ When we find values there that we need to apply on the hook, we patch
special
+ hook attributes here.
+ """
+
+ # default for enable_tcp_keepalive is True; patch if False
+ if conf.getboolean('kubernetes', 'enable_tcp_keepalive',
fallback=None) is False:
Review Comment:
I'm curious, why do we need `fallback=None`?
##########
airflow/providers/cncf/kubernetes/hooks/kubernetes.py:
##########
@@ -108,37 +121,77 @@ def __init__(
cluster_context: Optional[str] = None,
config_file: Optional[str] = None,
in_cluster: Optional[bool] = None,
+ disable_verify_ssl: Optional[bool] = None,
+ disable_tcp_keepalive: Optional[bool] = None,
) -> None:
super().__init__()
self.conn_id = conn_id
self.client_configuration = client_configuration
self.cluster_context = cluster_context
self.config_file = config_file
self.in_cluster = in_cluster
+ self.disable_verify_ssl = disable_verify_ssl
+ self.disable_tcp_keepalive = disable_tcp_keepalive
+
+ # these params used for transition in KPO to K8s hook
+ # for a deprecation period we will continue to consider k8s settings
from airflow.cfg
+ self._deprecated_core_disable_tcp_keepalive: Optional[bool] = None
+ self._deprecated_core_disable_verify_ssl: Optional[bool] = None
+ self._deprecated_core_in_cluster: Optional[bool] = None
+ self._deprecated_core_cluster_context: Optional[str] = None
+ self._deprecated_core_config_file: Optional[str] = None
@staticmethod
def _coalesce_param(*params):
for param in params:
if param is not None:
return param
- def get_conn(self) -> Any:
- """Returns kubernetes api session for use with requests"""
+ @cached_property
+ def conn_extras(self):
if self.conn_id:
connection = self.get_connection(self.conn_id)
extras = connection.extra_dejson
else:
extras = {}
+ return extras
+
+ def _get_field(self, field_name):
+ if field_name.startswith('extra_'):
+ raise ValueError(
+ f"Got prefixed name {field_name}; please remove the
'extra__kubernetes__' prefix "
+ f"when using this method."
+ )
+ if field_name in self.conn_extras:
+ return self.conn_extras[field_name] or None
+ prefixed_name = f"extra__kubernetes__{field_name}"
+ return self.conn_extras.get(prefixed_name) or None
+
+ @staticmethod
+ def _deprecation_warning_core_param(deprecation_warnings):
+ settings_list_str = ''.join([f"\n\t{k}={v!r}" for k, v in
deprecation_warnings])
+ warnings.warn(
+ f"\nApplying core Airflow settings from section [kubernetes] with
the following keys:"
+ f"{settings_list_str}\n"
+ "In a future release, KubernetesPodOperator will no longer
consider core\n"
+ "airflow settings; define an Airflow connection instead.",
Review Comment:
```suggestion
"Airflow settings; define an Airflow connection instead.",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]