The correct way to pass kwargs is through op_kwargs.
for log_type in LOG_TYPES:
raw_log_py_operator = PythonOperator(
task_id='{}-logs-copy-to-s3'.format(log_type),
provide_context=True,
python_callable=raw_log_callable,
dag=raw_log_dag,
op_kwargs={'log_type': log_type},
)
See the PythonOperator example,
https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/example_dags/example_python_operator.py#L54
.
On Thu, Jul 20, 2017 at 11:26 AM, Kevin Pamplona <[email protected]> wrote:
> I'm trying to pass a custom arg to a python operator, but when trying to
> resolve *kwargs.get('log_type')*, I get 'None'.
>
> Anyone have any issues with this before?
>
> *LOG_TYPES = ['abc_log', 'xyz_log']*
>
> def raw_log_callable(ds, **kwargs):
> exec_date = str(kwargs.get('task_instance').execution_date.date() -
> timedelta(days=1)).split()[0]
> task_logic.main({'<date>' : exec_date, '<log_type>' :
> *kwargs.get('log_type')*})
>
>
> for log_type in LOG_TYPES:
> raw_log_py_operator = PythonOperator(
> task_id='{}-logs-copy-to-s3'.format(log_type),
> provide_context=True,
> python_callable=raw_log_callable,
> dag=raw_log_dag,
> * log_type=log_type*
> )
>