jscheffl commented on code in PR #43153:
URL: https://github.com/apache/airflow/pull/43153#discussion_r1808896025


##########
providers/src/airflow/providers/edge/cli/edge_command.py:
##########
@@ -94,6 +94,33 @@ def _hostname() -> str:
 def _pid_file_path(pid_file: str | None) -> str:
     return cli_utils.setup_locations(process=EDGE_WORKER_PROCESS_NAME, 
pid=pid_file)[0]
 
+def _write_pid_to_pidfile(pid_file_path):
+    """
+    Write PIDs for Edge Workers to disk,
+    ensuring that orphanded PID files from crashed instance are handled.
+    """
+    if pid_file_path.exists():
+        # Handle existing PID files on disk
+        logger.info("An existing PID file has been found: %s.", pid_file_path)
+        pid_stored_in_pid_file = read_pid_from_pidfile(pid_file_path)
+        if os.getpid() == pid_stored_in_pid_file:

Review Comment:
   I'd favor to leverage the logic implemented in the lockfile package named 
`i_am_locking()`
   See `<dist-packages>/lockfile/pidlockfile.py`



##########
providers/src/airflow/providers/edge/cli/edge_command.py:
##########
@@ -94,6 +94,33 @@ def _hostname() -> str:
 def _pid_file_path(pid_file: str | None) -> str:
     return cli_utils.setup_locations(process=EDGE_WORKER_PROCESS_NAME, 
pid=pid_file)[0]
 
+def _write_pid_to_pidfile(pid_file_path):
+    """
+    Write PIDs for Edge Workers to disk,
+    ensuring that orphanded PID files from crashed instance are handled.
+    """
+    if pid_file_path.exists():
+        # Handle existing PID files on disk
+        logger.info("An existing PID file has been found: %s.", pid_file_path)
+        pid_stored_in_pid_file = read_pid_from_pidfile(pid_file_path)
+        if os.getpid() == pid_stored_in_pid_file:
+            raise SystemExit("A PID file has already been written")
+        else:
+            # PID file was written by dead or already running instance
+            logger.info("PID file does not belong to this process.")
+            if psutil.pid_exists(pid_stored_in_pid_file):
+                # case 1: another instance uses the same path for its PID file
+                raise SystemExit(
+                    f"The PID file {pid_file_path} contains the PID of another 
running process. "
+                    "Configuration issue: edge worker instance must use 
different PID file paths!"
+                )
+            else:
+                # case 2: previous instance crashed without cleaning up its 
PID file
+                logger.info("PID file is orphaned. Cleaning up.")
+                pid_file_path.unlink()
+    logger.info("PID file written to %s.", pid_file_path)

Review Comment:
   Please log only if this is beneficial for the user
   ```suggestion
       logger.debug("PID file written to %s.", pid_file_path)
   ```



##########
providers/src/airflow/providers/edge/cli/edge_command.py:
##########
@@ -94,6 +94,33 @@ def _hostname() -> str:
 def _pid_file_path(pid_file: str | None) -> str:
     return cli_utils.setup_locations(process=EDGE_WORKER_PROCESS_NAME, 
pid=pid_file)[0]
 
+def _write_pid_to_pidfile(pid_file_path):
+    """
+    Write PIDs for Edge Workers to disk,
+    ensuring that orphanded PID files from crashed instance are handled.
+    """
+    if pid_file_path.exists():
+        # Handle existing PID files on disk
+        logger.info("An existing PID file has been found: %s.", pid_file_path)
+        pid_stored_in_pid_file = read_pid_from_pidfile(pid_file_path)
+        if os.getpid() == pid_stored_in_pid_file:
+            raise SystemExit("A PID file has already been written")
+        else:
+            # PID file was written by dead or already running instance
+            logger.info("PID file does not belong to this process.")

Review Comment:
   If you raise an exception that is thrown on the console I assume the 
additional info is no needed.
   ```suggestion
   ```



##########
providers/src/airflow/providers/edge/cli/edge_command.py:
##########
@@ -94,6 +94,33 @@ def _hostname() -> str:
 def _pid_file_path(pid_file: str | None) -> str:
     return cli_utils.setup_locations(process=EDGE_WORKER_PROCESS_NAME, 
pid=pid_file)[0]
 
+def _write_pid_to_pidfile(pid_file_path):
+    """
+    Write PIDs for Edge Workers to disk,
+    ensuring that orphanded PID files from crashed instance are handled.
+    """
+    if pid_file_path.exists():
+        # Handle existing PID files on disk
+        logger.info("An existing PID file has been found: %s.", pid_file_path)
+        pid_stored_in_pid_file = read_pid_from_pidfile(pid_file_path)
+        if os.getpid() == pid_stored_in_pid_file:
+            raise SystemExit("A PID file has already been written")
+        else:
+            # PID file was written by dead or already running instance
+            logger.info("PID file does not belong to this process.")
+            if psutil.pid_exists(pid_stored_in_pid_file):

Review Comment:
   +1 here, can you change to `is_locked()` ?



##########
providers/src/airflow/providers/edge/cli/edge_command.py:
##########
@@ -94,6 +94,33 @@ def _hostname() -> str:
 def _pid_file_path(pid_file: str | None) -> str:
     return cli_utils.setup_locations(process=EDGE_WORKER_PROCESS_NAME, 
pid=pid_file)[0]
 
+def _write_pid_to_pidfile(pid_file_path):
+    """
+    Write PIDs for Edge Workers to disk,
+    ensuring that orphanded PID files from crashed instance are handled.
+    """
+    if pid_file_path.exists():
+        # Handle existing PID files on disk
+        logger.info("An existing PID file has been found: %s.", pid_file_path)
+        pid_stored_in_pid_file = read_pid_from_pidfile(pid_file_path)
+        if os.getpid() == pid_stored_in_pid_file:
+            raise SystemExit("A PID file has already been written")
+        else:
+            # PID file was written by dead or already running instance
+            logger.info("PID file does not belong to this process.")
+            if psutil.pid_exists(pid_stored_in_pid_file):
+                # case 1: another instance uses the same path for its PID file
+                raise SystemExit(
+                    f"The PID file {pid_file_path} contains the PID of another 
running process. "
+                    "Configuration issue: edge worker instance must use 
different PID file paths!"
+                )
+            else:
+                # case 2: previous instance crashed without cleaning up its 
PID file
+                logger.info("PID file is orphaned. Cleaning up.")
+                pid_file_path.unlink()

Review Comment:
   Could this be replaced by `break_lock()` from the library?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to