[
https://issues.apache.org/jira/browse/AIRFLOW-1814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Galak updated AIRFLOW-1814:
---------------------------
Description:
*I'm wondering if "_op_args_" and "_op_kwargs_" PythonOperator parameters could
be templated.*
I have 2 different use cases where this change could help a lot:
+1/ Provide some job execution information as a python callable argument:+
let's explain it through a simple example:
{code}
simple_task = PythonOperator(
task_id='simple_task',
provide_context=True,
python_callable=extract_data,
op_args=[
"my_db_connection_id"
"select * from my_table"
"/data/{{ dag.dag_id }}/{{ ts }}/my_export.csv"
],
dag=dag
)
{code}
"extract_data" python function seems to be simple here, but it could be
anything re-usable in multiple dags...
+2/ Provide some XCom value as a python callable argument:+
Let's say I a have a task which is retrieving or calculating a value, and then
storing it in an XCom for further use by other tasks:
{code}
value_producer_task = PythonOperator(
task_id='value_producer_task',
provide_context=True,
python_callable=produce_value,
op_args=[
"my_db_connection_id",
"some_other_static_parameter",
"my_xcom_key"
],
dag=dag
)
{code}
Then I can just configure a PythonCallable task to use the produced value:
{code}
value_consumer_task = PythonOperator(
task_id='value_consumer_task',
provide_context=True,
python_callable=consume_value,
op_args=[
"{{ task_instance.xcom_pull(task_ids=None, key='my_xcom_key') }}"
],
dag=dag
)
{code}
I quickly tried the following class:
{code}
from airflow.operators.python_operator import PythonOperator
class MyPythonOperator(PythonOperator):
template_fields = PythonOperator.template_fields + ('op_args', 'op_kwargs')
{code}
and it worked like a charm.
So could these 2 arguments be added to templated_fields? Or did I miss some
major drawback to this change?
was:
*I'm wondering if "_op_args_" and "_op_kwargs_" PythonOperator parameters could
be templated.*
I have 2 different use cases where this change could help a lot:
+1/ Provide some job execution information as a python callable argument:+
let's explain it through a simple example:
{code}
simple_task = PythonOperator(
task_id='simple_task',
provide_context=True,
python_callable=extract_data,
op_args=[
"my_db_connection_id"
"select * from my_table"
"/data/{dag.dag_id}/{ts}/my_export.csv"
],
dag=dag
)
{code}
"extract_data" python function seems to be simple here, but it could be
anything re-usable in multiple dags...
+2/ Provide some XCom value as a python callable argument:+
Let's say I a have a task which is retrieving or calculating a value, and then
storing it in an XCom for further use by other tasks:
{code}
value_producer_task = PythonOperator(
task_id='value_producer_task',
provide_context=True,
python_callable=produce_value,
op_args=[
"my_db_connection_id",
"some_other_static_parameter",
"my_xcom_key"
],
dag=dag
)
{code}
Then I can just configure a PythonCallable task to use the produced value:
{code}
value_consumer_task = PythonOperator(
task_id='value_consumer_task',
provide_context=True,
python_callable=consume_value,
op_args=[
"{{ task_instance.xcom_pull(task_ids=None, key='my_xcom_key') }}"
],
dag=dag
)
{code}
I quickly tried the following class:
{code}
from airflow.operators.python_operator import PythonOperator
class MyPythonOperator(PythonOperator):
template_fields = PythonOperator.template_fields + ('op_args', 'op_kwargs')
{code}
and it worked like a charm.
So could these 2 arguments be added to templated_fields? Or did I miss some
major drawback to this change?
> Add op_args and op_kwargs in PythonOperator templated fields
> ------------------------------------------------------------
>
> Key: AIRFLOW-1814
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1814
> Project: Apache Airflow
> Issue Type: Wish
> Components: operators
> Affects Versions: Airflow 1.8, 1.8.0
> Reporter: Galak
> Priority: Minor
>
> *I'm wondering if "_op_args_" and "_op_kwargs_" PythonOperator parameters
> could be templated.*
> I have 2 different use cases where this change could help a lot:
> +1/ Provide some job execution information as a python callable argument:+
> let's explain it through a simple example:
> {code}
> simple_task = PythonOperator(
> task_id='simple_task',
> provide_context=True,
> python_callable=extract_data,
> op_args=[
> "my_db_connection_id"
> "select * from my_table"
> "/data/{{ dag.dag_id }}/{{ ts }}/my_export.csv"
> ],
> dag=dag
> )
> {code}
> "extract_data" python function seems to be simple here, but it could be
> anything re-usable in multiple dags...
> +2/ Provide some XCom value as a python callable argument:+
> Let's say I a have a task which is retrieving or calculating a value, and
> then storing it in an XCom for further use by other tasks:
> {code}
> value_producer_task = PythonOperator(
> task_id='value_producer_task',
> provide_context=True,
> python_callable=produce_value,
> op_args=[
> "my_db_connection_id",
> "some_other_static_parameter",
> "my_xcom_key"
> ],
> dag=dag
> )
> {code}
> Then I can just configure a PythonCallable task to use the produced value:
> {code}
> value_consumer_task = PythonOperator(
> task_id='value_consumer_task',
> provide_context=True,
> python_callable=consume_value,
> op_args=[
> "{{ task_instance.xcom_pull(task_ids=None, key='my_xcom_key') }}"
> ],
> dag=dag
> )
> {code}
> I quickly tried the following class:
> {code}
> from airflow.operators.python_operator import PythonOperator
> class MyPythonOperator(PythonOperator):
> template_fields = PythonOperator.template_fields + ('op_args',
> 'op_kwargs')
> {code}
> and it worked like a charm.
> So could these 2 arguments be added to templated_fields? Or did I miss some
> major drawback to this change?
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)