I think with dag_run.id being uniquem, it's not essential any more for
scheduling. But it matters (a little) for UI and (a lot) for execution.

I think the easy thing to change would be the UI:

a) managing sorting and displaying things in a grid with some stable
sequence (currently execution date is used I believe)
b) making sure run_id is used always when referring to a particular dag_run
(maybe some older - not migrated views still have it).

But there is potentially a problem with execution. I think there are many
operators that might use the "execution_date" as part of whatever unique
key they want to use in their operations - and this is widely distributed
across multiple operators potentially
This potentially causes a problem with idempotency of output - generally
speaking the "output" of such a task is supposed to override the output of
the other task with the same execution_date  (of course when there is an
output)

But the biggest potential problem (which I only have a vague idea how to
solve) might be using it for any kind of unique ID used to store and query
stuff outside of airflow that is relevant to current dag run.

We probably could. remove all those for our built-in operators - we have a
few of those left (see below) not many to be honest, and we could likely
find and fix all of them, maybe with some backward-compatibility hacks.

But I am afraid we have completely no control over custom
operators/integrations our users have. Looking at the way we still use it
as a source of uniqueness after 2 years, I am afraid our users use it even
more. The "run_id" is a relatively new key that has not been available
before 2021 https://github.com/apache/airflow/issues/16302 as part of
AIP-39 in Airflow 2.2. I am afraid many  of our users still use it as well
- until 2021 that was the only way they could determine the dag_run
uniqueness. And that uniqueness is pretty important - as you can see below
- even some of our operators till today use it for things like determining
the unique id of some external entity - like workflow_id - that we can
query, kill, check if it is still running etc. etc.

The only way I **think** this could be solved without sacrificing backwards
compatibility is artificially generated runtime execution_date that could
be **slightly** different (but stable) for different dag_runs that have the
same "actual" execution_date. That could be quite possible actually.

Here are all the usages of execution_date as part of some unique key I
could find quickly in our code base:

Hive operator:

self.hook.mapred_job_name = self.mapred_job_name_template.format(
    dag_id=ti.dag_id,
    task_id=ti.task_id,
    execution_date=ti.execution_date.isoformat(),
    hostname=ti.hostname.split(".")[0],
)


Kubenetes executor seems to use it for labeling to find PODs (but likely
this is just a legacy case ad not valid for new PODs::

if not annotation_run_id and "execution_date" in annotations:
    execution_date = pendulum.parse(annotations["execution_date"])

Elasticsearch log_id als has execution_date in the example - leading likely
to mixing logs from several run_ids with the same execution_date:

While undocumented, previously ``[elasticsearch] log_id`` supported a
Jinja templated string.
Support for Jinja templates has now been removed. ``log_id`` should be
a template string instead,
for example: ``{dag_id}-{task_id}-{execution_date}-{try_number}``.

Similarly Stackdriver handler:

def _task_instance_to_labels(cls, ti: TaskInstance) -> dict[str, str]:
    return {
        cls.LABEL_TASK_ID: ti.task_id,
        cls.LABEL_DAG_ID: ti.dag_id,
        cls.LABEL_EXECUTION_DATE: str(ti.execution_date.isoformat()),
        cls.LABEL_TRY_NUMBER: str(ti.try_number),
    }

Google Workflow uses it for uniqueness of workflow_id as well:

# We are limited by allowed length of workflow_id so
# we use hash of whole information
exec_date = context["execution_date"].isoformat()
base = f"airflow_{self.dag_id}_{self.task_id}_{exec_date}_{hash_base}"
workflow_id = md5(base.encode()).hexdigest()
return re.sub(r"[:\-+.]", "_", workflow_id)


Similarly Google Cloud ObjectUpdatedSensor:

def ts_function(context):
    """
    Default callback for the GoogleCloudStorageObjectUpdatedSensor.

    The default behaviour is check for the object being updated after
the data interval's end,
    or execution_date + interval on Airflow versions prior to 2.2
(before AIP-39 implementation).
    """
    try:
        return context["data_interval_end"]
    except KeyError:
        return context["dag"].following_schedule(context["execution_date"])


Even the recent openlineage adapter uses it as uniqueness key:

def build_task_instance_run_id(task_id, execution_date, try_number):
    return str(
        uuid.uuid3(
            uuid.NAMESPACE_URL,
            f"{_DAG_NAMESPACE}.{task_id}.{execution_date}.{try_number}",
        )
    )

with this code:

task_uuid = OpenLineageAdapter.build_task_instance_run_id(
    task.task_id, task_instance.execution_date, task_instance.try_number - 1
)


Finally presto and trino use it:

def generate_presto_client_info() -> str:
    """Return json string with dag_id, task_id, execution_date and
try_number."""
    context_var = {
        format_map["default"].replace(DEFAULT_FORMAT_PREFIX, ""):
os.environ.get(
            format_map["env_var_format"], ""
        )
        for format_map in AIRFLOW_VAR_NAME_FORMAT_MAPPING.values()
    }
    task_info = {
        "dag_id": context_var["dag_id"],
        "task_id": context_var["task_id"],
        "execution_date": context_var["execution_date"],
        "try_number": context_var["try_number"],
        "dag_run_id": context_var["dag_run_id"],
        "dag_owner": context_var["dag_owner"],
    }
    return json.dumps(task_info, sort_keys=True)


J.

On Fri, Aug 25, 2023 at 1:14 AM Hussein Awala <huss...@awala.fr> wrote:

> Is the assumption that we should have only one DAG run per execution date
> still valid?
>
> In recent years, Airflow DAG has gained various features making it more
> dynamic, including branching operators, DAG run configuration (DAG params),
> and dynamic task mapping. While the scheduler creates most Airflow DAG runs
> based on defined `schedule`, users also trigger runs externally using the
> Webserver, REST API, and Python API.
>
> As users employ Airflow for diverse use cases, sometimes they need more
> than one DAG run for the same data interval (like event-based DAGs and
> asynchronous requests processing, ...), but they are blocked by the unique
> constraint on the execution date column. To work around this, users often
> implement hacks such as introducing a new logical date param and using it
> instead of the standard variable provided in the context.
>
> Sometimes, a scheduled DAG's results need to be overridden using manually
> triggered DAG runs that use different parameters values. Unfortunately, the
> current system doesn't accommodate this without deleting the original DAG
> run, leading to a loss of some of the historical execution records.
>
> How essential is it to maintain a unique execution/logical date, and why
> isn't the uniqueness of the run ID sufficient to address this?
>

Reply via email to