Hi, I/we suffered multiple times in the past by the unique constraint of execution date and I would also hope the uniqueness can be removed. So +1 here. But I was also stumbling multiple times in the past that e.g. the log view and a lot of references still use the execution date. I propose to start deprecating the uniqueness and start migrating over what we could find. The list from Jarek is already very good 😃
As we had a discussion recently about the "legacy" trigger button (was just a button) I fear there need to be at least an option to keep uniqueness for use cases still depending on it (whereas a configurable DB index might be a challenge with current tooling but still there are ways to make this possible) - but realistically speaking I assume such "breaking change" is something that can only be made in Airflow 3.0. So we can realistically only achieve either a default or optional relaxation after a cleanup. Mit freundlichen Grüßen / Best regards Jens Scheffler Deterministik open Loop (XC-DX/ETV5) Robert Bosch GmbH | Hessbruehlstraße 21 | 70565 Stuttgart-Vaihingen | GERMANY | http://www.bosch.com/ Tel. +49 711 811-91508 | Mobil +49 160 90417410 | jens.scheff...@de.bosch.com Sitz: Stuttgart, Registergericht: Amtsgericht Stuttgart, HRB 14000; Aufsichtsratsvorsitzender: Prof. Dr. Stefan Asenkerschbaumer; Geschäftsführung: Dr. Stefan Hartung, Dr. Christian Fischer, Dr. Markus Forschner, Stefan Grosch, Dr. Markus Heyn, Dr. Tanja Rückert -----Original Message----- From: Jarek Potiuk <ja...@potiuk.com> Sent: Freitag, 25. August 2023 02:44 To: dev@airflow.apache.org Subject: Re: [DISCUSS] Allowing Multiple DAG Runs with the Same Execution Date I think with dag_run.id being uniquem, it's not essential any more for scheduling. But it matters (a little) for UI and (a lot) for execution. I think the easy thing to change would be the UI: a) managing sorting and displaying things in a grid with some stable sequence (currently execution date is used I believe) b) making sure run_id is used always when referring to a particular dag_run (maybe some older - not migrated views still have it). But there is potentially a problem with execution. I think there are many operators that might use the "execution_date" as part of whatever unique key they want to use in their operations - and this is widely distributed across multiple operators potentially This potentially causes a problem with idempotency of output - generally speaking the "output" of such a task is supposed to override the output of the other task with the same execution_date (of course when there is an output) But the biggest potential problem (which I only have a vague idea how to solve) might be using it for any kind of unique ID used to store and query stuff outside of airflow that is relevant to current dag run. We probably could. remove all those for our built-in operators - we have a few of those left (see below) not many to be honest, and we could likely find and fix all of them, maybe with some backward-compatibility hacks. But I am afraid we have completely no control over custom operators/integrations our users have. Looking at the way we still use it as a source of uniqueness after 2 years, I am afraid our users use it even more. The "run_id" is a relatively new key that has not been available before 2021 https://github.com/apache/airflow/issues/16302 as part of AIP-39 in Airflow 2.2. I am afraid many of our users still use it as well - until 2021 that was the only way they could determine the dag_run uniqueness. And that uniqueness is pretty important - as you can see below - even some of our operators till today use it for things like determining the unique id of some external entity - like workflow_id - that we can query, kill, check if it is still running etc. etc. The only way I **think** this could be solved without sacrificing backwards compatibility is artificially generated runtime execution_date that could be **slightly** different (but stable) for different dag_runs that have the same "actual" execution_date. That could be quite possible actually. Here are all the usages of execution_date as part of some unique key I could find quickly in our code base: Hive operator: self.hook.mapred_job_name = self.mapred_job_name_template.format( dag_id=ti.dag_id, task_id=ti.task_id, execution_date=ti.execution_date.isoformat(), hostname=ti.hostname.split(".")[0], ) Kubenetes executor seems to use it for labeling to find PODs (but likely this is just a legacy case ad not valid for new PODs:: if not annotation_run_id and "execution_date" in annotations: execution_date = pendulum.parse(annotations["execution_date"]) Elasticsearch log_id als has execution_date in the example - leading likely to mixing logs from several run_ids with the same execution_date: While undocumented, previously ``[elasticsearch] log_id`` supported a Jinja templated string. Support for Jinja templates has now been removed. ``log_id`` should be a template string instead, for example: ``{dag_id}-{task_id}-{execution_date}-{try_number}``. Similarly Stackdriver handler: def _task_instance_to_labels(cls, ti: TaskInstance) -> dict[str, str]: return { cls.LABEL_TASK_ID: ti.task_id, cls.LABEL_DAG_ID: ti.dag_id, cls.LABEL_EXECUTION_DATE: str(ti.execution_date.isoformat()), cls.LABEL_TRY_NUMBER: str(ti.try_number), } Google Workflow uses it for uniqueness of workflow_id as well: # We are limited by allowed length of workflow_id so # we use hash of whole information exec_date = context["execution_date"].isoformat() base = f"airflow_{self.dag_id}_{self.task_id}_{exec_date}_{hash_base}" workflow_id = md5(base.encode()).hexdigest() return re.sub(r"[:\-+.]", "_", workflow_id) Similarly Google Cloud ObjectUpdatedSensor: def ts_function(context): """ Default callback for the GoogleCloudStorageObjectUpdatedSensor. The default behaviour is check for the object being updated after the data interval's end, or execution_date + interval on Airflow versions prior to 2.2 (before AIP-39 implementation). """ try: return context["data_interval_end"] except KeyError: return context["dag"].following_schedule(context["execution_date"]) Even the recent openlineage adapter uses it as uniqueness key: def build_task_instance_run_id(task_id, execution_date, try_number): return str( uuid.uuid3( uuid.NAMESPACE_URL, f"{_DAG_NAMESPACE}.{task_id}.{execution_date}.{try_number}", ) ) with this code: task_uuid = OpenLineageAdapter.build_task_instance_run_id( task.task_id, task_instance.execution_date, task_instance.try_number - 1 ) Finally presto and trino use it: def generate_presto_client_info() -> str: """Return json string with dag_id, task_id, execution_date and try_number.""" context_var = { format_map["default"].replace(DEFAULT_FORMAT_PREFIX, ""): os.environ.get( format_map["env_var_format"], "" ) for format_map in AIRFLOW_VAR_NAME_FORMAT_MAPPING.values() } task_info = { "dag_id": context_var["dag_id"], "task_id": context_var["task_id"], "execution_date": context_var["execution_date"], "try_number": context_var["try_number"], "dag_run_id": context_var["dag_run_id"], "dag_owner": context_var["dag_owner"], } return json.dumps(task_info, sort_keys=True) J. On Fri, Aug 25, 2023 at 1:14 AM Hussein Awala <huss...@awala.fr> wrote: > Is the assumption that we should have only one DAG run per execution > date still valid? > > In recent years, Airflow DAG has gained various features making it > more dynamic, including branching operators, DAG run configuration > (DAG params), and dynamic task mapping. While the scheduler creates > most Airflow DAG runs based on defined `schedule`, users also trigger > runs externally using the Webserver, REST API, and Python API. > > As users employ Airflow for diverse use cases, sometimes they need > more than one DAG run for the same data interval (like event-based > DAGs and asynchronous requests processing, ...), but they are blocked > by the unique constraint on the execution date column. To work around > this, users often implement hacks such as introducing a new logical > date param and using it instead of the standard variable provided in the > context. > > Sometimes, a scheduled DAG's results need to be overridden using > manually triggered DAG runs that use different parameters values. > Unfortunately, the current system doesn't accommodate this without > deleting the original DAG run, leading to a loss of some of the historical > execution records. > > How essential is it to maintain a unique execution/logical date, and > why isn't the uniqueness of the run ID sufficient to address this? >