Looks nice👍 I do think this will allows much higher flexibility for the users.
Will have a look in details as well, and get back to you if I have any questions/feedbacks. Thanks! XD On Fri, Nov 5, 2021 at 19:52 Jarek Potiuk <[email protected]> wrote: > Exciting and yeah - I've heard MANY requests from our users for this > kind of feature! > > Looks good - I will take a close look over the weekend for sure :) > > On Fri, Nov 5, 2021 at 6:29 PM Kaxil Naik <[email protected]> wrote: > > > > Well done folks, this is exciting and will solve lot of the current > issues with "Dynamic DAGs" (especially ones that need fan-out) > > > > On Fri, Nov 5, 2021 at 5:02 PM Daniel Imberman < > [email protected]> wrote: > >> > >> > >> Hello everyone! > >> > >> > >> I'd like to start a discussion about a new feature that we've been > thinking about for a while, and that Ash mentioned in his Airflow Summit > keynote: > >> > >> > >> Dynamic Task Mapping -- having the number of tasks vary at run time > based on the result of a previous step > >> > >> > >> The tl;dr of this is that it adds a form of "dynamism" to Airflow DAGs > that wasn't previously possible, and opens up a lot of exciting new > workfloas for DAGs. > >> > >> > >> A short example to whet your appetites: > >> > >> > >> @task > >> > >> def my_task(x,y): > >> > >> return x + y > >> > >> > >> @task > >> > >> def sum_values(x: Iterable[int]) -> int: > >> > >> return sum(x) > >> > >> > >> add_values = [1,2,3] > >> > >> > >> added_values = my_task.partial(x=5).map(y=add_values) > >> > >> result = sum_values(added_values) > >> > >> > >> Now this example is trivial and doesn't need multiple tasks, but it > shows the power of what we can do. At run time this would result in 4 task > instances being run, and the end result is 21. > >> > >> > >> A more useful example would be to use an S3PrefixSensor, then operate > on each file in a separate task: > >> > >> > >> with dag: > >> > >> sensor = S3PrefixSensor( > >> > >> bucket_name='test-airflow-bucket', > >> > >> prefix='vendor_a/incoming/{{ data_interval_start }}/', > >> > >> ) > >> > >> > >> @task > >> > >> def process_file(key: str): > >> > >> return len(S3Hook().read_key(key).splitlines()) > >> > >> > >> processed = process_file.map(key=sensor.output) > >> > >> > >> @task > >> > >> def sum_values(x: Iterable[int]) -> int: > >> > >> return sum(x) > >> > >> > >> result = sum_values(processed) > >> > >> > >> This API change is (hopefully) deceptively simple, but implementing it > is not straight forward -- we've tried to go into lots of detail in the AIP > document: > >> > >> > >> AIP-42: Dynamic Task Mapping - Airflow - Apache Software Foundation > >> > >> > >> We look forward to discussing this further as this is a feature we’re > all really excited about :). > >> > >> > >> Happy Friday! > >> > >> > >> Ash Berlin-Taylor, TP Chung, and Daniel Imberman >
