Syar123 edited a comment on issue #21714:
URL: https://github.com/apache/airflow/issues/21714#issuecomment-1047101944
```
def trigger_gcs_to_bq_dag(context, dag_run_obj):
dag_run_obj.payload = {
"gcs_bucket" : context['ti'].xcom_pull(key='gcs_bucket',
task_ids='parse_params'),"gcs_objects_list" :
context['ti'].xcom_pull(key='gcs_objects_list',
task_ids='filter_gcs_files'),"bq_auto_detect_schema" :
context['ti'].xcom_pull(key='bq_auto_detect_schema',
task_ids='parse_params'),"bq_schema" : context['ti'].xcom_pull(key='bq_schema',
task_ids='parse_params')
}
return dag_run_obj
trigger_gcs_to_bq_dag = TriggerDagRunOperator(
task_id='trigger_gcs_to_bq_dag',
trigger_dag_id="gcs_to_bq",
python_callable=trigger_gcs_to_bq_dag,
retries=1
)
```
in airflow 2.1.4 as python_callable is deprecated we needed to pass
parameters through 'conf' under TriggerDagRunOperator instead of
dag_run_obj.payload
```
trigger_gcs_to_bq_dag = TriggerDagRunOperator(
task_id='trigger_gcs_to_bq_dag',
trigger_dag_id="gcs_to_bq",
conf={"gcs_bucket" : "{{ ti.xcom_pull(key='gcs_bucket',
task_ids='parse_params') }}","gcs_objects_list" : "{{
ti.xcom_pull(key='gcs_objects_list', task_ids='filter_gcs_files')
}}","bq_auto_detect_schema" : "{{ ti.xcom_pull(key='bq_auto_detect_schema',
task_ids='parse_params') }}","bq_schema" : "{{ xcom_pull(key='bq_schema',
task_ids='parse_params') }}" },
retries=1
)
```
when passing through conf the parameter values are going as strings instead
of their own values (None is passing as 'None')
in the external dag ( gcs_to_bq) we are using logic like
```
def parse_params(ti, **context):
if context['dag_run'].conf:
params['gcs_bucket'] =
context['dag_run'].conf.get('gcs_bucket', None)
params['gcs_objects_list'] =
context['dag_run'].conf.get('gcs_objects_list', None)
params['bq_auto_detect_schema'] =
context['dag_run'].conf.get('bq_auto_detect_schema', None)
params['bq_schema'] =
context['dag_run'].conf.get('bq_schema', None)
else: # fail without retry
raise AirflowFailException("config missing. please trigger
the dag with proper config")
# set defaults:
if (params['bq_auto_detect_schema'] is None and
params['bq_schema'] is None):
params['bq_auto_detect_schema'] = True
if params['bq_schema'] is not None:
schema_fields=[
{
'name':f[0],
'type':f[1]
} for f in params['bq_schema']]
```
in our external dag (gcs_to_bq) [ as parsing parameters are strings we are
having issues while reading values like None, integers (0,1) and boolean values
(true)]
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]