Thank you for bringing this up Hussein! Similar to James, I recently stumbled upon this realization when I was investigating the Airflow best practice on 're-running' a Dag Run, whether that be to clear an existing Dag Run, or to create a Manually triggered Dag Run of the same execution date: https://github.com/apache/airflow/discussions/33677
As the rest of the folks have already mentioned on the thread, I think that creating additional runs on the same execution dates is a rather common use case that would be worth supporting properly in the longer term. And once implemented, I believe this would be the preferred mode for some users who want to submit re-runs (instead of clearing an existing Dag Run) because it will allow them to keep the run history of the initial scheduled run, as well as the newly triggered one. Hence, I see this as a change that would improve the visibility and the reliability of the workflows that users will build on top of Airflow. I agree that it's not a simple undertaking to remove a unique constraint. Would it make sense to invest in the work to remove the usage of execution_date as unique keys in Airflow core, views and built-in operators now, and tag the dag_id & execution_date unique constraint for deprecation in Airflow 3.0? This will get us ready for its deprecation while safe-guarding the users that might be relying on the constraint in their custom code. And until then, maybe we just leave it to the users to rely on less optimal workarounds: like triggering with a slightly different execution date, or clearing existing dag runs but leave the choice to them. On the suggestion of automatically issuing slightly different artificial execution dates: I think we can implement a safe way to do it for timetables with Data Intervals. However, I'm a bit worried that it might have unintended consequences for trigger timetables with no intervals, where the data interval start and end date are simply equal to the execution date. In the case of trigger timetables, we would be creating a new dag run that has a different actual execution date and data interval. And depending on how this execution date is used within their workflow, this might have some undesired downstream impact. Sung On Fri, Aug 25, 2023 at 1:28 AM Scheffler Jens (XC-DX/ETV5) <jens.scheff...@de.bosch.com.invalid> wrote: > Hi, > > I/we suffered multiple times in the past by the unique constraint of > execution date and I would also hope the uniqueness can be removed. So +1 > here. > But I was also stumbling multiple times in the past that e.g. the log view > and a lot of references still use the execution date. > I propose to start deprecating the uniqueness and start migrating over > what we could find. The list from Jarek is already very good 😃 > > As we had a discussion recently about the "legacy" trigger button (was > just a button) I fear there need to be at least an option to keep > uniqueness for use cases still depending on it (whereas a configurable DB > index might be a challenge with current tooling but still there are ways to > make this possible) - but realistically speaking I assume such "breaking > change" is something that can only be made in Airflow 3.0. So we can > realistically only achieve either a default or optional relaxation after a > cleanup. > > Mit freundlichen Grüßen / Best regards > > Jens Scheffler > > Deterministik open Loop (XC-DX/ETV5) > Robert Bosch GmbH | Hessbruehlstraße 21 | 70565 Stuttgart-Vaihingen | > GERMANY | http://www.bosch.com/ > Tel. +49 711 811-91508 | Mobil +49 160 90417410 | > jens.scheff...@de.bosch.com > > Sitz: Stuttgart, Registergericht: Amtsgericht Stuttgart, HRB 14000; > Aufsichtsratsvorsitzender: Prof. Dr. Stefan Asenkerschbaumer; > Geschäftsführung: Dr. Stefan Hartung, > Dr. Christian Fischer, Dr. Markus Forschner, Stefan Grosch, Dr. Markus > Heyn, Dr. Tanja Rückert > > -----Original Message----- > From: Jarek Potiuk <ja...@potiuk.com> > Sent: Freitag, 25. August 2023 02:44 > To: dev@airflow.apache.org > Subject: Re: [DISCUSS] Allowing Multiple DAG Runs with the Same Execution > Date > > I think with dag_run.id being uniquem, it's not essential any more for > scheduling. But it matters (a little) for UI and (a lot) for execution. > > I think the easy thing to change would be the UI: > > a) managing sorting and displaying things in a grid with some stable > sequence (currently execution date is used I believe) > b) making sure run_id is used always when referring to a particular > dag_run (maybe some older - not migrated views still have it). > > But there is potentially a problem with execution. I think there are many > operators that might use the "execution_date" as part of whatever unique > key they want to use in their operations - and this is widely distributed > across multiple operators potentially This potentially causes a problem > with idempotency of output - generally speaking the "output" of such a task > is supposed to override the output of the other task with the same > execution_date (of course when there is an > output) > > But the biggest potential problem (which I only have a vague idea how to > solve) might be using it for any kind of unique ID used to store and query > stuff outside of airflow that is relevant to current dag run. > > We probably could. remove all those for our built-in operators - we have a > few of those left (see below) not many to be honest, and we could likely > find and fix all of them, maybe with some backward-compatibility hacks. > > But I am afraid we have completely no control over custom > operators/integrations our users have. Looking at the way we still use it > as a source of uniqueness after 2 years, I am afraid our users use it even > more. The "run_id" is a relatively new key that has not been available > before 2021 https://github.com/apache/airflow/issues/16302 as part of > AIP-39 in Airflow 2.2. I am afraid many of our users still use it as well > - until 2021 that was the only way they could determine the dag_run > uniqueness. And that uniqueness is pretty important - as you can see below > - even some of our operators till today use it for things like determining > the unique id of some external entity - like workflow_id - that we can > query, kill, check if it is still running etc. etc. > > The only way I **think** this could be solved without sacrificing > backwards compatibility is artificially generated runtime execution_date > that could be **slightly** different (but stable) for different dag_runs > that have the same "actual" execution_date. That could be quite possible > actually. > > Here are all the usages of execution_date as part of some unique key I > could find quickly in our code base: > > Hive operator: > > self.hook.mapred_job_name = self.mapred_job_name_template.format( > dag_id=ti.dag_id, > task_id=ti.task_id, > execution_date=ti.execution_date.isoformat(), > hostname=ti.hostname.split(".")[0], > ) > > > Kubenetes executor seems to use it for labeling to find PODs (but likely > this is just a legacy case ad not valid for new PODs:: > > if not annotation_run_id and "execution_date" in annotations: > execution_date = pendulum.parse(annotations["execution_date"]) > > Elasticsearch log_id als has execution_date in the example - leading > likely to mixing logs from several run_ids with the same execution_date: > > While undocumented, previously ``[elasticsearch] log_id`` supported a > Jinja templated string. > Support for Jinja templates has now been removed. ``log_id`` should be a > template string instead, for example: > ``{dag_id}-{task_id}-{execution_date}-{try_number}``. > > Similarly Stackdriver handler: > > def _task_instance_to_labels(cls, ti: TaskInstance) -> dict[str, str]: > return { > cls.LABEL_TASK_ID: ti.task_id, > cls.LABEL_DAG_ID: ti.dag_id, > cls.LABEL_EXECUTION_DATE: str(ti.execution_date.isoformat()), > cls.LABEL_TRY_NUMBER: str(ti.try_number), > } > > Google Workflow uses it for uniqueness of workflow_id as well: > > # We are limited by allowed length of workflow_id so # we use hash of > whole information exec_date = context["execution_date"].isoformat() > base = f"airflow_{self.dag_id}_{self.task_id}_{exec_date}_{hash_base}" > workflow_id = md5(base.encode()).hexdigest() return re.sub(r"[:\-+.]", > "_", workflow_id) > > > Similarly Google Cloud ObjectUpdatedSensor: > > def ts_function(context): > """ > Default callback for the GoogleCloudStorageObjectUpdatedSensor. > > The default behaviour is check for the object being updated after the > data interval's end, > or execution_date + interval on Airflow versions prior to 2.2 (before > AIP-39 implementation). > """ > try: > return context["data_interval_end"] > except KeyError: > return context["dag"].following_schedule(context["execution_date"]) > > > Even the recent openlineage adapter uses it as uniqueness key: > > def build_task_instance_run_id(task_id, execution_date, try_number): > return str( > uuid.uuid3( > uuid.NAMESPACE_URL, > f"{_DAG_NAMESPACE}.{task_id}.{execution_date}.{try_number}", > ) > ) > > with this code: > > task_uuid = OpenLineageAdapter.build_task_instance_run_id( > task.task_id, task_instance.execution_date, task_instance.try_number - > 1 > ) > > > Finally presto and trino use it: > > def generate_presto_client_info() -> str: > """Return json string with dag_id, task_id, execution_date and > try_number.""" > context_var = { > format_map["default"].replace(DEFAULT_FORMAT_PREFIX, ""): > os.environ.get( > format_map["env_var_format"], "" > ) > for format_map in AIRFLOW_VAR_NAME_FORMAT_MAPPING.values() > } > task_info = { > "dag_id": context_var["dag_id"], > "task_id": context_var["task_id"], > "execution_date": context_var["execution_date"], > "try_number": context_var["try_number"], > "dag_run_id": context_var["dag_run_id"], > "dag_owner": context_var["dag_owner"], > } > return json.dumps(task_info, sort_keys=True) > > > J. > > On Fri, Aug 25, 2023 at 1:14 AM Hussein Awala <huss...@awala.fr> wrote: > > > Is the assumption that we should have only one DAG run per execution > > date still valid? > > > > In recent years, Airflow DAG has gained various features making it > > more dynamic, including branching operators, DAG run configuration > > (DAG params), and dynamic task mapping. While the scheduler creates > > most Airflow DAG runs based on defined `schedule`, users also trigger > > runs externally using the Webserver, REST API, and Python API. > > > > As users employ Airflow for diverse use cases, sometimes they need > > more than one DAG run for the same data interval (like event-based > > DAGs and asynchronous requests processing, ...), but they are blocked > > by the unique constraint on the execution date column. To work around > > this, users often implement hacks such as introducing a new logical > > date param and using it instead of the standard variable provided in the > context. > > > > Sometimes, a scheduled DAG's results need to be overridden using > > manually triggered DAG runs that use different parameters values. > > Unfortunately, the current system doesn't accommodate this without > > deleting the original DAG run, leading to a loss of some of the > historical execution records. > > > > How essential is it to maintain a unique execution/logical date, and > > why isn't the uniqueness of the run ID sufficient to address this? > > > -- Sung Yun Cornell Tech '20 Master of Engineering in Computer Science