AutomationDev85 commented on code in PR #47140:
URL: https://github.com/apache/airflow/pull/47140#discussion_r1973188482
##########
providers/edge/src/airflow/providers/edge/cli/edge_command.py:
##########
@@ -488,45 +504,99 @@ def worker(args):
@providers_configuration_loaded
def status(args):
"""Check for Airflow Edge Worker status."""
- pid = read_pid_from_pidfile(_pid_file_path(args.pid))
- # Send SIGINT
- if pid:
- logger.debug("Sending SIGUSR2 to worker pid %i.", pid)
- status_min_date = time() - 1
- status_path = Path(_status_file_path(args.pid))
- worker_process = psutil.Process(pid)
- worker_process.send_signal(SIG_STATUS)
- while psutil.pid_exists(pid) and (
- not status_path.exists() or status_path.stat().st_mtime <
status_min_date
- ):
- sleep(0.1)
- if not psutil.pid_exists(pid):
- logger.warning("PID of worker dis-appeared while checking for
status.")
- sys.exit(2)
- if not status_path.exists() or status_path.stat().st_mtime <
status_min_date:
- logger.warning("Could not read status of worker.")
- sys.exit(3)
- status = json.loads(status_path.read_text())
- print(json.dumps(status, indent=4))
+ pid = _get_pid(args.pid)
+
+ # Send Signal as notification to drop status JSON
+ logger.debug("Sending SIGUSR2 to worker pid %i.", pid)
+ status_min_date = time() - 1
+ status_path = Path(_status_file_path(args.pid))
+ worker_process = psutil.Process(pid)
+ worker_process.send_signal(SIG_STATUS)
+ while psutil.pid_exists(pid) and (
+ not status_path.exists() or status_path.stat().st_mtime <
status_min_date
+ ):
+ sleep(0.1)
+ if not psutil.pid_exists(pid):
+ logger.warning("PID of worker dis-appeared while checking for status.")
+ sys.exit(2)
+ if not status_path.exists() or status_path.stat().st_mtime <
status_min_date:
+ logger.warning("Could not read status of worker.")
+ sys.exit(3)
+ status = WorkerStatus.from_json(status_path.read_text())
+ print(json.dumps(asdict(status), indent=4))
- else:
- logger.warning("Could not find PID of worker.")
- sys.exit(1)
+
+@cli_utils.action_cli(check_db=False)
+@providers_configuration_loaded
+def maintenance(args):
+ """Set or Unset maintenance mode of worker."""
+ if args.maintenance == "on" and not args.comments:
+ logger.error("Comments are required when setting maintenance mode.")
+ sys.exit(4)
+
+ pid = _get_pid(args.pid)
+
+ # Write marker JSON file
+ from getpass import getuser
+
+ marker_path = Path(_maintenance_marker_file_path(args.pid))
+ logger.debug("Writing maintenance marker file to %s.", marker_path)
+ marker_path.write_text(
+ MaintenanceMarker(
+ maintenance=args.maintenance,
+ comments=f'[{datetime.now().strftime("%Y-%m-%d %H:%M")}] -
{getuser()} put '
+ f'node into maintenance mode via cli\nComment: {args.comments}'
+ if args.maintenance == "on"
+ else None,
+ ).json
+ )
+
+ # Send Signal as notification to fetch maintenance marker
+ logger.debug("Sending SIGUSR2 to worker pid %i.", pid)
+ status_min_date = time() - 1
+ status_path = Path(_status_file_path(args.pid))
+ worker_process = psutil.Process(pid)
+ worker_process.send_signal(SIG_STATUS)
+ while psutil.pid_exists(pid) and (
+ not status_path.exists() or status_path.stat().st_mtime <
status_min_date
+ ):
+ sleep(0.1)
+ if not psutil.pid_exists(pid):
+ logger.warning("PID of worker dis-appeared while checking for status.")
+ sys.exit(2)
+ if not status_path.exists() or status_path.stat().st_mtime <
status_min_date:
+ logger.warning("Could not read status of worker.")
+ sys.exit(3)
+ status = WorkerStatus.from_json(status_path.read_text())
+
+ if args.wait:
+ if args.maintenance == "on" and status.state !=
EdgeWorkerState.MAINTENANCE_MODE:
+ logger.info("Waiting for worker to be drained...")
+ while status.state != EdgeWorkerState.MAINTENANCE_MODE:
+ sleep(4.5)
+ worker_process.send_signal(SIG_STATUS)
+ sleep(0.5)
+ status = WorkerStatus.from_json(status_path.read_text())
+ if args.maintenance == "off" and status.state ==
EdgeWorkerState.MAINTENANCE_MODE:
+ logger.info("Waiting for worker to exit maintenance...")
+ while status.state == EdgeWorkerState.MAINTENANCE_MODE:
Review Comment:
One small improvement: also check here for the
EdgeWorkerState.MAINTENANCE_EXIT as worker left maintenance mode completely if
all maintenance states are left.
##########
providers/edge/src/airflow/providers/edge/cli/edge_command.py:
##########
@@ -488,45 +504,99 @@ def worker(args):
@providers_configuration_loaded
def status(args):
"""Check for Airflow Edge Worker status."""
- pid = read_pid_from_pidfile(_pid_file_path(args.pid))
- # Send SIGINT
- if pid:
- logger.debug("Sending SIGUSR2 to worker pid %i.", pid)
- status_min_date = time() - 1
- status_path = Path(_status_file_path(args.pid))
- worker_process = psutil.Process(pid)
- worker_process.send_signal(SIG_STATUS)
- while psutil.pid_exists(pid) and (
- not status_path.exists() or status_path.stat().st_mtime <
status_min_date
- ):
- sleep(0.1)
- if not psutil.pid_exists(pid):
- logger.warning("PID of worker dis-appeared while checking for
status.")
- sys.exit(2)
- if not status_path.exists() or status_path.stat().st_mtime <
status_min_date:
- logger.warning("Could not read status of worker.")
- sys.exit(3)
- status = json.loads(status_path.read_text())
- print(json.dumps(status, indent=4))
+ pid = _get_pid(args.pid)
+
+ # Send Signal as notification to drop status JSON
+ logger.debug("Sending SIGUSR2 to worker pid %i.", pid)
+ status_min_date = time() - 1
+ status_path = Path(_status_file_path(args.pid))
+ worker_process = psutil.Process(pid)
+ worker_process.send_signal(SIG_STATUS)
+ while psutil.pid_exists(pid) and (
+ not status_path.exists() or status_path.stat().st_mtime <
status_min_date
+ ):
+ sleep(0.1)
+ if not psutil.pid_exists(pid):
+ logger.warning("PID of worker dis-appeared while checking for status.")
+ sys.exit(2)
+ if not status_path.exists() or status_path.stat().st_mtime <
status_min_date:
+ logger.warning("Could not read status of worker.")
+ sys.exit(3)
+ status = WorkerStatus.from_json(status_path.read_text())
+ print(json.dumps(asdict(status), indent=4))
- else:
- logger.warning("Could not find PID of worker.")
- sys.exit(1)
+
+@cli_utils.action_cli(check_db=False)
+@providers_configuration_loaded
+def maintenance(args):
+ """Set or Unset maintenance mode of worker."""
+ if args.maintenance == "on" and not args.comments:
+ logger.error("Comments are required when setting maintenance mode.")
+ sys.exit(4)
+
+ pid = _get_pid(args.pid)
+
+ # Write marker JSON file
+ from getpass import getuser
+
+ marker_path = Path(_maintenance_marker_file_path(args.pid))
+ logger.debug("Writing maintenance marker file to %s.", marker_path)
+ marker_path.write_text(
+ MaintenanceMarker(
+ maintenance=args.maintenance,
+ comments=f'[{datetime.now().strftime("%Y-%m-%d %H:%M")}] -
{getuser()} put '
+ f'node into maintenance mode via cli\nComment: {args.comments}'
+ if args.maintenance == "on"
+ else None,
+ ).json
+ )
+
+ # Send Signal as notification to fetch maintenance marker
+ logger.debug("Sending SIGUSR2 to worker pid %i.", pid)
+ status_min_date = time() - 1
+ status_path = Path(_status_file_path(args.pid))
+ worker_process = psutil.Process(pid)
+ worker_process.send_signal(SIG_STATUS)
+ while psutil.pid_exists(pid) and (
+ not status_path.exists() or status_path.stat().st_mtime <
status_min_date
+ ):
+ sleep(0.1)
+ if not psutil.pid_exists(pid):
+ logger.warning("PID of worker dis-appeared while checking for status.")
+ sys.exit(2)
+ if not status_path.exists() or status_path.stat().st_mtime <
status_min_date:
+ logger.warning("Could not read status of worker.")
+ sys.exit(3)
+ status = WorkerStatus.from_json(status_path.read_text())
+
+ if args.wait:
+ if args.maintenance == "on" and status.state !=
EdgeWorkerState.MAINTENANCE_MODE:
+ logger.info("Waiting for worker to be drained...")
+ while status.state != EdgeWorkerState.MAINTENANCE_MODE:
Review Comment:
If someone triggers in the web UI to leave the maintenance during waiting in
the state MAINTENANCE_REQUEST then we could end here in an endless loop. Maybe
check if the request state was entered and left again and inform the user in
the cli about this state change.
##########
providers/edge/src/airflow/providers/edge/worker_api/routes/worker.py:
##########
@@ -150,7 +148,15 @@ def register(
worker: EdgeWorkerModel = session.scalar(query)
if not worker:
worker = EdgeWorkerModel(worker_name=worker_name, state=body.state,
queues=body.queues)
- worker.state = redefine_state_if_maintenance(worker.state, body.state)
+ worker.state = redefine_state(worker.state, body.state)
+ if body.maintenance_comments:
Review Comment:
Maybe move this logic in separate function as duplicated code with 2 times
definition of max length value.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]