anishgirianish commented on issue #44146:
URL: https://github.com/apache/airflow/issues/44146#issuecomment-4009506728
Hi, I would like to pick this up. I have been working on AllowedKeyMapper
(#61931, inreview) and ProductMapper (#61937, merged) and I would like to pick
up PartitionAtRuntime (#44146) next. The AIP spec says the partition
context object "has not been fully hashed out yet" so I wanted to check if
my thinking prior to writing any code.
Right now partition keys are always known before a task runs either from
CronPartitionTimetable or the upstream DagRun. But the AIP describes a case
where partitions are discovered at runtime the AIP example with
`add_partition(key=customer)`).
The task side can't attach per event partition keys. Outlet events all
inherit the single dag_run.partition_key (#58474). That issue is closely
related; solving per-event partition key overrides would also give us the
transport for PartitionAtRuntime, so I think these should be tackled together.
How I think this should work:
1. A task calls add_partition(key=...) during execution. Each key produces a
separate asset event with that partition key. Downstream partition-aware DAGs
get triggered per key, same as any other partition source. The APDR / scheduler
infra is already agnostic tohow the key originated.
2. Keys are only emitted on task success. Asset events go through the
SucceedTask path (supervisor, Execution API,
register_asset_changes_in_db). If the task fails, nothing gets created.
3. The upstream DagRun would have partition_key=None since the keys aren't
known at schedule time. Same as existing behavior for non partitioned
asset-driven DAGs.
4. PartitionAtRuntime can't be combined with PartitionByProduct, per the
spec. We should catch this at parse time.
5. Rollup/windowing and to_upstream() are deferred to 3.3 (#59294), so this
only needs to handle the simple case. Each add_partition() call produces one
asset event.
Transport
The outlet events pipeline (SucceedTask, supervisor,
task_instances.succeed()) already carries per-event extras from worker
to server, but partition keys aren't part of that flow yet. They're injected
server-side from dag_run.partition_key at
taskinstance.py:1346. I think the simplest approach is extending
OutletEventAccessor with an optional partition_key field so tasks can set
per-event keys. The serialization path (\_serialize_outlet_events) would
include it alongside extra and register_asset_changes_in_db would use it
instead of falling back to dag_run.partition_key. That solves both #58474 and
PartitionAtRuntime without a new Execution API endpoint.
Does that seem right or is there a reason to keep PartitionAtRuntime
transport separate from the general partition override mechanism?
Open questions:
Should it add_partition() allow arbitrary keys or only keys from a
predefined set? The AIP example suggests arbitrary (dynamic customer list), but
unconstrained keys could make the partition tables hard to reason about.
AllowedKeyMapper (#61931) already handles predefined-set validation for
mappers. Should PartitionAtRuntime optionally accept
one?
PartitionMapper is a consumer-side concept -- to_downstream() maps an
incoming upstream key to the downstream DAG's partition space.
PartitionAtRuntime is producer-side -- it creates keys during execution. These
feel like different roles to me. I'm thinking PartitionAtRuntime should be a
separate abstraction rather than a PartitionMapper subclass, but given
user-defined mappers aren't publicly advertised for 3.2 (#59294), not sure if
introducing a new
public abstraction makes sense right now. Should this be internal-only for
3.2?
Thoughts?
cc @uranusjr @Lee-W @dstandish
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]