AutomationDev85 commented on code in PR #47140:
URL: https://github.com/apache/airflow/pull/47140#discussion_r1973188482


##########
providers/edge/src/airflow/providers/edge/cli/edge_command.py:
##########
@@ -488,45 +504,99 @@ def worker(args):
 @providers_configuration_loaded
 def status(args):
     """Check for Airflow Edge Worker status."""
-    pid = read_pid_from_pidfile(_pid_file_path(args.pid))
-    # Send SIGINT
-    if pid:
-        logger.debug("Sending SIGUSR2 to worker pid %i.", pid)
-        status_min_date = time() - 1
-        status_path = Path(_status_file_path(args.pid))
-        worker_process = psutil.Process(pid)
-        worker_process.send_signal(SIG_STATUS)
-        while psutil.pid_exists(pid) and (
-            not status_path.exists() or status_path.stat().st_mtime < 
status_min_date
-        ):
-            sleep(0.1)
-        if not psutil.pid_exists(pid):
-            logger.warning("PID of worker dis-appeared while checking for 
status.")
-            sys.exit(2)
-        if not status_path.exists() or status_path.stat().st_mtime < 
status_min_date:
-            logger.warning("Could not read status of worker.")
-            sys.exit(3)
-        status = json.loads(status_path.read_text())
-        print(json.dumps(status, indent=4))
+    pid = _get_pid(args.pid)
+
+    # Send Signal as notification to drop status JSON
+    logger.debug("Sending SIGUSR2 to worker pid %i.", pid)
+    status_min_date = time() - 1
+    status_path = Path(_status_file_path(args.pid))
+    worker_process = psutil.Process(pid)
+    worker_process.send_signal(SIG_STATUS)
+    while psutil.pid_exists(pid) and (
+        not status_path.exists() or status_path.stat().st_mtime < 
status_min_date
+    ):
+        sleep(0.1)
+    if not psutil.pid_exists(pid):
+        logger.warning("PID of worker dis-appeared while checking for status.")
+        sys.exit(2)
+    if not status_path.exists() or status_path.stat().st_mtime < 
status_min_date:
+        logger.warning("Could not read status of worker.")
+        sys.exit(3)
+    status = WorkerStatus.from_json(status_path.read_text())
+    print(json.dumps(asdict(status), indent=4))
 
-    else:
-        logger.warning("Could not find PID of worker.")
-        sys.exit(1)
+
+@cli_utils.action_cli(check_db=False)
+@providers_configuration_loaded
+def maintenance(args):
+    """Set or Unset maintenance mode of worker."""
+    if args.maintenance == "on" and not args.comments:
+        logger.error("Comments are required when setting maintenance mode.")
+        sys.exit(4)
+
+    pid = _get_pid(args.pid)
+
+    # Write marker JSON file
+    from getpass import getuser
+
+    marker_path = Path(_maintenance_marker_file_path(args.pid))
+    logger.debug("Writing maintenance marker file to %s.", marker_path)
+    marker_path.write_text(
+        MaintenanceMarker(
+            maintenance=args.maintenance,
+            comments=f'[{datetime.now().strftime("%Y-%m-%d %H:%M")}] - 
{getuser()} put '
+            f'node into maintenance mode via cli\nComment: {args.comments}'
+            if args.maintenance == "on"
+            else None,
+        ).json
+    )
+
+    # Send Signal as notification to fetch maintenance marker
+    logger.debug("Sending SIGUSR2 to worker pid %i.", pid)
+    status_min_date = time() - 1
+    status_path = Path(_status_file_path(args.pid))
+    worker_process = psutil.Process(pid)
+    worker_process.send_signal(SIG_STATUS)
+    while psutil.pid_exists(pid) and (
+        not status_path.exists() or status_path.stat().st_mtime < 
status_min_date
+    ):
+        sleep(0.1)
+    if not psutil.pid_exists(pid):
+        logger.warning("PID of worker dis-appeared while checking for status.")
+        sys.exit(2)
+    if not status_path.exists() or status_path.stat().st_mtime < 
status_min_date:
+        logger.warning("Could not read status of worker.")
+        sys.exit(3)
+    status = WorkerStatus.from_json(status_path.read_text())
+
+    if args.wait:
+        if args.maintenance == "on" and status.state != 
EdgeWorkerState.MAINTENANCE_MODE:
+            logger.info("Waiting for worker to be drained...")
+            while status.state != EdgeWorkerState.MAINTENANCE_MODE:
+                sleep(4.5)
+                worker_process.send_signal(SIG_STATUS)
+                sleep(0.5)
+                status = WorkerStatus.from_json(status_path.read_text())
+        if args.maintenance == "off" and status.state == 
EdgeWorkerState.MAINTENANCE_MODE:
+            logger.info("Waiting for worker to exit maintenance...")
+            while status.state == EdgeWorkerState.MAINTENANCE_MODE:

Review Comment:
   One small improvement: also check here for the 
EdgeWorkerState.MAINTENANCE_EXIT as worker left maintenance mode completely if 
all maintenance states are left.



##########
providers/edge/src/airflow/providers/edge/cli/edge_command.py:
##########
@@ -488,45 +504,99 @@ def worker(args):
 @providers_configuration_loaded
 def status(args):
     """Check for Airflow Edge Worker status."""
-    pid = read_pid_from_pidfile(_pid_file_path(args.pid))
-    # Send SIGINT
-    if pid:
-        logger.debug("Sending SIGUSR2 to worker pid %i.", pid)
-        status_min_date = time() - 1
-        status_path = Path(_status_file_path(args.pid))
-        worker_process = psutil.Process(pid)
-        worker_process.send_signal(SIG_STATUS)
-        while psutil.pid_exists(pid) and (
-            not status_path.exists() or status_path.stat().st_mtime < 
status_min_date
-        ):
-            sleep(0.1)
-        if not psutil.pid_exists(pid):
-            logger.warning("PID of worker dis-appeared while checking for 
status.")
-            sys.exit(2)
-        if not status_path.exists() or status_path.stat().st_mtime < 
status_min_date:
-            logger.warning("Could not read status of worker.")
-            sys.exit(3)
-        status = json.loads(status_path.read_text())
-        print(json.dumps(status, indent=4))
+    pid = _get_pid(args.pid)
+
+    # Send Signal as notification to drop status JSON
+    logger.debug("Sending SIGUSR2 to worker pid %i.", pid)
+    status_min_date = time() - 1
+    status_path = Path(_status_file_path(args.pid))
+    worker_process = psutil.Process(pid)
+    worker_process.send_signal(SIG_STATUS)
+    while psutil.pid_exists(pid) and (
+        not status_path.exists() or status_path.stat().st_mtime < 
status_min_date
+    ):
+        sleep(0.1)
+    if not psutil.pid_exists(pid):
+        logger.warning("PID of worker dis-appeared while checking for status.")
+        sys.exit(2)
+    if not status_path.exists() or status_path.stat().st_mtime < 
status_min_date:
+        logger.warning("Could not read status of worker.")
+        sys.exit(3)
+    status = WorkerStatus.from_json(status_path.read_text())
+    print(json.dumps(asdict(status), indent=4))
 
-    else:
-        logger.warning("Could not find PID of worker.")
-        sys.exit(1)
+
+@cli_utils.action_cli(check_db=False)
+@providers_configuration_loaded
+def maintenance(args):
+    """Set or Unset maintenance mode of worker."""
+    if args.maintenance == "on" and not args.comments:
+        logger.error("Comments are required when setting maintenance mode.")
+        sys.exit(4)
+
+    pid = _get_pid(args.pid)
+
+    # Write marker JSON file
+    from getpass import getuser
+
+    marker_path = Path(_maintenance_marker_file_path(args.pid))
+    logger.debug("Writing maintenance marker file to %s.", marker_path)
+    marker_path.write_text(
+        MaintenanceMarker(
+            maintenance=args.maintenance,
+            comments=f'[{datetime.now().strftime("%Y-%m-%d %H:%M")}] - 
{getuser()} put '
+            f'node into maintenance mode via cli\nComment: {args.comments}'
+            if args.maintenance == "on"
+            else None,
+        ).json
+    )
+
+    # Send Signal as notification to fetch maintenance marker
+    logger.debug("Sending SIGUSR2 to worker pid %i.", pid)
+    status_min_date = time() - 1
+    status_path = Path(_status_file_path(args.pid))
+    worker_process = psutil.Process(pid)
+    worker_process.send_signal(SIG_STATUS)
+    while psutil.pid_exists(pid) and (
+        not status_path.exists() or status_path.stat().st_mtime < 
status_min_date
+    ):
+        sleep(0.1)
+    if not psutil.pid_exists(pid):
+        logger.warning("PID of worker dis-appeared while checking for status.")
+        sys.exit(2)
+    if not status_path.exists() or status_path.stat().st_mtime < 
status_min_date:
+        logger.warning("Could not read status of worker.")
+        sys.exit(3)
+    status = WorkerStatus.from_json(status_path.read_text())
+
+    if args.wait:
+        if args.maintenance == "on" and status.state != 
EdgeWorkerState.MAINTENANCE_MODE:
+            logger.info("Waiting for worker to be drained...")
+            while status.state != EdgeWorkerState.MAINTENANCE_MODE:

Review Comment:
   If someone triggers in the web UI to leave the maintenance during waiting in 
the state MAINTENANCE_REQUEST then we could end here in an endless loop. Maybe 
check if the request state was entered and left again and inform the user in 
the cli about this state change.



##########
providers/edge/src/airflow/providers/edge/worker_api/routes/worker.py:
##########
@@ -150,7 +148,15 @@ def register(
     worker: EdgeWorkerModel = session.scalar(query)
     if not worker:
         worker = EdgeWorkerModel(worker_name=worker_name, state=body.state, 
queues=body.queues)
-    worker.state = redefine_state_if_maintenance(worker.state, body.state)
+    worker.state = redefine_state(worker.state, body.state)
+    if body.maintenance_comments:

Review Comment:
   Maybe move this logic in separate function as duplicated code with 2 times 
definition of max length value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to