phanikumv commented on code in PR #28523:
URL: https://github.com/apache/airflow/pull/28523#discussion_r1070908307


##########
airflow/providers/cncf/kubernetes/hooks/kubernetes.py:
##########
@@ -431,3 +433,102 @@ def _get_bool(val) -> bool | None:
         elif val.strip().lower() == "false":
             return False
     return None
+
+
+class KubernetesAsyncHook(KubernetesHook):
+    """
+    Creates Async Kubernetes API connection.
+
+    - use in cluster configuration by using extra field ``in_cluster`` in 
connection
+    - use custom config by providing path to the file using extra field 
``kube_config_path`` in connection
+    - use custom configuration by providing content of kubeconfig file via
+        extra field ``kube_config`` in connection
+    - use default config by providing no extras
+
+    This hook check for configuration option in the above order. Once an 
option is present it will
+    use this configuration.
+
+    .. seealso::
+        For more information about Kubernetes connection:
+        :doc:`/connections/kubernetes`
+
+    :param conn_id: The :ref:`kubernetes connection 
<howto/connection:kubernetes>`
+        to Kubernetes cluster.
+    :param client_configuration: Optional dictionary of client configuration 
params.
+        Passed on to kubernetes client.
+    :param cluster_context: Optionally specify a context to use (e.g. if you 
have multiple
+        in your kubeconfig.
+    :param config_file: Path to kubeconfig file.
+    :param in_cluster: Set to ``True`` if running from within a kubernetes 
cluster.
+    :param disable_verify_ssl: Set to ``True`` if SSL verification should be 
disabled.
+    :param disable_tcp_keepalive: Set to ``True`` if you want to disable 
keepalive logic.
+    """
+
+    async def _load_config(self) -> client.ApiClient:
+        """
+        Load config to interact with Kubernetes
+
+        cluster_context: Optional[str] = None,
+        config_file: Optional[str] = None,
+        in_cluster: Optional[bool] = None,
+
+        """
+        if self.conn_id:
+            connection = await sync_to_async(self.get_connection)(self.conn_id)
+            extras = connection.extra_dejson
+        else:
+            extras = {}
+        in_cluster = self._coalesce_param(self.in_cluster, 
extras.get("extra__kubernetes__in_cluster"))
+        cluster_context = self._coalesce_param(
+            self.cluster_context, 
extras.get("extra__kubernetes__cluster_context")
+        )
+        kubeconfig_path = self._coalesce_param(
+            self.config_file, extras.get("extra__kubernetes__kube_config_path")
+        )
+        kubeconfig = extras.get("extra__kubernetes__kube_config") or None
+        num_selected_configuration = len([o for o in [in_cluster, kubeconfig, 
kubeconfig_path] if o])
+
+        if num_selected_configuration > 1:
+            raise AirflowException(
+                "Invalid connection configuration. Options kube_config_path, "
+                "kube_config, in_cluster are mutually exclusive. "
+                "You can only use one option at a time."
+            )
+        if in_cluster:
+            self.log.debug("loading kube_config from: in_cluster 
configuration")
+            config.load_incluster_config()
+            return client.ApiClient()
+
+        if kubeconfig_path is not None:
+            self.log.debug("loading kube_config from: %s", kubeconfig_path)
+            await config.load_kube_config(
+                config_file=kubeconfig_path,
+                client_configuration=self.client_configuration,
+                context=cluster_context,
+            )
+            return client.ApiClient()
+
+        if kubeconfig is not None:
+            async with aiofiles.tempfile.NamedTemporaryFile() as temp_config:
+                self.log.debug("loading kube_config from: connection 
kube_config")
+                await temp_config.write(kubeconfig.encode())
+                await temp_config.flush()
+                await config.load_kube_config(
+                    config_file=temp_config.name,
+                    client_configuration=self.client_configuration,
+                    context=cluster_context,
+                )
+            return client.ApiClient()

Review Comment:
   fixed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to