This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d23fe09ab1d [Edge]Add child processes to separate process group than 
main (#43927)
d23fe09ab1d is described below

commit d23fe09ab1d870ec6024c537b0c53588df6df80a
Author: majorosdonat <[email protected]>
AuthorDate: Tue Nov 12 15:46:49 2024 +0100

    [Edge]Add child processes to separate process group than main (#43927)
    
    * Block digint for child processes
    
    * try start_new_session
    
    * Add shutdown handling for SIGTERM
    
    * Update version
    
    * Update providers/src/airflow/providers/edge/CHANGELOG.rst
    
    * Delete line from docs
    
    ---------
    
    Co-authored-by: Majoros Donat (XC-DX/EET2-Bp) <[email protected]>
    Co-authored-by: Jens Scheffler <[email protected]>
---
 docs/apache-airflow-providers-edge/edge_executor.rst     |  1 -
 providers/src/airflow/providers/edge/CHANGELOG.rst       | 10 ++++++++++
 providers/src/airflow/providers/edge/__init__.py         |  2 +-
 providers/src/airflow/providers/edge/cli/edge_command.py |  9 ++++++++-
 providers/src/airflow/providers/edge/provider.yaml       |  2 +-
 5 files changed, 20 insertions(+), 4 deletions(-)

diff --git a/docs/apache-airflow-providers-edge/edge_executor.rst 
b/docs/apache-airflow-providers-edge/edge_executor.rst
index d27cb5bc698..a4e49d650d3 100644
--- a/docs/apache-airflow-providers-edge/edge_executor.rst
+++ b/docs/apache-airflow-providers-edge/edge_executor.rst
@@ -232,7 +232,6 @@ The following features are known missing and will be 
implemented in increments:
 - Edge Worker CLI
 
   - Use WebSockets instead of HTTP calls for communication
-  - Handle SIG-INT/CTRL+C and gracefully terminate and complete job (``airflow 
edge stop`` is working though)
   - Send logs also to TaskFileHandler if external logging services are used
   - Integration into telemetry to send metrics from remote site
   - Allow ``airflow edge stop`` to wait until completed to terminated
diff --git a/providers/src/airflow/providers/edge/CHANGELOG.rst 
b/providers/src/airflow/providers/edge/CHANGELOG.rst
index ae77d32b7ee..6f2fda5db9a 100644
--- a/providers/src/airflow/providers/edge/CHANGELOG.rst
+++ b/providers/src/airflow/providers/edge/CHANGELOG.rst
@@ -27,6 +27,16 @@
 Changelog
 ---------
 
+0.5.4pre0
+.........
+
+Misc
+~~~~
+
+* ``Fix SIGINT handling of child processes. Ensure graceful shutdown when 
SIGINT in received (not killing working tasks).``
+* ``Fix SIGTERM handling of child processes. Ensure all childs are terminated 
on SIGTERM.``
+
+
 0.5.3pre0
 .........
 
diff --git a/providers/src/airflow/providers/edge/__init__.py 
b/providers/src/airflow/providers/edge/__init__.py
index 2998b9d6a90..996e57a229c 100644
--- a/providers/src/airflow/providers/edge/__init__.py
+++ b/providers/src/airflow/providers/edge/__init__.py
@@ -29,7 +29,7 @@ from airflow import __version__ as airflow_version
 
 __all__ = ["__version__"]
 
-__version__ = "0.5.3pre0"
+__version__ = "0.5.4pre0"
 
 if 
packaging.version.parse(packaging.version.parse(airflow_version).base_version) 
< packaging.version.parse(
     "2.10.0"
diff --git a/providers/src/airflow/providers/edge/cli/edge_command.py 
b/providers/src/airflow/providers/edge/cli/edge_command.py
index 69f12acc3fa..be9a739dc29 100644
--- a/providers/src/airflow/providers/edge/cli/edge_command.py
+++ b/providers/src/airflow/providers/edge/cli/edge_command.py
@@ -159,6 +159,12 @@ class _EdgeWorkerCli:
         logger.info("Request to show down Edge Worker received, waiting for 
jobs to complete.")
         _EdgeWorkerCli.drain = True
 
+    def shutdown_handler(self, sig, frame):
+        logger.info("SIGTERM received. Terminating all jobs and quit")
+        for job in self.jobs:
+            os.killpg(job.process.pid, signal.SIGTERM)
+        _EdgeWorkerCli.drain = True
+
     def _get_sysinfo(self) -> dict:
         """Produce the sysinfo from worker to post to central site."""
         return {
@@ -182,6 +188,7 @@ class _EdgeWorkerCli:
             raise SystemExit(str(e))
         _write_pid_to_pidfile(self.pid_file_path)
         signal.signal(signal.SIGINT, _EdgeWorkerCli.signal_handler)
+        signal.signal(signal.SIGTERM, self.shutdown_handler)
         try:
             while not _EdgeWorkerCli.drain or self.jobs:
                 self.loop()
@@ -218,7 +225,7 @@ class _EdgeWorkerCli:
             env["AIRFLOW__CORE__DATABASE_ACCESS_ISOLATION"] = "True"
             env["AIRFLOW__CORE__INTERNAL_API_URL"] = conf.get("edge", 
"api_url")
             env["_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK"] = "1"
-            process = Popen(edge_job.command, close_fds=True, env=env)
+            process = Popen(edge_job.command, close_fds=True, env=env, 
start_new_session=True)
             logfile = EdgeLogs.logfile_path(edge_job.key)
             self.jobs.append(_Job(edge_job, process, logfile, 0))
             EdgeJob.set_state(edge_job.key, TaskInstanceState.RUNNING)
diff --git a/providers/src/airflow/providers/edge/provider.yaml 
b/providers/src/airflow/providers/edge/provider.yaml
index 34e787c528d..fd0e55fd6f6 100644
--- a/providers/src/airflow/providers/edge/provider.yaml
+++ b/providers/src/airflow/providers/edge/provider.yaml
@@ -27,7 +27,7 @@ source-date-epoch: 1729683247
 
 # note that those versions are maintained by release manager - do not update 
them manually
 versions:
-  - 0.5.3pre0
+  - 0.5.4pre0
 
 dependencies:
   - apache-airflow>=2.10.0

Reply via email to