Valeriys Soloviov created AIRFLOW-6061:
------------------------------------------

             Summary: xcom is not created
                 Key: AIRFLOW-6061
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6061
             Project: Apache Airflow
          Issue Type: Bug
          Components: xcom
    Affects Versions: 1.10.3
            Reporter: Valeriys Soloviov
         Attachments: image-2019-11-25-15-09-52-646.png, 
image-2019-11-25-15-24-20-610.png

I faced a strange bug:

We are using:
 * Airflow *1.10.3* 
[all_dbs,crypto,async,jdbc,hdfs,hive,ldap,mssql,mysql,password.s3.slack,gcp,google_auth,stats]

 * mysql: 5.7.22 
{code:java}
mysql> select @@version;
+------------+
| @@version  |
+------------+
| 5.7.22-log |
+------------+
{code}

 ** LocalExecuter*

I have a DAG with one main task that sends to EMR steps via 
EmrCreateJobFlowOperator

and that 12 tasks (EmrJobFlowSensor) which wait for  completion and that run 12 
more steps and that read anyway(trigger_rule=TriggerRule.ALL_DONE) from S3 the 
log of the EMR   

!image-2019-11-25-15-09-52-646.png!

For some reason I got 3/12 task failed   and no xcom written

  !image-2019-11-25-15-24-20-610.png!
The log
{noformat}

--------------------------------------------------------------------------------
 
[2019-11-25 11:09:18,434] {__init__.py:1354} INFO - Starting attempt 3 of 3 
[2019-11-25 11:09:18,434] {__init__.py:1355} INFO - 
--------------------------------------------------------------------------------
 
[2019-11-25 11:09:18,448] {__init__.py:1374} INFO - Executing 
<Task(SparkStepOperator): stage_cs_ext_api_myheritage_tickets_volume> on 
2019-11-25T09:46:10.998426+00:00 
[2019-11-25 11:09:18,449] {base_task_runner.py:119} INFO - Running: ['airflow', 
'run', 'api', 'api_myheritage_tickets_volume', 
'2019-11-25T09:46:10.998426+00:00', '--job_id', '98486', '--raw', '-sd', 
'DAGS_FOLDER/api_export.py', '--cfg_path', '/tmp/tmp4tfkxfsq'] 
[2019-11-25 11:09:18,963] {base_task_runner.py:101} INFO - Job 98486: Subtask 
stage_cs_ext_api_myheritage_tickets_volume 
[2019-11-25 11:09:18,963] {settings.py:182} INFO - settings.configure_orm(): 
Using pool settings. pool_size=5, pool_recycle=1800, pid=26857 
[2019-11-25 11:09:19,929] {base_task_runner.py:101} INFO - Job 98486: Subtask 
stage_cs_ext_api_myheritage_tickets_volume 
[2019-11-25 11:09:19,929] {__init__.py:51} INFO - Using executor LocalExecutor 
[2019-11-25 11:09:20,091] {base_task_runner.py:101} INFO - Job 98486: Subtask 
stage_cs_ext_api_myheritage_tickets_volume 
[2019-11-25 11:09:20,091] {__init__.py:305} INFO - Filling up the DagBag from 
/home/airflow/dags/export.py 
[2019-11-25 11:09:20,219] {base_task_runner.py:101} INFO - Job 98486: Subtask 
stage_cs_ext_api_myheritage_tickets_volume 
[2019-11-25 11:09:20,218] {cli.py:517} INFO - Running <TaskInstance: 
wap-cs-external-api.stage_cs_ext_api_myheritage_tickets_volume 
2019-11-25T09:46:10.998426+00:00 [running]> on host airflow 
[2019-11-25 11:09:20,312] {emr.py:348} INFO - Adding step [{'Name': 
'stage_cs_ext_api_myheritage_tickets_volume', 'ActionOnFailure': 'CONTINUE', 
'HadoopJarStep': {'Jar': 'command-runner.jar', 'Args': ['spark-submit', 
'--deploy-mode', 'client', 's3://releases/airflow/parquet_to_csv.py', 'prod', 
'marketing', 'cs_ext_api_myheritage_tickets_volume', 
's3://data/cs_ext_api_myheritage_tickets_volume.csv', 'false', 'true']}}] to 
the cluster [j-1UVOKFN0F1QC2] [2019-11-25 11:09:20,315] {logging_mixin.py:95} 
INFO - 
[2019-11-25 11:09:20,315] {connectionpool.py:735} INFO - Starting new HTTPS 
connection (1): elasticmapreduce.us-east-1.amazonaws.com 
[2019-11-25 11:11:20,485] {logging_mixin.py:95} INFO - 
[2019-11-25 11:11:20,484] {connectionpool.py:238} INFO - Resetting dropped 
connection: elasticmapreduce.us-east-1.amazonaws.com 
[2019-11-25 11:11:20,534] {__init__.py:1580} ERROR - Waiter StepComplete 
failed: Waiter encountered a terminal failure state
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models/__init__.py", 
line 1436, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.6/site-packages/flow/operators/emr.py", line 
350, in execute
    hook.wait_for_step_completion(cluster_id, step_id)
  File "/usr/local/lib/python3.6/site-packages/wixflow/aws/emr_lib.py", line 
55, in wait_for_step_completion
    'MaxAttempts': 60
  File "/usr/local/lib/python3.6/site-packages/botocore/waiter.py", line 53, in 
wait
    Waiter.wait(self, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/botocore/waiter.py", line 323, 
in wait
    last_response=response,
botocore.exceptions.WaiterError: Waiter StepComplete failed: Waiter encountered 
a terminal failure state
{noformat}
  
 flow/operators/emr.py
{code:java}
class SparkStepOperator(BaseOperator):
    ui_color = '#cce6ff'
    template_fields = ['app_args']

    def __init__(self,
                 step_name,
                 source_path,
                 deploy_mode='client',
                 main_class=None,
                 app_args: List[str] = None,
                 spark_submit_args: List[str] = None,
                 action_on_failure='CONTINUE',
                 cluster_id=None,
                 emr_conn_id=default_emr_conn_id,
                 *args, **kwargs):
        """
        :param step_name: name of the step to submit
        :param source_path: path to jar or python file on s3
        :param action_on_failure: 
'TERMINATE_JOB_FLOW'|'TERMINATE_CLUSTER'|'CANCEL_AND_WAIT'|'CONTINUE'
        :param emr_conn_id: emr connection id
        """
        super(SparkStepOperator, self).__init__(task_id=step_name, *args, 
**kwargs)
        self.emr_conn_id = emr_conn_id
        self.cluster_id = cluster_id
        self.action_on_failure = action_on_failure
        self.step_name = step_name
        self.source_path = source_path
        self.deploy_mode = deploy_mode
        self.spark_submit_args = spark_submit_args if spark_submit_args else []
        self.app_args = app_args if app_args else []
        self.main_class_args = ['--class', main_class] if main_class else []

    def execute(self, context):
        cluster_id = self.cluster_id if self.cluster_id else 
self.xcom_pull(context, task_ids='create_cluster')
        hook = EmrHook(self.emr_conn_id)
        args = ['spark-submit', '--deploy-mode', self.deploy_mode] + 
self.spark_submit_args + self.main_class_args + [
            self.source_path] + self.app_args
        step = {
            'Name': self.step_name,
            'ActionOnFailure': self.action_on_failure,
            'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                'Args': args
            }
        }
        self.log.info(f"Adding step [{step}] to the cluster [{cluster_id}]")
        step_id = hook.add_step(cluster_id, step)
        hook.wait_for_step_completion(cluster_id, step_id)
        self.log.info(f"Step [{step_id}] finished on the cluster 
[{cluster_id}]")
        return step_id
{code}
 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to