o-nikolas commented on code in PR #42048: URL: https://github.com/apache/airflow/pull/42048#discussion_r1762034842
########## airflow/providers/edge/executors/edge_executor.py: ########## Review Comment: What PR (maybe already merged?) contains the code where tasks are actually sent over API to the workers? ########## airflow/providers/edge/executors/edge_executor.py: ########## @@ -0,0 +1,175 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from datetime import datetime, timedelta +from typing import TYPE_CHECKING, Any + +from sqlalchemy import delete + +from airflow.cli.cli_config import GroupCommand +from airflow.configuration import conf +from airflow.executors.base_executor import BaseExecutor +from airflow.models.abstractoperator import DEFAULT_QUEUE +from airflow.models.taskinstance import TaskInstanceState +from airflow.providers.edge.models.edge_job import EdgeJobModel +from airflow.providers.edge.models.edge_logs import EdgeLogsModel +from airflow.providers.edge.models.edge_worker import EdgeWorkerModel +from airflow.utils.db import DBLocks, create_global_lock +from airflow.utils.session import NEW_SESSION, provide_session + +if TYPE_CHECKING: + import argparse + + from sqlalchemy.orm import Session + + from airflow.executors.base_executor import CommandType + from airflow.models.taskinstance import TaskInstance + from airflow.models.taskinstancekey import TaskInstanceKey + +PARALLELISM: int = conf.getint("core", "PARALLELISM") + + +class EdgeExecutor(BaseExecutor): + """Implementation of the EdgeExecutor to distribute work to Edge Workers via HTTP.""" + + def __init__(self, parallelism: int = PARALLELISM): + super().__init__(parallelism=parallelism) + self.last_reported_state: dict[TaskInstanceKey, TaskInstanceState] = {} + + @provide_session + def start(self, session: Session = NEW_SESSION): + """If EdgeExecutor provider is loaded first time, ensure table exists.""" + with create_global_lock(session=session, lock=DBLocks.MIGRATIONS): + engine = session.get_bind().engine + EdgeJobModel.metadata.create_all(engine) + EdgeLogsModel.metadata.create_all(engine) + EdgeWorkerModel.metadata.create_all(engine) + + @provide_session + def execute_async( + self, + key: TaskInstanceKey, + command: CommandType, + queue: str | None = None, + executor_config: Any | None = None, + session: Session = NEW_SESSION, + ) -> None: + """Execute asynchronously.""" + self.validate_airflow_tasks_run_command(command) + session.add( + EdgeJobModel( + dag_id=key.dag_id, + task_id=key.task_id, + run_id=key.run_id, + map_index=key.map_index, + try_number=key.try_number, + state=TaskInstanceState.QUEUED, + queue=queue or DEFAULT_QUEUE, + command=str(command), + ) + ) + + @provide_session + def sync(self, session: Session = NEW_SESSION) -> None: + """Sync will get called periodically by the heartbeat method.""" + jobs: list[EdgeJobModel] = session.query(EdgeJobModel).all() + for job in jobs: + if job.key in self.running: + if job.state == TaskInstanceState.RUNNING: + if ( + job.key not in self.last_reported_state + or self.last_reported_state[job.key] != job.state + ): + self.running_state(job.key) + self.last_reported_state[job.key] = job.state + elif job.state == TaskInstanceState.SUCCESS: + if job.key in self.last_reported_state: + del self.last_reported_state[job.key] + self.success(job.key) + elif job.state == TaskInstanceState.FAILED: + if job.key in self.last_reported_state: + del self.last_reported_state[job.key] + self.fail(job.key) + else: + self.last_reported_state[job.key] = job.state + job_success_purge = conf.getint("edge", "job_success_purge") + job_fail_purge = conf.getint("edge", "job_fail_purge") + if ( + job.state == TaskInstanceState.SUCCESS + and job.last_update_t < (datetime.now() - timedelta(minutes=job_success_purge)).timestamp() + ) or ( + job.state == TaskInstanceState.FAILED + and job.last_update_t < (datetime.now() - timedelta(minutes=job_fail_purge)).timestamp() + ): + if job.key in self.last_reported_state: + del self.last_reported_state[job.key] + session.delete(job) + session.execute( + delete(EdgeLogsModel).where( + EdgeLogsModel.dag_id == job.dag_id, + EdgeLogsModel.run_id == job.run_id, + EdgeLogsModel.task_id == job.task_id, + EdgeLogsModel.map_index == job.map_index, + EdgeLogsModel.try_number == job.try_number, + ) + ) + session.commit() + + def end(self) -> None: + """End the executor.""" + self.log.info("Shutting down EdgeExecutor") Review Comment: Shouldn't this do something to block and flush running tasks? ########## docs/apache-airflow-providers-edge/edge_executor.rst: ########## @@ -0,0 +1,248 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Edge Executor +============= + +.. note:: + + The Edge Provider Package is an experimental preview. Features and stability is limited + and needs to be improved over time. Target is to have full support in Airflow 3. + Once Airflow 3 support contains Edge Provider, maintenance of the Airflow 2 package will + be dis-continued. + + +.. note:: + + As of Airflow 2.10.0, you can install the ``edge`` provider package to use this executor. + This can be done by installing ``apache-airflow-providers-edge`` or by installing Airflow + with the ``edge`` extra: ``pip install 'apache-airflow[edge]'``. + + +``EdgeExecutor`` is an option if you want to distribute tasks to workers distributed in different locations. +You can use it also in parallel with other executors if needed. Change your ``airflow.cfg`` to point +the executor parameter to ``EdgeExecutor`` and provide the related settings. + +The configuration parameters of the Edge Executor can be found in the Edge provider's :doc:`configurations-ref`. + +Here are a few imperative requirements for your workers: + +- ``airflow`` needs to be installed, and the CLI needs to be in the path +- Airflow configuration settings should be homogeneous across the cluster +- Operators that are executed on the Edge Worker need to have their dependencies + met in that context. Please take a look to the respective provider package + documentations +- The worker needs to have access to its ``DAGS_FOLDER``, and you need to + synchronize the filesystems by your own means. A common setup would be to + store your ``DAGS_FOLDER`` in a Git repository and sync it across machines using + Chef, Puppet, Ansible, or whatever you use to configure machines in your + environment. If all your boxes have a common mount point, having your + pipelines files shared there should work as well + + +Minimum configuration for the Edge Worker to make it running is: + +- Section ``[core]`` + + - ``executor``: Executor must be set or added to be ``airflow.providers.edge.executors.EdgeExecutor`` + - ``internal_api_secret_key``: An encryption key must be set on webserver and Edge Worker component as + shared secret to authenticate traffic. It should be a random string like the fernet key + (but preferably not the same). + +- Section ``[edge]`` + + - ``api_enabled``: Must be set to true. It is disabled by intend not to expose + the endpoint by default. This is the endpoint the worker connects to. + In a future release a dedicated API server can be started. + - ``api_url``: Must be set to the URL which exposes the web endpoint + +To kick off a worker, you need to setup Airflow and kick off the worker +subcommand + +.. code-block:: bash + + airflow edge worker + +Your worker should start picking up tasks as soon as they get fired in +its direction. To stop a worker running on a machine you can use: + +.. code-block:: bash + + airflow edge stop + +It will try to stop the worker gracefully by sending ``SIGINT`` signal to main +process as and wait until all running tasks are completed. + +If you want to monitor the remote activity and worker, use the UI plugin which +is included in the provider package as install on the webserver and use the +"Admin" - "Edge Worker Hosts" and "Edge Worker Jobs" pages. + + +Some caveats: Review Comment: Should this get some kind of heading or bolding? ########## docs/apache-airflow-providers-edge/edge_executor.rst: ########## @@ -0,0 +1,248 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Edge Executor +============= + +.. note:: + + The Edge Provider Package is an experimental preview. Features and stability is limited + and needs to be improved over time. Target is to have full support in Airflow 3. + Once Airflow 3 support contains Edge Provider, maintenance of the Airflow 2 package will + be dis-continued. + + +.. note:: + + As of Airflow 2.10.0, you can install the ``edge`` provider package to use this executor. + This can be done by installing ``apache-airflow-providers-edge`` or by installing Airflow + with the ``edge`` extra: ``pip install 'apache-airflow[edge]'``. + + +``EdgeExecutor`` is an option if you want to distribute tasks to workers distributed in different locations. +You can use it also in parallel with other executors if needed. Change your ``airflow.cfg`` to point +the executor parameter to ``EdgeExecutor`` and provide the related settings. + +The configuration parameters of the Edge Executor can be found in the Edge provider's :doc:`configurations-ref`. + +Here are a few imperative requirements for your workers: + +- ``airflow`` needs to be installed, and the CLI needs to be in the path +- Airflow configuration settings should be homogeneous across the cluster +- Operators that are executed on the Edge Worker need to have their dependencies + met in that context. Please take a look to the respective provider package + documentations +- The worker needs to have access to its ``DAGS_FOLDER``, and you need to + synchronize the filesystems by your own means. A common setup would be to + store your ``DAGS_FOLDER`` in a Git repository and sync it across machines using + Chef, Puppet, Ansible, or whatever you use to configure machines in your + environment. If all your boxes have a common mount point, having your + pipelines files shared there should work as well + + +Minimum configuration for the Edge Worker to make it running is: + +- Section ``[core]`` + + - ``executor``: Executor must be set or added to be ``airflow.providers.edge.executors.EdgeExecutor`` + - ``internal_api_secret_key``: An encryption key must be set on webserver and Edge Worker component as + shared secret to authenticate traffic. It should be a random string like the fernet key + (but preferably not the same). + +- Section ``[edge]`` + + - ``api_enabled``: Must be set to true. It is disabled by intend not to expose Review Comment: ```suggestion - ``api_enabled``: Must be set to true. It is disabled intentionally to not expose ``` ########## airflow/providers/edge/executors/edge_executor.py: ########## @@ -0,0 +1,175 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from datetime import datetime, timedelta +from typing import TYPE_CHECKING, Any + +from sqlalchemy import delete + +from airflow.cli.cli_config import GroupCommand +from airflow.configuration import conf +from airflow.executors.base_executor import BaseExecutor +from airflow.models.abstractoperator import DEFAULT_QUEUE +from airflow.models.taskinstance import TaskInstanceState +from airflow.providers.edge.models.edge_job import EdgeJobModel +from airflow.providers.edge.models.edge_logs import EdgeLogsModel +from airflow.providers.edge.models.edge_worker import EdgeWorkerModel +from airflow.utils.db import DBLocks, create_global_lock +from airflow.utils.session import NEW_SESSION, provide_session + +if TYPE_CHECKING: + import argparse + + from sqlalchemy.orm import Session + + from airflow.executors.base_executor import CommandType + from airflow.models.taskinstance import TaskInstance + from airflow.models.taskinstancekey import TaskInstanceKey + +PARALLELISM: int = conf.getint("core", "PARALLELISM") + + +class EdgeExecutor(BaseExecutor): + """Implementation of the EdgeExecutor to distribute work to Edge Workers via HTTP.""" + + def __init__(self, parallelism: int = PARALLELISM): + super().__init__(parallelism=parallelism) + self.last_reported_state: dict[TaskInstanceKey, TaskInstanceState] = {} + + @provide_session + def start(self, session: Session = NEW_SESSION): + """If EdgeExecutor provider is loaded first time, ensure table exists.""" + with create_global_lock(session=session, lock=DBLocks.MIGRATIONS): + engine = session.get_bind().engine + EdgeJobModel.metadata.create_all(engine) + EdgeLogsModel.metadata.create_all(engine) + EdgeWorkerModel.metadata.create_all(engine) + + @provide_session + def execute_async( + self, + key: TaskInstanceKey, + command: CommandType, + queue: str | None = None, + executor_config: Any | None = None, + session: Session = NEW_SESSION, + ) -> None: + """Execute asynchronously.""" + self.validate_airflow_tasks_run_command(command) + session.add( Review Comment: Most executors use an in-memory queue of some kind to ingest tasks and track them for execution. Using the DB like this is certainly more durable (though you only call commit in the sync method) but we're going to get a lot of write load to the DB using the database like this (if you have an airflow environment with very high Task TPS). Have you done any benchmarking to see the impact on the DB? ########## docs/apache-airflow-providers-edge/edge_executor.rst: ########## @@ -0,0 +1,248 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Edge Executor +============= + +.. note:: + + The Edge Provider Package is an experimental preview. Features and stability is limited + and needs to be improved over time. Target is to have full support in Airflow 3. + Once Airflow 3 support contains Edge Provider, maintenance of the Airflow 2 package will + be dis-continued. + + +.. note:: + + As of Airflow 2.10.0, you can install the ``edge`` provider package to use this executor. + This can be done by installing ``apache-airflow-providers-edge`` or by installing Airflow + with the ``edge`` extra: ``pip install 'apache-airflow[edge]'``. + + +``EdgeExecutor`` is an option if you want to distribute tasks to workers distributed in different locations. +You can use it also in parallel with other executors if needed. Change your ``airflow.cfg`` to point +the executor parameter to ``EdgeExecutor`` and provide the related settings. + +The configuration parameters of the Edge Executor can be found in the Edge provider's :doc:`configurations-ref`. + +Here are a few imperative requirements for your workers: + +- ``airflow`` needs to be installed, and the CLI needs to be in the path +- Airflow configuration settings should be homogeneous across the cluster +- Operators that are executed on the Edge Worker need to have their dependencies + met in that context. Please take a look to the respective provider package + documentations +- The worker needs to have access to its ``DAGS_FOLDER``, and you need to + synchronize the filesystems by your own means. A common setup would be to + store your ``DAGS_FOLDER`` in a Git repository and sync it across machines using + Chef, Puppet, Ansible, or whatever you use to configure machines in your + environment. If all your boxes have a common mount point, having your + pipelines files shared there should work as well + + +Minimum configuration for the Edge Worker to make it running is: + +- Section ``[core]`` + + - ``executor``: Executor must be set or added to be ``airflow.providers.edge.executors.EdgeExecutor`` Review Comment: I believe I saw in another PR you added this executor to the holy list of "core" executors. So folks should be able to use the short name instead of the module path right? ########## docs/apache-airflow-providers-edge/edge_executor.rst: ########## @@ -0,0 +1,248 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Edge Executor +============= + +.. note:: + + The Edge Provider Package is an experimental preview. Features and stability is limited + and needs to be improved over time. Target is to have full support in Airflow 3. + Once Airflow 3 support contains Edge Provider, maintenance of the Airflow 2 package will + be dis-continued. + + +.. note:: + + As of Airflow 2.10.0, you can install the ``edge`` provider package to use this executor. + This can be done by installing ``apache-airflow-providers-edge`` or by installing Airflow + with the ``edge`` extra: ``pip install 'apache-airflow[edge]'``. + + +``EdgeExecutor`` is an option if you want to distribute tasks to workers distributed in different locations. +You can use it also in parallel with other executors if needed. Change your ``airflow.cfg`` to point +the executor parameter to ``EdgeExecutor`` and provide the related settings. + +The configuration parameters of the Edge Executor can be found in the Edge provider's :doc:`configurations-ref`. + +Here are a few imperative requirements for your workers: + +- ``airflow`` needs to be installed, and the CLI needs to be in the path +- Airflow configuration settings should be homogeneous across the cluster +- Operators that are executed on the Edge Worker need to have their dependencies + met in that context. Please take a look to the respective provider package + documentations +- The worker needs to have access to its ``DAGS_FOLDER``, and you need to + synchronize the filesystems by your own means. A common setup would be to + store your ``DAGS_FOLDER`` in a Git repository and sync it across machines using + Chef, Puppet, Ansible, or whatever you use to configure machines in your + environment. If all your boxes have a common mount point, having your + pipelines files shared there should work as well + + +Minimum configuration for the Edge Worker to make it running is: + +- Section ``[core]`` + + - ``executor``: Executor must be set or added to be ``airflow.providers.edge.executors.EdgeExecutor`` + - ``internal_api_secret_key``: An encryption key must be set on webserver and Edge Worker component as + shared secret to authenticate traffic. It should be a random string like the fernet key + (but preferably not the same). + +- Section ``[edge]`` + + - ``api_enabled``: Must be set to true. It is disabled by intend not to expose + the endpoint by default. This is the endpoint the worker connects to. + In a future release a dedicated API server can be started. + - ``api_url``: Must be set to the URL which exposes the web endpoint + +To kick off a worker, you need to setup Airflow and kick off the worker +subcommand + +.. code-block:: bash + + airflow edge worker + +Your worker should start picking up tasks as soon as they get fired in +its direction. To stop a worker running on a machine you can use: + +.. code-block:: bash + + airflow edge stop + +It will try to stop the worker gracefully by sending ``SIGINT`` signal to main +process as and wait until all running tasks are completed. + +If you want to monitor the remote activity and worker, use the UI plugin which +is included in the provider package as install on the webserver and use the Review Comment: Is this what was meant here?: ```suggestion is included in the provider package and install it on the webserver and use the ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
