rsotogar opened a new issue #17636:
URL: https://github.com/apache/airflow/issues/17636


   Hi!
   
   I am getting this error in the EmrStepSensor:
   
   `botocore.errorfactory.InvalidRequestException: An error occurred 
(InvalidRequestException) when calling the DescribeStep operation: Step id '{ 
task_instance.xcom_pull(task_ids='users_pipeline', key='return_value')[0] }' is 
not valid`.
   
   Here's the code:
   
          `pipelines = ["events", "users", "eventsstaged", "activities", 
"feeds", "attendances", "friendships", "media", "downtoclown", "agents"]
   
           for pipeline in pipelines:
   
           watcher_operator = EmrStepSensor(task_id=f"{pipeline}_sensor",
                                            job_flow_id="{{ 
task_instance.xcom_pull(task_ids='start_emr_cluster', key='return_value')}}",
                                            step_id= f"{{ 
task_instance.xcom_pull(task_ids='{pipeline}_pipeline', key='return_value')[0] 
}}",
                                            target_states=["COMPLETED"])
   
           watcher_operators.append(watcher_operator)`
   
   I am using Spark 3.1.1 running on EMR 6.3.0 and Airflow is deployed on EC2 
using Docker. When I use the XCOM method to extract the job flow id seems fine, 
it only gives me problems when extracting the step id. Any help would be 
appreciated.
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to