dheerajturaga commented on code in PR #60675:
URL: https://github.com/apache/airflow/pull/60675#discussion_r2744588631
##########
providers/celery/src/airflow/providers/celery/cli/definition.py:
##########
@@ -101,6 +101,10 @@
help="Don't subscribe to other workers events",
action="store_true",
)
+ARG_TEAM = Arg(
+ ("-t", "--team"),
+ help="Team name for team-specific multi-team configuration (requires
Airflow 3.1+)",
Review Comment:
```suggestion
help="Team name for team-specific multi-team configuration (requires
Airflow 3.2+)",
```
##########
providers/celery/src/airflow/providers/celery/cli/celery_command.py:
##########
@@ -189,8 +189,32 @@ def filter(self, record):
@_providers_configuration_loaded
def worker(args):
"""Start Airflow Celery worker."""
- # This needs to be imported locally to not trigger Providers Manager
initialization
- from airflow.providers.celery.executors.celery_executor import app as
celery_app
+ team_config = None
+ if hasattr(args, "team") and args.team:
+ # Multi-team is enabled, create team-specific Celery app and use team
based config
+ # This requires Airflow 3.2+, and core.multi_team config to be true to
be enabled.
+ if not AIRFLOW_V_3_2_PLUS:
+ raise AirflowConfigException(
Review Comment:
Can we raise SystemExit instead to be consistent with the other exceptions?
Current approach throws a stacktrace which can get confusing to the user
```suggestion
raise SystemExit(
```
##########
providers/celery/src/airflow/providers/celery/cli/celery_command.py:
##########
@@ -189,8 +189,32 @@ def filter(self, record):
@_providers_configuration_loaded
def worker(args):
"""Start Airflow Celery worker."""
- # This needs to be imported locally to not trigger Providers Manager
initialization
- from airflow.providers.celery.executors.celery_executor import app as
celery_app
+ team_config = None
+ if hasattr(args, "team") and args.team:
+ # Multi-team is enabled, create team-specific Celery app and use team
based config
+ # This requires Airflow 3.2+, and core.multi_team config to be true to
be enabled.
+ if not AIRFLOW_V_3_2_PLUS:
+ raise AirflowConfigException(
+ "Multi-team Celery workers require Airflow version 3.2 or
higher. "
+ "Please upgrade your Airflow installation or remove the --team
argument."
+ )
+ if not conf.getboolean("core", "multi_team", fallback=False):
+ raise AirflowConfigException(
Review Comment:
Same here
```suggestion
raise SystemExit(
```
##########
providers/celery/src/airflow/providers/celery/cli/celery_command.py:
##########
@@ -189,8 +189,32 @@ def filter(self, record):
@_providers_configuration_loaded
def worker(args):
"""Start Airflow Celery worker."""
- # This needs to be imported locally to not trigger Providers Manager
initialization
- from airflow.providers.celery.executors.celery_executor import app as
celery_app
+ team_config = None
+ if hasattr(args, "team") and args.team:
+ # Multi-team is enabled, create team-specific Celery app and use team
based config
+ # This requires Airflow 3.2+, and core.multi_team config to be true to
be enabled.
+ if not AIRFLOW_V_3_2_PLUS:
+ raise AirflowConfigException(
+ "Multi-team Celery workers require Airflow version 3.2 or
higher. "
+ "Please upgrade your Airflow installation or remove the --team
argument."
+ )
+ if not conf.getboolean("core", "multi_team", fallback=False):
+ raise AirflowConfigException(
+ "Multi-team Celery workers require core.multi_team
configuration to be enabled. "
Review Comment:
```suggestion
"Error: Multi-team Celery workers require core.multi_team
configuration to be enabled. "
```
##########
providers/celery/src/airflow/providers/celery/cli/celery_command.py:
##########
@@ -189,8 +189,32 @@ def filter(self, record):
@_providers_configuration_loaded
def worker(args):
"""Start Airflow Celery worker."""
- # This needs to be imported locally to not trigger Providers Manager
initialization
- from airflow.providers.celery.executors.celery_executor import app as
celery_app
+ team_config = None
+ if hasattr(args, "team") and args.team:
+ # Multi-team is enabled, create team-specific Celery app and use team
based config
+ # This requires Airflow 3.2+, and core.multi_team config to be true to
be enabled.
+ if not AIRFLOW_V_3_2_PLUS:
+ raise AirflowConfigException(
+ "Multi-team Celery workers require Airflow version 3.2 or
higher. "
Review Comment:
```suggestion
"Error: Multi-team Celery workers require Airflow version
3.2 or higher. "
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]