dheerajturaga commented on code in PR #60675:
URL: https://github.com/apache/airflow/pull/60675#discussion_r2744588631


##########
providers/celery/src/airflow/providers/celery/cli/definition.py:
##########
@@ -101,6 +101,10 @@
     help="Don't subscribe to other workers events",
     action="store_true",
 )
+ARG_TEAM = Arg(
+    ("-t", "--team"),
+    help="Team name for team-specific multi-team configuration (requires 
Airflow 3.1+)",

Review Comment:
   ```suggestion
       help="Team name for team-specific multi-team configuration (requires 
Airflow 3.2+)",
   ```



##########
providers/celery/src/airflow/providers/celery/cli/celery_command.py:
##########
@@ -189,8 +189,32 @@ def filter(self, record):
 @_providers_configuration_loaded
 def worker(args):
     """Start Airflow Celery worker."""
-    # This needs to be imported locally to not trigger Providers Manager 
initialization
-    from airflow.providers.celery.executors.celery_executor import app as 
celery_app
+    team_config = None
+    if hasattr(args, "team") and args.team:
+        # Multi-team is enabled, create team-specific Celery app and use team 
based config
+        # This requires Airflow 3.2+, and core.multi_team config to be true to 
be enabled.
+        if not AIRFLOW_V_3_2_PLUS:
+            raise AirflowConfigException(

Review Comment:
   Can we raise SystemExit instead to be consistent with the other exceptions? 
Current approach throws a stacktrace which can get confusing to the user
   
   ```suggestion
               raise SystemExit(
   ```



##########
providers/celery/src/airflow/providers/celery/cli/celery_command.py:
##########
@@ -189,8 +189,32 @@ def filter(self, record):
 @_providers_configuration_loaded
 def worker(args):
     """Start Airflow Celery worker."""
-    # This needs to be imported locally to not trigger Providers Manager 
initialization
-    from airflow.providers.celery.executors.celery_executor import app as 
celery_app
+    team_config = None
+    if hasattr(args, "team") and args.team:
+        # Multi-team is enabled, create team-specific Celery app and use team 
based config
+        # This requires Airflow 3.2+, and core.multi_team config to be true to 
be enabled.
+        if not AIRFLOW_V_3_2_PLUS:
+            raise AirflowConfigException(
+                "Multi-team Celery workers require Airflow version 3.2 or 
higher. "
+                "Please upgrade your Airflow installation or remove the --team 
argument."
+            )
+        if not conf.getboolean("core", "multi_team", fallback=False):
+            raise AirflowConfigException(

Review Comment:
   Same here
   ```suggestion
               raise SystemExit(
   ```



##########
providers/celery/src/airflow/providers/celery/cli/celery_command.py:
##########
@@ -189,8 +189,32 @@ def filter(self, record):
 @_providers_configuration_loaded
 def worker(args):
     """Start Airflow Celery worker."""
-    # This needs to be imported locally to not trigger Providers Manager 
initialization
-    from airflow.providers.celery.executors.celery_executor import app as 
celery_app
+    team_config = None
+    if hasattr(args, "team") and args.team:
+        # Multi-team is enabled, create team-specific Celery app and use team 
based config
+        # This requires Airflow 3.2+, and core.multi_team config to be true to 
be enabled.
+        if not AIRFLOW_V_3_2_PLUS:
+            raise AirflowConfigException(
+                "Multi-team Celery workers require Airflow version 3.2 or 
higher. "
+                "Please upgrade your Airflow installation or remove the --team 
argument."
+            )
+        if not conf.getboolean("core", "multi_team", fallback=False):
+            raise AirflowConfigException(
+                "Multi-team Celery workers require core.multi_team 
configuration to be enabled. "

Review Comment:
   ```suggestion
                   "Error: Multi-team Celery workers require core.multi_team 
configuration to be enabled. "
   ```



##########
providers/celery/src/airflow/providers/celery/cli/celery_command.py:
##########
@@ -189,8 +189,32 @@ def filter(self, record):
 @_providers_configuration_loaded
 def worker(args):
     """Start Airflow Celery worker."""
-    # This needs to be imported locally to not trigger Providers Manager 
initialization
-    from airflow.providers.celery.executors.celery_executor import app as 
celery_app
+    team_config = None
+    if hasattr(args, "team") and args.team:
+        # Multi-team is enabled, create team-specific Celery app and use team 
based config
+        # This requires Airflow 3.2+, and core.multi_team config to be true to 
be enabled.
+        if not AIRFLOW_V_3_2_PLUS:
+            raise AirflowConfigException(
+                "Multi-team Celery workers require Airflow version 3.2 or 
higher. "

Review Comment:
   ```suggestion
                   "Error: Multi-team Celery workers require Airflow version 
3.2 or higher. "
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to