EssKayz commented on issue #26619:
URL: https://github.com/apache/airflow/issues/26619#issuecomment-1256048784
This should be a working example, so essentially
```
dag = DAG('mockup_example',
# With this option, even when the start date is "one day ago" we
instruct airflow scheduler to only create
# a DAG Run for the most current instance of the DAG interval
series.
catchup=False,
default_args=dag_args,
default_view='graph',
schedule_interval=None, dagrun_timeout=timedelta(minutes=30))
def gen_multiprocess_example(
dag: DAG,
gen_task_id,
**kwargs
) -> TaskGroup:
with TaskGroup(dag=dag, group_id=f'mockup_{gen_task_id}',
prefix_group_id=False) as grp:
task = PythonOperator(
task_id=f'split_{gen_task_id}_data',
python_callable=split_example,
op_kwargs={
}
)
# This works, by utilizing a dirty hack
PythonOperator.partial(
task_id=f'process_{gen_task_id}_data',
python_callable=process_split_data,
op_kwargs={
}
).expand(
templates_dict=XComArg(task)
)
# This doesn't work, since the kwargs get expanded to
PythonOperator, not to the callable.
# PythonOperator.partial(
# task_id=f'process_{gen_task_id}_data',
# python_callable=process_split_data,
# op_kwargs={
# }
# ).expand_kwargs(
# XComArg(task)
# )
return grp
def split_example(**kwargs):
return [{'data': '1234', 'bucket_name': 'prod'}, {'data': '4321',
'bucket_name': 'test'}]
def process_split_data(data=None, bucket_name=None, templates_dict={},
**kwargs):
print('-' * 20)
print('data in process_func:', data)
print('bucket_name in process_func:', bucket_name)
print('templates_dict in process_func:', templates_dict)
print('-' * 20)
print(get_data(**templates_dict, **kwargs))
def get_data(data=None, bucket_name=None, **kwargs):
print('-' * 20)
print('data in get_data:', data)
print('bucket_name in get_data:', bucket_name)
print('-' * 20)
return data
with dag:
tg1 = gen_multiprocess_example(
dag,
gen_task_id='hope_this_helps'
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]