Hi Team, We are working on one of the use cases where in airflow we have to process multiple files in parallel and run a series of tasks for each file. For this we are using TaskGroup and expand. When we are trying to access the data passed via expand function outside/inside the context of any task(except PythonOperator), I am getting an error "TypeError: '*MappedArgument' object is not subscriptable*".
Attaching the sample reference code. In the attached code, I am able to access "file"(passed via expand) in get_file_id in PythonOperator(*line 31 in attached code*), whereas when trying to use it in insert_into_audit_table BigQueryInsertJobOperator(*line 58/77 in attached code*) I am getting the above mentioned error. This is blocking critical delivery, and we are looking for solution or alternative approach to this. Thank you in advance. Regards, Sunny Arora Customer Solutions Engineer - Data sunn...@google.com
GCS_Files = GCSListObjectsOperator( task_id='GCS_Files', bucket=PROCESSING_BUCKET, prefix=PROCESSING_FILE_PATH+FILE_NAME_PREFIX ) def get_file_iter(**context): files_to_process = 0 source_bucket = storage_client.get_bucket(PROCESSING_BUCKET) for i in enumerate(source_bucket.list_blobs(prefix=PROCESSING_FILE_PATH+FILE_NAME_PREFIX)): files_to_process += 1 return list(range(0,files_to_process)) get_file_iter_list = PythonOperator( task_id = 'get_file_iter_list', python_callable = get_file_iter, provide_context=True ) @task_group(group_id="process_files_in_parallel") def files_parallel_process(file): def get_file_id(LINEAGE_ID,file,**context): files_query = """ SELECT distinct f.file_id from `dh_operations.file_lookup` f WHERE f.lineage_id = '{}' and f.file_name = '{}' and f.source_id = '{}' """.format(LINEAGE_ID,file[0].split("/")[-1],SOURCE_ID) query_job = bigquery_client.query(files_query) # Wait for job to complete result = query_job.result() file_ids = [] if result != None: for row in result: file_ids.append(row.file_id) file_id = file_ids[0] context['ti'].xcom_push(key='file_id', value=file_id) get_file_id = PythonOperator( task_id = f'get_file_id', python_callable = get_file_id, op_kwargs={'LINEAGE_ID': LINEAGE_ID, 'file': file}, provide_context=True ) file_id = """{{{{ task_instance.xcom_pull(task_ids = 'get_file_id', key='file_id', map_indexes = [index]) }}}}""".format(index=file[1]) insert_into_audit_table = BigQueryInsertJobOperator( task_id=f'insert_into_audit_table', configuration={ "query": { "query": """INSERT INTO `dh_operations.auditlogs` values ( '{}', '{}', '{}', 'raw', 0, 0, 0, CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), 'STARTED' )""".format(LINEAGE_ID,SOURCE_ID,file_id), "useLegacySql": False } }, location='EU' ) get_file_id >> insert_into_audit_table files_parallel_process_obj = files_parallel_process.expand(file=XComArg(GCS_Files).zip(XComArg(get_file_iter_list)))
--------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org For additional commands, e-mail: dev-h...@airflow.apache.org