dstandish commented on code in PR #28523:
URL: https://github.com/apache/airflow/pull/28523#discussion_r1054683380
##########
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:
##########
@@ -731,3 +736,137 @@ def __exit__(self, exctype, excinst, exctb):
return True
else:
return True
+
+
+class PodNotFoundException(AirflowException):
+ """Expected pod does not exist in kube-api."""
+
+
+class KubernetesPodAsyncOperator(KubernetesPodOperator):
+ """
+ Async (deferring) version of KubernetesPodOperator
+
+ .. warning::
+ By default, logs will not be available in the Airflow Webserver until
the task completes. However,
+ you can configure ``KubernetesPodAsyncOperator`` to periodically
resume and fetch logs. This behavior
+ is controlled by param ``logging_interval``.
+
+ :param poll_interval: interval in seconds to sleep between checking pod
status
+ :param logging_interval: max time in seconds that task should be in
deferred state before
+ resuming to fetch latest logs. If ``None``, then the task will remain
in deferred state until pod
+ is done, and no logs will be visible until that time.
+ """
+
+ def __init__(self, *, poll_interval: int = 5, logging_interval: int | None
= None, **kwargs: Any):
+ self.poll_interval = poll_interval
+ self.logging_interval = logging_interval
+ super().__init__(**kwargs)
+
+ @staticmethod
+ def raise_for_trigger_status(event: dict[str, Any]) -> None:
+ """Raise exception if pod is not in expected state."""
+ if event["status"] == "error":
+ error_type = event["error_type"]
+ description = event["description"]
+ if error_type == "PodLaunchTimeoutException":
+ raise PodLaunchTimeoutException(description)
+ else:
+ raise AirflowException(description)
+
+ def defer(self, last_log_time: DateTime | None = None, **kwargs: Any) ->
None:
+ """Defers to ``WaitContainerTrigger`` optionally with last log time."""
+ if kwargs:
+ raise ValueError(
+ f"Received keyword arguments {list(kwargs.keys())} but "
+ f"they are not used in this implementation of `defer`."
+ )
+ super().defer(
+ trigger=WaitContainerTrigger(
+ kubernetes_conn_id=self.kubernetes_conn_id,
+ hook_params={
+ "cluster_context": self.cluster_context,
+ "config_file": self.config_file,
+ "in_cluster": self.in_cluster,
+ },
+ pod_name=self.pod.metadata.name,
+ container_name=self.BASE_CONTAINER_NAME,
+ pod_namespace=self.pod.metadata.namespace,
+ pending_phase_timeout=self.startup_timeout_seconds,
+ poll_interval=self.poll_interval,
+ logging_interval=self.logging_interval,
+ last_log_time=last_log_time,
Review Comment:
very soon, we will have triggerer logs in web UI in airflow. so it might be
better to remove the support for ping ponging i.e. logging interval / last log
time
##########
airflow/providers/cncf/kubernetes/hooks/kubernetes.py:
##########
@@ -406,3 +408,102 @@ def _get_bool(val) -> bool | None:
elif val.strip().lower() == "false":
return False
return None
+
+
+class KubernetesAsyncHook(KubernetesHook):
+ """
+ Creates Async Kubernetes API connection.
+
+ - use in cluster configuration by using extra field ``in_cluster`` in
connection
+ - use custom config by providing path to the file using extra field
``kube_config_path`` in connection
+ - use custom configuration by providing content of kubeconfig file via
+ extra field ``kube_config`` in connection
+ - use default config by providing no extras
+
+ This hook check for configuration option in the above order. Once an
option is present it will
+ use this configuration.
+
+ .. seealso::
+ For more information about Kubernetes connection:
+ :doc:`/connections/kubernetes`
+
+ :param conn_id: The :ref:`kubernetes connection
<howto/connection:kubernetes>`
+ to Kubernetes cluster.
+ :param client_configuration: Optional dictionary of client configuration
params.
+ Passed on to kubernetes client.
+ :param cluster_context: Optionally specify a context to use (e.g. if you
have multiple
+ in your kubeconfig.
+ :param config_file: Path to kubeconfig file.
+ :param in_cluster: Set to ``True`` if running from within a kubernetes
cluster.
+ :param disable_verify_ssl: Set to ``True`` if SSL verification should be
disabled.
+ :param disable_tcp_keepalive: Set to ``True`` if you want to disable
keepalive logic.
+ """
+
+ async def _load_config(self) -> client.ApiClient:
+ """
+ Load config to interact with Kubernetes
+
+ cluster_context: Optional[str] = None,
+ config_file: Optional[str] = None,
+ in_cluster: Optional[bool] = None,
+
+ """
+ if self.conn_id:
+ connection = await sync_to_async(self.get_connection)(self.conn_id)
+ extras = connection.extra_dejson
+ else:
+ extras = {}
+ in_cluster = self._coalesce_param(self.in_cluster,
extras.get("extra__kubernetes__in_cluster"))
+ cluster_context = self._coalesce_param(
+ self.cluster_context,
extras.get("extra__kubernetes__cluster_context")
Review Comment:
These prefixes `extra__kubernetes__` are deprecated. There is a helper
method `_get_field` you can use for backcompat..
##########
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:
##########
@@ -731,3 +736,137 @@ def __exit__(self, exctype, excinst, exctb):
return True
else:
return True
+
+
+class PodNotFoundException(AirflowException):
+ """Expected pod does not exist in kube-api."""
+
+
+class KubernetesPodAsyncOperator(KubernetesPodOperator):
+ """
+ Async (deferring) version of KubernetesPodOperator
+
+ .. warning::
+ By default, logs will not be available in the Airflow Webserver until
the task completes. However,
+ you can configure ``KubernetesPodAsyncOperator`` to periodically
resume and fetch logs. This behavior
+ is controlled by param ``logging_interval``.
+
+ :param poll_interval: interval in seconds to sleep between checking pod
status
+ :param logging_interval: max time in seconds that task should be in
deferred state before
+ resuming to fetch latest logs. If ``None``, then the task will remain
in deferred state until pod
+ is done, and no logs will be visible until that time.
+ """
+
+ def __init__(self, *, poll_interval: int = 5, logging_interval: int | None
= None, **kwargs: Any):
+ self.poll_interval = poll_interval
+ self.logging_interval = logging_interval
+ super().__init__(**kwargs)
+
+ @staticmethod
+ def raise_for_trigger_status(event: dict[str, Any]) -> None:
+ """Raise exception if pod is not in expected state."""
+ if event["status"] == "error":
+ error_type = event["error_type"]
+ description = event["description"]
+ if error_type == "PodLaunchTimeoutException":
+ raise PodLaunchTimeoutException(description)
+ else:
+ raise AirflowException(description)
+
+ def defer(self, last_log_time: DateTime | None = None, **kwargs: Any) ->
None:
+ """Defers to ``WaitContainerTrigger`` optionally with last log time."""
+ if kwargs:
+ raise ValueError(
+ f"Received keyword arguments {list(kwargs.keys())} but "
+ f"they are not used in this implementation of `defer`."
+ )
+ super().defer(
+ trigger=WaitContainerTrigger(
+ kubernetes_conn_id=self.kubernetes_conn_id,
+ hook_params={
+ "cluster_context": self.cluster_context,
+ "config_file": self.config_file,
+ "in_cluster": self.in_cluster,
+ },
+ pod_name=self.pod.metadata.name,
+ container_name=self.BASE_CONTAINER_NAME,
+ pod_namespace=self.pod.metadata.namespace,
+ pending_phase_timeout=self.startup_timeout_seconds,
+ poll_interval=self.poll_interval,
+ logging_interval=self.logging_interval,
+ last_log_time=last_log_time,
+ ),
+ method_name=self.trigger_reentry.__name__,
+ )
+
+ def execute(self, context: Context) -> None:
+ self.pod_request_obj = self.build_pod_request_obj(context)
+ self.pod: k8s.V1Pod = self.get_or_create_pod(self.pod_request_obj,
context)
+ self.defer()
+
+ def execute_complete(self, context: Context, event: dict[str, Any]) -> Any:
Review Comment:
since this is new operator, we don't need any deprecated method
##########
airflow/providers/cncf/kubernetes/triggers/wait_container.py:
##########
@@ -0,0 +1,163 @@
+# Licensed to the Apache Software Foundation (ASF) under one
Review Comment:
So the names are driven by what the things do. This is a trigger that waits
until container done. It is used by KPO now, but it doesn't mean that
something else couldn't use it.
then, think about when you are reading KPO code
if you see "now let's defer to wait container trigger" you know what it is
doing
if you see "now let's defer to kubernetes pod operator trigger", then you
don't know quite as well what it's doing
##########
airflow/providers/cncf/kubernetes/hooks/kubernetes.py:
##########
@@ -406,3 +408,102 @@ def _get_bool(val) -> bool | None:
elif val.strip().lower() == "false":
return False
return None
+
+
+class KubernetesAsyncHook(KubernetesHook):
+ """
+ Creates Async Kubernetes API connection.
+
+ - use in cluster configuration by using extra field ``in_cluster`` in
connection
+ - use custom config by providing path to the file using extra field
``kube_config_path`` in connection
+ - use custom configuration by providing content of kubeconfig file via
+ extra field ``kube_config`` in connection
+ - use default config by providing no extras
+
+ This hook check for configuration option in the above order. Once an
option is present it will
+ use this configuration.
+
+ .. seealso::
+ For more information about Kubernetes connection:
+ :doc:`/connections/kubernetes`
+
+ :param conn_id: The :ref:`kubernetes connection
<howto/connection:kubernetes>`
+ to Kubernetes cluster.
+ :param client_configuration: Optional dictionary of client configuration
params.
+ Passed on to kubernetes client.
+ :param cluster_context: Optionally specify a context to use (e.g. if you
have multiple
+ in your kubeconfig.
+ :param config_file: Path to kubeconfig file.
+ :param in_cluster: Set to ``True`` if running from within a kubernetes
cluster.
+ :param disable_verify_ssl: Set to ``True`` if SSL verification should be
disabled.
+ :param disable_tcp_keepalive: Set to ``True`` if you want to disable
keepalive logic.
+ """
+
+ async def _load_config(self) -> client.ApiClient:
+ """
+ Load config to interact with Kubernetes
+
+ cluster_context: Optional[str] = None,
+ config_file: Optional[str] = None,
+ in_cluster: Optional[bool] = None,
+
+ """
+ if self.conn_id:
+ connection = await sync_to_async(self.get_connection)(self.conn_id)
+ extras = connection.extra_dejson
+ else:
+ extras = {}
+ in_cluster = self._coalesce_param(self.in_cluster,
extras.get("extra__kubernetes__in_cluster"))
+ cluster_context = self._coalesce_param(
+ self.cluster_context,
extras.get("extra__kubernetes__cluster_context")
+ )
+ kubeconfig_path = self._coalesce_param(
+ self.config_file, extras.get("extra__kubernetes__kube_config_path")
+ )
+ kubeconfig = extras.get("extra__kubernetes__kube_config") or None
+ num_selected_configuration = len([o for o in [in_cluster, kubeconfig,
kubeconfig_path] if o])
+
+ if num_selected_configuration > 1:
+ raise AirflowException(
+ "Invalid connection configuration. Options kube_config_path, "
+ "kube_config, in_cluster are mutually exclusive. "
+ "You can only use one option at a time."
+ )
+ if in_cluster:
+ self.log.debug("loading kube_config from: in_cluster
configuration")
+ config.load_incluster_config()
+ return client.ApiClient()
+
+ if kubeconfig_path is not None:
+ self.log.debug("loading kube_config from: %s", kubeconfig_path)
+ await config.load_kube_config(
+ config_file=kubeconfig_path,
+ client_configuration=self.client_configuration,
+ context=cluster_context,
+ )
Review Comment:
you're referring to `async def _load_config(self)` right?
well it does do file operations which are technically blocking (even though
they might be fast)
so you might as well handle them asyncly ... just cus... why not?
but now that you mention it one thing that is a bit curious is ... how are
we able to await `await config.load_kube_config` becaus it look like that is
not an async method... i must be missing something
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]