barrywhart commented on code in PR #25829:
URL: https://github.com/apache/airflow/pull/25829#discussion_r950907137
##########
kubernetes_tests/test_kubernetes_pod_operator.py:
##########
@@ -702,6 +704,59 @@ def test_env_vars(self):
]
assert self.expected_pod == actual_pod
+ @staticmethod
+ def propagate_run_id(conf, run_id):
+ """Jinja filter. Test DAG uses this via user_defined_filters."""
+ if run_id:
+ conf = deepcopy(conf)
+ # In a real DAG, this value might be provided by XCOM output of an
+ # upstream task.
+ conf["run_id"] = run_id
+ return conf
+
+ def test_env_vars_are_templatized(self):
+ # WHEN
+ task_id = "task" + self.get_current_task_name()
+ conf = {
+ task_id: {
+ # These become environment variables passed to the pod. There
+ # could be *any number* of variables.
+ "foo1": "bar1",
+ "foo2": "bar2",
+ },
+ }
+
+ # Templated environment variables from two sources:
+ # - dag_run.conf: Arbitrary number of variables
+ # - Jinja filter propagate_run_id: Could add an arbitrary number of
+ # additional variables. In this case, it's 0 or 1 variables.
+ env_vars = f'{{{{ dag_run.conf["{task_id}"] | propagate_run_id(run_id)
}}}}'
Review Comment:
This is very similar to our real-world MLOps use case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]