Hi Team,

We are working on one of the use cases where in airflow we have to process
multiple files in parallel and run a series of tasks for each file. For
this we are using TaskGroup and expand. When we are trying to access the
data passed via expand function outside/inside the context of any
task(except PythonOperator), I am getting an error "TypeError:
'*MappedArgument'
object is not subscriptable*".

Attaching the sample reference code.

In the attached code, I am able to access "file"(passed via expand) in
get_file_id in PythonOperator(*line 31 in attached code*), whereas when
trying to use it in insert_into_audit_table BigQueryInsertJobOperator(*line
58/77 in attached code*) I am getting the above mentioned error.
This is blocking critical delivery, and we are looking for solution or
alternative approach to this.

Thank you in advance.

Regards,

Sunny Arora

Customer Solutions Engineer - Data

sunn...@google.com
GCS_Files = GCSListObjectsOperator(
    task_id='GCS_Files',
    bucket=PROCESSING_BUCKET,
    prefix=PROCESSING_FILE_PATH+FILE_NAME_PREFIX
)

def get_file_iter(**context):
    files_to_process = 0
    source_bucket = storage_client.get_bucket(PROCESSING_BUCKET)
    for i in enumerate(source_bucket.list_blobs(prefix=PROCESSING_FILE_PATH+FILE_NAME_PREFIX)):
        files_to_process += 1

    return list(range(0,files_to_process))

get_file_iter_list = PythonOperator(
    task_id = 'get_file_iter_list',
    python_callable = get_file_iter,
    provide_context=True
)

@task_group(group_id="process_files_in_parallel")
def files_parallel_process(file):

    def get_file_id(LINEAGE_ID,file,**context):
        files_query = """
            SELECT distinct f.file_id
            from `dh_operations.file_lookup` f
            WHERE f.lineage_id = '{}'
            and f.file_name = '{}'
            and f.source_id = '{}'
        """.format(LINEAGE_ID,file[0].split("/")[-1],SOURCE_ID)

        query_job = bigquery_client.query(files_query)

        # Wait for job to complete
        result = query_job.result()

        file_ids = []

        if result != None:
            for row in result:
                file_ids.append(row.file_id)

        file_id = file_ids[0]

        context['ti'].xcom_push(key='file_id', value=file_id)


    get_file_id = PythonOperator(
        task_id = f'get_file_id',
        python_callable = get_file_id,
        op_kwargs={'LINEAGE_ID': LINEAGE_ID,
                    'file': file},
        provide_context=True

    )

    file_id = """{{{{ task_instance.xcom_pull(task_ids = 'get_file_id', key='file_id', map_indexes = [index]) }}}}""".format(index=file[1])

    insert_into_audit_table = BigQueryInsertJobOperator(
            task_id=f'insert_into_audit_table',
            configuration={
                "query": {
                    "query": """INSERT INTO `dh_operations.auditlogs`
                                        values
                                        (
                                        '{}',
                                        '{}',
                                        '{}',
                                        'raw',
                                        0,
                                        0,
                                        0,
                                        CURRENT_TIMESTAMP(),
                                        CURRENT_TIMESTAMP(),
                                        'STARTED'
                                        )""".format(LINEAGE_ID,SOURCE_ID,file_id),

                    "useLegacySql": False
                }
            },
            location='EU'
        )

    get_file_id >> insert_into_audit_table

files_parallel_process_obj = files_parallel_process.expand(file=XComArg(GCS_Files).zip(XComArg(get_file_iter_list)))
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org
For additional commands, e-mail: dev-h...@airflow.apache.org

Reply via email to