ketozhang opened a new issue, #35969:
URL: https://github.com/apache/airflow/issues/35969

   ### Description
   
   Allow filtering mappable XCOMs such that inputs to dynamic task mapping run 
on a subset of the XCOM.
   
   ```python
   @task 
   def get_integers():
       return [-1, 0, 1]
   
   @task
   def print_x(x):
       print(x)
   
   print_x.expand(get_integers().filter(lambda x: x>0))
   # Prints:
   # 1
   ```
   
   ### Use case/motivation
   
   Currently, this can be implemented with a dedicated task just for filtering. 
This becomes undesired if you intend to filter to partition the XCOM output as 
inputs of multiple task (i.e., branching/partitioning):
   
   ### Dynamic task branching with XCOM filtering
   ```python
   @task 
   def get_integers():
       return [-1, 0, 1]
   
   @task
   def print_x(x):
       print(x)
   
   integers = get_integers()
   task_print_positive = print_x.expand(integers.filter(lambda x: x>0))
   task_print_negatives = print_x.expand(integers.filter(lambda x: x<0))
   task_print_zeros = print_x.expand(integers.filter(lambda x: x==0))
   
   # The flow logic is effectively:
   # integers >> [task_print_positive, task_print_negatives, task_print_zeros]
   ```
   
   ### Dynamic task branching with Taskflow API
   ```python
   @task 
   def get_integers():
       return [-1, 0, 1]
   
   @task
   def print_x(x):
       print(x)
   
   @task
   def filter_positive(x):
       return list(filter(x>0, x))
   
   @task
   def filter_negative(x):
       return list(filter(x<0, x))
   
   @task
   def filter_zero(x):
       return list(filter(x==0, x))
   
   integers = get_integers()
   task_print_positive = print_x.expand(filter_positive(integers))
   task_print_negatives = print_x.expand(filter_negatives(integers))
   task_print_zeros = print_x.expand(filter_zeros(integers))
   
   # The flow logic is effectively:
   # integers >> [filter_positive >> task_print_positive, filter_negative >> 
task_print_negatives, filter_zeros >> task_print_zeros]
   ```
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to