[ https://issues.apache.org/jira/browse/AIRFLOW-1814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Galak updated AIRFLOW-1814: --------------------------- Description: *I'm wondering if "_op_args_" and "_op_kwargs_" PythonOperator parameters could be templated.* I have 2 different use cases where this change could help a lot: +1/ Provide some job execution information as a python callable argument:+ let's explain it through a simple example: {code} simple_task = PythonOperator( task_id='simple_task', provide_context=True, python_callable=extract_data, op_args=[ "my_db_connection_id" "select * from my_table" "/data/{{ dag.dag_id }}/{{ ts }}/my_export.csv" ], dag=dag ) {code} "extract_data" python function seems to be simple here, but it could be anything re-usable in multiple dags... +2/ Provide some XCom value as a python callable argument:+ Let's say I a have a task which is retrieving or calculating a value, and then storing it in an XCom for further use by other tasks: {code} value_producer_task = PythonOperator( task_id='value_producer_task', provide_context=True, python_callable=produce_value, op_args=[ "my_db_connection_id", "some_other_static_parameter", "my_xcom_key" ], dag=dag ) {code} Then I can just configure a PythonCallable task to use the produced value: {code} value_consumer_task = PythonOperator( task_id='value_consumer_task', provide_context=True, python_callable=consume_value, op_args=[ "{{ task_instance.xcom_pull(task_ids=None, key='my_xcom_key') }}" ], dag=dag ) {code} I quickly tried the following class: {code} from airflow.operators.python_operator import PythonOperator class MyPythonOperator(PythonOperator): template_fields = PythonOperator.template_fields + ('op_args', 'op_kwargs') {code} and it worked like a charm. So could these 2 arguments be added to templated_fields? Or did I miss some major drawback to this change? was: *I'm wondering if "_op_args_" and "_op_kwargs_" PythonOperator parameters could be templated.* I have 2 different use cases where this change could help a lot: +1/ Provide some job execution information as a python callable argument:+ let's explain it through a simple example: {code} simple_task = PythonOperator( task_id='simple_task', provide_context=True, python_callable=extract_data, op_args=[ "my_db_connection_id" "select * from my_table" "/data/{dag.dag_id}/{ts}/my_export.csv" ], dag=dag ) {code} "extract_data" python function seems to be simple here, but it could be anything re-usable in multiple dags... +2/ Provide some XCom value as a python callable argument:+ Let's say I a have a task which is retrieving or calculating a value, and then storing it in an XCom for further use by other tasks: {code} value_producer_task = PythonOperator( task_id='value_producer_task', provide_context=True, python_callable=produce_value, op_args=[ "my_db_connection_id", "some_other_static_parameter", "my_xcom_key" ], dag=dag ) {code} Then I can just configure a PythonCallable task to use the produced value: {code} value_consumer_task = PythonOperator( task_id='value_consumer_task', provide_context=True, python_callable=consume_value, op_args=[ "{{ task_instance.xcom_pull(task_ids=None, key='my_xcom_key') }}" ], dag=dag ) {code} I quickly tried the following class: {code} from airflow.operators.python_operator import PythonOperator class MyPythonOperator(PythonOperator): template_fields = PythonOperator.template_fields + ('op_args', 'op_kwargs') {code} and it worked like a charm. So could these 2 arguments be added to templated_fields? Or did I miss some major drawback to this change? > Add op_args and op_kwargs in PythonOperator templated fields > ------------------------------------------------------------ > > Key: AIRFLOW-1814 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1814 > Project: Apache Airflow > Issue Type: Wish > Components: operators > Affects Versions: Airflow 1.8, 1.8.0 > Reporter: Galak > Priority: Minor > > *I'm wondering if "_op_args_" and "_op_kwargs_" PythonOperator parameters > could be templated.* > I have 2 different use cases where this change could help a lot: > +1/ Provide some job execution information as a python callable argument:+ > let's explain it through a simple example: > {code} > simple_task = PythonOperator( > task_id='simple_task', > provide_context=True, > python_callable=extract_data, > op_args=[ > "my_db_connection_id" > "select * from my_table" > "/data/{{ dag.dag_id }}/{{ ts }}/my_export.csv" > ], > dag=dag > ) > {code} > "extract_data" python function seems to be simple here, but it could be > anything re-usable in multiple dags... > +2/ Provide some XCom value as a python callable argument:+ > Let's say I a have a task which is retrieving or calculating a value, and > then storing it in an XCom for further use by other tasks: > {code} > value_producer_task = PythonOperator( > task_id='value_producer_task', > provide_context=True, > python_callable=produce_value, > op_args=[ > "my_db_connection_id", > "some_other_static_parameter", > "my_xcom_key" > ], > dag=dag > ) > {code} > Then I can just configure a PythonCallable task to use the produced value: > {code} > value_consumer_task = PythonOperator( > task_id='value_consumer_task', > provide_context=True, > python_callable=consume_value, > op_args=[ > "{{ task_instance.xcom_pull(task_ids=None, key='my_xcom_key') }}" > ], > dag=dag > ) > {code} > I quickly tried the following class: > {code} > from airflow.operators.python_operator import PythonOperator > class MyPythonOperator(PythonOperator): > template_fields = PythonOperator.template_fields + ('op_args', > 'op_kwargs') > {code} > and it worked like a charm. > So could these 2 arguments be added to templated_fields? Or did I miss some > major drawback to this change? -- This message was sent by Atlassian JIRA (v6.4.14#64029)