akshaychitneni commented on code in PR #34840:
URL: https://github.com/apache/airflow/pull/34840#discussion_r1385347383
##########
airflow/providers/papermill/hooks/kernel.py:
##########
@@ -0,0 +1,48 @@
+from airflow.hooks.base import BaseHook
+
+JUPYTER_KERNEL_SHELL_PORT = 60316
+JUPYTER_KERNEL_IOPUB_PORT = 60317
+JUPYTER_KERNEL_STDIN_PORT = 60318
+JUPYTER_KERNEL_CONTROL_PORT = 60319
+JUPYTER_KERNEL_HB_PORT = 60320
+
+
+def register_remote_engine():
+ """
+ Registers ``RemoteKernelEngine`` papermill engine
+ """
+ from papermill.engines import papermill_engines
+ from airflow.providers.papermill.operators.papermill import
RemoteKernelEngine, REMOTE_KERNEL_ENGINE
+
+ papermill_engines.register(REMOTE_KERNEL_ENGINE, RemoteKernelEngine)
+
+
+class KernelHook(BaseHook):
+ """
+ The KernelHook can be used to interact with remote jupyter kernel.
+
+ Takes kernel host/ip from connection and refers to jupyter kernel ports
and session_key
+ from ``extra`` field.
+
+ :param kernel_conn_id: connection that has kernel host/ip
+ """
+
+ conn_name_attr = "kernel_conn_id"
+ default_conn_name = "jupyter_kernel_default"
+ conn_type = "jupyter_kernel"
+ hook_name = "Jupyter Kernel"
+
+ def __init__(
+ self, kernel_conn_id: str = default_conn_name, *args, **kwargs
+ ) -> None:
+ self.kernel_conn = self.get_connection(kernel_conn_id)
+ self.ip = self.kernel_conn.host
+ self.shell_port = self.kernel_conn.extra_dejson.get("shell_port",
JUPYTER_KERNEL_SHELL_PORT)
+ self.iopub_port = self.kernel_conn.extra_dejson.get("iopub_port",
JUPYTER_KERNEL_IOPUB_PORT)
+ self.stdin_port = self.kernel_conn.extra_dejson.get("stdin_port",
JUPYTER_KERNEL_STDIN_PORT)
+ self.control_port = self.kernel_conn.extra_dejson.get("control_port",
JUPYTER_KERNEL_CONTROL_PORT)
+ self.hb_port = self.kernel_conn.extra_dejson.get("hb_port",
JUPYTER_KERNEL_HB_PORT)
+ self.session_key = self.kernel_conn.extra_dejson.get("session_key", '')
+
+ register_remote_engine()
Review Comment:
Updated.
##########
airflow/providers/papermill/operators/papermill.py:
##########
@@ -17,17 +17,102 @@
# under the License.
from __future__ import annotations
+from functools import cached_property
from typing import TYPE_CHECKING, ClassVar, Collection, Sequence
import attr
+from papermill.utils import remove_args, merge_kwargs
+from pydantic import typing
+
import papermill as pm
+from papermill.engines import NBClientEngine
+from papermill.clientwrap import PapermillNotebookClient
+from jupyter_client.manager import AsyncKernelManager
+from jupyter_client.client import KernelClient
+from traitlets import Unicode
from airflow.lineage.entities import File
from airflow.models import BaseOperator
+from airflow.providers.papermill.hooks.kernel import KernelHook
if TYPE_CHECKING:
from airflow.utils.context import Context
+REMOTE_KERNEL_ENGINE = "remote_kernel_engine"
+
+
+class RemoteKernelManager(AsyncKernelManager):
+ """
+ Jupyter kernel manager that connects to a remote kernel.
+ """
+ session_key = Unicode('', config=True, help="Session key to connect to
remote kernel")
+
+ @property
+ def has_kernel(self) -> bool:
+ return True
+
+ async def _async_is_alive(self) -> bool:
+ return True
+
+ def shutdown_kernel(self, now: bool = False, restart: bool = False) ->
None:
+ pass
+
+ def client(self, **kwargs: typing.Any) -> KernelClient:
+ """Create a client configured to connect to our kernel"""
+ kernel_client = super().client(**kwargs)
+ # load connection info to set session_key
+ config = dict(
+ ip=self.ip,
+ shell_port=self.shell_port,
+ iopub_port=self.iopub_port,
+ stdin_port=self.stdin_port,
+ control_port=self.control_port,
+ hb_port=self.hb_port,
+ key=self.session_key,
+ transport="tcp",
+ signature_scheme="hmac-sha256",
+ )
+ kernel_client.load_connection_info(config)
+ return kernel_client
+
+
+class RemoteKernelEngine(NBClientEngine):
+ """
+ Papermill engine to use ``RemoteKernelManager`` to connect to remote
kernel and execute notebook
+ """
+ @classmethod
+ def execute_managed_notebook(cls, nb_man,
Review Comment:
Updated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]