[ 
https://issues.apache.org/jira/browse/AIRFLOW-5295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16916568#comment-16916568
 ] 

Pierrick Boitel commented on AIRFLOW-5295:
------------------------------------------

Hi Ash, thanks for beeing so reactive.

We actually have a task which build and EMR cluster (spark cluster) then send 
the id of cluster via xcom.

Then we retrieve the id with other tasks to send steps in EMR. The code looks 
like this to build the operator:


{code:python}
step_operator = TemplatedEmrAddStepsOperator(
        task_id=task_id,
        aws_conn_id="aws_thefork",
        job_flow_id=get_job_flow_id(main_dag=dag, 
running_cluster=running_cluster),
        steps=json_emr_steps,
        queue="default",
        trigger_rule=TriggerRule.ALL_DONE,
    )
{code}

And here is the code for the get_job_flow_id function, retrieving the xcom:

 {code:python}
def get_job_flow_id(main_dag, running_cluster):
    if running_cluster is not None:
        job_flow_id = running_cluster
    else:
        job_flow_id = (
            "{{ task_instance.xcom_pull('start_emr_cluster', dag_id='"
            + MAIN_DAG_NAME
            + ".initialization_emr_cluster"
            + "', key='return_value') }}"
        )
    return job_flow_id
{code}

Could it be that it is the get_job_flow_id which remains in cache? Thanks for 
your help

> DAG doesn't take the last value of XCOM
> ---------------------------------------
>
>                 Key: AIRFLOW-5295
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-5295
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: DAG
>    Affects Versions: 1.8.0, 1.10.3
>            Reporter: Pierrick Boitel
>            Priority: Minor
>
> I have sometimes failure on my DAG and need to re-run some tasks. Some of 
> these emits a new value for a XCOM, but when following tasks runs, they 
> generally takes the old value for this XCOM (although the value is good in 
> Airflow's database). In order to have the good value, I have to clear my 
> tasks several time (more than 10 time sometimes) which is really painfull.
> I suppose that the DAG on the scheduler keeps the value on cache but not sure 
> on this. I didn't enable pickking for the JSON too.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to