vincbeck commented on code in PR #67605:
URL: https://github.com/apache/airflow/pull/67605#discussion_r3312359413
##########
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py:
##########
@@ -1529,6 +1529,60 @@ def test_ti_update_state_to_deferred(
else:
assert t[0].queue is None
+ @pytest.mark.parametrize(
+ ("multi_team_enabled", "expect_team"),
+ [
+ pytest.param("False", False, id="multi-team-disabled"),
+ pytest.param("True", True, id="multi-team-enabled"),
+ ],
+ )
+ def test_ti_update_state_to_deferred_populates_trigger_team_name(
+ self, client, session, create_task_instance, time_machine,
multi_team_enabled, expect_team
+ ):
+ """Trigger created on deferral gets team_name from the TI's bundle."""
+ from airflow.models.dagbundle import DagBundleModel
+ from airflow.models.team import Team
+ from airflow.models.trigger import Trigger
+
+ instant = timezone.datetime(2024, 11, 22)
+ time_machine.move_to(instant, tick=False)
+
+ ti = create_task_instance(
+ task_id="test_ti_deferred_team",
+ state=State.RUNNING,
+ session=session,
+ )
+
+ bundle_name = "bundle_deferred_team_test"
+ team_name = "team_deferred_test"
+ bundle = session.get(DagBundleModel, bundle_name) or
DagBundleModel(name=bundle_name)
+ team = session.get(Team, team_name) or Team(name=team_name)
+ if team not in bundle.teams:
+ bundle.teams.append(team)
+ session.add(bundle)
+ session.flush()
+ session.execute(update(DagModel).where(DagModel.dag_id ==
ti.dag_id).values(bundle_name=bundle_name))
+ session.commit()
+
+ payload = {
+ "state": "deferred",
+ "trigger_kwargs": {"key": "value"},
+ "classpath": "my-classpath",
+ "next_method": "execute_callback",
+ }
+
+ with conf_vars({("core", "multi_team"): multi_team_enabled}):
Review Comment:
nit: move it as decorator of the test, that makes the test smaller (that
applies to other location in your PR as well)
##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -1718,9 +1718,16 @@ def defer_task(self, session: Session = NEW_SESSION) ->
bool:
else:
self.trigger_timeout = None
+ team_name: str | None = None
+ if conf.getboolean("core", "multi_team"):
Review Comment:
This is not covered by unit test
##########
airflow-core/src/airflow/models/dagbundle.py:
##########
@@ -108,3 +113,11 @@ def render_url(self, version: str | None = None) -> str |
None:
except (KeyError, ValueError) as e:
self.log.warning("Failed to render URL template for bundle %s:
%s", self.name, e)
return None
+
+ @staticmethod
+ @provide_session
+ def get_team_name(bundle_name: str, *, session: Session = NEW_SESSION) ->
str | None:
Review Comment:
Please have unit test associated to this method
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]